## TRANSFORMING JSON COLUMNS TO TABULAR COLUMNS

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType

In [0]:
%run "/Workspace/UPI Transactions/Databricks_Notebooks/ETL_Files/01_Config"

In [0]:
txn_schema = StructType([
    StructField("user_details", StructType([
        StructField("user_id", StringType(), True),
        StructField("user_name", StringType(), True),
        StructField("user_bank_name", StringType(), True),
        StructField("user_bank_ifsc", StringType(), True),
        StructField("demographic_details", StructType([
            StructField("kyc", BooleanType(), True),
            StructField("id", StringType(), True)
        ]))
    ])),
    StructField("transaction_details", StructType([
        StructField("transaction_id", StringType(), True),
        StructField("receiver_details", StructType([
            StructField("receiver_bank_name", StringType(), True),
            StructField("receiver_bank_ifsc", StringType(), True),
            StructField("amount", DoubleType(), True),
            StructField("category", StringType(), True)
        ]))
    ])),
    StructField("payment_gateway", StringType(), True),
    StructField("state", StringType(), True),
    StructField("status", StringType(), True),
    StructField("timestamp", StringType(), True)
])

In [0]:
df_json = spark.read.format('json')\
            .option('multiLine', True)\
            .load(f"{s3_bucket}/{bronze_folder}/json_files/*.json")

In [0]:
df_json.printSchema()

In [0]:
df_flatten = df_json.select(
        col('user_details.user_id').alias('user_id'),
        col('user_details.user_name').alias('user_name'),
        col('user_details.user_bank_name').alias('sender_bank'),
        col('user_details.user_bank_ifsc').alias('sender_ifsc'),
        col('user_details.demographic_details.kyc').alias('kyc'),
        col('user_details.demographic_details.id').alias('id'),
        col('transaction_details.transaction_id').alias('transaction_id'),
        col('transaction_details.receiver_details.receiver_bank_name').alias('receiver_bank'),
        col('transaction_details.receiver_details.receiver_bank_ifsc').alias('receiver_ifsc'),
        col('transaction_details.receiver_details.amount').cast('double').alias('amount'),
        col('transaction_details.receiver_details.category').alias('category'),
        col('payment_gateway'),
        col('state'),
        col('status'),
        col('timestamp')
).withColumn('timestamp', to_timestamp(col('timestamp'), 'yyyy-MM-dd HH:mm:ss'))

In [0]:
df_user = df_flatten.select('user_id', 'user_name').dropDuplicates(['user_id'])

df_user.write.format('delta').mode('append')\
        .save(f"{s3_bucket}/{silver_folder}/dim_user")

In [0]:
sender_df = df_flatten.select(col('sender_bank').alias('bank_name')).distinct()
receiver_df = df_flatten.select(col('receiver_bank').alias('bank_name')).distinct()

df_banks = sender_df.unionByName(receiver_df).distinct()\
            .withColumn('bank_id', monotonically_increasing_id()+1)\
            .select(col('bank_id').cast('int'), 'bank_name')

df_banks.write.format('delta').mode('append')\
        .save(f"{s3_bucket}/{silver_folder}/dim_bank")

In [0]:
df_gateway = df_flatten.select('payment_gateway').distinct()\
                .withColumn('gateway_id', monotonically_increasing_id()+1)\
                .select(col('gateway_id').cast('int'), 'payment_gateway')

df_gateway.write.format('delta').mode('append')\
        .save(f"{s3_bucket}/{silver_folder}/dim_gateway")

In [0]:
df_category = df_flatten.select('category').distinct()\
                .withColumn('category_id', monotonically_increasing_id()+1)\
                .select(col('category_id').cast('int'), 'category')

df_category.write.format('delta').mode('append')\
        .save(f"{s3_bucket}/{silver_folder}/dim_category")

In [0]:
df_proof = df_flatten.select('id').dropna().distinct().withColumn('proof_id', monotonically_increasing_id()+1)\
            .select(col('proof_id').cast('int'), 'id')

df_proof.write.format('delta').mode('append')\
        .save(f"{s3_bucket}/{silver_folder}/dim_proof")

In [0]:
df_state = df_flatten.select('state').distinct().orderBy('state').withColumn('state_id', monotonically_increasing_id()+1)\
            .select(col('state_id').cast('int'), 'state')

df_state.write.format('delta').mode('append')\
        .save(f"{s3_bucket}/{silver_folder}/dim_state")

In [0]:
df_fact = df_flatten.join(df_user, "user_id", 'left')\
            .join(df_proof, "id", 'left')\
            .join(df_state, "state", 'left')\
            .join(df_category, "category", 'left')\
            .join(df_gateway, "payment_gateway", 'left')\
            .join(df_banks, (df_flatten.sender_bank == df_banks.bank_name), 'left').withColumnRenamed('bank_id', 'sender_bank_id')\
            .join(df_banks, (df_flatten.receiver_bank == df_banks.bank_name), 'left').drop('user_name', 'bank_name', 'state','receiver_bank', 'sender_bank', 'id', 'category', 'payment_gateway').withColumnRenamed('bank_id', 'receiver_bank_id')\
            .withColumn('year', year(col('timestamp'))).withColumn('month', month(col('timestamp')))\
            .select('user_id', 'sender_bank_id', 'sender_ifsc', 'receiver_bank_id', 'receiver_ifsc', 'amount', 'gateway_id', 'category_id', 'state_id', 'transaction_id', 'status', 'kyc', 'proof_id', 'timestamp', 'year', 'month')

df_fact.write.format('delta').mode('append')\
        .partitionBy('year', 'month')\
        .save(f"{s3_bucket}/{silver_folder}/fact_transactions")