OBSDATA-8643 Adding a column in druid metric#316
OBSDATA-8643 Adding a column in druid metric#316Indrajeet Garse (igarse) merged 29 commits into30.0.1-confluentfrom
Conversation
| event.put("delayed_minutes", delayMinutes); | ||
| } | ||
| catch (Exception e) { | ||
| log.warn(e, "Could not extract create_time from KafkaRecordEntity"); |
There was a problem hiding this comment.
this should be error log
| long timeUnixNano = dataPoint.getTimeUnixNano(); | ||
| try { | ||
| // Get the getRecord method reflectively | ||
| Method getRecordMethod = entity.getClass().getMethod("getRecord"); |
There was a problem hiding this comment.
we should avoid using java reflection.
This can cause problems at runtime.
Instead typecast the entity into KafkaConsumerRecordEntity
something like:
if (entity instanceof KafkaConsumerRecordEntity) {
KafkaConsumerRecordEntity ke = (KafkaConsumerRecordEntity) entity;
ConsumerRecord<?,?> record = ke.record();
long timestamp = record.timestamp();
| COPY --chown=druid:druid --from=builder /opt /opt | ||
| COPY distribution/docker/druid.sh /druid.sh | ||
| COPY distribution/docker/peon.sh /peon.sh | ||
| COPY --chown=druid:druid distribution/docker/extra_jars/ /opt/druid/lib/ |
There was a problem hiding this comment.
why is this removed ?
There was a problem hiding this comment.
Checking locally, I will add it again.
| catch (ClassCastException e) { | ||
| log.error(e, "Failed to cast source entity to TimestampedEntity."); | ||
| } | ||
| catch (NullPointerException e) { |
There was a problem hiding this comment.
We should not catch NPEs.
Instead check if the value is non-null before accessing.
|
|
||
| try { | ||
| long timeUnixNano = dataPoint.getTimeUnixNano(); | ||
| long createdTime = ((TimestampedEntity) source.getEntity()).getRecordTimestampMillis(); |
There was a problem hiding this comment.
It's better to check instance type here instead of catching exception and throw away the event:
Object entity = source.getEntity();
if (!(entity instanceof TimestampedEntity)) {
throw new ClassCastException("Entity is not TimestampedEntity");
}
Let me know you thoughts ?
| long deviated_seconds = (createdTime - (timeUnixNano / NANOS_TO_MILLIS)) / MILLIS_PER_SECOND; | ||
| long deviated_minutes = deviated_seconds / 60; | ||
| event.put("deviated_seconds", deviated_seconds); | ||
| event.put("deviated_minutes", deviated_minutes); |
There was a problem hiding this comment.
why another derived column of minutes ?
There was a problem hiding this comment.
Earlier we added only 'minutes', but later it was decided to add the 'seconds' also, for more clarity and ease in bucketing.
There was a problem hiding this comment.
bucketing will anyways be done using transformSpec during ingestion into druid.
imo, only deviated_seconds was sufficient.
| * This provides a way for extensions to safely access record timestamp information | ||
| * across ClassLoader boundaries without causing ClassCastExceptions. |
There was a problem hiding this comment.
Let's not mention this in JavaDocs.
* run "services" tests separately in semaphore * Avoid local artifacts for running tests of "services" * update command for "services" tests * Run "services" tests sequentially * pass params in MAVEN_OPTS * fix MAVEN_OPTS in "Services"
…cient (#309) * Add maxInterval to kill config and make kill tasks efficient * resolve CI
…nfig from 1M to 1S (#317)
…ervisor is in suspended state (#321)
4bbee04
…ate-arriving-data
Description
Adds a column 'deviated_minutes' and 'deviated_seconds' in the metrics. Represents the time difference from Telemetry Kafka emitter to ingestion in Druid in minutes and in seconds respectively.
Release note
For tips about how to write a good release note, see Release notes.
Key changed/added classes in this PR
OpenTelemetryMetricsProtobufReaderKafkaRecordEntityKafkaEntityThis PR has: