-
Notifications
You must be signed in to change notification settings - Fork 0
/
avro_sample.py
92 lines (53 loc) · 2 KB
/
avro_sample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# Databricks notebook source
from pyspark.sql.types import *
from pyspark.sql.functions import *
dbutils.widgets.text("year", "2019", "Year")
dbutils.widgets.text("month", "03", "Month")
dbutils.widgets.text("day", "01", "Day")
# COMMAND ----------
y = dbutils.widgets.get("year")
m = dbutils.widgets.get("month")
d = dbutils.widgets.get("day")
# COMMAND ----------
# COMMAND ----------
dbutils.fs.ls("wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/ehnrtanalytics/ehnrtanalytics-output/1/2019/03/04/20/53")
# COMMAND ----------
# Path format: partition/year/month/day/hour/minute/
input_loc = "wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/ehnrtanalytics/ehnrtanalytics-output/*/{}/{}/{}/*/*/".format(
y, m, d)
# COMMAND ----------
df = spark.read.format("avro").load(input_loc)
# COMMAND ----------
df.printSchema()
# COMMAND ----------
df.count()
# COMMAND ----------
df2 = df.select(df.Body.cast('string'))
# COMMAND ----------
display(df2)
# COMMAND ----------
df2.write.json(
'wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/temp_out/', mode="overwrite")
# COMMAND ----------
dbutils.fs.ls(
"wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/temp_out/")
# COMMAND ----------
dfjs = spark.read.json(
"wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/temp_out/")
# COMMAND ----------
schema = StructType([StructField("version", StringType()), StructField(
"userid", StringType()), StructField("platform", StringType())])
dfx = dfjs.select(from_json("Body", schema).alias("B"))
# COMMAND ----------
display(dfx)
# COMMAND ----------
dfx2 = dfx.select("B.*")
# COMMAND ----------
display(dfx2)
# COMMAND ----------
display(dfx2.groupBy("platform").count())
# COMMAND ----------
out_loc = "wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/csv_out/{}/{}/{}".format(
year, month, day)
dfx2.write.csv(out_loc, header=True, mode="overwrite")
# COMMAND ----------