In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.functions import sum as _sum

In [None]:
conf = SparkConf()
conf.set("spark.hadoop.fs.s3a.impl",
                 "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set("spark.hadoop.fs.s3a.access.key", "admin")
conf.set("spark.hadoop.fs.s3a.secret.key", "123456789")
conf.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
conf.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider',
            'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
conf.set(
    "spark.jars.packages",
    "io.delta:delta-core_2.12:1.0.0,"
    "org.apache.hadoop:hadoop-aws:3.1.1,"
    "com.amazonaws:aws-java-sdk:1.11.271,"
    "com.amazonaws:aws-java-sdk-bundle:1.11.271,"
    "software.amazon.awssdk:url-connection-client:2.15.40",
)
conf.set("spark.sql.extensions",
            "io.delta.sql.DeltaSparkSessionExtension")
conf.set(
    "spark.sql.catalog.spark_catalog",
    "org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
conf.set("spark.databricks.delta.merge.repartitionBeforeWrite.enabled","true")

spark = (
    SparkSession
    .builder
    .config(conf=conf)
    .master("local[*]")
    .getOrCreate()
)

In [None]:
orderDF = spark.read.format("delta").load("s3a://datalake/brozen/cdc.inventory.orders")
orderDF.orderBy("order_number").show()

In [None]:
productDF = spark.read.format("delta").load("s3a://datalake/brozen/cdc.inventory.products")
productDF.orderBy("id").show()

In [None]:
customerDF = spark.read.format("delta").load("s3a://datalake/brozen/cdc.inventory.customers")
customerDF.show()

In [None]:
path_table = "s3a://datalake/brozen/cdc.inventory.orders"
df = spark.read.format("delta").option("versionAsOf", 0).load(path_table)
df.show()

In [None]:
delta_table = DeltaTable.forPath(spark, "s3a://datalake/brozen/cdc.inventory.orders")
history = delta_table.history()
history.show()

In [None]:
df = spark.read.format("delta").option("versionAsOf", 0).load("s3a://datalake/brozen/cdc.inventory.orders")
df.show()

In [None]:
data = [(10004, 16852, 1003, 2000, 107, "u")]
columns = ["order_number", "order_date", "purchaser", "quantity", "product_id", "op"]
df = spark.createDataFrame(data, schema=columns)
df.show()

In [None]:
orderDF = spark.readStream.option("ignoreChanges", "true").format("delta").load("s3a://datalake/brozen/cdc.inventory.orders")
productDF = spark.readStream.option("ignoreChanges", "true").format("delta").load("s3a://datalake/brozen/cdc.inventory.products")
customerDF = spark.readStream.option("ignoreChanges", "true").format("delta").load("s3a://datalake/brozen/cdc.inventory.customers")

# orderDF.join(customerDF, customerDF.id == orderDF.purchaser, "inner") \
#                     .join(productDF, productDF.id == orderDF.product_id, "inner") \
#                     .selectExpr("order_number", "order_date as order_time", "email", "purchaser", 
#                                 "name as product_name", "quantity", "weight as unit_price") \
orderDF.writeStream.format("delta").option("checkpointLocation", "s3a://datalake/sliver/checkpoint/aggTable").outputMode("append").start("s3a://datalake/sliver/aggTable")

In [None]:
df = spark.read.format("delta").load("s3a://datalake/sliver/aggTable")
df.show()

In [None]:
delta_table = DeltaTable.forPath(spark, "s3a://datalake/sliver/aggTable")
history = delta_table.history()
history.show()

In [None]:
df = spark.read.format("delta").load("s3a://datalake/sliver/calTable")
df.count()

In [32]:
orderDF = spark.readStream.option("ignoreChanges", "true").format("delta").load("s3a://datalake/brozen/cdc.inventory.orders")
productDF = spark.readStream.option("ignoreChanges", "true").format("delta").load("s3a://datalake/brozen/cdc.inventory.products")
customerDF = spark.readStream.option("ignoreChanges", "true").format("delta").load("s3a://datalake/brozen/cdc.inventory.customers")

joinDF = orderDF.join(customerDF, customerDF.id == orderDF.purchaser, "inner") \
                    .join(productDF, productDF.id == orderDF.product_id, "inner") \
                    .selectExpr("order_number", "order_date as order_time", "email", "purchaser", 
                                "name as product_name", "quantity", "weight as unit_price")

calDF = joinDF.withColumn(
  	    "total_price", joinDF.quantity * joinDF.unit_price)

total_spent_DF = calDF \
        .groupBy("email") \
        .agg(_sum("total_price")) 

# product_DF = calDF \
#             .groupBy("product_name") \
#             .agg(
#                 _sum("quantity").alias("products_selled"), 
#                 _sum("total_price").alias("total_price")
#             )

# product_price = calDF \
#             .groupBy("order_time","product_name") \
#             .agg(
#                 avg("unit_price").alias("ave_unit_price")
            # )

calDF.writeStream.format("delta").option("checkpointLocation", "s3a://datalake/sliver/checkpoint/calTable") \
                    .outputMode("append").start("s3a://datalake/sliver/calTable")

total_spent_DF.writeStream.format("delta").option("checkpointLocation", "s3a://datalake/sliver/checkpoint/total_spent_Df") \
                    .outputMode("append").start("s3a://datalake/sliver/total_spent_Df")

# product_DF.writeStream.format("delta").option("checkpointLocation", "s3a://datalake/sliver/checkpoint/product_DF") \
#                     .outputMode("append").start("s3a://datalake/sliver/product_DF")

# product_price.writeStream.format("delta").option("checkpointLocation", "s3a://datalake/sliver/checkpoint/product_price") \
#                     .outputMode("append").start("s3a://datalake/sliver/product_price")

23/03/17 15:10:00 WARN StreamingQueryManager: Stopping existing streaming query [id=c5f28b59-15cf-45b9-a5da-233245351362, runId=3c4bb777-cd3f-4b70-abd0-3d42989ac196], as a new run is being started.


AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
Aggregate [email#23682], [email#23682, sum(cast(total_price#23753 as double)) AS sum(total_price)#23771]
+- Project [order_number#23647, order_time#23743, email#23682, purchaser#23649, product_name#23744, quantity#23650, unit_price#23745, (cast(quantity#23650 as float) * unit_price#23745) AS total_price#23753]
   +- Project [order_number#23647, order_date#23648 AS order_time#23743, email#23682, purchaser#23649, name#23665 AS product_name#23744, quantity#23650, weight#23667 AS unit_price#23745]
      +- Join Inner, (id#23664 = product_id#23651)
         :- Join Inner, (id#23679 = purchaser#23649)
         :  :- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6f692efb,delta,List(),None,List(),None,Map(ignoreChanges -> true, path -> s3a://datalake/brozen/cdc.inventory.orders),None), delta, [order_number#23647, order_date#23648, purchaser#23649, quantity#23650, product_id#23651, op#23652]
         :  +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6f692efb,delta,List(),None,List(),None,Map(ignoreChanges -> true, path -> s3a://datalake/brozen/cdc.inventory.customers),None), delta, [id#23679, first_name#23680, last_name#23681, email#23682, op#23683]
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6f692efb,delta,List(),None,List(),None,Map(ignoreChanges -> true, path -> s3a://datalake/brozen/cdc.inventory.products),None), delta, [id#23664, name#23665, description#23666, weight#23667, op#23668]


23/03/17 15:15:22 ERROR MicroBatchExecution: Query [id = 28dc1a50-aeaa-4cd8-9e82-9cd39dafa6bf, runId = 00879f97-05e2-4835-bd3b-93c1f141cc21] terminated with error
java.io.IOException: rename from s3a://datalake/sliver/checkpoint/calTable/offsets/.1.fe339b9c-6b90-4694-82b3-1290ff2ae67d.tmp to s3a://datalake/sliver/checkpoint/calTable/offsets/1 failed.
	at org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1548)
	at org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:204)
	at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:769)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
	at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:335)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager