In [0]:
#Reading the order details from schema and save it as in delta table
from pyspark.sql import functions as F
orderSchemaDDL = "orderNumber int, productCode string, qty int, unitPrice double, orderLineNumber int"
orderdetailDF = spark.read.option("header","true").schema(orderSchemaDDL).csv("dbfs:/FileStore/data/cmdata/orderdetail.csv")
orderdetailDF.display()

orderdetailDF.write.format("delta").save("/output/trg2809/orderdetailDT")

orderNumber,productCode,qty,unitPrice,orderLineNumber
10100,S18_1749,30,136.0,3
10100,S18_2248,50,55.09,2
10100,S18_4409,22,75.46,4
10100,S24_3969,49,35.29,1
10101,S18_2325,25,108.06,4
10101,S18_2795,26,167.06,1
10101,S24_1937,45,32.53,3
10101,S24_2022,46,44.35,2
10102,S18_1342,39,95.55,2
10102,S18_1367,41,43.13,1


In [0]:
%fs ls /output/trg2809/orderdetailDT

path,name,size,modificationTime
dbfs:/output/trg2809/orderdetailDT/_delta_log/,_delta_log/,0,0
dbfs:/output/trg2809/orderdetailDT/part-00000-fd2d01c9-bf47-429f-89ed-83f6a7551f43-c000.snappy.parquet,part-00000-fd2d01c9-bf47-429f-89ed-83f6a7551f43-c000.snappy.parquet,24233,1664466618000


In [0]:
%fs ls /output/trg2809/orderdetailDT/_delta_log/

path,name,size,modificationTime
dbfs:/output/trg2809/orderdetailDT/_delta_log/.s3-optimization-0,.s3-optimization-0,0,1664466633000
dbfs:/output/trg2809/orderdetailDT/_delta_log/.s3-optimization-1,.s3-optimization-1,0,1664466633000
dbfs:/output/trg2809/orderdetailDT/_delta_log/.s3-optimization-2,.s3-optimization-2,0,1664466634000
dbfs:/output/trg2809/orderdetailDT/_delta_log/00000000000000000000.crc,00000000000000000000.crc,2133,1664466632000
dbfs:/output/trg2809/orderdetailDT/_delta_log/00000000000000000000.json,00000000000000000000.json,1822,1664466619000


In [0]:
%fs head /output/trg2809/orderdetailDT/_delta_log/00000000000000000000.json

In [0]:
#Read the Delta table

from delta.tables import *

orderdetailDT = DeltaTable.forPath(spark, "/output/trg2809/orderdetailDT/")
orderdetailDT.toDF().display()

orderNumber,productCode,qty,unitPrice,orderLineNumber
10100,S18_1749,30,136.0,3
10100,S18_2248,50,55.09,2
10100,S18_4409,22,75.46,4
10100,S24_3969,49,35.29,1
10101,S18_2325,25,108.06,4
10101,S18_2795,26,167.06,1
10101,S24_1937,45,32.53,3
10101,S24_2022,46,44.35,2
10102,S18_1342,39,95.55,2
10102,S18_1367,41,43.13,1


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sales;
CREATE TABLE sales.orderdetailDT2809
USING DELTA
location '/output/trg2809/orderdetailDT/';

--Create physical metadata table in SQL


In [0]:
%sql
SELECT * FROM sales.orderdetailDT2809 LIMIT 5;
SELECT * FROM delta.`/output/trg2809/orderdetailDT/` limit 5;

orderNumber,productCode,qty,unitPrice,orderLineNumber
10100,S18_1749,30,136.0,3
10100,S18_2248,50,55.09,2
10100,S18_4409,22,75.46,4
10100,S24_3969,49,35.29,1
10101,S18_2325,25,108.06,4


In [0]:
#Delete on DELTA table
from pyspark.sql.functions import *

orderdetailDT.delete(col("orderNumber") == 10100)

In [0]:
%sql
DELETE FROM sales.orderdetailDT2809
WHERE orderNumber = 10101
AND productCode = "S24_1937";

num_affected_rows
1


In [0]:
#Update on DELTA table
from delta.tables import *
from pyspark.sql.functions import *


orderdetailDT.update(
        condition = (col("orderNumber") == 10101) & (col("productCode") == 'S18_2795'),
        set = {"qty" : lit(30)}
)


In [0]:
%sql
UPDATE sales.orderdetailDT2809
SET qty = 40
WHERE orderNumber = 10101 AND productCode = 'S18_2795'

num_affected_rows
1


In [0]:
orderdetailStageDF = spark.read.option("header","true").schema(orderSchemaDDL).csv("dbfs:/FileStore/data/cmdata/orderdetail_delta.csv")
orderdetailStageDF.display()
#orderdetailDF.write.format("delta").save("/output/trg2809/orderdetailDT")

orderNumber,productCode,qty,unitPrice,orderLineNumber
70111,S18_1749,30,136.0,3
70111,S18_2248,50,55.09,2
70111,S18_4409,22,75.46,1
10102,S18_1342,40,95.55,2
10102,S18_1367,45,43.13,1


In [0]:
#Upsert into a table using merge

orderdetailDT.alias('prod') \
  .merge(
    orderdetailStageDF.alias('stage'),
    'prod.orderNumber = stage.orderNumber and prod.productCode = stage.productCode'
  ) \
  .whenMatchedUpdate(set =
    {
      "orderNumber": "stage.orderNumber",
      "productCode": "stage.productCode",
      "qty": "stage.qty",
      "unitprice": "stage.unitPrice",
      "orderLineNumber": "stage.orderLineNumber"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "orderNumber": "stage.orderNumber",
      "productCode": "stage.productCode",
      "qty": "stage.qty",
      "unitprice": "stage.unitPrice",
      "orderLineNumber": "stage.orderLineNumber"
    }
  ) \
  .execute()

In [0]:
display(orderdetailDT.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
6,2022-09-29T16:59:45.000+0000,2498077672718400,kundan.kumar23@wipro.com,MERGE,"Map(predicate -> ((prod.orderNumber = stage.orderNumber) AND (prod.productCode = stage.productCode)), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(101824190164321),0929-154154-qn1wx3xa,5.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, executionTimeMs -> 6332, numTargetRowsInserted -> 0, scanTimeMs -> 3598, numTargetRowsUpdated -> 5, numOutputRows -> 5, numTargetChangeFilesAdded -> 0, numSourceRows -> 5, numTargetFilesRemoved -> 1, rewriteTimeMs -> 2364)",,Databricks-Runtime/10.4.x-scala2.12
5,2022-09-29T16:58:49.000+0000,2498077672718400,kundan.kumar23@wipro.com,MERGE,"Map(predicate -> ((prod.orderNumber = stage.orderNumber) AND (prod.productCode = stage.productCode)), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(101824190164321),0929-154154-qn1wx3xa,4.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 2989, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, executionTimeMs -> 7825, numTargetRowsInserted -> 3, scanTimeMs -> 4125, numTargetRowsUpdated -> 2, numOutputRows -> 2994, numTargetChangeFilesAdded -> 0, numSourceRows -> 5, numTargetFilesRemoved -> 1, rewriteTimeMs -> 3022)",,Databricks-Runtime/10.4.x-scala2.12
4,2022-09-29T16:36:12.000+0000,2498077672718400,kundan.kumar23@wipro.com,UPDATE,Map(predicate -> ((orderNumber#2650 = 10101) AND (productCode#2651 = S18_2795))),,List(101824190164321),0929-154154-qn1wx3xa,3.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 2990, numAddedChangeFiles -> 0, executionTimeMs -> 3544, scanTimeMs -> 426, numAddedFiles -> 1, numUpdatedRows -> 1, rewriteTimeMs -> 3118)",,Databricks-Runtime/10.4.x-scala2.12
3,2022-09-29T16:34:37.000+0000,2498077672718400,kundan.kumar23@wipro.com,UPDATE,Map(predicate -> ((orderNumber#674 = 10101) AND (productCode#675 = S18_2795))),,List(101824190164321),0929-154154-qn1wx3xa,2.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 2990, numAddedChangeFiles -> 0, executionTimeMs -> 3015, scanTimeMs -> 297, numAddedFiles -> 1, numUpdatedRows -> 1, rewriteTimeMs -> 2674)",,Databricks-Runtime/10.4.x-scala2.12
2,2022-09-29T16:28:13.000+0000,2498077672718400,kundan.kumar23@wipro.com,DELETE,"Map(predicate -> [""((spark_catalog.sales.orderdetailDT2809.orderNumber = 10101) AND (spark_catalog.sales.orderdetailDT2809.productCode = 'S24_1937'))""])",,List(101824190164321),0929-154154-qn1wx3xa,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 2991, numAddedChangeFiles -> 0, executionTimeMs -> 3582, numDeletedRows -> 1, scanTimeMs -> 1973, numAddedFiles -> 1, rewriteTimeMs -> 1609)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-09-29T16:24:45.000+0000,2498077672718400,kundan.kumar23@wipro.com,DELETE,"Map(predicate -> [""(orderNumber = 10100)""])",,List(101824190164321),0929-154154-qn1wx3xa,0.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 2992, numAddedChangeFiles -> 0, executionTimeMs -> 4413, numDeletedRows -> 4, scanTimeMs -> 2457, numAddedFiles -> 1, rewriteTimeMs -> 1949)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-09-29T15:50:19.000+0000,2498077672718400,kundan.kumar23@wipro.com,WRITE,"Map(mode -> ErrorIfExists, partitionBy -> [])",,List(101824190164321),0929-154154-qn1wx3xa,,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 2996, numOutputBytes -> 24233)",,Databricks-Runtime/10.4.x-scala2.12


In [0]:
%fs ls /FileStore/data/cmstream

path,name,size,modificationTime
dbfs:/FileStore/data/cmstream/orderdetail.csv,orderdetail.csv,79571,1664210449000
dbfs:/FileStore/data/cmstream/orderdetail1.csv,orderdetail1.csv,79571,1664210459000
dbfs:/FileStore/data/cmstream/orderdetail10.csv,orderdetail10.csv,79571,1664210456000
dbfs:/FileStore/data/cmstream/orderdetail11.csv,orderdetail11.csv,79571,1664210457000
dbfs:/FileStore/data/cmstream/orderdetail12.csv,orderdetail12.csv,79571,1664210453000
dbfs:/FileStore/data/cmstream/orderdetail13.csv,orderdetail13.csv,79571,1664210458000
dbfs:/FileStore/data/cmstream/orderdetail14.csv,orderdetail14.csv,79571,1664210457000
dbfs:/FileStore/data/cmstream/orderdetail15.csv,orderdetail15.csv,79571,1664210455000
dbfs:/FileStore/data/cmstream/orderdetail16.csv,orderdetail16.csv,79571,1664210459000
dbfs:/FileStore/data/cmstream/orderdetail17.csv,orderdetail17.csv,79571,1664210450000


In [0]:
#https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts

from pyspark.sql.functions import *
orderSchemaDDL = "orderNumber int, productCode string, qty int, unitPrice double, orderLineNumber int"
orderdetailStreamDF =(spark
                      .readStream
                      .option("header","true")
                      .option("maxFileParTrigger", 1)
                      .schema(orderSchemaDDL)
                      .csv("dbfs:/FileStore/data/cmstream")
                     )



In [0]:
productDF = spark.read.option("header","true").option("inferSchema","true").csv("/FileStore/data/cmdata/product.csv")
orderSummaryStreamDF = (orderdetailStreamDF
                                .join(productDF, "productCode", "inner")
                                .select("orderNumber", "productCode", "productLine", "qty",  "unitPrice","MSRP")
                       )

In [0]:
orderStream = (orderSummaryStreamDF
               .writeStream
               .format("Delta")
               .option("checkpointLocation", "output/trg2809V1/orderdetailDT/check")
               .outputMode("append")
               .start("/output/trg2809V1/orderdetailDT/out")
              )

In [0]:
orderStream.awaitTermination(5)
orderStream.stop()

In [0]:
%fs ls /output/trg2809V1/orderdetailDT/out

path,name,size,modificationTime
dbfs:/output/trg2809V1/orderdetailDT/out/_delta_log/,_delta_log/,0,0
dbfs:/output/trg2809V1/orderdetailDT/out/part-00000-1a6573c8-f175-4d54-890f-93d464620d5f-c000.snappy.parquet,part-00000-1a6573c8-f175-4d54-890f-93d464620d5f-c000.snappy.parquet,41671,1664472779000
dbfs:/output/trg2809V1/orderdetailDT/out/part-00001-27fd4765-7ff1-4997-a901-c838114eb0d5-c000.snappy.parquet,part-00001-27fd4765-7ff1-4997-a901-c838114eb0d5-c000.snappy.parquet,41671,1664472779000
dbfs:/output/trg2809V1/orderdetailDT/out/part-00002-836cbbcc-1e9d-483a-83bb-a3e958b80128-c000.snappy.parquet,part-00002-836cbbcc-1e9d-483a-83bb-a3e958b80128-c000.snappy.parquet,41671,1664472779000
dbfs:/output/trg2809V1/orderdetailDT/out/part-00003-ce2ba823-4e8a-446f-954a-59a2d584d303-c000.snappy.parquet,part-00003-ce2ba823-4e8a-446f-954a-59a2d584d303-c000.snappy.parquet,41671,1664472779000
dbfs:/output/trg2809V1/orderdetailDT/out/part-00004-dba3f056-62db-4694-93fe-f2076e89c3ee-c000.snappy.parquet,part-00004-dba3f056-62db-4694-93fe-f2076e89c3ee-c000.snappy.parquet,41671,1664472779000
dbfs:/output/trg2809V1/orderdetailDT/out/part-00005-ca5c9d47-0926-4dab-ba84-aa4d9bc924da-c000.snappy.parquet,part-00005-ca5c9d47-0926-4dab-ba84-aa4d9bc924da-c000.snappy.parquet,41671,1664472779000
dbfs:/output/trg2809V1/orderdetailDT/out/part-00006-4aafb2f0-f4f0-42a0-9851-1f289c7248b0-c000.snappy.parquet,part-00006-4aafb2f0-f4f0-42a0-9851-1f289c7248b0-c000.snappy.parquet,41671,1664472779000
dbfs:/output/trg2809V1/orderdetailDT/out/part-00007-bce20963-fffc-4c62-903c-46fd44652387-c000.snappy.parquet,part-00007-bce20963-fffc-4c62-903c-46fd44652387-c000.snappy.parquet,26968,1664472777000


In [0]:
%fs ls /output/trg2809V1/orderdetailDT/out/_delta_log

path,name,size,modificationTime
dbfs:/output/trg2809V1/orderdetailDT/out/_delta_log/.s3-optimization-0,.s3-optimization-0,0,1664472783000
dbfs:/output/trg2809V1/orderdetailDT/out/_delta_log/.s3-optimization-1,.s3-optimization-1,0,1664472783000
dbfs:/output/trg2809V1/orderdetailDT/out/_delta_log/.s3-optimization-2,.s3-optimization-2,0,1664472784000
dbfs:/output/trg2809V1/orderdetailDT/out/_delta_log/00000000000000000000.crc,00000000000000000000.crc,2208,1664472783000
dbfs:/output/trg2809V1/orderdetailDT/out/_delta_log/00000000000000000000.json,00000000000000000000.json,7047,1664472780000
