In [1]:
!pip install faker
!pip install pandas



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, rand, when, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import *
from faker import Faker
import random
from datetime import datetime, timedelta
import pandas as pd

In [3]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("RazorpayDataModeling").getOrCreate()

fake = Faker()

In [4]:
""" raw_merchant_profiles """
# Define the number of rows (merchants)
NUM_MERCHANTS = 100

# Lists to hold our data
merchant_ids = []
merchant_names = []
signup_dates = []
statuses = []

# Loop to generate data
for i in range(1, NUM_MERCHANTS + 1):
    # 1. Generate a unique, consistent Merchant ID (our Business Key)
    merchant_ids.append(f'MERCH_{i:04d}')
    
    # 2. Use Faker to create a realistic business name
    merchant_names.append(fake.company())
    
    # 3. Create a realistic signup date
    signup_dates.append(fake.date_time_between(start_date='-2y', end_date=datetime.now()))
    
    # 4. Assign a status (mostly Active, some Suspended/Inactive)
    status_choices = ['Active'] * 80 + ['Suspended'] * 15 + ['Inactive'] * 5
    statuses.append(random.choice(status_choices))

# Create the DataFrame
df_merchants = pd.DataFrame({
    'merchant_id': merchant_ids,
    'merchant_name': merchant_names,
    'signup_date': signup_dates,
    'status': statuses,
})

In [5]:
""" raw_merchant_subscriptions """
# Assuming df_merchants from the previous step is available

subscription_data = []
subscription_id_counter = 1

# Loop through each merchant to ensure every merchant has at least one plan
for merchant_id in df_merchants['merchant_id']:
    # Ensure every merchant has a starting plan
    num_plans = random.randint(1, 3) # Merchants will have 1 to 3 plan changes
    
    # Set the initial plan date slightly after their signup date
    signup_date = df_merchants[df_merchants['merchant_id'] == merchant_id]['signup_date'].iloc[0]
    
    # Generate the historical plan records for this merchant
    current_start_date = signup_date
    
    for _ in range(num_plans):
        # Define the pricing tier and associated fee percentage
        tier_options = {'Starter': 2.9, 'Pro': 1.5, 'Enterprise': 1.0}
        
        # Pick a random tier
        tier, fee = random.choice(list(tier_options.items()))
        
        # Determine the effective date of this plan
        # We need to ensure the end date is later than the start date
        start_date = current_start_date
        
        # Set the end date for the plan. Use a future date for the last plan, 
        # or a past date for an historical plan.
        if _ == num_plans - 1:
             # The current, active plan
            end_date = None
        else:
             # A historical, expired plan
            end_date = fake.date_between(start_date=start_date, end_date= datetime.now())

        subscription_data.append({
            'subscription_id': f'SUB_{subscription_id_counter:04d}',
            'merchant_id': merchant_id,
            'pricing_tier': tier,
            'fee_percentage': fee,
            'start_date': start_date,
            'end_date': end_date
        })
        subscription_id_counter += 1
        
        # Prepare the start date for the next plan change (if any)
        if end_date:
             current_start_date = pd.to_datetime(end_date) + pd.Timedelta(days=1)

# Create the DataFrame
df_subscriptions = pd.DataFrame(subscription_data)
df_subscriptions['start_date'] = pd.to_datetime(df_subscriptions['start_date']).dt.date
df_subscriptions['end_date'] = pd.to_datetime(df_subscriptions['end_date']).dt.date
df_subscriptions['end_date'] = df_subscriptions['end_date'].apply(lambda x: None if pd.isna(x) else x)


In [7]:
""" raw_merchant_transaction  """

# Assuming df_merchants is available from the first step
# We only want to generate transactions for 'Active' or 'Suspended' merchants
active_merchant_ids = df_merchants[df_merchants['status'].isin(['Active', 'Suspended'])]['merchant_id'].tolist()

NUM_TRANSACTIONS = 10000
transaction_data = []
transaction_id_counter = 1

for _ in range(NUM_TRANSACTIONS):
    # 1. Generate unique ID
    transaction_id = f'TXN_{transaction_id_counter:05d}'
    
    # 2. Pick a random active merchant
    merchant_id = random.choice(active_merchant_ids)
    
    # 3. Generate a timestamp in the last 3 months (July, Aug, Sep 2025)
    # The start date is 2025-07-01 and the end date is 2025-09-30
    transaction_timestamp = fake.date_time_between(start_date='-3M', end_date=datetime.now()) 
    
    # 4. Generate a random transaction amount (between $1 and $1000)
    gross_amount_usd = round(random.uniform(1.0, 1000.0), 2)
    
    # 5. Payment method
    payment_method = random.choice(['Card', 'UPI', 'Wallet', 'Net Banking'])
    
    transaction_data.append({
        'transaction_id': transaction_id,
        'merchant_id': merchant_id,
        'transaction_timestamp': transaction_timestamp,
        'gross_amount_usd': gross_amount_usd,
        'payment_method': payment_method
    })
    transaction_id_counter += 1

# Create the final Bronze DataFrame
df_transactions = pd.DataFrame(transaction_data)

In [8]:
# Convert df_merchants (Pandas) to spark_df_merchants (PySpark)
spark_df_merchants = spark.createDataFrame(df_merchants) # raw_merchant_profiles
spark_df_subscriptions = spark.createDataFrame(df_subscriptions) # raw_merchant_subscriptions
spark_df_transactions = spark.createDataFrame(df_transactions) # raw_merchant_transaction

In [9]:
print(f" merchants: {spark_df_merchants.count()}, subscriptions: {spark_df_subscriptions.count()}, transactions: {spark_df_transactions.count()} ")

 merchants: 100, subscriptions: 212, transactions: 10000 


# Silver - Data Vault 2.0

In [10]:
from pyspark.sql.functions import current_timestamp, sha2, lit, col

HUB_MERCHANT = (
    spark_df_merchants 
    .select("merchant_id")                                      # 1. Select the Business Key
    .distinct()                                                 # 2. Only keep unique Business Keys
    .withColumnRenamed("merchant_id", "MERCHANT_BK")            # 3. Rename BK for clarity
    .withColumn("HUB_MERCHANT_HK", sha2(col("MERCHANT_BK"), 256)) # 4. Generate the Hash Key (PK)
    .withColumn("LOAD_DATETIME", current_timestamp())          # 5. Generate the Load Timestamp
    .withColumn("RECORD_SOURCE", lit("raw_merchant_profiles"))  # 6. Define the Source
    .select("HUB_MERCHANT_HK", "MERCHANT_BK", "LOAD_DATETIME", "RECORD_SOURCE") # Final order
)

In [11]:
HUB_TRANSACTION = (
    spark_df_transactions
    .select("transaction_id")                                       # 1. Select the Business Key
    .distinct()                                        # 2. Only keep unique Business Keys
    .withColumnRenamed("transaction_id", "TRANSACTION_BK")          # 3. Rename BK for clarity
    .withColumn("HUB_TRANSACTION_HK", sha2(col("TRANSACTION_BK"), 256)) # 4. Generate the Hash Key (PK)
    .withColumn("LOAD_DATETIME", current_timestamp())   # 5. Generate the Load Timestamp
    .withColumn("RECORD_SOURCE", lit("raw_merchant_transaction"))             # 6. Define the Source
    .select("HUB_TRANSACTION_HK", "TRANSACTION_BK", "LOAD_DATETIME", "RECORD_SOURCE") 
)

In [12]:
HUB_SUBSCRIPTION = (
    spark_df_subscriptions
    .select("subscription_id")                                       # 1. Select the Business Key
    .distinct()                                        # 2. Only keep unique Business Keys
    .withColumnRenamed("subscription_id", "SUBSCRIPTION_BK")          # 3. Rename BK for clarity
    .withColumn("HUB_SUBSCRIPTION_HK", sha2(col("SUBSCRIPTION_BK"), 256)) # 4. Generate the Hash Key (PK)
    .withColumn("LOAD_DATETIME", current_timestamp())   # 5. Generate the Load Timestamp
    .withColumn("RECORD_SOURCE", lit("raw_merchant_subscriptions"))             # 6. Define the Source
    .select("HUB_SUBSCRIPTION_HK", "SUBSCRIPTION_BK", "LOAD_DATETIME", "RECORD_SOURCE") 
)

In [13]:
from pyspark.sql.functions import concat_ws

SAT_MERCHANT_PROFILE = (
    spark_df_merchants 
    .withColumn("HUB_MERCHANT_HK", sha2(col("merchant_id"), 256)) # 1. Create the Hub Hash Key
    .withColumn("HASH_DIFF", sha2(concat_ws("|", col("merchant_name"), col("status"), col("signup_date").cast("string") ), 256) )  # 2. Create the Hash Difference (HD)
    .withColumn("LOAD_DATETIME", current_timestamp())     # 3. Add Audit Columns
    .withColumn("RECORD_SOURCE", lit("raw_merchant_profiles"))
    .select(
        "HUB_MERCHANT_HK",
        "LOAD_DATETIME",
        "HASH_DIFF",
        "merchant_name",
        "status",
        "signup_date",
        "RECORD_SOURCE"
    )
)

In [14]:
from pyspark.sql.functions import concat_ws
SAT_SUBSCRIPTION_DETAILS = (
    spark_df_subscriptions 
    .withColumn("HUB_SUBSCRIPTION_HK", sha2(col("subscription_id"), 256))  # 1. Create the Hub Hash Key (Recalculated for decoupling)
    .withColumn("HASH_DIFF", sha2(concat_ws("|", col("pricing_tier"), col("fee_percentage").cast("string"), col("start_date").cast("string"), col("end_date").cast("string") ), 256))  # 2. Create the Hash Difference (HD)
    .withColumn("LOAD_DATETIME", current_timestamp())     # 3. Add Audit Columns
    .withColumn("RECORD_SOURCE", lit("raw_merchant_subscriptions"))
    .select(
        "HUB_SUBSCRIPTION_HK",
        "LOAD_DATETIME",
        "HASH_DIFF",
        "pricing_tier",
        "fee_percentage",
        "start_date",
        "end_date",
        "RECORD_SOURCE"
    ) # 4. Select the final Satellite columns 
)

In [15]:
from pyspark.sql.functions import current_timestamp, sha2, lit, col, concat_ws

LNK_MERCHANT_SUBSCRIPTION = (
    spark_df_subscriptions
    .select("merchant_id", "subscription_id") 
    .distinct()  # Links are unique relationships, not unique rows of the source
    .withColumn("HUB_MERCHANT_HK", sha2(col("merchant_id"), 256))  # 1. Recalculate the two Hub Hash Keys (Foreign Keys)
    .withColumn("HUB_SUBSCRIPTION_HK", sha2(col("subscription_id"), 256)) # 1.2 Recalculate the two Hub Hash Keys (Foreign Keys)
    .withColumn("LNK_MERCHANT_SUBSCRIPTION_HK", sha2(concat_ws("|", col("HUB_MERCHANT_HK"), col("HUB_SUBSCRIPTION_HK")), 256))  # 2. Create the Link Hash Key (Primary Key)
    .withColumn("LOAD_DATETIME", current_timestamp()) # 3. Add Audit Columns
    .withColumn("RECORD_SOURCE", lit("raw_merchant_subscriptions"))
    
    # 4. Select the final Link columns (LHK, HK1, HK2, Audit)
    .select(
        "LNK_MERCHANT_SUBSCRIPTION_HK",
        "HUB_MERCHANT_HK",
        "HUB_SUBSCRIPTION_HK",
        "LOAD_DATETIME",
        "RECORD_SOURCE"
    )
)

In [None]:
from pyspark.sql.functions import current_timestamp, sha2, lit, col, concat_ws

LNK_MERCHANT_SUBSCRIPTION = (
    spark_df_transactions
    .select("merchant_id", "subscription_id") 
    .distinct()  # Links are unique relationships, not unique rows of the source
    .withColumn("HUB_MERCHANT_HK", sha2(col("merchant_id"), 256))  # 1. Recalculate the two Hub Hash Keys (Foreign Keys)
    .withColumn("HUB_SUBSCRIPTION_HK", sha2(col("subscription_id"), 256)) # 1.2 Recalculate the two Hub Hash Keys (Foreign Keys)
    .withColumn("LNK_MERCHANT_SUBSCRIPTION_HK", sha2(concat_ws("|", col("HUB_MERCHANT_HK"), col("HUB_SUBSCRIPTION_HK")), 256))  # 2. Create the Link Hash Key (Primary Key)
    .withColumn("LOAD_DATETIME", current_timestamp()) # 3. Add Audit Columns
    .withColumn("RECORD_SOURCE", lit("raw_merchant_subscriptions"))
    
    # 4. Select the final Link columns (LHK, HK1, HK2, Audit)
    .select(
        "LNK_MERCHANT_SUBSCRIPTION_HK",
        "HUB_MERCHANT_HK",
        "HUB_SUBSCRIPTION_HK",
        "LOAD_DATETIME",
        "RECORD_SOURCE"
    )
)

In [25]:
from pyspark.sql.functions import concat_ws

LNK_MERCHANT_TRANSACTION = (
    spark_df_transactions
    .select("merchant_id", "transaction_id")  # Select the two required Business Keys (IDs) from the raw source
    .distinct()  # Not strictly needed for a transaction event, but good practice
    .withColumn("HUB_MERCHANT_HK", sha2(col("merchant_id"), 256))  # 1. Recalculate the two Hub Hash Keys (Foreign Keys)
    .withColumn("HUB_TRANSACTION_HK", sha2(col("transaction_id"), 256))  # 1.1 Recalculate the two Hub Hash Keys (Foreign Keys)
    .withColumn("LNK_MERCHANT_TRANSACTION_HK", sha2(concat_ws("|", col("HUB_MERCHANT_HK"), col("HUB_TRANSACTION_HK")), 256))  # 2. Create the Link Hash Key (LHK) (The Primary Key)
    .withColumn("LOAD_DATETIME", current_timestamp())  # 3. Add Audit Columns
    .withColumn("RECORD_SOURCE", lit("raw_payment_transactions"))
    .select(
        "LNK_MERCHANT_TRANSACTION_HK",
        "HUB_MERCHANT_HK",
        "HUB_TRANSACTION_HK",
        "LOAD_DATETIME",
        "RECORD_SOURCE"
    )# 4. Select the final Link columns (LHK, HK1, HK2, Audit)
)

# Gold Layer - Kimball (Dimensional Data Model)

In [28]:
DIM_MERCHANT = (
    SAT_MERCHANT_PROFILE
    .join(
        HUB_MERCHANT.select("HUB_MERCHANT_HK", "MERCHANT_BK"), 
        on=["HUB_MERCHANT_HK"], 
        how="inner"
    )
    # Select the columns for the final dimension table:
    .select(
        col("HUB_MERCHANT_HK"),
        col("MERCHANT_BK"),
        col("merchant_name"),
        col("status"),
        col("signup_date"),

    )
)

In [29]:
from pyspark.sql.functions import col

DIM_SUBSCRIPTION = (
    SAT_SUBSCRIPTION_DETAILS 
    .join(
        HUB_SUBSCRIPTION.select("HUB_SUBSCRIPTION_HK", "SUBSCRIPTION_BK"), 
        on=["HUB_SUBSCRIPTION_HK"], 
        how="inner"
    )
    .select(
        col("HUB_SUBSCRIPTION_HK"),
        col("SUBSCRIPTION_BK"),
        col("pricing_tier"),
        col("fee_percentage"),
        col("start_date"),
        col("end_date")
    )
)

In [34]:
from pyspark.sql.functions import current_timestamp, sha2, lit, col, concat_ws, to_date, when

fact_prep = (
    spark_df_transactions 
    # Recalculate the two necessary Hub Hash Keys
    .withColumn("HUB_MERCHANT_HK", sha2(col("merchant_id"), 256))
    .withColumn("HUB_TRANSACTION_HK", sha2(col("transaction_id"), 256))
    .withColumnRenamed("transaction_timestamp", "TRANSACTION_DATETIME")
    .select(
        "HUB_MERCHANT_HK", 
        "HUB_TRANSACTION_HK", 
        "TRANSACTION_DATETIME", 
        "gross_amount_usd"
    )
)

In [35]:
# The Link table contains HUB_MERCHANT_HK and HUB_SUBSCRIPTION_HK
# We are only selecting the two HKs here, as we don't need the LNK_HK for the Fact table.

bridged_fact = (
    fact_prep
    .join(
        LNK_MERCHANT_SUBSCRIPTION.select("HUB_MERCHANT_HK", "HUB_SUBSCRIPTION_HK"),
        on=["HUB_MERCHANT_HK"],
        how="inner"
    )
)

In [72]:
FACT_PAYMENT_REVENUE = (
bridged_fact.alias("bf")
    .join(
        SAT_SUBSCRIPTION_DETAILS.alias("ssd"), 
        on=( 
            (col("ssd.HUB_SUBSCRIPTION_HK") == col("bf.HUB_SUBSCRIPTION_HK")) 
            & (col("bf.TRANSACTION_DATETIME") >= to_date(col("ssd.start_date")))
            & (col("bf.TRANSACTION_DATETIME") < to_date(when(col("ssd.start_date").isNull(), lit("2099-12-31")).otherwise(col("ssd.end_date"))))
        ), how="inner"
    )
    .withColumn("PLATFORM_REVENUE_USD", round((col("bf.gross_amount_usd") * col("ssd.fee_percentage")) / 100, 2)) 
    .select("bf.HUB_MERCHANT_HK", "bf.HUB_TRANSACTION_HK", "bf.HUB_SUBSCRIPTION_HK", "bf.TRANSACTION_DATETIME", "bf.gross_amount_usd", "PLATFORM_REVENUE_USD")
    .withColumnRenamed("HUB_SUBSCRIPTION_HK", "HK_SUBSCRIPTION_FK")

)

## Questions 

In [102]:
"""1. Find the total platform revenue from Pro plans and the average fee percentage paid by Active merchants """

FACT_PAYMENT_REVENUE.alias("fpr") \
    .join(DIM_MERCHANT.alias("dm"), on=(col("dm.HUB_MERCHANT_HK")==col("fpr.HUB_MERCHANT_HK")), how= "inner") \
    .join(DIM_SUBSCRIPTION.alias('ds'), on=(col("ds.HUB_SUBSCRIPTION_HK")==col("fpr.HK_SUBSCRIPTION_FK")), how="inner") \
    .select(col("dm.status"), col("dm.merchant_name"),col("ds.pricing_tier"),col("ds.fee_percentage"), col("fpr.PLATFORM_REVENUE_USD"), col('fpr.TRANSACTION_DATETIME'))\
    .filter((col('status')=='Active') & (col('pricing_tier')== 'Pro'))\
    .agg(sum(col("PLATFORM_REVENUE_USD")).alias('total_revenue_pro_active'), avg(col("fee_percentage")).alias("Avg_Fee_Percentage")  )\
    .show()

+------------------------+------------------+
|total_revenue_pro_active|Avg_Fee_Percentage|
+------------------------+------------------+
|      3280.1600000000003|               1.5|
+------------------------+------------------+



In [155]:
""" 
For every unique Merchant and Subscription Plan Tier combination, what is the total sales volume processed, 
the effective average fee charged ?
"""

merchant_subscription_fee_query = (
    FACT_PAYMENT_REVENUE.alias("fpr")
    .join(
        DIM_MERCHANT.alias("dm"), 
        on=(col("dm.HUB_MERCHANT_HK") == col("fpr.HUB_MERCHANT_HK")), 
        how="inner"
    )
    .join(
        DIM_SUBSCRIPTION.alias('ds'), 
        on=(col("ds.HUB_SUBSCRIPTION_HK") == col("fpr.HK_SUBSCRIPTION_FK")), 
        how="inner"
    )    
    .groupBy(
        col("dm.merchant_name"), 
        col("ds.pricing_tier")
    )
    .agg(
        count(col("ds.SUBSCRIPTION_BK")).alias('No_of_times_Subscription_taken'),
        round(sum(col("fpr.gross_amount_usd")),2).alias("Total_Gross_Revenue_Processed"),
        round(avg(col("ds.fee_percentage")),2).alias("Average_Fee_Percentage")
    )
    .orderBy(col('dm.merchant_name'), col("Total_Gross_Revenue_Processed").desc())
)

merchant_subscription_fee_query.show()


+--------------------+------------+------------------------------+-----------------------------+----------------------+
|       merchant_name|pricing_tier|No_of_times_Subscription_taken|Total_Gross_Revenue_Processed|Average_Fee_Percentage|
+--------------------+------------+------------------------------+-----------------------------+----------------------+
|     Alvarez-Sanchez|  Enterprise|                             7|                      2516.98|                   1.0|
|Baird, Wright and...|         Pro|                            74|                     41985.97|                   1.5|
|Collins, Levine a...|  Enterprise|                            45|                     19229.86|                   1.0|
|       Conway-Martin|     Starter|                            35|                     15611.27|                   2.9|
|       Conway-Martin|  Enterprise|                            10|                      4194.37|                   1.0|
|Deleon, Reed and ...|     Starter|     