# DATA EXPLORATION AND CLEANING WITH PYSPARK

## Notes

In [None]:
from IPython.display import display, Image 
display(Image("spark_architecture1.png"))
display(Image("spark_architecture.png"))

- **Advantages of Spark :** Spark  is  known  to  handles  large-scale  data  processing  and  it  has  efficient  architecture 
(Driver, Executors, Cluster Manager).
>- Speed: Spark performs up to 100 times faster than MapReduce for processing large amounts of data. It is also able to divide the data into chunks in a controlled way.
>- Powerful Caching: Powerful caching and disk persistence capabilities are offered by a simple programming layer.(in memory)
>- Deployment: Mesos, Hadoop via YARN, or Spark’s own cluster manager can all be used to deploy it.
>- Real-Time: Because of its in-memory processing, it offers real-time computation and low latency.
>- Polyglot:  In  addition  to  Java,  Scala,  Python,  and  R,  Spark  also  supports  all  four  of these languages. You can write spark code in any one of these languages. 
>- Spark also provides a command-line interface in Scala and Python.

-  **Cleaning Techniques in Spark**
>- Filtering data.
>- Aggregations and GroupBy.
>- Joining DataFrames

## Case Study

### Case Study Summary: Data Exploration & Cleaning with PySpark

**1. Overview**
This case study highlights how **Nuga Bank**, a prominent financial institution, adopted **PySpark** to enhance its data exploration and cleaning workflows. The initiative focused on automating data preparation to support better analysis and decision-making.

---

**2. Key Challenges**
- Manual, time-consuming data cleaning processes.
- Poor scalability with increasing data volume.
- Inconsistent data quality impacting analysis.
- Difficulty transforming raw data into structured formats.

---

**3. Objectives**
- Implement automated data exploration and cleaning using **PySpark**.
- Normalize datasets to **2NF or 3NF** for better integrity.
- Load structured data into a **PostgreSQL** database for reporting and analysis.

---

**4. Implementation Steps**

- 1. **Data Extraction**
>- Setup **Spark Context** for distributed computing.
>- Load CSV data into **PySpark DataFrame**.

- 2. **Data Transformation**
>- Handle missing values, duplicates, and inconsistent formats.
>- Normalize data into relational schema.

- 3. **Data Loading**
>- Store the cleaned and normalized data in a **PostgreSQL Server**.

---

**5. Benefits**
>- **Efficiency**: Reduced manual tasks via automation.
>- **Scalability**: Able to process large datasets with ease.
>- **Data Quality**: Standardized and consistent data cleaning.
>- **Structured Database**: Easier data management and querying.
>- **Team Collaboration**: Shared workflows improved communication and efficiency.

---

**6. Tech Stack**
>- **Programming Languages**: Python, SQL  
>- **Processing Framework**: PySpark  
>- **Database**: PostgreSQL  

---

**7. 🔗 Data Source**
>- [Google Drive - Dataset](https://drive.google.com/file/d/1WvnxUWIUQcRSXB5so7-PoGPxU9ekSkYb/view?usp=sharing)

---

> *The project will successfully automate and scale data exploration and cleaning, empowering Nuga Bank with a robust and efficient data foundation for analytics.*

## Case Study Solution

### Libraries and Dependencies

In [1]:
# Import necessary dependencies
from IPython.display import Image, display
from pyspark.sql import SparkSession 
from pyspark.sql.functions import col, monotonically_increasing_id # Great for joining, tracking, or indexing rows when no natural unique key exists.
import pandas as pd
import gdown
import os
import tempfile
import shutil
from dotenv import load_dotenv
load_dotenv()
import psycopg2
from psycopg2 import sql
from sqlalchemy import (
      create_engine, Column, Integer, String, Float, DateTime,
      BigInteger, Text, TIMESTAMP, ForeignKey, MetaData, Table
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from urllib.parse import urlparse

In [2]:
# Initialise  Spark Session
spark = SparkSession.builder.appName('NugaBankEtl')\
.getOrCreate()
spark

### Extraction Layer

In [None]:
# Download CSV content from Google Drive into a temporary file path 

with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp:
    temp_path = tmp.name

file_id = "1WvnxUWIUQcRSXB5so7-PoGPxU9ekSkYb"
gdown.download(f"https://drive.google.com/uc?id={file_id}", temp_path, quiet=False)

shutil.copy(temp_path, "nuga_bank_transactions.csv") # Save a permanent copy of the downloaded CSV

nuga_bank_df = spark.read.csv(temp_path, header=True, inferSchema=True) # Load into Spark
nuga_bank_df.cache()  # Force read and keep in memory
nuga_bank_df.count()  # Triggers actual file read


os.remove(temp_path) # delete temp file

# Show details
print(f"Data Shape: {nuga_bank_df.count()} rows × {len(nuga_bank_df.columns)} columns")
nuga_bank_df.show(5)
nuga_bank_df.printSchema()

### Transformation Layer

In [3]:
nuga_bank_df = spark.read.csv(r'nuga_bank_transactions.csv', header=True, inferSchema=True)

In [4]:
print(f"Data Shape: {nuga_bank_df.count()} rows × {len(nuga_bank_df.columns)} columns")
nuga_bank_df.printSchema()

Data Shape: 1000000 rows × 23 columns
root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Ma

#### Checking for null values

In [None]:
# Checking for null values
for col in nuga_bank_df.columns:
    null_count = nuga_bank_df.filter(nuga_bank_df[col].isNull()).count()
    if null_count > 0:
        print(f"{col}: {null_count} nulls")

**NULL HANDLING STRATEGY**
| Column Category          | Columns                                                                                                                                         | Strategy                                                                                                     |
| ------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------ |
| ✅ Mandatory & Critical   | `Transaction_Date`, `Amount`, `Transaction_Type`                                                                                                | Already non-null — **no action** needed.                                                                     |
| 📌 High-Value Optional   | `Currency_Code`, `Category`, `Group`, `Is_Active`, `Random_Number`, `Last_Updated`                                                              | - Use `fillna()` with `"Unknown"` or `"N/A"` for strings<br>- Use `-1` or a flagged number for numerics      |
| Customer Profile      | `Customer_Name`, `Customer_Address`, `Customer_City`, `Customer_State`, `Customer_Country`, `Email`, `Phone_Number`, `Gender`, `Marital_Status` | - Use `"Unknown"`  for strings<br>- Optionally drop rows with **≥5 nulls** across these fields |
| Financial Identifiers | `Credit_Card_Number`, `IBAN`                                                                                                                    | - Drop these **if not needed downstream** or mask with `"0000-0000-0000-0000"` / `"UNKNOWN"`                 |
| Employment Data    | `Company`, `Job_Title`                                                                                                                          | - Fill missing with `"Undisclosed"`                                                        |
| Description           | `Description`                                                                                                                                   | - Fill with `"No description provided"`  placeholder                                                |


In [5]:
# 1. Fill common categorical string fields
string_fill = {
    "Customer_Name": "Unknown",
    "Customer_Address": "Unknown",
    "Customer_City": "Unknown",
    "Customer_State": "Unknown",
    "Customer_Country": "Unknown",
    "Email": "unknown@example.com",
    "Phone_Number": "Unknown",
    "Company": "Undisclosed",
    "Job_Title": "Unemployed",
    "Currency_Code": "N/A",
    "Category": "N/A",
    "Group": "N/A",
    "Is_Active": "Unknown",
    "Description": "No description provided",
    "Gender": "Unspecified",
    "Marital_Status": "Unspecified",
    "IBAN": "Unknown"
}
nuga_bank_df_clean = nuga_bank_df.fillna(string_fill)

In [7]:
# 2. Fill numerical fields
nuga_bank_df_clean = nuga_bank_df_clean.fillna({
    "Random_Number": -1.0,
    "Credit_Card_Number": 0 
})

In [8]:
# 3. Fill timestamp
nuga_bank_df_clean = nuga_bank_df_clean.fillna({"Last_Updated": "1900-01-01"})

In [9]:
#  Convert all column names to lowercase
from pyspark.sql.functions import col
nuga_bank_df_clean = nuga_bank_df_clean.select([col(c).alias(c.lower()) for c in nuga_bank_df_clean.columns])

In [None]:
# Checking for null values again
for col in nuga_bank_df_clean.columns:
    null_count = nuga_bank_df_clean.filter(nuga_bank_df_clean[col].isNull()).count()
    if null_count > 0:
        print(f"{col}: {null_count} nulls")

nuga_bank_df_clean.columns

In [10]:
print(nuga_bank_df_clean.count())
nuga_bank_df_clean.printSchema()

1000000
root
 |-- transaction_date: timestamp (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- customer_name: string (nullable = false)
 |-- customer_address: string (nullable = false)
 |-- customer_city: string (nullable = false)
 |-- customer_state: string (nullable = false)
 |-- customer_country: string (nullable = false)
 |-- company: string (nullable = false)
 |-- job_title: string (nullable = false)
 |-- email: string (nullable = false)
 |-- phone_number: string (nullable = false)
 |-- credit_card_number: long (nullable = false)
 |-- iban: string (nullable = false)
 |-- currency_code: string (nullable = false)
 |-- random_number: double (nullable = false)
 |-- category: string (nullable = false)
 |-- group: string (nullable = false)
 |-- is_active: string (nullable = false)
 |-- last_updated: timestamp (nullable = true)
 |-- description: string (nullable = false)
 |-- gender: string (nullable = false)
 |-- marital_status

#### Normalise and Split to dim and facts

In [11]:
dim_customer = nuga_bank_df_clean \
    .select('customer_name', 'customer_address', 'customer_city', 'customer_state', 'customer_country') \
    .distinct() \
    .withColumn("customer_id", monotonically_increasing_id()).cache()


dim_customer =dim_customer.select('customer_id','customer_name', 'customer_address', 'customer_city', 'customer_state','customer_country')
print(dim_customer.printSchema())
dim_customer.show()

root
 |-- customer_id: long (nullable = false)
 |-- customer_name: string (nullable = false)
 |-- customer_address: string (nullable = false)
 |-- customer_city: string (nullable = false)
 |-- customer_state: string (nullable = false)
 |-- customer_country: string (nullable = false)

None
+-----------+--------------------+--------------------+------------------+--------------+-------------------+
|customer_id|       customer_name|    customer_address|     customer_city|customer_state|   customer_country|
+-----------+--------------------+--------------------+------------------+--------------+-------------------+
|          0| Dr. Steven Sandoval|71969 Casey Mountain|North Belindaville|     Wisconsin|            Uruguay|
|          1|         Jamie Dixon|0146 Veronica Mou...|         Jonesland|      Delaware|       Saint Martin|
|          2|         Amber Jones|37115 Peterson Vi...|    West Kellyside|      Delaware|            Unknown|
|          3|     Michael Hawkins|             Unk

In [None]:
dim_employee = nuga_bank_df_clean \
    .select('company', 'job_title', 'email', 'phone_number','gender', 'marital_status') \
    .distinct() \
    .withColumn("employee_id", monotonically_increasing_id()).cache()

dim_employee = dim_employee.select('employee_id','company', 'job_title', 'email', 'phone_number','gender', 'marital_status')
print(dim_employee.printSchema())
dim_employee.show()

root
 |-- employee_id: long (nullable = false)
 |-- company: string (nullable = false)
 |-- job_title: string (nullable = false)
 |-- email: string (nullable = false)
 |-- phone_number: string (nullable = false)
 |-- gender: string (nullable = false)
 |-- marital_status: string (nullable = false)

None
+-----------+--------------------+--------------------+--------------------+--------------------+-----------+--------------+
|employee_id|             company|           job_title|               email|        phone_number|     gender|marital_status|
+-----------+--------------------+--------------------+--------------------+--------------------+-----------+--------------+
|          0|         Undisclosed|Sport and exercis...| unknown@example.com|    001-817-207-5116|       Male|        Single|
|          1|       Park and Sons|Race relations of...|   joy43@example.org|          4942627537|     Female|       Married|
|          2|       Walker-Hanson|      Hydrogeologist|mendozabrett@exa

In [13]:
dim_transaction = nuga_bank_df_clean \
    .select('transaction_date', 'transaction_type','description') \
    .distinct() \
    .withColumn("transaction_id", monotonically_increasing_id()).cache()
dim_transaction = dim_transaction.select('transaction_id','transaction_date', 'transaction_type','description')
print(dim_transaction.printSchema())
dim_transaction.show()

root
 |-- transaction_id: long (nullable = false)
 |-- transaction_date: timestamp (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- description: string (nullable = false)

None
+--------------+--------------------+----------------+--------------------+
|transaction_id|    transaction_date|transaction_type|         description|
+--------------+--------------------+----------------+--------------------+
|             0|2024-04-11 09:51:...|        Transfer|Determine program...|
|             1|2024-03-15 02:27:...|        Transfer|Body this laugh s...|
|             2|2024-02-22 06:24:...|         Deposit|Congress more foo...|
|             3|2024-01-15 11:54:...|      Withdrawal|Near magazine oil...|
|             4|2024-03-20 02:18:...|         Deposit|Fire thing pull g...|
|             5|2024-01-10 07:20:...|        Transfer|Dark station offi...|
|             6|2024-01-13 10:47:...|        Transfer|No description pr...|
|             7|2024-03-25 00:46:...|    

In [14]:
# Build fact_transaction with LEFT JOINs to preserve as many rows as possible
fact_transaction = nuga_bank_df_clean \
    .join(dim_customer, on=['customer_name', 'customer_address', 'customer_city','customer_state', 'customer_country'], how='left') \
    .join(dim_employee, on=['company', 'job_title', 'email', 'phone_number','gender', 'marital_status'], how='left') \
    .join(dim_transaction, on=['transaction_date', 'transaction_type', 'description'], how='left') \
    .withColumn("fact_transaction_sk", monotonically_increasing_id()).cache()\
        .select('fact_transaction_sk','transaction_id','customer_id', 'employee_id','amount', 'credit_card_number', 'iban','currency_code', 'random_number',\
        'category', 'group','is_active','last_updated' )
print(fact_transaction.printSchema())
fact_transaction.show()

root
 |-- fact_transaction_sk: long (nullable = false)
 |-- transaction_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- employee_id: long (nullable = true)
 |-- amount: double (nullable = true)
 |-- credit_card_number: long (nullable = false)
 |-- iban: string (nullable = false)
 |-- currency_code: string (nullable = false)
 |-- random_number: double (nullable = false)
 |-- category: string (nullable = false)
 |-- group: string (nullable = false)
 |-- is_active: string (nullable = false)
 |-- last_updated: timestamp (nullable = true)

None
+-------------------+--------------+-------------+-------------+------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+
|fact_transaction_sk|transaction_id|  customer_id|  employee_id|amount| credit_card_number|                iban|currency_code|random_number|category|group|is_active|        last_updated|
+-------------------+--------------+-------------+-----

In [15]:
# Identify unmatched joins
missing_customers = fact_transaction.filter(fact_transaction.employee_id.isNull())
print("Missing customers:", missing_customers.count())


missing_employees = fact_transaction.filter(fact_transaction.employee_id.isNull())
print("Missing employees:", missing_customers.count())

missing_transactions = fact_transaction.filter(fact_transaction.transaction_id.isNull())
print("Missing transactions:", missing_customers.count())

Missing customers: 0
Missing employees: 0
Missing transactions: 0


####  Save the Cleaned Dimension and Fact Tables

In [16]:
# Save as Parquet Files (Recommended for DWH / Spark Pipelines)
# Parquet is columnar, efficient for query engines, supports schema, and is splittable.

dim_customer.write.mode("overwrite").parquet("output/dim_customer")
dim_employee.write.mode("overwrite").parquet("output/dim_employee")
dim_transaction.write.mode("overwrite").parquet("output/dim_transaction")
fact_transaction.write.mode("overwrite").parquet("output/fact_transaction")


In [None]:
'''# Example: Write to HDFS instead of local
dim_customer.write.mode("overwrite").parquet("hdfs:///output/dim_customer")
dim_employee.write.mode("overwrite").parquet("hdfs:///output/dim_employee")
dim_transaction_ctx.write.mode("overwrite").parquet("hdfs:///output/dim_transaction_context")
fact_transaction.write.mode("overwrite").parquet("hdfs:///output/fact_transactions")

# output the transformed data as csv
transaction.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/transaction')
customer.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/customer')
employee.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/employee')
fact_table.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/fact_table')

# Convert to Pandas DataFrames (For Lightweight BI or Local Analysis)
# Useful if you want to do quick analysis or load into something like Power BI/Excel.

df_customer = dim_customer.toPandas()
df_employee = dim_employee.toPandas()
dim_transaction_ctx = dim_transaction_ctx.toPandas()
fact_transaction = fact_transaction.toPandas()'''

### Loading Layer

- Load to a Target Destination (optional based on use-case)
>- To a Database: Load into PostgreSQL, BigQuery, Redshift, etc.
>- To a Data Lake: Save to cloud storage like AWS S3, Azure Blob, GCS.

####  Load into PostgreSQL or Upload to Cloud Storage (e.g., Amazon S3)

##### Connect to db

In [17]:
try:
    # Load DB URL and target schema name
    db_url = os.getenv("NUGA_BANK")
    target_schema = "olap"

    # Initialize SQLAlchemy engine and psycopg2 connection
    engine = create_engine(db_url)
    conn = psycopg2.connect(db_url)
    conn.autocommit = True
    cursor = conn.cursor()

    # Define metadata with schema
    metadata = MetaData(schema=target_schema)

    print("✅ Database engine created successfully.")

    # Create schema if it does not exist
    cursor.execute(
        sql.SQL("CREATE SCHEMA IF NOT EXISTS {}").format(sql.Identifier(target_schema))
    )

    print(f"✅ Schema '{target_schema}' created or already exists.")

except Exception as e:
    print("❌ Failed to create database engine or schema:", e)
    engine = engine



✅ Database engine created successfully.
✅ Schema 'olap' created or already exists.


In [20]:
#display(nuga_bank_df_clean.printSchema())
display(dim_customer.printSchema())
display(dim_employee.printSchema())
display(dim_transaction.printSchema())      
display(fact_transaction.printSchema())     

root
 |-- customer_id: long (nullable = false)
 |-- customer_name: string (nullable = false)
 |-- customer_address: string (nullable = false)
 |-- customer_city: string (nullable = false)
 |-- customer_state: string (nullable = false)
 |-- customer_country: string (nullable = false)



None

root
 |-- employee_id: long (nullable = false)
 |-- company: string (nullable = false)
 |-- job_title: string (nullable = false)
 |-- email: string (nullable = false)
 |-- phone_number: string (nullable = false)
 |-- gender: string (nullable = false)
 |-- marital_status: string (nullable = false)



None

root
 |-- transaction_id: long (nullable = false)
 |-- transaction_date: timestamp (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- description: string (nullable = false)



None

root
 |-- fact_transaction_sk: long (nullable = false)
 |-- transaction_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- employee_id: long (nullable = true)
 |-- amount: double (nullable = true)
 |-- credit_card_number: long (nullable = false)
 |-- iban: string (nullable = false)
 |-- currency_code: string (nullable = false)
 |-- random_number: double (nullable = false)
 |-- category: string (nullable = false)
 |-- group: string (nullable = false)
 |-- is_active: string (nullable = false)
 |-- last_updated: timestamp (nullable = true)



None

#####   Create Tables with Primary/Foreign Keys/ ORM

In [21]:
# --- Base Setup ---
metadata = MetaData(schema=target_schema)
Base = declarative_base(metadata=metadata)

# --- Tables ---

# === Customers Table ===
class Customer(Base):
    __tablename__ = 'customers'

    customer_id = Column(BigInteger, primary_key=True, autoincrement=True)
    customer_name = Column(Text, nullable=False)
    customer_address = Column(Text, nullable=False)
    customer_city = Column(Text, nullable=False)
    customer_state = Column(Text, nullable=False)
    customer_country = Column(Text, nullable=False)

    # Reverse relationship
    transactions = relationship("TransactionFact", back_populates="customer", cascade="all, delete")


# === Employees Table ===
class Employee(Base):
    __tablename__ = 'employees'

    employee_id = Column(BigInteger, primary_key=True, autoincrement=True)
    company = Column(Text, nullable=False)
    job_title = Column(Text, nullable=False)
    email = Column(Text, nullable=False)
    phone_number = Column(Text, nullable=False)
    gender = Column(Text, nullable=False)
    marital_status = Column(Text, nullable=False)

    # Reverse relationship
    transactions = relationship("TransactionFact", back_populates="employee", cascade="all, delete")


# === Transactions Table ===
class Transaction(Base):
    __tablename__ = 'transactions'

    transaction_id = Column(BigInteger, primary_key=True, autoincrement=True)
    transaction_date = Column(DateTime, nullable=True)
    transaction_type = Column(String(50), nullable=True)
    description = Column(Text, nullable=False)

    # Reverse relationship
    facts = relationship("TransactionFact", back_populates="transaction", cascade="all, delete")


# === Transaction Fact Table ===
class TransactionFact(Base):
    __tablename__ = 'transaction_facts'

    fact_transaction_sk = Column(BigInteger, primary_key=True, autoincrement=True)

    transaction_id = Column(
        BigInteger,
        ForeignKey(f'{target_schema}.transactions.transaction_id', ondelete='CASCADE'),
        primary_key=True
    )
    customer_id = Column(
        BigInteger,
        ForeignKey(f'{target_schema}.customers.customer_id', ondelete='CASCADE'),
        nullable=False
    )
    employee_id = Column(
        BigInteger,
        ForeignKey(f'{target_schema}.employees.employee_id', ondelete='CASCADE'),
        nullable=False
    )

    amount = Column(Float, nullable=True)
    credit_card_number = Column(BigInteger, nullable=False)
    iban = Column(Text, nullable=False)
    currency_code = Column(String(10), nullable=False)
    random_number = Column(Float, nullable=False)
    category = Column(String(50), nullable=False)
    group = Column(String(50), nullable=False)
    is_active = Column(String(10), nullable=False)
    last_updated = Column(DateTime, nullable=True)

    # Relationships (for ORM joins)
    customer = relationship("Customer", back_populates="transactions")
    employee = relationship("Employee", back_populates="transactions")
    transaction = relationship("Transaction", back_populates="facts")

Base.metadata.create_all(engine)

  Base = declarative_base(metadata=metadata)


##### ERD

In [None]:
display(Image("datamodel.png"))

##### Load Tables : Efficiently load large PySpark DataFrames (from Parquet files) into PostgreSQL tables, using spark.write.jdbc().

In [22]:
# 1. Load Parquet file
customers_df = spark.read.parquet("output/dim_customer")
employees_df = spark.read.parquet("output/dim_employee")
transaction_df = spark.read.parquet("output/dim_transaction")
transaction_fact_df = spark.read.parquet("output/fact_transaction")

In [23]:
# 2. Define JDBC Settings
parsed = urlparse(db_url);\
jdbc_url = f"jdbc:postgresql://{parsed.hostname}:{parsed.port}{parsed.path}";\
jdbc_props = {"user": parsed.username, "password": parsed.password, "driver": "org.postgresql.Driver"}

#print(f"{jdbc_url }+ {jdbc_props}")

In [24]:
# 3. Create a Write Function
def write_to_postgres(df, table_name, mode="append"):
    try:
        print(f"Writing table: {target_schema}.{table_name} ({df.count()} rows)...")
        df.write \
          .mode(mode) \
          .jdbc(
              url=jdbc_url,
              table=f"{target_schema}.{table_name}",
              properties=jdbc_props
          )
        print(f"✅ Successfully wrote: {target_schema}.{table_name}")
        
    except Exception as e:
        print(f"❌ Failed to write table: {target_schema}.{table_name}")
        print(f"Error: {str(e)}")

In [25]:
# 4. Write All DataFrames to DB
write_to_postgres(customers_df, "customers", mode="append")        # or "append" if incremental
write_to_postgres(employees_df, "employees", mode="append")
write_to_postgres(transaction_df, "transactions", mode="append")

# Then write facts
write_to_postgres(transaction_fact_df, "transaction_facts")


Writing table: olap.customers (999884 rows)...
✅ Successfully wrote: olap.customers
Writing table: olap.employees (999856 rows)...
✅ Successfully wrote: olap.employees
Writing table: olap.transactions (1000000 rows)...
✅ Successfully wrote: olap.transactions
Writing table: olap.transaction_facts (1000000 rows)...
✅ Successfully wrote: olap.transaction_facts


In [26]:
# Stop existing session
spark.stop()