In [1]:
%pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp311-cp311-win_amd64.whl (1.2 MB)
     ---------------------------------------- 1.2/1.2 MB 1.0 MB/s eta 0:00:00
Installing collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.3.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
from pyspark import SparkConf, pandas as ps
from pyspark.sql import SparkSession, functions as sf, Window
from pyspark.sql.functions import col, collect_list
from pyspark.sql.types import *
import psycopg2
import time
import os
import glob



In [2]:
conf = SparkConf().set("spark.driver.memory", "8g")

spark_session = SparkSession\
    .builder\
    .master("local")\
    .config(conf=conf)\
    .appName("ETL") \
    .getOrCreate()

In [3]:
oltp_params = {
    'user': 'admin',
    'password': 'password',
    "driver": "org.postgresql_postgresql-42.7.3.jar"
}

oltp_url = "jdbc:postgresql://rfm-segmentation-oltp-db-1:5432/trans_oltp"

In [4]:
# print(spark_session.sparkContext._jvm.java.sql.DriverManager.getDrivers())
print(spark_session.sparkContext._jsc.sc().listJars())

Vector(spark://5ab2b1b817ac:43457/jars/org.checkerframework_checker-qual-3.42.0.jar, spark://5ab2b1b817ac:43457/jars/org.postgresql_postgresql-42.7.3.jar)


In [6]:
df = spark_session.read.format('jdbc').option('url', oltp_url).option('dbtable', "trans").option('user', 'admin').option('password', 'password').option('driver', "org.postgresql.Driver").load()

df.show()


+----------+----------+--------------------+--------------+-------------+-----------------+----------------+--------------------+-------+
|  trans_id|trans_date|          total_cost|payment_method|         city|       store_type|discount_applied|           promotion|cust_id|
+----------+----------+--------------------+--------------+-------------+-----------------+----------------+--------------------+-------+
|1000299020|2021-07-03|78.20000000000000...|Mobile Payment|        Miami| Department Store|           false|Discount on Selec...|  33877|
|1000342674|2021-10-25|11.60000000000000...|Mobile Payment|San Francisco|      Supermarket|            true|BOGO (Buy One Get...|  33877|
|1000806142|2020-05-25|90.06000000000000...|    Debit Card|      Chicago|   Warehouse Club|            true|                None| 629479|
|1000008294|2020-07-27|90.77000000000000...|          Cash|San Francisco| Department Store|            true|BOGO (Buy One Get...| 143811|
|1000641795|2020-05-22|68.82000000

In [11]:
product_dim = spark_session.read.format('jdbc').option('url', oltp_url).option('dbtable', "product").option('user', 'admin').option('password', 'password').option('driver', "org.postgresql.Driver").load()
customer_dim = spark_session.read.format('jdbc').option('url', oltp_url).option('dbtable', "cust").option('user', 'admin').option('password', 'password').option('driver', "org.postgresql.Driver").load()
tran_prod_fact = spark_session.read.format('jdbc').option('url', oltp_url).option('dbtable', "trans_prod").option('user', 'admin').option('password', 'password').option('driver', "org.postgresql.Driver").load()
tran_dim = spark_session.read.format('jdbc').option('url', oltp_url).option('dbtable', "trans").option('user', 'admin').option('password', 'password').option('driver', "org.postgresql.Driver").load()


In [16]:
tran_prod_fact.describe()


DataFrame[summary: string, product: string, trans_id: string, product_id: string]

In [17]:
customer_dim.describe()


DataFrame[summary: string, customer_name: string, customer_category: string, cust_id: string]

In [18]:
product_dim.describe()


DataFrame[summary: string, product: string, product_id: string]

In [19]:
tran_dim.describe()

DataFrame[summary: string, trans_id: string, total_cost: string, payment_method: string, city: string, store_type: string, promotion: string, cust_id: string]

In [22]:
tran_prod_fact2 = tran_prod_fact.join(other=tran_dim, on='trans_id', how='left').select(['trans_id', 'product_id', 'trans_date', 'cust_id'])
tran_prod_fact2.show()

+----------+----------+----------+-------+
|  trans_id|product_id|trans_date|cust_id|
+----------+----------+----------+-------+
|1000001491|         2|2022-10-29| 315863|
|1000010824|         2|2022-01-06| 423272|
|1000025357|         2|2023-11-30| 189014|
|1000027766|         1|2020-11-11| 398088|
|1000028663|         1|2023-08-22| 513663|
|1000032837|         1|2023-03-24| 333029|
|1000036123|         1|2020-11-26|  27619|
|1000038805|         1|2021-09-23| 378306|
|1000040863|         2|2022-04-28| 434641|
|1000043591|         1|2021-07-26|  65245|
|1000050636|         2|2023-08-20|  38189|
|1000050860|         2|2020-08-30|  30304|
|1000055410|         1|2021-07-31|  89984|
|1000055491|         1|2024-03-09| 431868|
|1000059460|         1|2021-09-05| 318968|
|1000060350|         1|2021-05-22| 228746|
|1000062227|         2|2022-12-03|  98458|
|1000074421|         1|2020-09-18|  54856|
|1000091287|         2|2023-02-11| 107278|
|1000099710|         2|2020-06-10| 245685|
+----------

In [23]:
tran_dim2= tran_dim.drop('cust_id')
tran_dim2.show(15)

+----------+----------+--------------------+--------------+-------------+-----------------+----------------+--------------------+
|  trans_id|trans_date|          total_cost|payment_method|         city|       store_type|discount_applied|           promotion|
+----------+----------+--------------------+--------------+-------------+-----------------+----------------+--------------------+
|1000299020|2021-07-03|78.20000000000000...|Mobile Payment|        Miami| Department Store|           false|Discount on Selec...|
|1000342674|2021-10-25|11.60000000000000...|Mobile Payment|San Francisco|      Supermarket|            true|BOGO (Buy One Get...|
|1000806142|2020-05-25|90.06000000000000...|    Debit Card|      Chicago|   Warehouse Club|            true|                None|
|1000008294|2020-07-27|90.77000000000000...|          Cash|San Francisco| Department Store|            true|BOGO (Buy One Get...|
|1000641795|2020-05-22|68.82000000000000...|Mobile Payment|       Boston|  Specialty Store

In [25]:
olap_params = {
    'host': 'rfm-segmentation-olap-db-1',
    'database': 'trans_olap',
    'user': 'admin',
    'password': 'password',
    'port': '5432'
}



In [30]:
def insert_from_df(df, table, conn=psycopg2.connect(**olap_params)):
    tmp = './tmp'
    df.write.csv(tmp,mode='overwrite')
    tmp_files = glob.glob('./tmp/*.csv')
    print(tmp_files)
    cursor = conn.cursor()

    try:
        for file in tmp_files:
            with open(file, 'r') as f:
                cursor.copy_from(f, table, sep=",", )
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as e:
        [os.remove(file) for file in tmp_files]
        print("Error: %s" % e)
        conn.rollback()
        cursor.close()
        return 1
    print("Table %s copied from dataframe" % table)
    cursor.execute(F"Select * FROM {table} LIMIT 5;")
    cursor.fetchall()
    cursor.close()
    [os.remove(file) for file in tmp_files]


In [31]:
insert_from_df(df=tran_dim2, table="trans_dim")
insert_from_df(df=customer_dim, table="cust_dim")
insert_from_df(df=product_dim.select(["prod_id", "product"]), table="prod_dim")
insert_from_df(df=tran_prod_fact, table="trans_prod_fact")


['./tmp/part-00000-6aaba813-94f9-45bb-8f0c-153e8fb025d6-c000.csv']
Error: relation "trans_dim" does not exist

['./tmp/part-00000-bb1aa290-3e5f-4db7-b453-3f5ff796ea5c-c000.csv']
Error: duplicate key value violates unique constraint "cust_dim_pkey"
DETAIL:  Key (cust_id)=(David Melton) already exists.
CONTEXT:  COPY cust_dim, line 151



AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `prod_id` cannot be resolved. Did you mean one of the following? [`product`, `product_id`].;
'Project ['prod_id, product#82]
+- Relation [product#82,product_id#83] JDBCRelation(product) [numPartitions=1]
