Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logs from different streams can arrive out of order #401

Merged
merged 2 commits into from
Dec 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions lib/kubernetes-deploy/container_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def initialize(parent_id:, container_name:, namespace:, context:, logger:)
def sync
new_logs = fetch_latest
return unless new_logs.present?
@lines += deduplicate(new_logs)
@lines += sort_and_deduplicate(new_logs)
end

def empty?
Expand Down Expand Up @@ -60,18 +60,24 @@ def rfc3339_timestamp(time)
time.strftime("%FT%T.%N%:z")
end

def deduplicate(logs)
deduped = []
check_for_duplicate = true
def sort_and_deduplicate(logs)
parsed_lines = logs.map { |line| split_timestamped_line(line) }
sorted_lines = parsed_lines.sort do |(timestamp1, _msg1), (timestamp2, _msg2)|
if timestamp1.nil?
-1
elsif timestamp2.nil?
1
else
timestamp1 <=> timestamp2
end
end

logs.each do |line|
timestamp, msg = split_timestamped_line(line)
next if check_for_duplicate && likely_duplicate?(timestamp)
check_for_duplicate = false # logs are ordered, so once we've seen a new one, assume all subsequent logs are new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

womp womp lol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

deduped = []
sorted_lines.each do |timestamp, msg|
next if likely_duplicate?(timestamp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. likely_duplicate? means that timestamp <= @last_timestamp. But how can this ever be true now that we've sorted the logs by timestamp? Or is it possibly true because of a previous call to this method? i.e. an older value of @last_timestamp, not one that was set during this method call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is it possibly true because of a previous call to this method

Exactly. We fetch logs (and then process them here) repeatedly, and since the API does not support subsecond granularity, we inevitably sometimes get the same log lines in more than one batch.

@last_timestamp = timestamp if timestamp
deduped << msg
end

deduped
end

Expand Down
50 changes: 47 additions & 3 deletions test/unit/container_logs_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,23 @@ def test_print_latest_supports_prefixing
end

def test_logs_without_timestamps_are_not_deduped
logs_response_1_with_anomaly = logs_response_1 + "Line 3.5"
logs_response_2_with_anomaly = "Line 3.5\n" + logs_response_2
logs_response_1_with_anomaly = logs_response_1 + "No timestamp"
logs_response_2_with_anomaly = "No timestamp 2\n" + logs_response_2
KubernetesDeploy::Kubectl.any_instance.stubs(:run)
.returns([logs_response_1_with_anomaly, "", ""])
.then.returns([logs_response_2_with_anomaly, "", ""])

@logs.sync
@logs.sync
@logs.print_all
assert_logs_match("Line 3.5", 2)
assert_logs_match_all([
"No timestamp", # moved to start of batch 1
"Line 1",
"Line 2",
"Line 3",
"No timestamp 2", # moved to start of batch 2
"Line 4"
], in_order: true)
end

def test_deduplication_works_when_exact_same_batch_is_returned_more_than_once
Expand All @@ -108,6 +115,43 @@ def test_deduplication_works_when_exact_same_batch_is_returned_more_than_once
assert_logs_match("Line 2", 1)
end

def test_deduplication_works_when_last_line_is_out_of_order
regression_data = <<~STRING
2018-12-13T12:17:23.727605598Z Line 1
2018-12-13T12:17:23.727696012Z Line 2
2018-12-13T12:17:23.728538913Z Line 3
2018-12-13T12:17:23.7287293Z Line 4
2018-12-13T12:17:23.729694842Z Line 5
2018-12-13T12:17:23.731259592Z Line 7
2018-12-13T12:17:23.73127007Z Line 8
2018-12-13T12:17:23.731273672Z Line 9
2018-12-13T12:17:23.731276862Z Line 10
2018-12-13T12:17:23.731284069Z Line 11
2018-12-13T12:17:23.731287054Z Line 12
2018-12-13T12:17:23.731289959Z Line 13
2018-12-13T12:17:23.731292814Z Line 14
2018-12-13T12:17:23.731295298Z Line 15
2018-12-13T12:17:23.731297747Z Line 16
2018-12-13T12:17:23.731297748Z Line 17
2018-12-13T12:17:23.729851532Z Line 6
STRING

KubernetesDeploy::Kubectl.any_instance.stubs(:run)
.returns([regression_data, "", ""]).times(12)

12.times do
@logs.sync
@logs.print_latest
end

expected_lines = generate_log_messages(1..17)

expected_lines.each do |line|
assert_logs_match(/#{line}$/, 1) # no duplicates
end
assert_logs_match_all(expected_lines, in_order: true) # sorted correctly
end

private

def generate_log_messages(range)
Expand Down