/
idle-bikes.flow.yaml
49 lines (44 loc) 路 1.64 KB
/
idle-bikes.flow.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import:
- rides.flow.yaml
collections:
# Derive idle bikes via two transforms of rides:
# * One reads in real-time, and stores the ride timestamp in a register.
# * The second reads with a delay, and checks whether register timestamp
# hasn't been updated since this (delayed) ride.
examples/citi-bike/idle-bikes:
schema:
type: object
properties:
bike_id: { type: integer }
station: { $ref: ride.schema.yaml#/$defs/terminus }
required: [bike_id, station]
key: [/bike_id, /station/timestamp]
derive:
using:
sqlite:
migrations:
- |
CREATE TABLE last_rides (
bike_id INTEGER PRIMARY KEY NOT NULL,
time TEXT NOT NULL
);
transforms:
- name: liveRides
source:
name: examples/citi-bike/rides
shuffle: { key: [/bike_id] }
lambda: |
INSERT INTO last_rides (bike_id, time) VALUES ($bike_id, $end$timestamp)
ON CONFLICT DO UPDATE SET time = $end$timestamp;
- name: delayedRides
source:
name: examples/citi-bike/rides
shuffle: { key: [/bike_id] }
# Use a 2-day read delay, relative to the document's ingestion,
# to fetch records where the bike hasn't moved in that time.
# To see read delays in action within a short-lived
# testing contexts, try using a smaller value (e.g., 2m).
readDelay: "48h"
lambda: |
SELECT $bike_id, $end AS station FROM last_rides r
WHERE r.bike_id = $bike_id AND r.time = $end$timestamp;