In [0]:

%run "./Classroom-Setup"

In [0]:
%run "./Delta-Lab-1-Prep"

In [0]:
%python

sqlContext.setConf("spark.sql.shuffle.partitions", "8")

In [0]:
DeltaPath = userhome + "/delta/customer-data/"
CustomerCountsPath = userhome + "/delta/customer_counts/"
dbutils.fs.rm(CustomerCountsPath, True)

(spark.read
  .format("delta")
  .load(DeltaPath)
  .groupBy("CustomerID", "Country")
  .count()
  .withColumnRenamed("count", "total_orders")
  .write
  .mode("overwrite")
  .format("delta")
  .partitionBy("Country")
  .save(CustomerCountsPath))

In [0]:
spark.sql("""
  DROP TABLE IF EXISTS customer_counts
""")

spark.sql("""
  CREATE TABLE customer_counts
  USING DELTA
  LOCATION '{}'
""".format(CustomerCountsPath))

In [0]:
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType

inputSchema = StructType([
  StructField("InvoiceNo", IntegerType(), True),
  StructField("StockCode", StringType(), True),
  StructField("Description", StringType(), True),
  StructField("Quantity", IntegerType(), True),
  StructField("InvoiceDate", StringType(), True),
  StructField("UnitPrice", DoubleType(), True),
  StructField("CustomerID", IntegerType(), True),
  StructField("Country", StringType(), True)
])

In [0]:
newDataPath = "/mnt/training/online_retail/outdoor-products/outdoor-products-small.csv"

spark.sql("DROP TABLE IF EXISTS new_customer_counts")
newDataDF = (spark
  .read
  .option("header", "true")
  .schema(inputSchema)
  .csv(newDataPath)
)

(newDataDF.groupBy("CustomerID", "Country")
  .count()
  .withColumnRenamed("count", "total_orders")
  .write
  .saveAsTable("new_customer_counts"))

In [0]:
%sql

MERGE INTO customer_counts
USING new_customer_counts
ON customer_counts.Country = new_customer_counts.Country
AND customer_counts.CustomerID = new_customer_counts.CustomerID
WHEN MATCHED THEN
  UPDATE SET total_orders = customer_counts.total_orders + new_customer_counts.total_orders
WHEN NOT MATCHED THEN
  INSERT *

In [0]:
(newDataDF.write
  .format("delta")
  .mode("append")
  .save(DeltaPath))
