### READ JSON DATA (BATCH)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### READ JSON DATA (BATCH)

In [0]:
# df = spark.read.format("json")\
#             .option("inferScehma", True)\
#             .option("multiline", True)\
#             .load("/Volumes/workspace/stream/streaming/jsonsource")
# #flatten order_id and timestamp
# df = df.select("order_id", "timestamp", "customer", "items", "payment", "metadata")

# #flatten customer
# df = df.select("order_id", "timestamp", "customer.customer_id", "customer.name", "customer.email", "customer.address.city", "customer.address.country", "customer.address.postal_code", "items", "payment", "metadata")

# #explode items list
# df = df.withColumn("items", explode_outer("items"))

# #flatten items
# df = df.select("items.item_id", "items.price", "items.product_name", "items.quantity", "order_id", "timestamp", "customer_id", "name", "email", "city", "country", "postal_code", "payment", "metadata")

# #flatten payment
# df = df.select("order_id", "timestamp", "item_id", "price", "product_name", "quantity", "order_id", "timestamp", "customer_id", "name", "email", "city", "country", "postal_code", "payment.method", "payment.transaction_id", "metadata")

# #explode metadata list
# df = df.withColumn("metadata", explode_outer("metadata"))

# #flatten metadata
# df = df.select("*", "metadata.key", "metadata.value").drop("metadata")

# display(df)


### READ STREAMING DATA

In [0]:
my_schema = """order_id STRING,
timestamp STRING,
customer STRUCT<
  customer_id: INT,
  name: STRING,
  email: STRING,
  address: STRUCT<
    city: STRING,
    postal_code: STRING,
    country: STRING
  >
>,
items ARRAY<STRUCT<
  item_id: STRING,
  product_name: STRING,
  quantity: INT,
  price: DOUBLE
>>,
payment STRUCT<
  method: STRING,
  transaction_id: STRING
>,
metadata ARRAY<STRUCT<
  key: STRING,
  value: STRING
>>"""

### FLATTEN JSON
get rid of hierarchy in json to make data easier to process

### EXPLODE ARRAY
convert every item in array as a row
explode() - removes NULL values. Drops entire row for one NULL
explode_outer() - explode all values, including NULL. You want this so a row won't be dropped for one NULL in the real world.

In [0]:
from pyspark.sql.functions import explode_outer, col

# Streaming query
df = spark.readStream.format("json")\
    .option("multiline", True)\
    .schema(my_schema)\
    .load("/Volumes/workspace/stream/streaming/jsonsource")

# Flatten customer
df = df.select(
    "order_id", "timestamp",
    col("customer.customer_id").alias("customer_id"),
    col("customer.name").alias("customer_name"),
    col("customer.email").alias("customer_email"),
    col("customer.address.city").alias("customer_city"),
    col("customer.address.country").alias("customer_country"),
    col("customer.address.postal_code").alias("customer_postal_code"),
    "items", "payment", "metadata"
)

# Explode items list
df = df.withColumn("item", explode_outer("items"))

# Flatten items
df = df.select(
    "order_id", "timestamp", "customer_id", "customer_name", "customer_email",
    "customer_city", "customer_country", "customer_postal_code",
    col("item.item_id").alias("item_id"),
    col("item.price").alias("item_price"),
    col("item.product_name").alias("item_product_name"),
    col("item.quantity").alias("item_quantity"),
    "payment", "metadata"
)

# Flatten payment
df = df.select(
    "order_id", "timestamp", "customer_id", "customer_name", "customer_email",
    "customer_city", "customer_country", "customer_postal_code",
    "item_id", "item_price", "item_product_name", "item_quantity",
    col("payment.method").alias("payment_method"),
    col("payment.transaction_id").alias("payment_transaction_id"),
    "metadata"
)

# Explode metadata list
df = df.withColumn("metadata_item", explode_outer("metadata"))

# Flatten metadata
df = df.select(
    "order_id", "timestamp", "customer_id", "customer_name", "customer_email",
    "customer_city", "customer_country", "customer_postal_code",
    "item_id", "item_price", "item_product_name", "item_quantity",
    "payment_method", "payment_transaction_id",
    col("metadata_item.key").alias("metadata_key"),
    col("metadata_item.value").alias("metadata_value")
)

In [0]:
df.writeStream.format("delta")\
        .outputMode("append")\
        .trigger(once=True)\
        .option("path", "/Volumes/workspace/stream/streaming/jsonsink/Data")\
        .option("checkpointLocation", "/Volumes/workspace/stream/streaming/jsonsink/checkpoint")\
        .start()



In [0]:
# # Cleaner
# dbutils.fs.rm("/Volumes/workspace/stream/streaming/jsonsink/Data", recurse=True)
# dbutils.fs.rm("/Volumes/workspace/stream/streaming/jsonsink/checkpoint", recurse=True)

.trigger(once=True)\
Usually triggers every amount of period

In [0]:
%sql
SELECT * FROM delta.`/Volumes/workspace/stream/streaming/jsonsink/Data/`

### Idempotency
Spark will only process new information.
Checkpoint saves what has been processed so we don't have to do it again.

### how does it work?
- each query is assigned an id
- when a query finishes pretend it's being interrupted
- when another query is executed it will start off where the previous id query was interrupted

### commits folder in checkpoint
it tracks all the files
(doesn't work for delta files as a source)
- this folder remembers which files have been read
- it remembers names, so deleting and reuploading the same file or uploading a dupe will still be ignored when processing
- it only checks if the whole file is read
- **NEVER TOUCH THIS. managed by spark and if you touch it it will break**

### triggers
#### default mode
imagine a micro batch
the next micro batch will be processed once the current one is finished

#### processing time
```
.trigger(processingTime="10 seconds")
```
when the source receives any data a timer starts for it to check if anything new came in.

In this case, it will check the source for new data, then start processing, then start the 10 second timer, once it hits 10 seconds, it checks the source folder again and repeats this process from the beginning.

#### Once
load all the data and process all together
basically batch processing
good when starting out and testing

#### Available Now
load all the data, in one MB(micro batch). 
Divides into a lot of smaller microbatches to stream it into the process
better than ONCE

#### Continuous !!
most recently added to spark
only supports append only.

process data row by row
```
.trigger(continuous='1 second')
```
the period of time is for checkpoint Location. It does not stand for processing time.

### archiving source files
To keep the source folder clean, processed files are moved to Archive directory.
Only files that are processed go into archives. So, duplicates will not end up there.



# Archiving example


In [0]:
from pyspark.sql.functions import explode_outer, col

# Streaming query
df = spark.readStream.format("json")\
    .option("multiline", True)\
    .schema(my_schema)\
    .option("cleanSource", "archive")\
    .option("sourceArchiveDir", "/Volumes/workspace/stream/streaming/jsonsourcearchive")\
    .load("/Volumes/workspace/stream/streaming/jsonsourcenew")

# Flatten customer
df = df.select(
    "order_id", "timestamp",
    col("customer.customer_id").alias("customer_id"),
    col("customer.name").alias("customer_name"),
    col("customer.email").alias("customer_email"),
    col("customer.address.city").alias("customer_city"),
    col("customer.address.country").alias("customer_country"),
    col("customer.address.postal_code").alias("customer_postal_code"),
    "items", "payment", "metadata"
)

# Explode items list
df = df.withColumn("item", explode_outer("items"))

# Flatten items
df = df.select(
    "order_id", "timestamp", "customer_id", "customer_name", "customer_email",
    "customer_city", "customer_country", "customer_postal_code",
    col("item.item_id").alias("item_id"),
    col("item.price").alias("item_price"),
    col("item.product_name").alias("item_product_name"),
    col("item.quantity").alias("item_quantity"),
    "payment", "metadata"
)

# Flatten payment
df = df.select(
    "order_id", "timestamp", "customer_id", "customer_name", "customer_email",
    "customer_city", "customer_country", "customer_postal_code",
    "item_id", "item_price", "item_product_name", "item_quantity",
    col("payment.method").alias("payment_method"),
    col("payment.transaction_id").alias("payment_transaction_id"),
    "metadata"
)

# Explode metadata list
df = df.withColumn("metadata_item", explode_outer("metadata"))

# Flatten metadata
df = df.select(
    "order_id", "timestamp", "customer_id", "customer_name", "customer_email",
    "customer_city", "customer_country", "customer_postal_code",
    "item_id", "item_price", "item_product_name", "item_quantity",
    "payment_method", "payment_transaction_id",
    col("metadata_item.key").alias("metadata_key"),
    col("metadata_item.value").alias("metadata_value")
)

In [0]:
df.writeStream.format("delta")\
        .outputMode("append")\
        .trigger(once=True)\
        .option("path", "/Volumes/workspace/stream/streaming/jsonsinknew/Data")\
        .option("checkpointLocation", "/Volumes/workspace/stream/streaming/jsonsinknew/checkpoint")\
        .start()

