Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DSM] - Set DSM checkpoints for S3 put / get operations #6859

Merged
merged 47 commits into from
Apr 19, 2024

Conversation

kr-igor
Copy link
Contributor

@kr-igor kr-igor commented Mar 29, 2024

What Does This Do

  • Adding DSM checkpoints for S3 put/get operations
  • Adding request/response content length to span tags (only if DSM is enabled)
  • Introduces concept of datasources for DSM checkpoints inline with OpenLineage spec (ds.name / ds.namespace).
  • Changes the way DSM stats are aggregated. Instead of hash-based aggregation stats are now aggregated per data source (datasource hash is generated using checkpoint hash as a base).

Motivation

Setting "data source" checkpoints unlocks end-to-end pipeline lineage. For instance, this particular change allows visualizing data flow between streaming and batch processing. The same mechanism can be used to support more types of data sources.

Additional Notes

Jira ticket

@kr-igor kr-igor marked this pull request as ready for review April 2, 2024 20:16
@kr-igor kr-igor requested a review from a team as a code owner April 2, 2024 20:16

if (key != null && bucket != null && awsOperation != null) {
if ("GetObject".equalsIgnoreCase(awsOperation.toString())) {
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LinkedHashMap can be replaced with a fixed size array. This will be done in a followup PR in Q2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a simple struct-like object would be better than an array.

@@ -88,6 +92,9 @@ public AgentSpan onRequest(final AgentSpan span, final Request request) {
CharSequence awsRequestName = AwsNameCache.getQualifiedName(request);

span.setResourceName(awsRequestName, RPC_COMMAND_NAME);
if ("Amazon S3".equalsIgnoreCase(awsServiceName) && span.traceConfig().isDataStreamsEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basing this on service name seems brittle to me, it would make more sense to base this check on the raw information -- e.g. the type of Request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've switched to "s3" service name for both versions of AWS SDK. I didn't use the request type to avoid adding new dependencies.


Object key = span.getTag(InstrumentationTags.AWS_OBJECT_KEY);
Object bucket = span.getTag(InstrumentationTags.AWS_BUCKET_NAME);
Object awsOperation = span.getTag(InstrumentationTags.AWS_OPERATION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot or repetitive toString calls here.
If these are Strings or CharSequences, I would type check and cast them sooner.

Or, just create variables to hold the String values immediately after calling getTag.
Something like the line below would be fine...
String keyStr = (key == nul) ? null : key.toString()

payloadSize = (long) requestSize;
}

LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see DSM replace the use of LinkedHashMap with a simple struct-like object or builder.
LinkedHashMap is creating a lot of unnecessary allocation.

final String awsServiceName = attributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME);
final String awsOperationName = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
onOperation(span, awsServiceName, awsOperationName);

// S3
request.getValueForField("Bucket", String.class).ifPresent(name -> setBucketName(span, name));
if ("s3".equalsIgnoreCase(awsServiceName) && span.traceConfig().isDataStreamsEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is service name different here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SDK v1 and V2 have different naming for services. I'll check if this can be unified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both implementations now use 's3'

private final StringBuilder builder;

public DataSetHashBuilder() {
builder = new StringBuilder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Builder should be given an initial size if possible.


public long generateDataSourceHash(long parentHash) {
builder.append(parentHash);
return FNV64Hash.generateHash(builder.toString(), FNV64Hash.Version.v1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the builder & string aren't actually needed for the hash calculation.
I presume the FNV64Hash computation could be done in a streaming fashion as the tags are added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'll update the code.

Copy link
Contributor

@dougqh dougqh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's some opportunity for clean-up and optimization.

@kr-igor kr-igor requested a review from dougqh April 5, 2024 14:38
@kr-igor kr-igor merged commit 487e069 into master Apr 19, 2024
79 checks passed
@kr-igor kr-igor deleted the kr-igor/dsm-s3-checkpoints branch April 19, 2024 17:30
@github-actions github-actions bot added this to the 1.34.0 milestone Apr 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants