In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
import os
import findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

findspark.init()

In [3]:
from google.colab import drive
drive.mount("/content/gdrive")

Mounted at /content/gdrive


In [4]:
from pyspark import SparkConf
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

config = SparkConf().setMaster('local').setAppName('lab53')
spark = SparkSession.builder.config(conf=config).config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .config("spark.sql.streaming.schemaInference", "true").getOrCreate()
sc = spark.sparkContext

DATASET_PATH = '/content/gdrive/MyDrive/'

In [5]:
  # Đọc dữ liệu từ file
  # Bạn cần config để Spark chỉ đọc nhiều nhất là 1 file mỗi batch
  raw_df = spark.readStream \
      .format("json") \
      .option("path", DATASET_PATH) \
      .option("maxFilesPerTrigger", "1")\
      .option("cleanSource", "delete")\
      .load()

  raw_df.printSchema()

  # Làm phẳng các phần tử của mảng InvoiceLineItems, bạn có thể sử dụng explode.
  explode_df = raw_df.selectExpr("InvoiceNumber", "CreatedTime", "StoreID", "PosID",
                                  "CustomerType", "PaymentMethod", "DeliveryType",
                                  "DeliveryAddress.City", "DeliveryAddress.State",
                                  "DeliveryAddress.Pincode", "explode(InvoiceLineItems) as LineItems")
  explode_df.printSchema()


  # Lấy các dữ liệu từ Dataframe
  flattened_df = explode_df \
      .withColumn("ItemCode", expr("LineItems.ItemCode")) \
      .withColumn("ItemDescription", expr("LineItems.ItemDescription")) \
      .withColumn("ItemPrice", expr("LineItems.ItemPrice")) \
      .withColumn("ItemQty", expr("LineItems.ItemQty")) \
      .withColumn("TotalValue", expr("LineItems.TotalValue")) \
      .drop("LineItem")

  # Lưu dữ liệu và tạo các checkpoint
  # Bạn hãy thiết lập để mỗi lần trigger cách nhau 1 phút và Output mode là Append
  invoiceWriterQuery = flattened_df.writeStream \
      .format("json") \
      .queryName("Flattened Invoice Writer") \
      .outputMode("append") \
      .option("path", "output/result") \
      .option("checkpointLocation", "chk-point-dir") \
      .trigger(processingTime="1 minute")\
      .start()

  print("Flattened Invoice Writer started")
  invoiceWriterQuery.awaitTermination()

root
 |-- CESS: double (nullable = true)
 |-- CGST: double (nullable = true)
 |-- CashierID: string (nullable = true)
 |-- CreatedTime: long (nullable = true)
 |-- CustomerCardNo: string (nullable = true)
 |-- CustomerType: string (nullable = true)
 |-- DeliveryAddress: struct (nullable = true)
 |    |-- AddressLine: string (nullable = true)
 |    |-- City: string (nullable = true)
 |    |-- ContactNumber: string (nullable = true)
 |    |-- PinCode: string (nullable = true)
 |    |-- State: string (nullable = true)
 |-- DeliveryType: string (nullable = true)
 |-- InvoiceLineItems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ItemCode: string (nullable = true)
 |    |    |-- ItemDescription: string (nullable = true)
 |    |    |-- ItemPrice: double (nullable = true)
 |    |    |-- ItemQty: long (nullable = true)
 |    |    |-- TotalValue: double (nullable = true)
 |-- InvoiceNumber: string (nullable = true)
 |-- NumberOfItems: long (nullable = t

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/content/spark-3.4.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/content/spark-3.4.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored