Skip to content
This repository has been archived by the owner on Dec 17, 2021. It is now read-only.

Commit

Permalink
add examples/re-key, modeling a typical anon ID => stable ID join pro…
Browse files Browse the repository at this point in the history
…blem
  • Loading branch information
jgraettinger committed Dec 8, 2020
1 parent 6263bcc commit e04c7d2
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Update the VARIANT arg in docker-compose.yml to pick a Node version: 10, 12, 14
ARG VARIANT=12

FROM quay.io/estuary/flow:v0.1.0-99-g07ad64b
FROM quay.io/estuary/flow:v0.1.0-102-ga915d43
FROM mcr.microsoft.com/vscode/devcontainers/javascript-node:${VARIANT}

# Copy in Estuary Flow release binaries.
Expand Down
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
],
"editor.formatOnSave": true,
"editor.formatOnSaveMode": "file",
"restructuredtext.confPath": "${workspaceFolder}"
"restructuredtext.confPath": "${workspaceFolder}",
"restructuredtext.sphinxBuildPath": "/usr/local/bin/sphinx-build"
}
3 changes: 2 additions & 1 deletion examples/flow.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import:
- citi-bike/flow.yaml
- net-trace/flow.yaml
- wiki/flow.yaml
- re-key/flow.yaml
- shopping/flow.yaml
- wiki/flow.yaml
114 changes: 114 additions & 0 deletions examples/re-key/flow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# This example models a problem that's common to customer registration or login
# workflows, where an application may have interactions with a user that occur
# *before* the user completes a registration / login.
#
# Here, we capture events with on anonymous IDs (e.x. a session cookie),
# and join through a mapping of anonymous <=> stable IDs. As new anonymous IDs
# are used, events are collected within a register and then published once an
# associated stable ID is learned. Thereafter, further anonymous events are
# immediately mapped and published.

collections:
# A collection of some interesting events, having (only) anonymous IDs.
- name: examples/re-key/anonymous_events
schema: schema.yaml#/$defs/anonymous_event
key: [/event_id]

# Learned mappings of anonymous ID <=> stable ID,
# e.x. from a login or registration workflow.
- name: examples/re-key/mappings
schema: schema.yaml#/$defs/id_mapping
key: [/anonymous_id, /stable_id]

# Derivation of interesting events, now enriched with stable IDs.
- name: examples/re-key/stable_events
schema: schema.yaml#/$defs/stable_event
key: [/event_id]

derivation:
register:
schema: schema.yaml#/$defs/join_register
initial: { events: [] }

transform:
fromAnonymousEvents:
source: { name: examples/re-key/anonymous_events }
shuffle: [/anonymous_id]
update:
# Reduce this event into |register.events|. If stable_id is already known,
# then register.events is null and this is a no-op.
nodeJS: |
return [{events: [source]}];
publish:
# If the stable ID for this event is known, enrich the source event and publish.
# Otherwise, we've updated this source event into |register.events| and will
# publish once its stable ID becomes known.
nodeJS: |
if (register.stable_id) {
return [{ stable_id: register.stable_id, ...source }];
}
return [];
fromIdMappings:
source: { name: examples/re-key/mappings }
shuffle: [/anonymous_id]
update:
# Update the register with the associated stable ID of this anonymous ID.
# Set events to null, so that future "append" reductions are no-ops.
nodeJS: |
return [{ events: null, stable_id: source.stable_id }];
publish:
# Publish previous register.events, enriched with the just-learned stable ID.
nodeJS: |
let out = [];
if (register.stable_id && previous.events) {
for (var event of previous.events) {
out.push({ stable_id: register.stable_id, ...event });
}
}
return out;
tests:
"Expect we join anonymous events through stable ID mappings":
# Events which are recorded before a mapping is available.
- ingest:
collection: examples/re-key/anonymous_events
documents:
- { anonymous_id: "anon-one", event_id: "ev-1" }
- { anonymous_id: "anon-two", event_id: "ev-2" }
- { anonymous_id: "anon-one", event_id: "ev-3" }
# Learn an "anon-one" => "stable-one" mapping.
- ingest:
collection: examples/re-key/mappings
documents:
- { anonymous_id: "anon-one", stable_id: "stable-one" }
# More events are recorded.
- ingest:
collection: examples/re-key/anonymous_events
documents:
- { anonymous_id: "anon-two", event_id: "ev-4" }
- { anonymous_id: "anon-one", event_id: "ev-5" }
# Verify we derive all "stable-one" events (and only these events),
# from before and after its ID mapping was learned.
- verify:
collection: examples/re-key/stable_events
documents:
- { stable_id: "stable-one", event_id: "ev-1" }
- { stable_id: "stable-one", event_id: "ev-3" }
- { stable_id: "stable-one", event_id: "ev-5" }
# We duplicate the stable-one mapping (this is a no-op),
# and learn a new mapping for anon-two.
- ingest:
collection: examples/re-key/mappings
documents:
- { anonymous_id: "anon-one", stable_id: "stable-one" }
- { anonymous_id: "anon-two", stable_id: "stable-two" }
# Now the derivation contains all events, with stable IDs.
- verify:
collection: examples/re-key/stable_events
documents:
- { stable_id: "stable-one", event_id: "ev-1" }
- { stable_id: "stable-two", event_id: "ev-2" }
- { stable_id: "stable-one", event_id: "ev-3" }
- { stable_id: "stable-two", event_id: "ev-4" }
- { stable_id: "stable-one", event_id: "ev-5" }
52 changes: 52 additions & 0 deletions examples/re-key/schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
$schema: https://json-schema.org/draft/2019-09/schema

$defs:
anonymous_event:
description: "An interesting event, keyed on an anonymous ID"
type: object
properties:
anonymous_id: { type: string }
event_id: { type: string }
required: [anonymous_id, event_id]

id_mapping:
description: "A learned association of an anonymous ID <=> stable ID"
type: object
properties:
anonymous_id: { type: string }
stable_id: { type: string }
required: [anonymous_id, stable_id]

stable_event:
description: "An event enriched with a stable ID"
$ref: "#/$defs/anonymous_event"
properties:
stable_id: { type: string }
required: [stable_id]

join_register:
description: |
Register that's keyed on anonymous ID, which:
1) Stores anonymous events prior to a stable ID being known, and thereafter
2) Stores a mapped stable ID for this anonymous ID.
type: object
reduce: { strategy: merge }

oneOf:
# Case 1: stable_id is undefined, and we're appending anonymous
# events in the hope that it becomes known later.
- properties:
stable_id: false
events:
type: array
items: { $ref: schema.yaml#/$defs/anonymous_event }
reduce: { strategy: append }
required: [events]

# Case 2: stable_id is defined. |events| is now null,
# such that further reductions are a no-op.
- properties:
stable_id: { type: string }
events: { const: null }
required: [events, stable_id]
7 changes: 7 additions & 0 deletions reductions/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ append
.. literalinclude:: append.flow.yaml
:language: yaml

The right-hand side *must* always be an array. The left-hand side *may* be null, in which case
the reduction is treated as a no-op and its result remains null. This can be combined
with schema conditionals to "toggle" whether reduction reduction should be done or not.

firstWriteWins / lastWriteWins
--------------------------------------

Expand Down Expand Up @@ -98,6 +102,9 @@ merging sorted arrays of scalars.
.. literalinclude:: merge_key.flow.yaml
:language: yaml

As with ``append``, the left-hand side of ``merge`` *may* be null, in which case
the reduction is treated as a no-op and its result remains null.

minimize / maximize
-------------------

Expand Down

0 comments on commit e04c7d2

Please sign in to comment.