In [1]:
from pyspark.sql import SparkSession
import os
from dotenv import load_dotenv
load_dotenv()


spark = SparkSession.builder \
    .appName("MongoDBIntegration") \
    .master("local")\
    .config("spark.jars", 
            r"C:\Users\chopp\mongo-spark-connector_2.12-10.4.0-all.jar,"
            r"C:\Users\chopp\mongodb-driver-core-5.2.0-javadoc.jar,"
            r"C:\Users\chopp\bson-5.2.0-javadoc.jar," 
            r"C:\Users\chopp\postgresql-42.2.20.jar") \
    .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/mydb") \
    .config("spark.mongodb.write.connection.uri", "mongodb://localhost:27017/mydb") \
    .getOrCreate()

spark.stop()

In [2]:
from pyspark.sql import SparkSession


mongo_uri = os.getenv("MONGO_URI")
mongo_database = os.getenv("MONGO_DATABASE")
mongo_collection = os.getenv("MONGO_COLLECTION")


spark = SparkSession.builder \
    .appName("MongoDBIntegration") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.0.0,org.postgresql:postgresql:42.3.1") \
    .config("spark.mongodb.read.connection.uri", mongo_uri) \
    .config("spark.mongodb.write.connection.uri", mongo_uri) \
    .getOrCreate()

df = spark.read \
    .format("mongodb") \
    .option("database", mongo_database) \
    .option("collection", mongo_collection) \
    .load()


df.show()



+--------------------+------------+------------+--------------------+----------+
|                 _id|customername|phone_number|             product| sale_date|
+--------------------+------------+------------+--------------------+----------+
|67023f84e36a1dfed...|    chaninat|  0812588636|[{SAMSUNG, 1, 200...|2024-10-05|
|67024ba390f757aa4...|   pongpatin|  0814523682|[{SAMSUNG, 14, 52...|2024-10-06|
|6702504b939259ddd...|    kongthai|  0971456202|[{SAMSUNG, 1, 300...|2024-10-31|
|670251b0939259ddd...|   kongthai2|  0945603473|[{SAMSUNG, 1, 500...|2024-10-06|
|670252cd939259ddd...|   kongthai3|  0823654869| [{OPPO, 1, 1000.0}]|2024-10-06|
|6703fe6e9e66d7833...|  thanathorn|  0945603471|[{SAMSUNG, 20, 50...|2024-10-07|
|670402c84ed411e46...|       payut|  0821478255| [{ASUS, 3, 4500.0}]|2024-10-07|
|670403ab9e5a6e7d9...|    chaniant|  0812588636| [{IPHONE, 3, 15.0}]|2024-10-07|
|670404eed31f93443...|       tommy|  0921473250|[{RAZER, 1, 5000....|2024-10-07|
|670405d7195d992aa...|     c

In [3]:
from pyspark.sql.functions import explode, split, col
df_product = df.withColumn("product", explode(col("product")))\
.select("customername","sale_date",col("product.product").alias("band"),col("product.unit").alias("unit"),col("product.price").alias("price"))
df_product.show(truncate=False)

df_information = df.select("sale_date","customername","phone_number")
df_information.show(truncate=False)

+------------+----------+-------+----+-------+
|customername|sale_date |band   |unit|price  |
+------------+----------+-------+----+-------+
|chaninat    |2024-10-05|SAMSUNG|1   |2000.0 |
|pongpatin   |2024-10-06|SAMSUNG|14  |52120.0|
|kongthai    |2024-10-31|SAMSUNG|1   |3000.0 |
|kongthai    |2024-10-31|RAZER  |5   |20000.0|
|kongthai    |2024-10-31|IPHONE |1   |29000.0|
|kongthai2   |2024-10-06|SAMSUNG|1   |5000.0 |
|kongthai3   |2024-10-06|OPPO   |1   |1000.0 |
|thanathorn  |2024-10-07|SAMSUNG|20  |50000.0|
|payut       |2024-10-07|ASUS   |3   |4500.0 |
|chaniant    |2024-10-07|IPHONE |3   |15.0   |
|tommy       |2024-10-07|RAZER  |1   |5000.0 |
|tommy       |2024-10-07|REALME |2   |7000.0 |
|chopper     |2024-10-07|REALME |1   |2000.0 |
|ja          |2024-10-07|VIVO   |1   |3500.0 |
|ืnarin      |2024-10-07|OPPO   |1   |1000.0 |
|hotjang     |2024-09-30|IPHONE |1   |25000.0|
|hotjang     |2024-09-30|VIVO   |2   |5000.0 |
|pangkung    |2024-10-07|IPHONE |1   |22000.0|
|ying        

In [4]:
def write_to_postgresql(df, table_name):
    df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/projectmongodb") \
        .option("dbtable", table_name) \
        .option("user", "postgres") \
        .option("password", "1160") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()
    
write_to_postgresql(df_product, "product")
write_to_postgresql(df_information, "information")



In [None]:
# CREATE OR REPLACE FUNCTION prevent_duplicate_product()
# RETURNS TRIGGER AS $$
# BEGIN
#     -- ตรวจสอบว่ามีข้อมูลซ้ำในตารางหรือไม่
#     IF EXISTS (
#         SELECT 1 FROM product
#         WHERE customername = NEW.customername
#         AND sale_date = NEW.sale_date
#         AND band = NEW.band
#         AND unit = NEW.unit
#         AND price = NEW.price
#     ) THEN
#         -- ถ้ามีข้อมูลซ้ำ ให้ตัดข้อมูลนี้ออก
#         RETURN NULL; 
#     END IF;
    
#     -- ถ้าไม่มีข้อมูลซ้ำ ให้แทรกข้อมูลใหม่
#     RETURN NEW; 
# END;
# $$ LANGUAGE plpgsql;



# CREATE TRIGGER trigger_prevent_duplicate_product
# BEFORE INSERT ON product
# FOR EACH ROW EXECUTE FUNCTION prevent_duplicate_product();
