-
Notifications
You must be signed in to change notification settings - Fork 12
/
actions.py
144 lines (121 loc) · 5.27 KB
/
actions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#
# Copyright 2020 IBM Corp.
# SPDX-License-Identifier: Apache-2.0
#
import pandas as pd
import pyarrow as pa
import hashlib
from time import time
from datetime import datetime
from fybrik_python_transformation import Action, PandasAction
class Filter(PandasAction):
def __init__(self, description, columns, options):
super().__init__(description, columns, options)
self.query = options.get('query', '')
def __dftransform__(self, df: pd.DataFrame) -> pd.DataFrame:
if self.query:
return df.query(self.query)
else:
return df
class AgeFilter(PandasAction):
def __init__(self, description, columns, options):
super().__init__(description, columns, options)
age = int(options.get('age', 18))
now = datetime.fromtimestamp(time())
self.cutoff = now.replace(year=(now.year-age))
def __dftransform__(self, df: pd.DataFrame) -> pd.DataFrame:
if self.columns:
for column in self.columns:
df = df[pd.to_datetime(df[column]) < self.cutoff]
return df
else:
return df
class Redact(Action):
def __init__(self, description, columns, options):
super().__init__(description, columns, options)
self.redact_value = options.get("redactValue", "XXXXXXXXXX")
def __call__(self, records: pa.RecordBatch) -> pa.RecordBatch:
"""Transformation logic for Redact action.
Args:
records (pa.RecordBatch): record batch to transform
Returns:
pa.RecordBatch: transformed record batch
"""
columns = [column for column in self.columns if column in records.schema.names]
indices = [records.schema.get_field_index(c) for c in columns]
constColumn = pa.array([self.redact_value] * len(records), type=pa.string())
new_columns = records.columns
for i in indices:
new_columns[i] = constColumn
new_schema = self.schema(records.schema)
return pa.RecordBatch.from_arrays(new_columns, schema=new_schema)
def field_type(self):
"""Overrides field_type to calculate transformed schema correctly."""
return pa.string() # redacted value is a string
class RemoveColumns(Action):
def __call__(self, records: pa.RecordBatch) -> pa.RecordBatch:
"""Overrides __call__ to verify no removed columns exist."""
columns = [column for column in self.columns if column in records.schema.names]
if columns:
raise RuntimeError("Access to {} is forbidden".format(columns))
return records # no transformation needed
def schema(self, original):
"""Removes configured columns from the schema."""
schema: pa.Schema = original
columns = [column for column in self.columns if column in schema.names]
for column in columns:
schema = schema.remove(schema.get_field_index(column))
return schema
class FilterColumns(Action):
def __init__(self, description, columns, options):
super().__init__(description, columns, options)
self._schema = None
def __call__(self, records: pa.RecordBatch) -> pa.RecordBatch:
columns = [column for column in self.columns if column in records.schema.names]
indices = [records.schema.get_field_index(c) for c in columns]
column_array = records.columns
if not self._schema:
self.schema(records.schema)
return pa.RecordBatch.from_arrays(
[column_array[i] for i in indices],
schema=self._schema)
def schema(self, original):
if self._schema:
return self.schema
columns = [column for column in self.columns if column in original.names]
self._schema = pa.schema([pa.field(c, original.field(c).type) for c in columns])
return self._schema
class HashRedact(Action):
def __init__(self, description, columns, options):
super().__init__(description, columns, options)
if options == None:
self.hash_algo = "md5"
else:
self.hash_algo = options.get("algo", "md5")
def __call__(self, records: pa.RecordBatch) -> pa.RecordBatch:
"""Transformation logic for HashRedact action.
Args:
records (pa.RecordBatch): record batch to transform
Returns:
pa.RecordBatch: transformed record batch
"""
columns = [column for column in self.columns if column in records.schema.names]
indices = [records.schema.get_field_index(c) for c in columns]
new_columns = records.columns
algo = self.hash_algo.lower()
hashFunc = hashlib.md5
if algo == "md5":
hashFunc = hashlib.md5
elif algo == "sha256":
hashFunc = hashlib.sha256
elif algo == "sha512":
hashFunc = hashlib.sha512
else:
raise ValueError(f"Algorithm {algo} is not supported!")
for i in indices:
new_columns[i] = pa.array([hashFunc(v.as_py().encode()).hexdigest() for v in records.column(i)])
new_schema = self.schema(records.schema)
return pa.RecordBatch.from_arrays(new_columns, schema=new_schema)
def field_type(self):
"""Overrides field_type to calculate transformed schema correctly."""
return pa.string() # redacted value is a string