Skip to content

Commit

Permalink
Davinchia/tracking client refactor (#7595)
Browse files Browse the repository at this point in the history
* Add doc string to tracking interface.

* Add doc string with minor refactor.

* Update airbyte-analytics/src/main/java/io/airbyte/analytics/SegmentTrackingClient.java

* Format javadoc. Add user_id trait.

* Better formatting javadocs.

* Turn on spotless.

* Format before checking again.

* Clean this up.
  • Loading branch information
davinchia committed Nov 10, 2021
1 parent 504580d commit 7f6d12a
Show file tree
Hide file tree
Showing 12 changed files with 5,117 additions and 4,604 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ jobs:
- name: Build
run: SUB_BUILD=PLATFORM ./gradlew build --scan

- name: Ensure no file change
run: git --no-pager diff && test -z "$(git --no-pager diff)"
- name: Test if Seed spec is updated
run: SUB_BUILD=PLATFORM ./gradlew format && git --no-pager diff && test -z "$(git --no-pager diff)"

# todo (cgardens) - scope by platform.
- name: Check documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,36 @@
import com.segment.analytics.messages.AliasMessage;
import com.segment.analytics.messages.IdentifyMessage;
import com.segment.analytics.messages.TrackMessage;
import io.airbyte.config.StandardWorkspace;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;

/**
* This class is a wrapper around the Segment backend Java SDK.
* <p>
* In general, the Segment SDK events have two pieces to them, a top-level userId field and a map of
* properties.
* <p>
* As of 2021/11/03, the top level userId field is standardised on the
* {@link StandardWorkspace#getCustomerId()} field. This field is a random UUID generated when a
* workspace model is created. This standardisation is through OSS Airbyte and Cloud Airbyte. This
* join key now underpins Airbyte OSS Segment tracking. Although the id is meaningless and the name
* confusing, it is not worth performing a migration at this time. Interested parties can look at
* https://github.com/airbytehq/airbyte/issues/7456 for more context.
* <p>
* Consumers utilising this class must understand that the top-level userId field is subject to this
* constraint.
* <p>
* See the following document for details on tracked events. Please update this document if tracked
* events change.
* https://docs.google.com/spreadsheets/d/1lGLmLIhiSPt_-oaEf3CpK-IxXnCO0NRHurvmWldoA2w/edit#gid=1567609168
*/
public class SegmentTrackingClient implements TrackingClient {

public static final String CUSTOMER_ID_KEY = "user_id";
private static final String SEGMENT_WRITE_KEY = "7UDdp5K55CyiGgsauOr2pNNujGvmhaeu";
private static final String AIRBYTE_VERSION_KEY = "airbyte_version";
private static final String AIRBYTE_ROLE = "airbyte_role";
Expand Down Expand Up @@ -68,15 +90,17 @@ public void identify(final UUID workspaceId) {
identityMetadata.put(AIRBYTE_ROLE, airbyteRole);
}

final String joinKey = trackingIdentity.getCustomerId().toString();
analytics.enqueue(IdentifyMessage.builder()
// user id is scoped by workspace. there is no cross-workspace tracking.
.userId(trackingIdentity.getCustomerId().toString())
.userId(joinKey)
.traits(identityMetadata));
}

@Override
public void alias(final UUID workspaceId, final String previousCustomerId) {
analytics.enqueue(AliasMessage.builder(previousCustomerId).userId(identityFetcher.apply(workspaceId).getCustomerId().toString()));
final var joinKey = identityFetcher.apply(workspaceId).getCustomerId().toString();
analytics.enqueue(AliasMessage.builder(previousCustomerId).userId(joinKey));
}

@Override
Expand All @@ -88,12 +112,17 @@ public void track(final UUID workspaceId, final String action) {
public void track(final UUID workspaceId, final String action, final Map<String, Object> metadata) {
final Map<String, Object> mapCopy = new HashMap<>(metadata);
final TrackingIdentity trackingIdentity = identityFetcher.apply(workspaceId);

// Always add these traits.
mapCopy.put(AIRBYTE_VERSION_KEY, trackingIdentity.getAirbyteVersion().serialize());
mapCopy.put(CUSTOMER_ID_KEY, trackingIdentity.getCustomerId());
if (!metadata.isEmpty()) {
trackingIdentity.getEmail().ifPresent(email -> mapCopy.put("email", email));
}

final var joinKey = trackingIdentity.getCustomerId().toString();
analytics.enqueue(TrackMessage.builder(action)
.userId(trackingIdentity.getCustomerId().toString())
.userId(joinKey)
.properties(mapCopy));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@
import java.util.Map;
import java.util.UUID;

/**
* General interface for user level Airbyte usage reporting. We use Segment for behavioural
* reporting, so this interface mirrors the Segment backend api sdk.
* <p>
* For more information see
* https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/.
* <p>
* This interface allows us to easily stub this out via the {@link LoggingTrackingClient}. The main
* implementation is in {@link SegmentTrackingClient}.
* <p>
* Although the methods seem to take in workspace id, this id is used to index into more metadata.
* See {@link SegmentTrackingClient} for more information.
* <p>
* Keep in mind that this interface is also relied on in Airbyte Cloud.
*/
public interface TrackingClient {

void identify(UUID workspaceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ void testIdentify() {
verify(analytics).enqueue(mockBuilder.capture());
final IdentifyMessage actual = mockBuilder.getValue().build();
final Map<String, Object> expectedTraits = ImmutableMap.<String, Object>builder()
.put("anonymized", IDENTITY.isAnonymousDataCollection())
.put("airbyte_version", AIRBYTE_VERSION.serialize())
.put("deployment_env", DEPLOYMENT.getDeploymentEnv())
.put("deployment_mode", DEPLOYMENT.getDeploymentMode())
.put("deployment_id", DEPLOYMENT.getDeploymentId())
.put("airbyte_version", AIRBYTE_VERSION.serialize())
.put("email", IDENTITY.getEmail().get())
.put("anonymized", IDENTITY.isAnonymousDataCollection())
.put("subscribed_newsletter", IDENTITY.isNews())
.put("subscribed_security", IDENTITY.isSecurityUpdates())
.build();
Expand All @@ -83,15 +83,15 @@ void testIdentifyWithRole() {
verify(analytics).enqueue(mockBuilder.capture());
final IdentifyMessage actual = mockBuilder.getValue().build();
final Map<String, Object> expectedTraits = ImmutableMap.<String, Object>builder()
.put("airbyte_role", "role")
.put("airbyte_version", AIRBYTE_VERSION.serialize())
.put("anonymized", IDENTITY.isAnonymousDataCollection())
.put("deployment_env", DEPLOYMENT.getDeploymentEnv())
.put("deployment_mode", DEPLOYMENT.getDeploymentMode())
.put("deployment_id", DEPLOYMENT.getDeploymentId())
.put("airbyte_version", AIRBYTE_VERSION.serialize())
.put("email", IDENTITY.getEmail().get())
.put("anonymized", IDENTITY.isAnonymousDataCollection())
.put("subscribed_newsletter", IDENTITY.isNews())
.put("subscribed_security", IDENTITY.isSecurityUpdates())
.put("airbyte_role", "role")
.build();
assertEquals(IDENTITY.getCustomerId().toString(), actual.userId());
assertEquals(expectedTraits, actual.traits());
Expand All @@ -100,7 +100,8 @@ void testIdentifyWithRole() {
@Test
void testTrack() {
final ArgumentCaptor<TrackMessage.Builder> mockBuilder = ArgumentCaptor.forClass(TrackMessage.Builder.class);
final ImmutableMap<String, Object> metadata = ImmutableMap.of("airbyte_version", AIRBYTE_VERSION.serialize());
final ImmutableMap<String, Object> metadata =
ImmutableMap.of("airbyte_version", AIRBYTE_VERSION.serialize(), "user_id", IDENTITY.getCustomerId());

segmentTrackingClient.track(WORKSPACE_ID, "jump");

Expand All @@ -115,9 +116,10 @@ void testTrack() {
void testTrackWithMetadata() {
final ArgumentCaptor<TrackMessage.Builder> mockBuilder = ArgumentCaptor.forClass(TrackMessage.Builder.class);
final ImmutableMap<String, Object> metadata = ImmutableMap.of(
"height", "80 meters",
"airbyte_version", AIRBYTE_VERSION.serialize(),
"email", EMAIL,
"airbyte_version", AIRBYTE_VERSION.serialize());
"height", "80 meters",
"user_id", IDENTITY.getCustomerId());

segmentTrackingClient.track(WORKSPACE_ID, "jump", metadata);

Expand Down
Loading

0 comments on commit 7f6d12a

Please sign in to comment.