# Lab 8.1: FileStreamDemo

## Tổng quan bài tập
**Đề bài**: Hãy hoàn thiện các phần `[...]` để hoàn thiện đoạn code và giải quyết bài toán theo yêu cầu.

## Tài nguyên tham khảo

Bạn có thể tải tập Dataset tại [link sau](https://drive.google.com/drive/folders/1cjoBjN2TyJ99A1NrZPNgqEkrAkPIaWDy?usp=sharing). Sau đó đưa lên Google Drive và kết nối với Colab là có thể sử dụng được. Tập dữ liệu là 3 file .json với cấu trúc như sau:
```
root
 |-- CESS: double (nullable = true)
 |-- CGST: double (nullable = true)
 |-- CashierID: string (nullable = true)
 |-- CreatedTime: long (nullable = true)
 |-- CustomerCardNo: string (nullable = true)
 |-- CustomerType: string (nullable = true)
 |-- DeliveryAddress: struct (nullable = true)
 |    |-- AddressLine: string (nullable = true)
 |    |-- City: string (nullable = true)
 |    |-- ContactNumber: string (nullable = true)
 |    |-- PinCode: string (nullable = true)
 |    |-- State: string (nullable = true)
 |-- DeliveryType: string (nullable = true)
 |-- InvoiceLineItems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ItemCode: string (nullable = true)
 |    |    |-- ItemDescription: string (nullable = true)
 |    |    |-- ItemPrice: double (nullable = true)
 |    |    |-- ItemQty: long (nullable = true)
 |    |    |-- TotalValue: double (nullable = true)
 |-- InvoiceNumber: string (nullable = true)
 |-- NumberOfItems: long (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- PosID: string (nullable = true)
 |-- SGST: double (nullable = true)
 |-- StoreID: string (nullable = true)
 |-- TaxableAmount: double (nullable = true)
 |-- TotalAmount: double (nullable = true)

```


## Chuẩn bị

Cài đặt miniconda3 để tạo kernel cho jupyter notebook.

Tạo môi trường .conda để chạy jupyter ( conda đang chạy Python 3.12.1, pyspark 3.5.0).

# FileStreamDemo

Bạn sẽ cần khởi tạo 1 SparkSesson để có thể bắt đầu Spark.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

spark = SparkSession \
    .builder \
    .appName("File Streaming Demo") \
    .master("local[3]") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .getOrCreate()

sc = spark.sparkContext

Bạn hãy viết một chương trình Spark Streamming để đọc dữ liệu từ các file đó và trích xuất dữ liệu từ các trường `InvoiceLineItems` về dạng dữ liệu có cấu trúc như sau:
```
root
 |-- ItemCode: string (nullable = true)
 |-- ItemDescription: string (nullable = true)
 |-- ItemPrice: double (nullable = true)
 |-- ItemQty: long (nullable = true)
 |-- TotalValue: double (nullable = true)
```
Ví dụ:
```
+--------+-----------------+---------+-------+----------+
|ItemCode|  ItemDescription|ItemPrice|ItemQty|TotalValue|
+--------+-----------------+---------+-------+----------+
|     258|           Closet|   1687.0|      2|    3374.0|
|     538|Grandmother clock|   1301.0|      1|    1301.0|
|     528| Projection clock|   2365.0|      1|    2365.0|
|     673|    Dough scraper|    980.0|      2|    1960.0|
|     593| Hanging curtains|   1896.0|      2|    3792.0|
+--------+-----------------+---------+-------+----------+
```

In [4]:
# Đọc dữ liệu từ file
# Bạn cần config để Spark chỉ đọc nhiều nhất là 1 file mỗi batch
raw_df = spark.readStream \
	.option("maxFilesPerTrigger", 1) \
	.json("lab8_input")

# Làm phẳng các phần tử của mảng InvoiceLineItems, bạn có thể sử dụng explode.
explode_df = raw_df.selectExpr("explode(InvoiceLineItems) as LineItem")

# Lấy các dữ liệu từ Dataframe
flattened_df = explode_df \
	.withColumn("ItemCode", expr("LineItem.ItemCode")) \
	.withColumn("ItemDescription", expr("LineItem.ItemDescription")) \
	.withColumn("ItemPrice", expr("LineItem.ItemPrice")) \
	.withColumn("ItemQty", expr("LineItem.ItemQty")) \
	.withColumn("TotalValue", expr("LineItem.TotalValue")) \
	.drop("LineItem")

# Lưu dữ liệu và tạo các checkpoint
# Bạn hãy thiết lập để mỗi lần trigger cách nhau 1 phút và Output mode là Append
invoiceWriterQuery = flattened_df.writeStream \
	.format("json") \
	.queryName("Flattened Invoice Writer") \
	.outputMode("append") \
	.option("path", "lab8_output") \
	.option("checkpointLocation", "lab8_chk-point-dir") \
	.trigger(processingTime="1 minute") \
	.start()

print("Flattened Invoice Writer started")
invoiceWriterQuery.awaitTermination()

Flattened Invoice Writer started
