/
utils.py
131 lines (97 loc) · 3.6 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# generic imports
import django
from lxml import etree
import os
import sys
# pylint: disable=wrong-import-position
# check for registered apps signifying readiness, if not, run django.setup() to run as standalone
if not hasattr(django, 'apps'):
os.environ['DJANGO_SETTINGS_MODULE'] = 'combine.settings'
sys.path.append('/opt/combine')
django.setup()
# import django settings
from django.conf import settings
from django.db import connection
def refresh_django_db_connection():
"""
Function to refresh connection to Django DB.
Behavior with python files uploaded to Spark context via Livy is atypical when
it comes to opening/closing connections with MySQL. Specifically, if jobs are run farther
apart than MySQL's `wait_timeout` setting, it will result in the error, (2006, 'MySQL server has gone away').
Running this function before jobs ensures that the connection is fresh between these python files
operating in the Livy context, and Django's DB connection to MySQL.
Args:
None
Returns:
None
"""
connection.close()
connection.connect()
class PythonUDFRecord():
"""
Class to provide a slim-downed version of core.models.Record that is used for spark UDF functions,
and for previewing python based validations and transformations
"""
def __init__(self, record_input, non_row_input=False, record_id=None, document=None):
"""
Instantiated in one of two ways
1) from a DB row representing a Record in its entirety
2) manually passed record_id or document (or both), triggered by non_row_input Flag
- for example, this is used for testing record_id transformations
"""
if non_row_input:
# if record_id provided, set
if record_id:
self.record_id = record_id
# if document provided, set and parse
if document:
self.document = document
try:
# parse XML string, save
self.xml = etree.fromstring(self.document.encode('utf-8'))
# get namespace map, popping None values
_nsmap = self.xml.nsmap.copy()
try:
_nsmap.pop(None)
except:
pass
self.nsmap = _nsmap
except:
self.xml = None
self.nsmap = None
else:
# row
self._row = record_input
# get db id
try:
self.id = self._row._id
except:
pass
# get record id
self.record_id = self._row.record_id
# document string
self.document = self._row.document
# set error
self.error = self._row.error
try:
# parse XML string, save
self.xml = etree.fromstring(self.document.encode('utf-8'))
# get namespace map, popping None values
_nsmap = self.xml.nsmap.copy()
try:
_nsmap.pop(None)
except:
pass
self.nsmap = _nsmap
# set inverted nsmap
self.nsmap_inv = {v: k for k, v in self.nsmap.items()}
except:
self.xml = None
self.nsmap = None
def df_union_all(dfs):
"""
Function to merge list of DataFrames
"""
if len(dfs) > 1:
return dfs[0].unionAll(df_union_all(dfs[1:]))
return dfs[0]