/
save.py
193 lines (165 loc) · 7.12 KB
/
save.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
from kombu import Connection, Exchange, Queue, Producer
from packaging import version
from pymongo import MongoClient
from pyspark.sql import DataFrame
from optimus.helpers.columns import parse_columns
from optimus.helpers.decorators import *
from optimus.helpers.logger import logger
from optimus.spark import Spark
def save(self):
@add_attr(save)
def json(path, mode="overwrite", num_partitions=1):
"""
Save data frame in a json file
:param path: path where the dataframe will be saved.
:param mode: Specifies the behavior of the save operation when data already exists.
"append": Append contents of this DataFrame to existing data.
"overwrite" (default case): Overwrite existing data.
"ignore": Silently ignore this operation if data already exists.
"error": Throw an exception if data already exists.
:param num_partitions: the number of partitions of the DataFrame
:return:
"""
try:
# na.fill enforce null value keys to the json output
self.na.fill("") \
.repartition(num_partitions) \
.write \
.format("json") \
.mode(mode) \
.save(path)
except IOError as e:
logger.print(e)
raise
@add_attr(save)
def csv(path, header="true", mode="overwrite", sep=",", num_partitions=1):
"""
Save data frame to a CSV file.
:param path: path where the dataframe will be saved.
:param header: True or False to include header
:param mode: Specifies the behavior of the save operation when data already exists.
"append": Append contents of this DataFrame to existing data.
"overwrite" (default case): Overwrite existing data.
"ignore": Silently ignore this operation if data already exists.
"error": Throw an exception if data already exists.
:param sep: sets the single character as a separator for each field and value. If None is set,
it uses the default value.
:param num_partitions: the number of partitions of the DataFrame
:return: Dataframe in a CSV format in the specified path.
"""
try:
df = self
columns = parse_columns(self, "*", filter_by_column_dtypes=["date", "array", "vector", "binary", "null"])
df = df.cols.cast(columns, "str").repartition(num_partitions)
# Save to csv
df.write.options(header=header).mode(mode).csv(path, sep=sep)
except IOError as error:
logger.print(error)
raise
@add_attr(save)
def parquet(path, mode="overwrite", num_partitions=1):
"""
Save data frame to a parquet file
:param path: path where the dataframe will be saved.
:param mode: Specifies the behavior of the save operation when data already exists.
"append": Append contents of this DataFrame to existing data.
"overwrite" (default case): Overwrite existing data.
"ignore": Silently ignore this operation if data already exists.
"error": Throw an exception if data already exists.
:param num_partitions: the number of partitions of the DataFrame
:return:
"""
# This character are invalid as column names by parquet
invalid_character = [" ", ",", ";", "{", "}", "(", ")", "\n", "\t", "="]
def func(col_name):
for i in invalid_character:
col_name = col_name.replace(i, "_")
return col_name
df = self.cols.rename(func)
columns = parse_columns(self, "*", filter_by_column_dtypes=["null"])
df = df.cols.cast(columns, "str")
try:
df.coalesce(num_partitions) \
.write \
.mode(mode) \
.parquet(path)
except IOError as e:
logger.print(e)
raise
@add_attr(save)
def avro(path, mode="overwrite", num_partitions=1):
"""
Save data frame to an avro file
:param path: path where the dataframe will be saved.
:param mode: Specifies the behavior of the save operation when data already exists.
"append": Append contents of this DataFrame to existing data.
"overwrite" (default case): Overwrite existing data.
"ignore": Silently ignore this operation if data already exists.
"error": Throw an exception if data already exists.
:param num_partitions: the number of partitions of the DataFrame
:return:
"""
try:
if version.parse(Spark.instance.spark.version) < version.parse("2.4"):
avro_version = "com.databricks.spark.avro"
else:
avro_version = "avro"
self.coalesce(num_partitions) \
.write.format(avro_version) \
.mode(mode) \
.save(path)
except IOError as e:
logger.print(e)
raise
@add_attr(save)
def rabbit_mq(host, exchange_name=None, queue_name=None, routing_key=None, parallelism=None):
"""
Send a dataframe to a redis queue
# https://medium.com/python-pandemonium/talking-to-rabbitmq-with-python-and-kombu-6cbee93b1298
# https://medium.com/python-pandemonium/building-robust-rabbitmq-consumers-with-python-and-kombu-part-1-ccd660d17271
:return:
"""
df = self
if parallelism:
df = df.coalesce(parallelism)
def _rabbit_mq(messages):
conn = Connection(host)
channel = conn.channel()
exchange = Exchange(exchange_name, type="direct")
queue = Queue(name=queue_name, exchange=exchange, routing_key=routing_key)
queue.maybe_bind(conn)
queue.declare()
producer = Producer(exchange=exchange, channel=channel, routing_key=routing_key)
for message in messages:
# as_dict = message.asDict(recursive=True)
producer.publish(message)
channel.close()
conn.release()
return messages
self.rdd.mapPartitions(_rabbit_mq).count()
@add_attr(save)
def mongo(host, port=None, db_name=None, collection_name=None, parallelism=None):
"""
Send a dataframe to a mongo collection
:param host:
:param port:
:param db_name:
:param collection_name:
:param parallelism:
:return:
"""
df = self
if parallelism:
df = df.coalesce(parallelism)
def _mongo(messages):
client = MongoClient(host, port)
db = client[db_name]
collection = db[collection_name]
for message in messages:
as_dict = message.asDict(recursive=True)
collection.insert_one(as_dict)
client.close()
return messages
df.rdd.mapPartitions(_mongo).count()
return save
DataFrame.save = property(save)