Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ test-output
# Local config to handle using Java 8 vs java 11.
.java-version
*.tgz

# helm
charts/
Chart.lock
9 changes: 5 additions & 4 deletions hypertrace-ingester/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ hypertraceDocker {
}

dependencies {
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.9")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.9")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.8")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.15")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.18")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.18")
implementation("org.hypertrace.core.datamodel:data-model:0.1.12")
implementation("org.hypertrace.core.viewgenerator:view-generator-framework:0.1.21")
implementation("com.typesafe:config:1.4.0")
implementation("org.apache.commons:commons-lang3:3.11")

implementation(project(":span-normalizer:span-normalizer"))
implementation(project(":raw-spans-grouper:raw-spans-grouper"))
Expand All @@ -40,7 +41,7 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
testImplementation("org.mockito:mockito-core:3.6.0")
testImplementation("org.junit-pioneer:junit-pioneer:1.0.0")
testImplementation("org.apache.kafka:kafka-streams-test-utils:5.5.1-ccs")
testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs")
testImplementation(project(":hypertrace-view-generator:hypertrace-view-generator-api"))
testImplementation(project(":span-normalizer:span-normalizer-api"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,6 @@ dependencies {
implementation(project(":span-normalizer:span-normalizer-constants"))
implementation("org.hypertrace.entity.service:entity-service-api:0.1.23")

constraints {
implementation("com.google.guava:guava:30.0-jre") {
because("https://snyk.io/vuln/SNYK-JAVA-COMGOOGLEGUAVA-1015415")
}
}

testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
testImplementation("org.mockito:mockito-core:3.6.28")
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,32 @@ data:
application.conf: |-
kafka.streams.config {
application.id = structured-traces-enrichment-job
metrics.recording.level = "{{ .Values.traceEnricherConfig.kafka.streams.config.metricsRecordingLevel }}"
num.stream.threads = "{{ .Values.traceEnricherConfig.kafka.streams.config.numStreamThreads }}"
bootstrap.servers = "{{ .Values.traceEnricherConfig.kafka.streams.config.bootstrapServers }}"
schema.registry.url = "{{ .Values.traceEnricherConfig.kafka.streams.config.schemaRegistryUrl }}"
bootstrap.servers = "{{ .Values.traceEnricherConfig.kafkaStreamsConfig.bootstrapServers }}"
schema.registry.url = "{{ .Values.traceEnricherConfig.kafkaStreamsConfig.schemaRegistryUrl }}"
# kafka streams config
num.stream.threads = "{{ int .Values.traceEnricherConfig.kafkaStreamsConfig.numStreamThreads }}"
commit.interval.ms = "{{ int .Values.traceEnricherConfig.kafkaStreamsConfig.commitIntervalMs }}"
# Common client (prodcuer, consumer, admin) configs
receive.buffer.bytes = "{{ int .Values.traceEnricherConfig.kafkaStreamsConfig.receiveBufferBytes }}"
send.buffer.bytes = "{{ int .Values.traceEnricherConfig.kafkaStreamsConfig.sendBufferBytes }}"
# Producer configs
producer.acks = "{{ .Values.traceEnricherConfig.kafkaStreamsConfig.producerAcks }}"
producer.batch.size = "{{ int .Values.traceEnricherConfig.kafkaStreamsConfig.producerBatchSize }}"
producer.linger.ms = "{{ int .Values.traceEnricherConfig.kafkaStreamsConfig.producerLingerMs }}"
producer.compression.type = "{{ .Values.traceEnricherConfig.kafkaStreamsConfig.producerCompressionType }}"
producer.max.request.size = "{{ int .Values.traceEnricherConfig.kafkaStreamsConfig.producerMaxRequestSize }}"
producer.buffer.memory = "{{ int .Values.traceEnricherConfig.kafkaStreamsConfig.producerBufferMemory }}"
# Consumer configs
consumer.max.partition.fetch.bytes = "{{ int .Values.traceEnricherConfig.kafkaStreamsConfig.consumerMaxPartitionFetchBytes }}"
consumer.max.poll.records = "{{ int .Values.traceEnricherConfig.kafkaStreamsConfig.consumerMaxPollRecords }}"
consumer.session.timeout.ms = "{{ int .Values.traceEnricherConfig.kafkaStreamsConfig.consumerSessionTimeoutMs }}"
# Others
metrics.recording.level = "{{ .Values.traceEnricherConfig.kafkaStreamsConfig.metricsRecordingLevel }}"
{{- if .Values.traceEnricherConfig.extraKafkaStreamsConfig }}
{{- range $key,$value := .Values.traceEnricherConfig.extraKafkaStreamsConfig }}
{{ $key }} = {{ $value }}
{{- end }}
{{- end }}
}

enricher {
Expand Down
44 changes: 32 additions & 12 deletions hypertrace-trace-enricher/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ nodeLabels: {}
# This is defined in resources/configs/trace-enricher/application.conf as service.admin.port
containerAdminPort: 8099

javaOpts: "-Xms512M -Xmx1024M"
javaOpts: "-XX:InitialRAMPercentage=50.0 -XX:MaxRAMPercentage=75.0 -XX:MaxDirectMemorySize=128M"

livenessProbe:
initialDelaySeconds: 10
Expand All @@ -45,7 +45,7 @@ resources:
memory: 1024Mi
limits:
cpu: 1.0
memory: 1536Mi
memory: 1024Mi

podLabels:
app: hypertrace-trace-enricher
Expand All @@ -65,13 +65,32 @@ deploymentSelectorMatchLabels:
###########
traceEnricherConfig:
name: hypertrace-trace-enricher-config
kafka:
streams:
config:
metricsRecordingLevel: INFO
numStreamThreads: 2
bootstrapServers: "bootstrap:9092"
schemaRegistryUrl: "http://schema-registry-service:8081"
# Important kafka streams configurations which are used in config template goes here.
kafkaStreamsConfig:
bootstrapServers: "bootstrap:9092"
schemaRegistryUrl: "http://schema-registry-service:8081"
# Core config
numStreamThreads: 2 # default = 1
commitIntervalMs: 30000 # default = 30000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are keeping the default from kafka, does it make sense to add an if in the chart and thus not requiring us to pass a default? cc @JBAhire

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes the chart more complex by adding many if-else branches.

These are very important handpicked kafka/kafka-streams configs that needs to be tuned as per the application performance/throughput requirements

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Do you think if we can put it as a commented block if a default is the same and it can be uncommented if some values need to be changed?

# default = 30000 override it for Xyz
# commitIntervalMs: 30000

Copy link
Contributor

@kotharironak kotharironak Dec 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah above will again need if/else block in the template which you mentioned. Got it now that this was done for keeping the template simple.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if/else blocks saves us the need to define the defaults ourselves because it adds noise to a already big set of settings. I don't think we will have complex if { if else } else { if else }, it is just a simple if.

# Common client (producer, consumer, admin) configs
receiveBufferBytes: 4194304 # default = 32768 (kafka streams default)
sendBufferBytes: 4194304 # default = 131072 (kafka streams default)
# Producer configs
producerAcks: all # default: 1
producerBatchSize: 524288 # default = 16384
producerLingerMs: 1000 # default = 100 (kafka streams default)
producerCompressionType: "gzip" # default = none
producerMaxRequestSize: 10485760 # default = 1048576
producerBufferMemory: 134217728 # default = 33554432
# Consumer configs
consumerMaxPartitionFetchBytes: 4194304 # default = 1048576
consumerMaxPollRecords: 1000 # default = 1000 (kafka streams default)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumerSessionTimeoutMs: 10000 # default = 10000
# Others
metricsRecordingLevel: INFO # default = INFO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# All other streams config goes here.
# Remove the flower braces and add key: value pair here.
extraKafkaStreamsConfig: {}

logConfig:
name: hypertrace-trace-enricher-log-config
Expand All @@ -88,11 +107,12 @@ kafka-topic-creator:
kafka:
topics:
- name: enriched-structured-traces
replicationFactor: 1
replicationFactor: 3
partitions: 8
configs:
- retention.bytes=4294967296
- retention.ms=259200000
- retention.bytes=8589934592 # default = -1
- retention.ms=86400000 # default = 604800000 (7 days)
- max.message.bytes=10485760 # Allow larger messages for traces
zookeeper:
address: zookeeper:2181
imagePullSecrets: []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ dependencies {
implementation("org.slf4j:slf4j-api:1.7.30")
implementation("org.apache.commons:commons-lang3:3.11")
constraints {
implementation("com.google.guava:guava:30.0-jre") {
because("https://snyk.io/vuln/SNYK-JAVA-COMGOOGLEGUAVA-1015415")
implementation("com.google.guava:guava:30.1-jre") {
because("Information Disclosure [Medium Severity][https://snyk.io/vuln/SNYK-JAVA-COMGOOGLEGUAVA-1015415] in com.google.guava:guava@29.0-android")
}
}

testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,22 @@ tasks.test {
}

dependencies {
constraints {
implementation("org.hibernate.validator:hibernate-validator:6.1.5.Final") {
because("Cross-site Scripting (XSS) [Medium Severity][https://snyk.io/vuln/SNYK-JAVA-ORGHIBERNATEVALIDATOR-541187] in org.hibernate.validator:hibernate-validator@6.0.17.Final\n" +
" introduced by io.confluent:kafka-avro-serializer@5.5.0 > io.confluent:kafka-schema-registry-client@5.5.0 > org.glassfish.jersey.ext:jersey-bean-validation@2.30 > org.hibernate.validator:hibernate-validator@6.0.17.Final")
}
implementation("org.yaml:snakeyaml:1.26") {
because("Denial of Service (DoS) [Medium Severity][https://snyk.io/vuln/SNYK-JAVA-ORGYAML-537645] in org.yaml:snakeyaml@1.23\n" +
" introduced by io.confluent:kafka-avro-serializer@5.5.0 > io.confluent:kafka-schema-registry-client@5.5.0 > io.swagger:swagger-core@1.5.3 > com.fasterxml.jackson.dataformat:jackson-dataformat-yaml@2.4.5 > org.yaml:snakeyaml@1.12")
}
}

implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher-impl"))
implementation("org.hypertrace.core.datamodel:data-model:0.1.12")
implementation("org.hypertrace.core.flinkutils:flink-utils:0.1.6")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.18")
implementation("org.hypertrace.entity.service:entity-service-client:0.1.23")

implementation("com.typesafe:config:1.4.1")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.13")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.15")
constraints {
implementation("com.google.guava:guava:30.1-jre") {
because("Information Disclosure [Medium Severity][https://snyk.io/vuln/SNYK-JAVA-COMGOOGLEGUAVA-1015415] in com.google.guava:guava@29.0-android")
}
}

// Required for the GRPC clients.
runtimeOnly("io.grpc:grpc-netty-shaded:1.33.1")

// Logging
implementation("org.slf4j:slf4j-api:1.7.30")
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.14.0")
Expand All @@ -64,5 +58,5 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
testImplementation("org.mockito:mockito-core:3.6.28")
testImplementation("org.junit-pioneer:junit-pioneer:1.1.0")
testImplementation("org.apache.kafka:kafka-streams-test-utils:5.5.1-ccs")
testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs")
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ dependencies {

implementation("org.json:json:20201115")
implementation("org.apache.commons:commons-lang3:3.11")
constraints {
implementation("com.google.guava:guava:30.0-jre") {
because("https://snyk.io/vuln/SNYK-JAVA-COMGOOGLEGUAVA-1015415")
}
}
}

description = "Trace Visualizer to help visualize a structured trace."
5 changes: 5 additions & 0 deletions hypertrace-trace-enricher/trace-reader/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ dependencies {
implementation("org.hypertrace.core.grpcutils:grpc-client-rx-utils:0.3.2")
implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.3.2")
implementation("io.reactivex.rxjava3:rxjava:3.0.7")
constraints {
implementation("com.google.guava:guava:30.1-jre") {
because("Information Disclosure [Medium Severity][https://snyk.io/vuln/SNYK-JAVA-COMGOOGLEGUAVA-1015415] in com.google.guava:guava@29.0-android")
}
}

testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
testImplementation("org.mockito:mockito-core:3.6.28")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ dependencies {
implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api"))
implementation("org.hypertrace.core.viewcreator:view-creator-framework:0.1.21")
constraints {
implementation("com.google.guava:guava:30.0-jre") {
because("https://snyk.io/vuln/SNYK-JAVA-COMGOOGLEGUAVA-1015415")
}
// to have calcite libs on the same version
implementation("org.apache.calcite:calcite-babel:1.26.0") {
because("https://snyk.io/vuln/SNYK-JAVA-ORGAPACHECALCITE-1038296")
Expand Down
70 changes: 38 additions & 32 deletions raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,50 @@ metadata:
release: {{ .Release.Name }}
data:
application.conf: |-

kafka.streams.config = {
# Core configs
application.id = raw-spans-to-structured-traces-grouping-job
bootstrap.servers = "{{ .Values.rawSpansGrouperConfig.kafkaStreamsConfig.bootstrapServers }}"
schema.registry.url = "{{ .Values.rawSpansGrouperConfig.kafkaStreamsConfig.schemaRegistryUrl }}"
value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
# Core configs - For applications with state
num.stream.threads = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.numStreamThreads }}"
commit.interval.ms = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.commitIntervalMs }}"
group.instance.id = ${?POD_NAME}
cache.max.bytes.buffering = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.cacheMaxBytesBuffering }}"
# Common client (prodcuer, consumer, admin) configs
receive.buffer.bytes = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.receiveBufferBytes }}"
send.buffer.bytes = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.sendBufferBytes }}"
# Producer configs
producer.acks = "{{ .Values.rawSpansGrouperConfig.kafkaStreamsConfig.producerAcks }}"
producer.batch.size = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.producerBatchSize }}"
producer.linger.ms = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.producerLingerMs }}"
producer.compression.type = "{{ .Values.rawSpansGrouperConfig.kafkaStreamsConfig.producerCompressionType }}"
producer.max.request.size = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.producerMaxRequestSize }}"
producer.buffer.memory = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.producerBufferMemory }}"
# Consumer configs
consumer.max.partition.fetch.bytes = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.consumerMaxPartitionFetchBytes }}"
consumer.max.poll.records = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.consumerMaxPollRecords }}"
consumer.session.timeout.ms = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.consumerSessionTimeoutMs }}"
# Changelog topic configs
replication.factor = "{{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.replicationFactor }}"
topic.cleanup.policy = "delete,compact"
Copy link
Contributor

@ravisingal ravisingal Dec 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always wonder shouldn't it be compact,delete? what does delete,compact mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compact,delete delete,compact both are same.
It's basically a list of values. Order doesn't matter.

# RocksDB state store configs
state.dir = "/var/data/"
metrics.recording.level = "{{ .Values.rawSpansGrouperConfig.kafka.streams.config.metricsRecordingLevel }}"
num.stream.threads = "{{ .Values.rawSpansGrouperConfig.kafka.streams.config.numStreamThreads }}"
num.standby.replicas = {{ int .Values.rawSpansGrouperConfig.kafka.streams.config.numStandbyReplicas }}
replication.factor = {{ int .Values.rawSpansGrouperConfig.kafka.streams.config.replicationFactor }}
producer.max.request.size = {{ int .Values.rawSpansGrouperConfig.kafka.streams.config.producerMaxRequestSize }}
cache.max.bytes.buffering = {{ int .Values.rawSpansGrouperConfig.kafka.streams.config.cacheMaxBytesBuffering }}
{{- if .Values.rawSpansGrouperConfig.kafka.streams.config.sessionTimeoutMs }}
session.timeout.ms = {{ int .Values.rawSpansGrouperConfig.kafka.streams.config.sessionTimeoutMs }}
rocksdb.block.cache.size = {{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.rocksdbBlockCacheSize }}
rocksdb.write.buffer.size = {{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.rocksdbWriteBufferSize }}
rocksdb.max.write.buffers = {{ int .Values.rawSpansGrouperConfig.kafkaStreamsConfig.rocksdbMaxWriteBuffers }}
rocksdb.cache.index.and.filter.blocks = {{ .Values.rawSpansGrouperConfig.kafkaStreamsConfig.rocksdbCacheIndexAndFilterBlocks }}
# Exception handler configs
default.production.exception.handler = {{ .Values.rawSpansGrouperConfig.kafkaStreamsConfig.defaultProductionExceptionHandler }}
ignore.production.exception.classes = {{ .Values.rawSpansGrouperConfig.kafkaStreamsConfig.ignoreProductionExceptionClasses }}
# Others
metrics.recording.level = "{{ .Values.rawSpansGrouperConfig.kafkaStreamsConfig.metricsRecordingLevel }}"
{{- if .Values.rawSpansGrouperConfig.extraKafkaStreamsConfig }}
{{- range $key,$value := .Values.rawSpansGrouperConfig.extraKafkaStreamsConfig }}
{{ $key }} = {{ $value }}
{{- end }}
{{- if .Values.rawSpansGrouperConfig.kafka.streams.config.commitIntervalMs }}
commit.interval.ms = {{ int .Values.rawSpansGrouperConfig.kafka.streams.config.commitIntervalMs }}
{{- end }}
{{- if .Values.rawSpansGrouperConfig.kafka.streams.config.producerBatchSize }}
producer.batch.size = {{ int .Values.rawSpansGrouperConfig.kafka.streams.config.producerBatchSize }}
{{- end }}
{{- if .Values.rawSpansGrouperConfig.kafka.streams.config.producerLingerMs }}
producer.linger.ms = {{ int .Values.rawSpansGrouperConfig.kafka.streams.config.producerLingerMs }}
{{- end }}
default.production.exception.handler = {{ .Values.rawSpansGrouperConfig.kafka.streams.config.defaultProductionExceptionHandler }}
ignore.production.exception.classes = {{ .Values.rawSpansGrouperConfig.kafka.streams.config.ignoreProductionExceptionClasses }}

topic.cleanup.policy = "delete,compact"
bootstrap.servers = "{{ .Values.rawSpansGrouperConfig.kafka.streams.config.bootstrapServers }}"

schema.registry.url = "{{ .Values.rawSpansGrouperConfig.kafka.streams.config.schemaRegistryUrl }}"
specific.avro.reader = true

rocksdb.block.cache.size = {{ int .Values.rawSpansGrouperConfig.kafka.streams.config.rocksdbBlockCacheSize }}
rocksdb.write.buffer.size = {{ int .Values.rawSpansGrouperConfig.kafka.streams.config.rocksdbWriteBufferSize }}
rocksdb.max.write.buffers = {{ int .Values.rawSpansGrouperConfig.kafka.streams.config.rocksdbMaxWriteBuffers }}
rocksdb.cache.index.and.filter.blocks = {{ .Values.rawSpansGrouperConfig.kafka.streams.config.rocksdbCacheIndexAndFilterBlocks }}

value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
}

span.groupby.session.window.interval = {{ .Values.rawSpansGrouperConfig.span.groupby.internal }}
Expand Down
Loading