In [6]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [7]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("create_merchant_recommendation_dataMart").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [8]:
# CRITICAL FIXES: Must be run BEFORE loading data to prevent ClassCastException
# 1. Disable Vectorized Reader (Avoids low-level ORC data reading crash)
spark.conf.set("spark.sql.orc.enableVectorizedReader", "false")
spark.conf.set("spark.sql.hive.convertMetastoreOrc", "false")
# 2. Disable Broadcast Join (Avoids memory/shuffle crash on join)
#spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) 
# 3. Disable Spark's optimizing components (Forces safer execution path)
#spark.conf.set("spark.sql.cbo.enabled", "false") 
#spark.conf.set("spark.sql.codegen.wholeStage", "false")
# 4. Force Hive SerDe (Ultimate attempt to bypass native Spark reader)
spark.conf.set("spark.sql.hive.convertMetastore", "false") 

print("Spark Config Updated for maximum stability.")


Spark Config Updated for maximum stability.


In [9]:
import pandas as pd
import numpy as np
import pyspark.sql.functions as F 
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType, DecimalType, TimestampType

In [10]:
#Load the tables directly from the Database
#We use the standard "database_name.table_name" format

print("Loading Silver Layer table from Hive..")

#A. Transaction Table
df_trans=spark.table("financial_db.transactions_silver")

#B. Users Table
df_users=spark.table("financial_db.users_silver")

#C.Cards Table
df_cards=spark.table("financial_db.cards_silver")

#3.Verification
print(f"Transaction Count:{df_trans.count()}")
print(f"Users Count: {df_users.count()}")
print(f"Cards Count: {df_cards.count()}")

#4.Preview Schema to ensure types are correct
df_trans.printSchema()
df_users.printSchema()
df_cards.printSchema()

Loading Silver Layer table from Hive..
Transaction Count:24386900
Users Count: 2000
Cards Count: 6146
root
 |-- user_id: integer (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- amount: decimal(10,2) (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- merchant_name: string (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- errors: string (nullable = true)
 |-- is_fraud: string (nullable = true)
 |-- transaction_timestamp: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

root
 |-- person_id: string (nullable = true)
 |-- current_age: integer (nullable = true)
 |-- retirement_age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- birth_month: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- apart

In [11]:
df_trans.show()

+-------+-------+------+--------+--------------------+-------------+--------------+-----+----+------+--------+---------------------+----+-----+
|user_id|card_id|amount|use_chip|       merchant_name|merchant_city|merchant_state|  zip| mcc|errors|is_fraud|transaction_timestamp|year|month|
+-------+-------+------+--------+--------------------+-------------+--------------+-----+----+------+--------+---------------------+----+-----+
|    897|      0| 52.65|   Swipe|   97032797689821735|  Sioux Falls|            SD|57108|5411|   N/A|      No|  1993-08-01 10:18:00|1993|    8|
|    897|      0| 40.00|   Swipe|-4282466774399734331|   Woonsocket|            SD|57385|4829|   N/A|      No|  1993-08-01 13:13:00|1993|    8|
|    897|      0| 13.14|   Swipe|-5581123930363301609|       Peever|            SD|57257|5311|   N/A|      No|  1993-08-02 11:25:00|1993|    8|
|    897|      0|  6.53|   Swipe|  112925206871091074|  Sioux Falls|            SD|57107|5541|   N/A|      No|  1993-08-02 12:20:00|1993

In [12]:
print("----2.Fixing Users Table----")

#1. Generate 'user_id' using Row Number(0,1,2)
rdd_with_id=df_users.rdd.zipWithIndex().map(lambda x:(x[1],)+tuple(x[0]))

#2.Create DataFrame with "user_id" as the first column 
new_column_names=["user_id"]+df_users.columns

df_users_indexed=spark.createDataFrame(rdd_with_id,new_column_names)

----2.Fixing Users Table----


In [13]:
df_users_indexed.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- person_id: string (nullable = true)
 |-- current_age: long (nullable = true)
 |-- retirement_age: long (nullable = true)
 |-- birth_year: long (nullable = true)
 |-- birth_month: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- apartment: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- per_capita_income_zipcode: decimal(38,18) (nullable = true)
 |-- yearly_income_person: decimal(38,18) (nullable = true)
 |-- total_debt: decimal(38,18) (nullable = true)
 |-- fico_score: long (nullable = true)
 |-- num_credit_cards: long (nullable = true)



In [14]:
df_cards.select("card_number","card_type").show(10)

+----------------+---------------+
|     card_number|      card_type|
+----------------+---------------+
|4344676511950444|          Debit|
|4956965974959986|          Debit|
|4582313478255491|          Debit|
|4879494103069057|         Credit|
|5722874738736011|Debit (Prepaid)|
|4404898874682993|         Credit|
|4001482973848631|          Debit|
|5627220683410948|          Debit|
|5711382187309326|Debit (Prepaid)|
|5766121508358701|Debit (Prepaid)|
+----------------+---------------+
only showing top 10 rows



In [15]:
print("--- 1. Prep Users (Golden Layer Standardization) ---")
# Preparing Users table with standardized column names and clean datatypes
df_users_prep = df_users.select(
    # Primary user identifier (cast to INT for join compatibility)
    F.col("person_id").cast("int").alias("user_id"),

    # Demographic attributes
    F.col("current_age").alias("age"),
    F.col("gender"),

    # Location attributes
    F.col("city"),
    F.col("state"),
    F.col("zipcode").alias("user_zip"),

    # Income & financial attributes
    F.coalesce(F.col("per_capita_income_zipcode").cast("double"), F.lit(0.0)).alias("per_capita_income"),
    F.coalesce(F.col("yearly_income_person").cast("double"), F.lit(0.0)).alias("yearly_income"),
    F.coalesce(F.col("total_debt").cast("double"), F.lit(0.0)).alias("total_debt"),

    # Credit profile attributes
    F.col("fico_score"),
    F.col("num_credit_cards")
)

print("Users Prepped (Schema Updated).")


--- 1. Prep Users (Golden Layer Standardization) ---
Users Prepped (Schema Updated).


In [16]:
print("--- 2. Prep Transactions (Golden Layer Standardization) ---")
# Preparing Transactions table with cleaned columns
df_trans_prep = df_trans.select(
    # Primary join keys
    F.col("user_id"),     
    F.col("card_id"),     

    # Transaction timestamp
    F.col("transaction_timestamp"),

    # Merchant attributes
    F.col("merchant_name"),
    F.col("merchant_state"),
    F.col("zip").alias("merchant_zip"),

    # Merchant category code (MCC)
    F.col("mcc").alias("merchant_category"), 

    # Error and chip usage indicators
    F.col("errors").alias("error_code"), 
    F.col("use_chip"),

    # Transaction amount (null-safe & numeric)
    F.coalesce(F.col("amount").cast("double"), F.lit(0.0)).alias("amount"),

    # Fraud label (binary for ML)
    F.when(F.col("is_fraud") == "Yes", 1).otherwise(0).alias("label_is_fraud")

).withColumn(
    # Extract hour of transaction for behavioral patterns
    "hour_of_day",
    F.hour(F.col("transaction_timestamp"))
)

print("Transactions Prepped (Schema Updated).")


--- 2. Prep Transactions (Golden Layer Standardization) ---
Transactions Prepped (Schema Updated).


In [17]:
print("--- 3. Prep Cards (Golden Layer Standardization) ---")
# Prepare Cards table with clean identifiers and financial attributes
df_cards_prep = df_cards.select(
    # Primary join keys
    F.col("user").alias("user_id"),
    F.col("card_index").alias("card_id"),

    # Card attributes
    F.col("card_brand"),
    F.col("card_type"),

    # Financial capacity of the card
    F.coalesce(F.col("credit_limit").cast("double"), F.lit(0.0)).alias("credit_limit"),

    # Security & risk attributes
    F.col("has_chip").alias("card_has_chip"),
    F.col("card_on_dark_web").alias("dark_web_exposure"),

    # Account lifecycle attributes
    F.col("acct_opened_year"),
    F.col("year_pin_last_changed")
)

print("Cards Prepped (Schema Updated).")


--- 3. Prep Cards (Golden Layer Standardization) ---
Cards Prepped (Schema Updated).


In [18]:
print("--- 4. Join Transactions with Users ---")
df_tx_user = df_trans_prep.join(
    df_users_prep,
    on="user_id",
    how="left"
)

print("Transactions + Users Joined.")

--- 4. Join Transactions with Users ---
Transactions + Users Joined.


In [19]:
print("--- 5. Join Cards with Transactions + Users ---")
df_golden = df_tx_user.join(
    df_cards_prep,
    on=["user_id", "card_id"],
    how="left"
)

print("Cards Joined. Golden Base Dataset Created.")

--- 5. Join Cards with Transactions + Users ---
Cards Joined. Golden Base Dataset Created.


In [20]:
print("--- 6. Feature Engineering for Spend Analysis ---")

CURRENT_YEAR = 2026

df_golden = (
    df_golden
    # Flag indicating whether chip was used (binary behavioral feature)
    .withColumn(
        "is_chip_txn",
        F.when(F.col("use_chip") == "Yes", 1).otherwise(0)
    )

    # Flag for cross-state transactions (travel / anomaly indicator)
    .withColumn(
        "is_out_of_state_txn",
        F.when(F.col("merchant_state") != F.col("state"), 1).otherwise(0)
    )

    # Ratio of debt to income (financial stress indicator)
    .withColumn(
        "debt_to_income_ratio",
        F.when(
            F.col("yearly_income") > 0,
            F.col("total_debt") / F.col("yearly_income")
        ).otherwise(0.0)
    )

    # Spend to income ratio (affordability & overspend detection)
    .withColumn(
        "spend_to_income_ratio",
        F.when(
            F.col("yearly_income") > 0,
            F.col("amount") / F.col("yearly_income")
        ).otherwise(0.0)
    )

    # Credit utilization proxy (single-txn utilization signal)
    .withColumn(
        "credit_utilization_ratio",
        F.when(
            F.col("credit_limit") > 0,
            F.col("amount") / F.col("credit_limit")
        ).otherwise(0.0)
    )

    # Card account age in years (tenure & trust indicator)
    .withColumn(
        "card_account_age_years",
        F.when(
            F.col("acct_opened_year").isNotNull(),
            F.lit(CURRENT_YEAR) - F.col("acct_opened_year")
        ).otherwise(None)
    )

    # PIN age in years (security hygiene indicator)
    .withColumn(
        "pin_age_years",
        F.when(
            F.col("year_pin_last_changed").isNotNull(),
            F.lit(CURRENT_YEAR) - F.col("year_pin_last_changed")
        ).otherwise(None)
    )

    # High value transaction flag based on credit limit threshold
    .withColumn(
        "is_high_value_txn",
        F.when(
            (F.col("credit_limit") > 0) &
            (F.col("amount") > (0.2 * F.col("credit_limit"))),
            1
        ).otherwise(0)
    )
)

print("Feature Engineering Completed for Spend Analysis.")


--- 6. Feature Engineering for Spend Analysis ---
Feature Engineering Completed for Spend Analysis.


In [21]:
print("--- 7. Final Validation ---")

# Print schema to validate final Golden Layer structure
df_golden.printSchema()


--- 7. Final Validation ---
root
 |-- user_id: integer (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- transaction_timestamp: timestamp (nullable = true)
 |-- merchant_name: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- merchant_zip: string (nullable = true)
 |-- merchant_category: integer (nullable = true)
 |-- error_code: string (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- amount: double (nullable = false)
 |-- label_is_fraud: integer (nullable = false)
 |-- hour_of_day: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- user_zip: string (nullable = true)
 |-- per_capita_income: double (nullable = true)
 |-- yearly_income: double (nullable = true)
 |-- total_debt: double (nullable = true)
 |-- fico_score: integer (nullable = true)
 |-- num_credit_cards: integer (nullable = true)
 |-- card_brand: s

In [22]:
print("--- Step 6: Saving as External Hive Table ---")
# Path for the External Table
gold_path = "/user/talentum/projectMaster/warehouseDir/gold/spend_analysis"

--- Step 6: Saving as External Hive Table ---


In [23]:
spark.conf.set("spark.sql.orc.enableVectorizedReader", "false")
spark.conf.set("spark.sql.hive.convertMetastoreOrc", "false")

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [24]:
spark.sql("DROP TABLE IF EXISTS financial_db.spend_analysis_gold")

# Write with explicit partitioning and Hive serde override
df_golden.write \
    .mode("overwrite") \
    .format("orc") \
    .option("path", "/user/talentum/projectMaster/warehouseDir/gold/spend_analysis") \
    .option("compression", "snappy") \
    .saveAsTable("financial_db.spend_analysis_gold")

print("SUCCESS: Table saved!")

SUCCESS: Table saved!


In [25]:

# Show sample records to visually inspect joins and engineered features
df_golden.show(10, truncate=False)

print("Golden Layer Spend Analysis Dataset Ready.")

+-------+-------+---------------------+--------------------+--------------+------------+-----------------+----------+--------+------+--------------+-----------+----+------+----+-----+--------+-----------------+-------------+----------+----------+----------------+----------+---------+------------+-------------+-----------------+----------------+---------------------+-----------+-------------------+--------------------+---------------------+------------------------+----------------------+-------------+-----------------+
|user_id|card_id|transaction_timestamp|merchant_name       |merchant_state|merchant_zip|merchant_category|error_code|use_chip|amount|label_is_fraud|hour_of_day|age |gender|city|state|user_zip|per_capita_income|yearly_income|total_debt|fico_score|num_credit_cards|card_brand|card_type|credit_limit|card_has_chip|dark_web_exposure|acct_opened_year|year_pin_last_changed|is_chip_txn|is_out_of_state_txn|debt_to_income_ratio|spend_to_income_ratio|credit_utilization_ratio|card_acc

In [26]:
print("--- Step 6: Saving as External Hive Table ---")
# Path for the External Table
gold_path = "/user/talentum/projectMaster/warehouseDir/gold/spend_analysis"

--- Step 6: Saving as External Hive Table ---


In [27]:
spark.conf.set("spark.sql.orc.enableVectorizedReader", "false")
spark.conf.set("spark.sql.hive.convertMetastoreOrc", "false")

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [28]:
spark.sql("DROP TABLE IF EXISTS financial_db.spend_analysis_gold")

# Write with explicit partitioning and Hive serde override
df_golden.write \
    .mode("overwrite") \
    .format("orc") \
    .option("path", "/user/talentum/projectMaster/warehouseDir/gold/spend_analysis") \
    .option("compression", "snappy") \
    .saveAsTable("financial_db.spend_analysis_gold")

print("SUCCESS: Table saved!")

SUCCESS: Table saved!
