/
lineage_script.py
96 lines (87 loc) · 3.46 KB
/
lineage_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""This is a script at linearly walks through some lineage functionality that Hamilton has.
It mirrors the code that was presented for the Lineage + Hamilton in 10 minutes blog post.
"""
import pprint
import data_loading
import features
import model_pipeline
import sets
from hamilton import base, driver
# Determine configuration for creating the DAG.
config = {} # This example has no configuration.
# instantiate the driver
adapter = base.DefaultAdapter()
dr = driver.Driver(config, data_loading, features, sets, model_pipeline, adapter=adapter)
# --- (1) What sequence of operations produced this data/model? ---
# How do the feature encoders get computed and what flows into them?
inputs = {
"location": "data/train.csv",
"index_col": "passengerid",
"target_col": "survived",
"random_state": 42,
"max_depth": None,
"validation_size_fraction": 0.33,
}
pprint.pprint("Visualizing how the encoders are computed")
dr.visualize_execution([features.encoders], "encoder_lineage", {"format": "png"}, inputs=inputs)
# --- (2) Whose/What data sources led to this artifact/model? ---
# There is something funky with the Random Forest model and we want to double-check
# for the current production model, what the data sources are and who owns them,
# so we can go ping them.
upstream_nodes = dr.what_is_upstream_of("fit_random_forest")
teams = []
# iterate through
for node in upstream_nodes:
# filter to nodes that we're interested in getting information about
if node.tags.get("source"):
# append for output
teams.append(
{
"team": node.tags.get("owner"),
"function": node.name,
"source": node.tags.get("source"),
}
)
pprint.pprint("Upstream data sources of fit_random_forest")
pprint.pprint(teams)
# --- (3) Who/What is downstream of this transform? ---
# Say we're on data engineering and want to change the source data. How could we determine
# what the artifacts that use this data are and who owns them?
downstream_nodes = dr.what_is_downstream_of("titanic_data")
artifacts = []
for node in downstream_nodes:
# if it's an artifact function
if node.tags.get("artifact"):
# pull out the information we want
artifacts.append(
{
"team": node.tags.get("owner"),
"function": node.name,
"artifact": node.tags.get("artifact"),
}
)
pprint.pprint("Downstream artifacts of titanic_data")
pprint.pprint(artifacts)
# --- (4) What is defined as PII data, and what does it end up in? ---
# Let's say our compliance team has come to us to understand how we're using PII data,
# i.e. what artifacts does it end up in? They want this report every month
pii_nodes = [n for n in dr.list_available_variables() if n.tags.get("PII") == "true"]
pii_to_artifacts = {}
# loop through each PII node
for node in pii_nodes:
pii_to_artifacts[node.name] = []
# ask what is downstream
downstream_nodes = dr.what_is_downstream_of(node.name)
for dwn_node in downstream_nodes:
# Filter to nodes of interest
if dwn_node.tags.get("artifact"):
# pull out information
pii_to_artifacts[node.name].append(
{
"team": dwn_node.tags.get("owner"),
"function": dwn_node.name,
"artifact": dwn_node.tags.get("artifact"),
}
)
pprint.pprint("PII to artifacts")
pprint.pprint(pii_to_artifacts)