Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kinesis lag metric #9509

Merged
merged 7 commits into from
Mar 17, 2020
Merged

Conversation

clintropolis
Copy link
Member

@clintropolis clintropolis commented Mar 12, 2020

Fixes #6739.

Description

This PR adds an additional set of SeekableStreamSupervisor lag metrics, based on time in millis behind the latest offsets of the stream, allowing the Kinesis indexing service extension to have a lag metric. scheduleReporting and emitLag were pushed down from KafkaSupervisor into SeekableStreamSupervisor, and two additional abstract methods, getPartitionRecordLag and getPartitionTimeLag which collect the two types of lag metrics per partition of the stream.

Kafka's existing lag metrics are still collected in the same format for backwards compatibility.

ingest/{supervisor type}/lag
ingest/{supervisor type}/maxLag
ingest/{supervisor type}/avgLag

and the new time metrics have a /time suffix.

ingest/{supervisor type}/lag/time
ingest/{supervisor type}/maxLag/time
ingest/{supervisor type}/avgLag/time

Currently KafkaSupervisor only emits the former set of metrics, and KinesisSupervisor, after this PR, will emit only the latter. It seems possible for KafkaSupervisor to emit both sets of metrics, but I will save that for a follow-up PR.

Lag is also available on the supervisor status reports:

example kinesis supervisor status report:

{
  "dataSource": "clint-test-kinesis",
  "stream": "clint-test",
  "partitions": 1,
  "replicas": 1,
  "durationSeconds": 3600,
  "activeTasks": [
    {
      "id": "index_kinesis_clint-test-kinesis_0dbe7907dfdf424_omdghmmd",
      "startingOffsets": {
        "shardId-000000000000": "49605075714895969944729166984744978015226161388004048898"
      },
      "startTime": null,
      "remainingSeconds": null,
      "type": "ACTIVE",
      "currentOffsets": {},
      "lag": {},
      "lagMillis": {
        "shardId-000000000000": 86244000
      }
    }
  ],
  "publishingTasks": [],
  "minimumLagMillis": {
    "shardId-000000000000": 86243000
  },
  "aggregateLagMillis": 86243000,
  "suspended": false,
  "healthy": true,
  "state": "RUNNING",
  "detailedState": "RUNNING",
  "recentErrors": []
}

example status report with suspended supervisor:

{
  "dataSource": "clint-test-kinesis",
  "stream": "clint-test",
  "partitions": 1,
  "replicas": 1,
  "durationSeconds": 3600,
  "activeTasks": [],
  "publishingTasks": [],
  "minimumLagMillis": {
    "shardId-000000000000": 22463000
  },
  "aggregateLagMillis": 22463000,
  "suspended": true,
  "healthy": true,
  "state": "SUSPENDED",
  "detailedState": "SUSPENDED",
  "recentErrors": []
}

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths.
  • added integration tests.
  • been tested in a test Druid cluster.

Key changed/added classes in this PR
  • SeekableStreamSupervisor
  • KafkaSupervisor
  • KinesisSupervisor

@codecov-io
Copy link

Codecov Report

Merging #9509 into master will increase coverage by 0.98%.
The diff coverage is 57.89%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #9509      +/-   ##
============================================
+ Coverage     63.56%   64.55%   +0.98%     
- Complexity    23489    23571      +82     
============================================
  Files          3040     2932     -108     
  Lines        125895   119897    -5998     
  Branches      17410    16048    -1362     
============================================
- Hits          80026    77394    -2632     
+ Misses        38673    35519    -3154     
+ Partials       7196     6984     -212
Impacted Files Coverage Δ Complexity Δ
...pervisor/SeekableStreamSupervisorTuningConfig.java 0% <ø> (ø) 0 <0> (ø) ⬇️
.../kafka/supervisor/KafkaSupervisorTuningConfig.java 100% <ø> (ø) 12 <0> (ø) ⬇️
...xing/seekablestream/supervisor/TaskReportData.java 0% <0%> (ø) 0 <0> (ø) ⬇️
...esis/supervisor/KinesisSupervisorTuningConfig.java 100% <100%> (ø) 12 <1> (+1) ⬆️
.../druid/indexing/kinesis/KinesisRecordSupplier.java 55.27% <100%> (+0.84%) 35 <2> (+2) ⬆️
...uid/indexing/kafka/supervisor/KafkaSupervisor.java 79.51% <20%> (+10.49%) 24 <1> (-1) ⬇️
...blestream/supervisor/SeekableStreamSupervisor.java 27.25% <59.18%> (+1.5%) 62 <5> (+7) ⬆️
...indexing/kinesis/supervisor/KinesisSupervisor.java 75% <83.33%> (+0.21%) 33 <2> (+1) ⬆️
...main/java/org/apache/druid/guice/DruidBinders.java 33.33% <0%> (-8.34%) 3% <0%> (ø)
...org/apache/druid/segment/realtime/FireHydrant.java 58.33% <0%> (-8.34%) 14% <0%> (-1%)
... and 148 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7401bb3...646a70b. Read the comment docs.

@clintropolis clintropolis removed the WIP label Mar 12, 2020

public Map<String, Long> getPartitionTimeLag(Map<String, String> currentOffsets)
{
return partitionResources.entrySet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of partitions can be on the order of hundreds, would it make sense to avoid streams here for performance (I remember various issues in the past around stream perf)

protected abstract Map<PartitionIdType, Long> getPartitionRecordLag();

/**
* Gets 'lag'of currently processed offset behind latest offset as a measure of the difference in time inserted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'lag'of -> 'lag' of

final String offsetToUse;
if (offset == null || KinesisSupervisor.NOT_SET.equals(offset)) {
// this should probably check if will start processing earliest or latest rather than assuming earliest
// if latest we could skip this because latest will not be behing latest so lag is 0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

behing -> behind

…s from kinesis supervisor report since always null, review stuffs
@jon-wei jon-wei merged commit 142742f into apache:master Mar 17, 2020
@clintropolis clintropolis deleted the kinesis-lag-metric branch March 19, 2020 10:53
@jihoonson jihoonson added this to the 0.18.0 milestone Mar 26, 2020
jihoonson added a commit to jihoonson/druid that referenced this pull request May 4, 2020
jihoonson added a commit to jihoonson/druid that referenced this pull request May 5, 2020
ccaominh pushed a commit to implydata/druid-public that referenced this pull request May 5, 2020
jihoonson added a commit that referenced this pull request May 5, 2020
clintropolis added a commit to implydata/druid-public that referenced this pull request May 19, 2020
clintropolis added a commit to implydata/druid-public that referenced this pull request May 20, 2020
* Revert "Revert "add kinesis lag metric (apache#9509)" (#88)"

This reverts commit 05fbcab.

* refactor SeekableStreamSupervisor usage of RecordSupplier (apache#9819)

* refactor SeekableStreamSupervisor usage of RecordSupplier to reduce contention between background threads and main thread, refactor KinesisRecordSupplier, refactor Kinesis lag metric collection and emitting

* fix style and test

* cleanup, refactor, javadocs, test

* fixes

* keep collecting current offsets and lag if unhealthy in background reporting thread

* review stuffs

* add comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement approximate time to latest in Kinesis Indexing Service
4 participants