In [0]:
import random
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
db = "sales_db"

spark.sql(f"DROP DATABASE IF EXISTS {db}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
spark.sql(f"USE {db}")

 
spark.sql("SET spark.databricks.delta.formatCheck.enabled = false")
spark.sql("SET spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true")

In [0]:
output_path = "/tmp/Streaming/test_parquet/"
checkpoint_path = "/tmp/Streaming/test-checkpoint"

In [0]:
dbutils.fs.rm(output_path,True)
dbutils.fs.mkdirs(output_path)

dbutils.fs.rm(checkpoint_path,True)
dbutils.fs.mkdirs(checkpoint_path)

for table in ["sales_db.sales",]:
        spark.sql(f"DROP TABLE IF EXISTS {table}")

In [0]:
def stop_all_streams():
    for s in spark.streams.active:
        try:
            print(f"Steam {s} was stopped")
            s.stop()
        except:
            pass
    print("All streams were stopped")

In [0]:
@udf(returnType=StringType())
def random_region(val1="r",val2=4):
  return str(random.choice(["".join([val1,str(_)]) for _ in range(1,val2)]))

In [0]:
@udf(returnType=StringType())
def random_manager(val1="m",val2=7):
  return str(random.choice(["".join([val1,str(_)]) for _ in range(1,val2)]))

In [0]:
@udf(returnType=StringType())
def random_product(val1="pr",val2=10):
  return str(random.choice(["".join([val1,str(_)]) for _ in range(1,val2)]))

In [0]:
@udf(returnType=IntegerType())
def random_val(val1=1,val2=100):
  return random.randint(val1,val2)

In [0]:
stream_data = (spark.readStream.format("rate").option("rowsPerSecond", 1).load()
               .withColumn("timestamp",current_timestamp())
               .withColumn("sale_id", col("value"))
               .withColumn("region",random_region())
               .withColumn("manager",random_manager())
               .withColumn("product",random_product())
               .withColumn("val",random_val()))
stream_data = stream_data.select("timestamp", "sale_id", "region", "manager", "product", "val")

In [0]:
stream_data.isStreaming

In [0]:
stream_data.printSchema()

In [0]:
query = (stream_data
         .writeStream
         .format("parquet")
         .option("path", output_path)
         .option("checkpointLocation", checkpoint_path)
         .outputMode("append")
         .queryName("sales")
         .trigger(processingTime='5 seconds')
         .start() 
        )

In [0]:
stop_all_streams()

In [0]:
dbutils.fs.ls(output_path)

In [0]:
df = spark.read.format("parquet").load(output_path)

In [0]:
df.show(25, truncate=False)

In [0]:
df.printSchema()

In [0]:
df.write.format("delta").mode("overwrite").saveAsTable("sales")

In [0]:
%sql
SELECT *
FROM sales as s
ORDER BY s.sale_id
LIMIT 10

In [0]:
%sql
SELECT s.region, s.manager, sum(s.val) as total_val
FROM sales as s
GROUP BY s.region, s.manager
ORDER BY s.region, s.manager