In [1]:
import sys
sys.path.append("./work")

from operators.streaming import SparkStreaming
import pyspark.sql.functions as F

In [2]:
spark = SparkStreaming.get_instance(app_name="Streamify Spark Streaming")

In [3]:
spark

In [11]:
rental_df = (spark.read
                    .format("delta")
                    .load("s3a://lakehouse/streaming/streamify/rental")
                    .dropDuplicates())
print(rental_df.count())
rental_df.printSchema()

66
root
 |-- rental_id: integer (nullable = true)
 |-- rental_date: string (nullable = true)
 |-- inventory_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- return_date: string (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- last_update: string (nullable = true)
 |-- rental_year: integer (nullable = true)
 |-- rental_month: integer (nullable = true)



In [12]:
rental_df.show(5)

+---------+-------------------+------------+-----------+-------------------+--------+-------------------+-----------+------------+
|rental_id|        rental_date|inventory_id|customer_id|        return_date|staff_id|        last_update|rental_year|rental_month|
+---------+-------------------+------------+-----------+-------------------+--------+-------------------+-----------+------------+
|   281500|2010-01-03 09:27:25|        2586|        446|2010-01-05 07:42:24|      64|2010-01-02 08:33:44|       2010|           1|
|   281483|2010-01-02 07:26:19|        3135|        591|2010-01-07 05:53:43|      75|2010-01-01 10:18:55|       2010|           1|
|   281536|2010-01-09 00:38:36|           3|        255|2010-01-13 17:26:34|     115|2010-01-04 10:29:16|       2010|           1|
|   281494|2010-01-03 01:29:12|        1229|        312|2010-01-04 00:13:16|      71|2010-01-02 03:04:21|       2010|           1|
|   281489|2010-01-02 15:15:09|        2387|        472|2010-01-06 13:17:17|      1

In [5]:
payment_df = (spark.read
                    .format("delta")
                    .load("s3a://lakehouse/streamify.db/payment")
                    .drop("last_update")
                    .dropDuplicates())
print(payment_df.count())
payment_df.printSchema()

144
root
 |-- rental_id: integer (nullable = true)
 |-- payment_date: timestamp (nullable = true)
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- payment_month: integer (nullable = true)
 |-- payment_year: integer (nullable = true)



In [14]:
join_df = payment_df.join(rental_df, on=["rental_id"], how="inner")
print(join_df.count())
join_df.show(5)

64
+---------+-------------------+----------+-----------+------+--------+------------+-------------+-------------------+------------+-----------+-------------------+--------+-------------------+-----------+------------+
|rental_id|       payment_date|payment_id|customer_id|amount|staff_id|payment_year|payment_month|        rental_date|inventory_id|customer_id|        return_date|staff_id|        last_update|rental_year|rental_month|
+---------+-------------------+----------+-----------+------+--------+------------+-------------+-------------------+------------+-----------+-------------------+--------+-------------------+-----------+------------+
|   281500|2010-01-05 07:42:24|     40261|        446|  39.7|      64|        2010|            1|2010-01-03 09:27:25|        2586|        446|2010-01-05 07:42:24|      64|2010-01-02 08:33:44|       2010|           1|
|   281483|2010-01-07 05:53:43|     40244|        591|  39.7|      75|        2010|            1|2010-01-02 07:26:19|        3135

In [6]:
customer_df = spark.read.csv("s3a://lakehouse/csv/customer.csv", header=True, inferSchema=True, samplingRatio=0.1)
address_df = spark.read.csv("s3a://lakehouse/csv/address.csv", header=True, inferSchema=True, samplingRatio=0.1)
city_df = spark.read.csv("s3a://lakehouse/csv/city.csv", header=True, inferSchema=True, samplingRatio=0.1)
country_df = spark.read.csv("s3a://lakehouse/csv/country.csv", header=True, inferSchema=True, samplingRatio=0.1)

In [10]:
spark.sql("CREATE DATABASE streamify").show()

++
||
++
++



In [11]:
customer_df.write.format("delta").mode("overwrite").saveAsTable("streamify.customer")
address_df.write.format("delta").mode("overwrite").saveAsTable("streamify.address")
city_df.write.format("delta").mode("overwrite").saveAsTable("streamify.city")
country_df.write.format("delta").mode("overwrite").saveAsTable("streamify.country")

# Synchronize data between MinIO and Trino

In [10]:
# spark.sql("DROP DATABASE test_db CASCADE")
spark.sql("USE streamify")
# spark.sql("DROP TABLE IF EXISTS payment")
spark.sql("DELETE FROM rental")
spark.sql("SHOW TABLES").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|streamify|  address|      false|
|streamify|     city|      false|
|streamify|  country|      false|
|streamify| customer|      false|
|streamify|  payment|      false|
|streamify|   rental|      false|
+---------+---------+-----------+



In [6]:
temp_df = (payment_df.drop("payment_month")
                    .withColumn("amount", F.round(F.col("amount")*0.5, 2)))

In [6]:
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS streamify.payment (
        rental_id integer,
        payment_date timestamp,
        payment_id integer,
        customer_id integer,
        amount double,
        staff_id integer,
        payment_month integer
    )
    USING DELTA
    PARTITIONED BY(payment_year integer)
    LOCATION 's3a://lakehouse/streamify.db/payment'
""")

DataFrame[]

In [16]:
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS streamify.rental (
        rental_id integer,
        rental_date timestamp,
        inventory_id integer,
        customer_id integer,
        return_date timestamp,
        staff_id integer,
        last_update timestamp,
        rental_month integer
    )
    USING DELTA
    PARTITIONED BY(rental_year integer)
    LOCATION 's3a://lakehouse/streamify.db/rental'
""")

DataFrame[]

In [11]:
(spark.read.format("delta").load("s3a://lakehouse/streamify.db/rental").count())

0

In [11]:
# spark.sql("SELECT * FROM test.payment02").count()
spark.sql("DESCRIBE EXTENDED test.payment03").show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|           rental_id|                 int|       |
|        payment_date|           timestamp|       |
|          payment_id|                 int|       |
|         customer_id|                 int|       |
|              amount|              double|       |
|            staff_id|                 int|       |
|        payment_year|                 int|       |
|                    |                    |       |
|      # Partitioning|                    |       |
|              Part 0|        payment_year|       |
|                    |                    |       |
|# Detailed Table ...|                    |       |
|                Name|      test.payment03|       |
|            Location|s3a://lakehouse/t...|       |
|            Provider|               delta|       |
|               Owner|              jovyan|       |
|           

## Register Delta table to Hive Metastore

- Register Delta table to Hive Metastore

In [10]:
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS test.payment03 (
        rental_id integer,
        payment_date timestamp,
        payment_id integer,
        customer_id integer,
        amount double,
        staff_id integer
    )
    USING DELTA
    PARTITIONED BY(payment_year integer)
    LOCATION 's3a://lakehouse/test.db/payment03'
""")

spark.sql("SELECT * FROM test.payment03").show()
# spark.sql("DESCRIBE EXTENDED streamify.delta_payment").show()

+---------+-------------------+----------+-----------+------+--------+------------+
|rental_id|       payment_date|payment_id|customer_id|amount|staff_id|payment_year|
+---------+-------------------+----------+-----------+------+--------+------------+
|   281634|2010-01-22 22:58:32|     40380|        437|   3.0|      68|        2010|
|   281487|2010-01-05 11:33:41|     40248|        468|  37.7|      68|        2010|
|   281624|2010-01-20 09:18:39|     40371|        317|  38.4|      71|        2010|
|   281597|2010-01-18 01:41:08|     40346|        366| 37.25|      63|        2010|
|   281523|2010-01-11 08:52:02|     40280|        411|  2.25|     123|        2010|
|   281619|2010-01-22 03:53:38|     40368|        139| 30.95|      65|        2010|
|   281511|2010-01-04 23:24:08|     40271|        581| 28.35|      15|        2010|
|   281496|2010-01-08 02:53:12|     40257|        309|  2.65|      62|        2010|
|   281541|2010-01-10 05:09:41|     40297|        169|  6.05|      17|      

In [7]:
(temp_df.write
        .format("delta")
        .partitionBy("payment_year")
        .option("path", "s3a://lakehouse/test.db/payment03")
        .mode("append")
        .save())

**How to write streaming data to Delta table**

```python
# Cấu hình streaming query
write_stream = (payment_df.writeStream
                        .format("delta")
                        .partitionBy(partitioned_col)
                        .option("checkpointLocation", checkpoint_path)
                        .option("path", storage_path)
                        .trigger(processingTime=trigger)
                        .outputMode(output_mode)
                        .start())
```

- Register Hive table to Hive Metastore

In [None]:
(temp_df.write
        .mode("append")
        .insertInto("streamify.hive_payment"))

In [None]:
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS streamify.hive_payment (
        rental_id integer,
        payment_date string,
        payment_id integer,
        customer_id integer,
        amount double,
        staff_id integer
    )
    PARTITIONED BY(payment_year integer)
    LOCATION 's3a://lakehouse/streamify.db/hive_payment'
""")

spark.sql("SELECT * FROM streamify.hive_payment").show(5)
spark.sql("DESCRIBE EXTENDED streamify.hive_payment").show()

**How to write streaming data to Hive table**

```python
def write_to_hive_table(batch_df, batch_id):
    # Ghi dữ liệu vào Hive table (append mode)
    (batch_df.write
            .mode("append")
            .insertInto("streamify.hive_payment"))

# Cấu hình streaming query
write_stream = (payment_df.writeStream
                        .foreachBatch(write_to_hive_table)
                        .option("checkpointLocation", checkpoint_path)
                        .trigger(processingTime=trigger)
                        .outputMode(output_mode)
                        .start())
```

In [None]:
spark.sql("SHOW SCHEMAS FROM hive")

In [5]:
spark.stop()