In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("CustomerTransactionsPipeline")
    .config("spark.sql.catalogImplementation", "hive")  # Enable Hive support
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse/mysqltopgsql")  # HDFS location
    .enableHiveSupport()
    .getOrCreate()
)

spark

In [104]:
mysql_props = {
    "user": "root",
    "password": "Root",
    "driver": "com.mysql.cj.jdbc.Driver"
}

try:
    df_mysql = spark.read.jdbc(
        url="jdbc:mysql://localhost:3306/customer",
        table="customers_transaction",
        properties=mysql_props
    )
    print("Connection successful. Rows fetched:", df_mysql.count())
except Exception as e:
    print("Connection failed:", str(e))


Connection successful. Rows fetched: 50


In [105]:
df_mysql.write.mode("overwrite").parquet("hdfs:///data/landing/customers_transaction_raw")

In [106]:
df_mysql.show(10)

+----------+------------+--------------------+---------------+-------+--------+-------------+-------------------+---------+---------+
|CustomerID|        Name|               Email|TransactionDate| Amount|Currency|     Location|            Address|ProductID|  Remarks|
+----------+------------+--------------------+---------------+-------+--------+-------------+-------------------+---------+---------+
|      1001|    JOHN DOE|    john.doe@xyz.com|     2023/04/12|$120.50|     USD|     New York|123 Baker St,NY,USA|     P123|     NULL|
|      1002|    Jane doe|    jane.doe@xyz.com|     04-13-2023|   98.6|     usd|       Boston| 456 Elm Street, MA|     P124|delivered|
|      1003|Robert Smith|   rob.smith@xyz.com|     13-04-2023|   NULL|     USD|  Los Angeles| 789 Sunset Blvd,CA|     P125|     NULL|
|      1004|Mary Johnson|                NULL|     2023-04-15|$205.00|     USD|      Chicago|  321 Pine, IL, USA|     P123| refunded|
|      1002|    Jane doe|    jane.doe@xyz.com|     04-13-2023|

In [10]:
spark.sql("""

CREATE EXTERNAL TABLE customers_transaction_raw_tbl (
    CustomerID INT,
    Name STRING,
    Email STRING,
    TransactionDate STRING,
    Amount STRING,
    Currency STRING,
    Location STRING,
    Address STRING,
    ProductID STRING,
    Remarks STRING
)
STORED AS PARQUET
LOCATION 'hdfs://localhost:9000/raw_tbls/';
""")

spark.sql("""
LOAD DATA INPATH 'hdfs:///data/landing/customers_transaction_raw' INTO TABLE customers_transaction_raw_tbl;
""")

DataFrame[]

In [70]:
spark.sql("describe table customers_transaction_raw_tbl").show()

+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|     CustomerID|      int|   NULL|
|           Name|   string|   NULL|
|          Email|   string|   NULL|
|TransactionDate|   string|   NULL|
|         Amount|   string|   NULL|
|       Currency|   string|   NULL|
|       Location|   string|   NULL|
|        Address|   string|   NULL|
|      ProductID|   string|   NULL|
|        Remarks|   string|   NULL|
+---------------+---------+-------+



In [107]:
df = spark.sql("SELECT * FROM customers_transaction_raw_tbl")

In [108]:
df.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- TransactionDate: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Currency: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Remarks: string (nullable = true)



In [76]:
df.show(10)

+----------+------------+--------------------+---------------+-------+--------+-------------+-------------------+---------+---------+
|CustomerID|        Name|               Email|TransactionDate| Amount|Currency|     Location|            Address|ProductID|  Remarks|
+----------+------------+--------------------+---------------+-------+--------+-------------+-------------------+---------+---------+
|      1001|    JOHN DOE|    john.doe@xyz.com|     2023/04/12|$120.50|     USD|     New York|123 Baker St,NY,USA|     P123|     NULL|
|      1002|    Jane doe|    jane.doe@xyz.com|     04-13-2023|   98.6|     usd|       Boston| 456 Elm Street, MA|     P124|delivered|
|      1003|Robert Smith|   rob.smith@xyz.com|     13-04-2023|   NULL|     USD|  Los Angeles| 789 Sunset Blvd,CA|     P125|     NULL|
|      1004|Mary Johnson|                NULL|     2023-04-15|$205.00|     USD|      Chicago|  321 Pine, IL, USA|     P123| refunded|
|      1002|    Jane doe|    jane.doe@xyz.com|     04-13-2023|

In [109]:
from pyspark.sql.functions import regexp_replace, col, isnan, length, upper
from pyspark.sql.types import DecimalType

bad_records_df = df.filter(
    (col("CustomerID").cast("int").isNull()) |

    (col("Name").isNull() | (col("Name") != upper(col("Name")))) |

    (col("Email").isNull() | ~col("Email").rlike(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$")) |

    (col("TransactionDate").isNull() | ~col("TransactionDate").rlike(r"^(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01])-\d{4}$")) |

    (col("Amount").isNull() | ~col("Amount").cast("string").rlike(r"^\$?\d+\.\d{2}$")) |

    (col("Currency").isNull() | (col("Currency") != upper(col("Currency")))) |

    (col("Location").isNull() | ~col("Location").rlike(r"^([A-Z][a-z]+)(\s[A-Z][a-z]+)*$")) |

    (col("Address").isNull()) |

    (col("ProductID").isNull() | ~col("ProductID").rlike(r"^P\d{3,}$")) |

    (col("Remarks").isNull() | (col("Remarks") != upper(col("Remarks"))))
)

bad_records_df.show(n=50, truncate=False)

+----------+---------------------+--------------------------------+---------------+--------+--------+-------------+-------------------------+---------+------------+
|CustomerID|Name                 |Email                           |TransactionDate|Amount  |Currency|Location     |Address                  |ProductID|Remarks     |
+----------+---------------------+--------------------------------+---------------+--------+--------+-------------+-------------------------+---------+------------+
|1001      |JOHN DOE             |john.doe@xyz.com                |2023/04/12     |$120.50 |USD     |New York     |123 Baker St,NY,USA      |P123     |NULL        |
|1002      |Jane doe             |jane.doe@xyz.com                |04-13-2023     |98.6    |usd     |Boston       |456 Elm Street, MA       |P124     |delivered   |
|1003      |Robert Smith         |rob.smith@xyz.com               |13-04-2023     |NULL    |USD     |Los Angeles  |789 Sunset Blvd,CA       |P125     |NULL        |
|1004     

In [110]:
from pyspark.sql.functions import col, regexp_replace, upper, trim, when, to_date
from pyspark.sql.types import DecimalType

# Step 1: Convert all string fields to trimmed and appropriate format
df_cleaned = df \
    .withColumn("Name", upper(trim(col("Name")))) \
    .withColumn("Email", trim(col("Email"))) \
    .withColumn("Currency", upper(trim(col("Currency")))) \
    .withColumn("Remarks", upper(trim(col("Remarks")))) \
    .withColumn("Location", regexp_replace(trim(col("Location")), r"\s+", " ")) \
    .withColumn("Amount", regexp_replace(col("Amount"), r"[^0-9.]", "").cast(DecimalType(10, 2))) \
    .withColumn("TransactionDate", regexp_replace(col("TransactionDate"), r"[./]", "-"))

df_cleaned.show(40, truncate=False)

+----------+------------------+------------------------------+---------------+------+--------+-------------+-------------------------+---------+------------+
|CustomerID|Name              |Email                         |TransactionDate|Amount|Currency|Location     |Address                  |ProductID|Remarks     |
+----------+------------------+------------------------------+---------------+------+--------+-------------+-------------------------+---------+------------+
|1001      |JOHN DOE          |john.doe@xyz.com              |2023-04-12     |120.50|USD     |New York     |123 Baker St,NY,USA      |P123     |NULL        |
|1002      |JANE DOE          |jane.doe@xyz.com              |04-13-2023     |98.60 |USD     |Boston       |456 Elm Street, MA       |P124     |DELIVERED   |
|1003      |ROBERT SMITH      |rob.smith@xyz.com             |13-04-2023     |NULL  |USD     |Los Angeles  |789 Sunset Blvd,CA       |P125     |NULL        |
|1004      |MARY JOHNSON      |NULL                 

In [79]:
from pyspark.sql.functions import col, length, trim

invalid_names_df = df_cleaned.filter(
    (col("Name") != trim(col("Name"))) &          # Not trimmed
    (col("Name") != upper(col("Name")))            # Contains lowercase letters
)

invalid_names_df.select("Name").show(truncate=False)

+----+
|Name|
+----+
+----+



In [10]:
from pyspark.sql.functions import col, when, to_date, date_format, substring

# Create a new column to store the day/month prefix as an int
df_cleaned = df_cleaned.withColumn("prefix_int", col("TransactionDate").substr(1, 2).cast("int"))

df_cleaned = df_cleaned.withColumn(
    "TransactionDateParsed",
    when(col("TransactionDate").rlike(r"^\d{4}-\d{1,2}-\d{1,2}$"), to_date(col("TransactionDate"), "yyyy-M-d"))
    .when(
        col("TransactionDate").rlike(r"^\d{1,2}-\d{1,2}-\d{4}$") & (col("prefix_int") <= 12),
        to_date(col("TransactionDate"), "MM-dd-yyyy")
    )
    .when(
        col("TransactionDate").rlike(r"^\d{1,2}-\d{1,2}-\d{4}$") & (col("prefix_int") > 12),
        to_date(col("TransactionDate"), "dd-MM-yyyy")
    )
)

df_cleaned = df_cleaned.withColumn(
    "TransactionDate", date_format(col("TransactionDateParsed"), "dd-MM-yyyy")
).drop("TransactionDateParsed", "prefix_int")

In [21]:
df_cleaned.show(50, truncate = True)

+----------+--------------------+--------------------+---------------+-------+--------+-------------+--------------------+---------+------------+
|CustomerID|                Name|               Email|TransactioNdate| Amount|Currency|     Location|             Address|ProductID|     Remarks|
+----------+--------------------+--------------------+---------------+-------+--------+-------------+--------------------+---------+------------+
|      1001|            JOHN DOE|    john.doe@xyz.com|     12-04-2023| 120.50|     USD|     New York| 123 Baker St,NY,USA|     P123|        NULL|
|      1002|            JANE DOE|    jane.doe@xyz.com|     13-04-2023|  98.60|     USD|       Boston|  456 Elm Street, MA|     P124|   DELIVERED|
|      1003|        ROBERT SMITH|   rob.smith@xyz.com|     13-04-2023|   NULL|     USD|  Los Angeles|  789 Sunset Blvd,CA|     P125|        NULL|
|      1004|        MARY JOHNSON|                NULL|     15-04-2023| 205.00|     USD|      Chicago|   321 Pine, IL, USA|  

In [20]:
df_cleaned.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- TransactioNdate: string (nullable = true)
 |-- Amount: decimal(10,2) (nullable = true)
 |-- Currency: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Remarks: string (nullable = true)



In [22]:
from pyspark.sql.functions import col, sum as _sum, when

null_counts = df_cleaned.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) 
    for c in df_cleaned.columns
])

null_counts.show()

+----------+----+-----+---------------+------+--------+--------+-------+---------+-------+
|CustomerID|Name|Email|TransactioNdate|Amount|Currency|Location|Address|ProductID|Remarks|
+----------+----+-----+---------------+------+--------+--------+-------+---------+-------+
|         0|   1|    1|              0|     5|       0|       0|      0|        1|     15|
+----------+----+-----+---------------+------+--------+--------+-------+---------+-------+



In [111]:
df_cleaned = df_cleaned.fillna({
    "CustomerID": 9999,  # A fallback integer ID
    "Name": "1001",
    "Email": "tcs@xyz.com",
    "TransactionDate": "2023-01-01",  # ISO format expected if it's a string
    "Amount": 0.0,
    "Currency": "USD",
    "Location": "New York",
    "Address": "Not Provided",
    "ProductID": "P000",
    "Remarks": "ITEM RETURED"
})

In [112]:
df_cleaned.show(50)

+----------+--------------------+--------------------+---------------+-------+--------+-------------+--------------------+---------+------------+
|CustomerID|                Name|               Email|TransactionDate| Amount|Currency|     Location|             Address|ProductID|     Remarks|
+----------+--------------------+--------------------+---------------+-------+--------+-------------+--------------------+---------+------------+
|      1001|            JOHN DOE|    john.doe@xyz.com|     2023-04-12| 120.50|     USD|     New York| 123 Baker St,NY,USA|     P123|ITEM RETURED|
|      1002|            JANE DOE|    jane.doe@xyz.com|     04-13-2023|  98.60|     USD|       Boston|  456 Elm Street, MA|     P124|   DELIVERED|
|      1003|        ROBERT SMITH|   rob.smith@xyz.com|     13-04-2023|   0.00|     USD|  Los Angeles|  789 Sunset Blvd,CA|     P125|ITEM RETURED|
|      1004|        MARY JOHNSON|         tcs@xyz.com|     2023-04-15| 205.00|     USD|      Chicago|   321 Pine, IL, USA|  

In [113]:
df_cleaned.select("Remarks").distinct().show()

+------------+
|     Remarks|
+------------+
|MISSING DATA|
|ITEM RETURED|
|    REFUNDED|
|    RETURNED|
|   DUPLICATE|
|   DELIVERED|
|   COMPLETED|
|     PENDING|
|          OK|
+------------+



In [114]:
from pyspark.sql.functions import when, col

df_cleaned = df_cleaned.withColumn("Remarks", when(col("Remarks") == "ITEM RETURED", "PENDING").otherwise(col("Remarks"))
)

In [115]:
from pyspark.sql.functions import col, when

# Define the allowed values
valid_values = ["REFUNDED", "RETURNED", "DELIVERED", "COMPLETED", "PENDING"]

# Clean the Remarks column
df_cleaned = df_cleaned.withColumn("Remarks", when(col("Remarks").isin(valid_values), col("Remarks")).otherwise("COMPLETED")
)

df_cleaned.show(50)

+----------+--------------------+--------------------+---------------+-------+--------+-------------+--------------------+---------+---------+
|CustomerID|                Name|               Email|TransactionDate| Amount|Currency|     Location|             Address|ProductID|  Remarks|
+----------+--------------------+--------------------+---------------+-------+--------+-------------+--------------------+---------+---------+
|      1001|            JOHN DOE|    john.doe@xyz.com|     2023-04-12| 120.50|     USD|     New York| 123 Baker St,NY,USA|     P123|  PENDING|
|      1002|            JANE DOE|    jane.doe@xyz.com|     04-13-2023|  98.60|     USD|       Boston|  456 Elm Street, MA|     P124|DELIVERED|
|      1003|        ROBERT SMITH|   rob.smith@xyz.com|     13-04-2023|   0.00|     USD|  Los Angeles|  789 Sunset Blvd,CA|     P125|  PENDING|
|      1004|        MARY JOHNSON|         tcs@xyz.com|     2023-04-15| 205.00|     USD|      Chicago|   321 Pine, IL, USA|     P123| REFUNDED|

In [None]:
"""
- Sales peaks by month/year → via TransactionMonth, TransactionYear
- High-value customer identification → from HighValueTransaction
- Customer segmentation → using CustomerTier for targeted marketing
- Fulfillment performance → via % of IsDelivered
- Revenue risk/opportunity zones → analyze combinations of Amount and Remarks
"""


In [116]:
from pyspark.sql.functions import col, month, year, upper, when, lit, to_date, lpad
from pyspark.sql.types import StringType

df_cleaned = df_cleaned.withColumn("TransactioNdate", to_date(col("TransactioNdate"), "dd-MM-yyyy"))

# 1. TransactionMonth - Helps analyze monthly trends
df_cleaned = df_cleaned.withColumn("TransactionMonth", lpad(month(to_date(col("TransactioNdate"), "dd-MM-yyyy")).cast(StringType()), 2, "0"))

# 2. TransactionYear - Helps observe year-over-year growth
df_cleaned = df_cleaned.withColumn("TransactioNdate", year(to_date(col("TransactioNdate"), "dd-MM-yyyy")).cast(StringType()))

# 3. HighValueTransaction - Flags transactions that are large (custom threshold)
df_cleaned = df_cleaned.withColumn("HighValueTransaction", when(col("Amount") > 500, "Yes").otherwise("No"))

# 4. CustomerTier - Classify customers based on Amount spent
df_cleaned = df_cleaned.withColumn(
    "CustomerTier",
    when(col("Amount") >= 500, "Platinum")
    .when(col("Amount") >= 200, "Gold")
    .when(col("Amount") >= 100, "Silver")
    .otherwise("Bronze")
)

# 5. IsDelivered - Convert Remarks to a delivery flag for fulfillment analysis
df_cleaned = df_cleaned.withColumn(
    "IsDelivered",
    when(upper(col("Remarks")) == "DELIVERED", 1).otherwise(0)
)

In [117]:
df_cleaned.show(50, truncate = True)

+----------+--------------------+--------------------+---------------+-------+--------+-------------+--------------------+---------+---------+----------------+--------------------+------------+-----------+
|CustomerID|                Name|               Email|TransactioNdate| Amount|Currency|     Location|             Address|ProductID|  Remarks|TransactionMonth|HighValueTransaction|CustomerTier|IsDelivered|
+----------+--------------------+--------------------+---------------+-------+--------+-------------+--------------------+---------+---------+----------------+--------------------+------------+-----------+
|      1001|            JOHN DOE|    john.doe@xyz.com|           NULL| 120.50|     USD|     New York| 123 Baker St,NY,USA|     P123|  PENDING|            NULL|                  No|      Silver|          0|
|      1002|            JANE DOE|    jane.doe@xyz.com|           NULL|  98.60|     USD|       Boston|  456 Elm Street, MA|     P124|DELIVERED|            NULL|                 

In [87]:
df_cleaned.printSchema()

root
 |-- CustomerID: integer (nullable = false)
 |-- Name: string (nullable = false)
 |-- Email: string (nullable = false)
 |-- TransactioNdate: string (nullable = true)
 |-- Amount: decimal(10,2) (nullable = true)
 |-- Currency: string (nullable = false)
 |-- Location: string (nullable = false)
 |-- Address: string (nullable = false)
 |-- ProductID: string (nullable = false)
 |-- Remarks: string (nullable = false)
 |-- TransactionMonth: string (nullable = true)
 |-- HighValueTransaction: string (nullable = false)
 |-- CustomerTier: string (nullable = false)
 |-- IsDelivered: integer (nullable = false)



In [47]:
spark.sql('''

CREATE EXTERNAL TABLE customers_transaction_stg_tbl (
    CustomerID INT,
    Name STRING,
    Email STRING,
    TransactionDate STRING,
    Amount DECIMAL(10,2),
    Currency STRING,
    Location STRING,
    Address STRING,
    ProductID STRING,
    Remarks STRING,
    TransactionMonth STRING,
    HighValueTransaction STRING,
    CustomerTier STRING,
    IsDelivered INT
)
STORED AS PARQUET
LOCATION 'hdfs://localhost:9000/stg_tbls/';
''')

# Save DataFrame to Hive table

df_cleaned.write.mode("overwrite").format("parquet").save("hdfs://localhost:9000/stg_tbls/")

# Print confirmation
print("Data successfully loaded into the staging table: customers_transaction_stg_tbl")


Data successfully loaded into the staging table: customers_transaction_stg_tbl


In [48]:
spark.sql("SELECT COUNT(*) FROM customers_transaction_stg_tbl").show()

+--------+
|count(1)|
+--------+
|      50|
+--------+



In [118]:
df_stg = spark.sql("SELECT * FROM customers_transaction_stg_tbl")

df_stg.show(50)

+----------+--------------------+--------------------+---------------+-------+--------+-------------+--------------------+---------+---------+----------------+--------------------+------------+-----------+
|CustomerID|                Name|               Email|TransactionDate| Amount|Currency|     Location|             Address|ProductID|  Remarks|TransactionMonth|HighValueTransaction|CustomerTier|IsDelivered|
+----------+--------------------+--------------------+---------------+-------+--------+-------------+--------------------+---------+---------+----------------+--------------------+------------+-----------+
|      1001|            JOHN DOE|    john.doe@xyz.com|           2023| 120.50|     USD|     New York| 123 Baker St,NY,USA|     P123|  PENDING|              04|                  No|      Silver|          0|
|      1002|            JANE DOE|    jane.doe@xyz.com|           2023|  98.60|     USD|       Boston|  456 Elm Street, MA|     P124|DELIVERED|              04|                 

In [89]:
from pyspark.sql.functions import col, when, sum

# Run the null check
null_summary = df_stg.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c + "_nulls") 
    for c in ["CustomerID", "Email", "TransactionDate", "Amount", "ProductID", "Remarks"]
])

# Collect the single row into a dictionary-like object
result = null_summary.collect()[0].asDict()

# Print only if nulls are found
nulls_found = {k: v for k, v in result.items() if v > 0}

if nulls_found:
    print("Nulls found in the following columns:")
    for k, v in nulls_found.items():
        print(f"{k}: {v}")
else:
    print("No nulls found in the selected columns.")

No nulls found in the selected columns.


In [90]:
from pyspark.sql.functions import regexp_extract, col

# Validate emails using regex
df_cleaned = df_stg.withColumn(
    "email_valid", regexp_extract(col("Email"), r"^[\w\.-]+@[\w\.-]+\.\w+$", 0)
)

# Filter out invalid ones
invalid_emails = df_cleaned.filter(col("email_valid") == "").select("Email")

# Show the invalid emails if any exist
if invalid_emails.count() > 0:
    print("Invalid email addresses found:")
    invalid_emails.show()
else:
    print("All email addresses are valid.")

All email addresses are valid.


In [92]:
from pyspark.sql.functions import col

valid_remarks = ["REFUNDED", "RETURNED", "DELIVERED", "COMPLETED", "PENDING"]

# Filter for unexpected remarks
invalid_remarks = df_stg.filter(~col("Remarks").isin(valid_remarks)).select("Remarks").distinct()

# Display results if any invalid values found
if invalid_remarks.count() > 0:
    print("Invalid 'Remarks' values found:")
    invalid_remarks.show()
else:
    print("All values in 'Remarks' column are valid.")

All values in 'Remarks' column are valid.


In [121]:
from pyspark.sql.functions import col

# Group by relevant fields and count duplicates
duplicates = df_stg.groupBy("CustomerID", "TransactionDate", "ProductID") \
    .count().filter("count > 1")

# Print result only if duplicates exist
if duplicates.count() > 0:
    print("Duplicate transactions found based on CustomerID, TransactionDate, and ProductID:")
    duplicates.show()
else:
    print("No duplicate transactions detected.")

No duplicate transactions detected.


In [120]:
df_stg = df_stg.dropDuplicates(["CustomerID", "TransactionDate", "ProductID"])

In [96]:
from pyspark.sql.functions import col

# Filter for invalid Amount values
invalid_amounts = df_stg.filter((col("Amount") < 0) | (col("Amount") > 10000)).select("Amount")

# Check and print result
if invalid_amounts.count() > 0:
    print("Invalid Amount values found (less than 0 or greater than 10,000):")
    invalid_amounts.show()
else:
    print("All Amount values are within the valid range.")

All Amount values are within the valid range.


In [97]:
df_stg.show(50)

+----------+--------------------+--------------------+---------------+-------+--------+-------------+--------------------+---------+---------+----------------+--------------------+------------+-----------+
|CustomerID|                Name|               Email|TransactionDate| Amount|Currency|     Location|             Address|ProductID|  Remarks|TransactionMonth|HighValueTransaction|CustomerTier|IsDelivered|
+----------+--------------------+--------------------+---------------+-------+--------+-------------+--------------------+---------+---------+----------------+--------------------+------------+-----------+
|      1001|            JOHN DOE|    john.doe@xyz.com|           2023| 120.50|     USD|     New York| 123 Baker St,NY,USA|     P123|  PENDING|              04|                  No|      Silver|          0|
|      1002|            JANE DOE|    jane.doe@xyz.com|           2023|  98.60|     USD|       Boston|  456 Elm Street, MA|     P124|DELIVERED|              04|                 

In [122]:
df_stg.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- TransactionDate: string (nullable = true)
 |-- Amount: decimal(10,2) (nullable = true)
 |-- Currency: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Remarks: string (nullable = true)
 |-- TransactionMonth: string (nullable = true)
 |-- HighValueTransaction: string (nullable = true)
 |-- CustomerTier: string (nullable = true)
 |-- IsDelivered: integer (nullable = true)



In [None]:
try:
    df_stg.write \
        .jdbc(
            url="jdbc:postgresql://localhost:5433/postgres",
            table="customers_transaction",
            mode="append", 
            properties={
                "user": "postgres",
                "password": "SURI@walmt1999",
                "driver": "org.postgresql.Driver"
            }
        )
    print("Data written successfully to 'customers_transaction' table in 'customer' database.")
except Exception as e:
    print("Failed to write data:", str(e))

Failed to write data: An error occurred while calling o1287.jdbc.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:254)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:258

In [126]:
from importlib import util
print(util.find_spec("org.postgresql.Driver"))

ModuleNotFoundError: No module named 'org'