In [2]:
import os

from src.config.spark import get_spark_session
from src.extract.extract_src_data import extract_src_db, extract_src_csv

# Set Hadoop home dynamically in Python
PARENT_DIR = os.getcwd()
HADOOP_PATH = os.path.join(PARENT_DIR, "library/hadoop")
POSTGRES_DRIVER_PATH = os.path.join(PARENT_DIR, "library/postgre/postgresql-42.7.5.jar")

os.environ["HADOOP_HOME"] = HADOOP_PATH
os.environ["PATH"] += os.pathsep + os.path.join(HADOOP_PATH, "bin")

In [3]:
spark_session = get_spark_session("week6-warehouse-pipeline", POSTGRES_DRIVER_PATH)
spark_session.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [4]:
spark_session

In [5]:
df = extract_src_db(spark_session, "marital_status")
df.show(10)

+----------+--------+--------------------+--------------------+
|marital_id|   value|          created_at|          updated_at|
+----------+--------+--------------------+--------------------+
|         1| married|2025-02-28 15:31:...|2025-02-28 15:31:...|
|         2|  single|2025-02-28 15:31:...|2025-02-28 15:31:...|
|         3|divorced|2025-02-28 15:31:...|2025-02-28 15:31:...|
+----------+--------+--------------------+--------------------+



In [6]:
df = extract_src_db(spark_session, "education_status")
df.show(10)

+------------+---------+--------------------+--------------------+
|education_id|    value|          created_at|          updated_at|
+------------+---------+--------------------+--------------------+
|           1| tertiary|2025-02-28 15:31:...|2025-02-28 15:31:...|
|           2|secondary|2025-02-28 15:31:...|2025-02-28 15:31:...|
|           3|  unknown|2025-02-28 15:31:...|2025-02-28 15:31:...|
|           4|  primary|2025-02-28 15:31:...|2025-02-28 15:31:...|
+------------+---------+--------------------+--------------------+



In [7]:
df = extract_src_db(spark_session, "marketing_campaign_deposit")
df.show(10)

+------------+---+------------+----------+------------+-------+-------+-------+-----+-------+---+-----+--------+--------+-----+--------+--------+------------------+--------------------+--------------------+
|loan_data_id|age|         job|marital_id|education_id|default|balance|housing| loan|contact|day|month|duration|campaign|pdays|previous|poutcome|subscribed_deposit|          created_at|          updated_at|
+------------+---+------------+----------+------------+-------+-------+-------+-----+-------+---+-----+--------+--------+-----+--------+--------+------------------+--------------------+--------------------+
|           1| 58|  management|         1|           1|  false|  $2143|   true|false|unknown|  5|  may|     261|       1|   -1|       0| unknown|             false|2025-02-28 15:59:...|2025-02-28 15:59:...|
|           2| 44|  technician|         2|           2|  false|    $29|   true|false|unknown|  5|  may|     151|       1|   -1|       0| unknown|             false|2025-02-

In [8]:
df = extract_src_csv(spark_session, "data/new_bank_transaction_csv/")
df.show(10)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|      T401396|  C1010024|    21/6/65|         M|     KOLKATA|          87058.65|        18/8/16|         141103|                 5000.0|
|      T303294|  C1010068|    14/7/76|         M|     GURGAON|          46741.73|        10/8/16|         101617|                  546.0|
|      T347496|  C1010081|     1/5/89|         M|   GHAZIABAD|           1584.18|        14/8/16|         144742|                  429.0|
|      T329017|C1010081_2|     2/9/77|         F|   PANCHKULA|          23319.04|        15/8/16|         172658|                 1699.0|
|      T113706|C1010081_3|    11/2

In [9]:
new_df = df.select("CustomerID").distinct().count()
print(new_df)

1048567


In [10]:
from pyspark.sql.functions import to_date

customers_df = df.select("CustomerID", to_date("CustomerDOB", "d/M/yy").alias("CustomerDOB"), "CustGender", "CustLocation", "CustAccountBalance")

customers_df.show(10)

+----------+-----------+----------+------------+------------------+
|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|
+----------+-----------+----------+------------+------------------+
|  C1010024| 1965-06-21|         M|     KOLKATA|          87058.65|
|  C1010068| 1976-07-14|         M|     GURGAON|          46741.73|
|  C1010081| 1989-05-01|         M|   GHAZIABAD|           1584.18|
|C1010081_2| 1977-09-02|         F|   PANCHKULA|          23319.04|
|C1010081_3| 1984-02-11|         F|      HOWRAH|             17.71|
|  C1010085| 1800-01-01|         M|      KHARAR|          319080.2|
|  C1010087| 1990-04-20|         M|      MUMBAI|           8346.99|
|  C1010116| 1994-01-30|         M|   HYDERABAD|          12299.04|
|  C1010122| 1988-09-05|         F|       NOIDA|            312.39|
|  C1010241| 1985-10-09|         F|        PUNE|          27952.37|
+----------+-----------+----------+------------+------------------+
only showing top 10 rows



In [11]:
from pyspark.sql.functions import length, col

df1 = df.filter(length(col("TransactionTime")) < 6)
df1.show(10)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|      T454284|C1010414_2|    22/4/88|         M|       DELHI|          21864.53|        21/8/16|          95537|                 2200.0|
|       T37993|  C1010616|    26/7/79|         M|       DELHI|            9101.3|        27/9/16|           2619|                  439.0|
|     T1044818|  C1010726|    25/7/91|         M|      VALSAD|          12735.85|        18/9/16|          93237|                  220.0|
|      T296101|  C1010749|    15/8/98|         M|        VAPI|          56432.62|        10/8/16|          61818|                  210.0|
|      T676234|C1010820_2|    20/1

In [12]:
# spark_session.stop()