Skip to content

Commit

Permalink
Merge pull request #1 from sptramer/pep8
Browse files Browse the repository at this point in the history
Run PEP8 on content
  • Loading branch information
BrianPeek committed Oct 15, 2019
2 parents 61439e7 + 4491b05 commit 414dcb9
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions avro_sample.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Databricks notebook source

from pyspark.sql.types import *
from pyspark.sql.functions import *
dbutils.widgets.text("year", "2019", "Year")
dbutils.widgets.text("month", "03", "Month")
dbutils.widgets.text("day", "01", "Day")
Expand All @@ -12,8 +14,6 @@

# COMMAND ----------

from pyspark.sql.functions import *
from pyspark.sql.types import *

# COMMAND ----------

Expand All @@ -22,7 +22,8 @@
# COMMAND ----------

# Path format: partition/year/month/day/hour/minute/
input_loc = "wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/ehnrtanalytics/ehnrtanalytics-output/*/{}/{}/{}/*/*/".format(y, m, d)
input_loc = "wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/ehnrtanalytics/ehnrtanalytics-output/*/{}/{}/{}/*/*/".format(
y, m, d)


# COMMAND ----------
Expand All @@ -47,19 +48,23 @@

# COMMAND ----------

df2.write.json('wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/temp_out/', mode="overwrite")
df2.write.json(
'wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/temp_out/', mode="overwrite")

# COMMAND ----------

dbutils.fs.ls("wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/temp_out/")
dbutils.fs.ls(
"wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/temp_out/")

# COMMAND ----------

dfjs = spark.read.json("wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/temp_out/")
dfjs = spark.read.json(
"wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/temp_out/")

# COMMAND ----------

schema = StructType([StructField("version", StringType()), StructField("userid", StringType()), StructField("platform", StringType())])
schema = StructType([StructField("version", StringType()), StructField(
"userid", StringType()), StructField("platform", StringType())])
dfx = dfjs.select(from_json("Body", schema).alias("B"))

# COMMAND ----------
Expand All @@ -80,9 +85,8 @@

# COMMAND ----------

out_loc = "wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/csv_out/{}/{}/{}".format(year, month, day)
out_loc = "wasbs://ehcapture-analytics@storagenrtanalytics.blob.core.windows.net/csv_out/{}/{}/{}".format(
year, month, day)
dfx2.write.csv(out_loc, header=True, mode="overwrite")

# COMMAND ----------


0 comments on commit 414dcb9

Please sign in to comment.