In [0]:
customer_df=spark.read.table("bronze.customers")
branch_df=spark.read.table("bronze.branches")
# transaction_df=spark.read.table("bronze.transactions")

In [0]:
transaction_df = (spark.readStream
    .format('delta')
    .option("checkpointLocation", "dbfs:/FileStore/capstone/checkpoints/transactionsLoad/")
    .table("bronze.transactions"))

In [0]:
transaction_df.display()

transaction_id,customer_id,branch_id,channel,transaction_type,amount,currency,timestamp,status,_rescued_data
T6050,C1917,B0004,web,transfer,43.51,GBP,2023-01-22T07:39:00Z,completed,
T6051,C1988,B0009,mobile,payment,8.08,EUR,2023-01-22T08:32:00Z,completed,
T6052,C1186,B0007,mobile,deposit,14.01,USD,2023-01-22T09:04:00Z,completed,
T6053,C1098,B0007,branch,deposit,67.58,EUR,2023-01-22T09:24:00Z,completed,
T6054,C1804,B0008,ATM,deposit,24.75,USD,2023-01-22T09:45:00Z,completed,
T6055,C1546,B0003,web,payment,28.34,EUR,2023-01-22T10:27:00Z,completed,
T6056,C1416,B0009,branch,payment,35.45,GBP,2023-01-22T11:01:00Z,pending,
T6057,C1859,B0008,mobile,transfer,25.13,GBP,2023-01-22T11:54:00Z,completed,
T6058,C1388,B0009,branch,deposit,8.45,USD,2023-01-22T12:16:00Z,completed,
T6059,C1332,B0000,mobile,deposit,52.64,GBP,2023-01-22T13:11:00Z,completed,


In [0]:
%sql
use bronze;

## basic oprations on all 3 DF

### ROW Count

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
def get_row_count(df: DataFrame, df_name: str) -> str:
    """Returns the number of rows in the DataFrame with a label."""
    row_count = df.groupBy().count()
    return f"{df_name} = {row_count}"
#get_row_count(customer_df, "customer_df")
#get_row_count(branch_df,"branch_df")
get_row_count(transaction_df,"transaction_df")

'transaction_df = DataFrame[count: bigint]'

In [0]:
row_count = transaction_df.groupBy().count()

In [0]:
row_count.display()

count
3000


### Column Count

In [0]:
def get_column_count(df: DataFrame, df_name: str) -> str:
    """Returns the number of columns in the DataFrame with a label."""
    column_count = len(df.columns)
    return f"{df_name} = {column_count} columns"
get_column_count(customer_df, "customer_df")
#get_column_count(branch_df,"branch_df")
#get_column_count(transaction_df,"transaction_df")

'customer_df = 8 columns'

### Column Names

In [0]:
def get_column_names(df: DataFrame, df_name: str) -> str:
    """Returns the list of column names in the DataFrame with a label."""
    column_names = df.columns
    return f"{df_name} columns: {column_names}"
get_column_names(customer_df, "customer_df")
#get_column_names(branch_df,"branch_df")
#get_column_names(transaction_df,"transaction_df")

"customer_df columns: ['customer_id', 'name', 'email', 'phone', 'address', 'credit_score', 'join_date', 'last_update']"

### Distinct_value

In [0]:
def get_distinct_counts(df: DataFrame, df_name: str) -> str:
    """Returns the distinct count for each column in the DataFrame with a label."""
    distinct_counts = {col: df.select(col).distinct().count() for col in df.columns}
    return f"{df_name} distinct counts: {distinct_counts}"
get_distinct_counts(customer_df, "customer_df")
#get_distinct_counts(branch_df,"branch_df")
#get_distinct_counts(transaction_df,"transaction_df")

"customer_df distinct counts: {'customer_id': 1000, 'name': 979, 'email': 991, 'phone': 1000, 'address': 1000, 'credit_score': 331, 'join_date': 794, 'last_update': 340}"

### DataTYpes

In [0]:
def get_data_types(df: DataFrame, df_name: str) -> str:
    """Returns the data types for each column in the DataFrame with a label."""
    data_types = {col: df.schema[col].dataType for col in df.columns}
    return f"{df_name} data types: {data_types}"
get_data_types(customer_df, "customer_df")
#get_data_types(branch_df,"branch_df")
#get_data_types(transaction_df,"transaction_df")

"customer_df data types: {'customer_id': StringType(), 'name': StringType(), 'email': StringType(), 'phone': StringType(), 'address': StringType(), 'credit_score': IntegerType(), 'join_date': DateType(), 'last_update': TimestampType()}"

### NULL Count

In [0]:
def get_null_counts(df: DataFrame, df_name: str) -> str:
    """Returns the count and percentage of null values for each column in the DataFrame with a label."""
    row_count = df.count()
    null_info = {
        column: (df.filter(col(column).isNull()).count(), (df.filter(col(column).isNull()).count() / row_count) * 100)
        for column in df.columns
    }
    return f"{df_name} null counts: {null_info}"
get_null_counts(customer_df, "customer_df")
#get_null_counts(branch_df,"branch_df")
#get_null_counts(transaction_df,"transaction_df")

"customer_df null counts: {'customer_id': (0, 0.0), 'name': (0, 0.0), 'email': (0, 0.0), 'phone': (0, 0.0), 'address': (0, 0.0), 'credit_score': (0, 0.0), 'join_date': (0, 0.0), 'last_update': (0, 0.0)}"

### Duplicate_count

In [0]:
def get_duplicate_count(df: DataFrame, df_name: str) -> str:
    """Returns the count of duplicate rows in the DataFrame with a label."""
    duplicate_count = df.groupBy(df.columns).count().filter(col('count') > 1).count()
    return f"{df_name} duplicate rows count: {duplicate_count}"
get_duplicate_count(customer_df, "customer_df")
#get_duplicate_count(branch_df,"branch_df")
#get_duplicate_count(transaction_df,"transaction_df")

'customer_df duplicate rows count: 0'

## TypeCasting

In [0]:
def cast_column_to_double(df: DataFrame, column_name: str, df_name: str) -> DataFrame:
    """Casts the specified column in the DataFrame to DoubleType and returns the updated DataFrame with a label."""
    casted_df = df.withColumn(column_name, col(column_name).cast(DoubleType()))
    print(f"Column '{column_name}' in {df_name} has been casted to DoubleType.")
    return casted_df
cast_column_to_double(customer_df, "credit_score", "customer_df")

Column 'credit_score' in customer_df has been casted to DoubleType.


DataFrame[customer_id: string, name: string, email: string, phone: string, address: string, credit_score: double, join_date: date, last_update: timestamp]

## cleaning

### Normalization phone number

In [0]:
def format_phone_number(phone_number: str) -> str:
    """Format phone number to the format +1XXXXXXXXXX."""
    phone_number = phone_number.replace('(', '').replace(')', '').replace('-', '').replace(' ', '')
    if len(phone_number) == 10:
        return f"{phone_number}"
    return phone_number  

format_phone_number_udf = udf(format_phone_number, StringType())

def standardize_phone_numbers(df: DataFrame, column_name: str, df_name: str) -> DataFrame:
    standardized_df = df.withColumn(
        column_name,
        format_phone_number_udf(trim(col(column_name)))  
    )
    print(f"Phone numbers in '{column_name}' column of {df_name} have been standardized.")
    return standardized_df
customer_df = spark.read.table("bronze.customers")
customer_df_standardized = standardize_phone_numbers(customer_df, "phone", "customer_df")

customer_df_standardized.display()


Phone numbers in 'phone' column of customer_df have been standardized.


customer_id,name,email,phone,address,credit_score,join_date,last_update
C1000,Courtney Jones,courtney.jones@gmail.com,2104212602,"9345 Melissa Manors Suite 869, Ibarraview, PR 08116",525,2017-01-01,2023-02-25T00:00:00Z
C1001,Joshua Edwards,joshua.edwards@yahoo.com,6183149154,"8886 Katelyn Stravenue Apt. 089, Randyview, VI 49164",528,2017-01-02,2023-10-04T00:00:00Z
C1002,Brandon Hernandez,brandon.hernandez@aol.com,8813875395,"72170 Christensen Parkways Apt. 696, New Thomasfort, AK 76566",508,2017-01-02,2023-04-28T00:00:00Z
C1003,Mary Salazar,mary.salazar@aol.com,1409313508,"Unit 2356 Box 0826, DPO AE 82138",637,2017-01-02,2023-05-23T00:00:00Z
C1004,Thomas Gibson,thomas.gibson@outlook.com,4798745147,"1445 Smith Station, Port Joel, SD 89362",594,2017-01-03,2023-09-22T00:00:00Z
C1005,Cody Alvarez,cody.alvarez@hotmail.com,8894044576,"480 Catherine Forge, Williamview, ND 08510",565,2017-01-03,2023-07-09T00:00:00Z
C1006,Christopher Davidson,christopher.davidson@gmail.com,3093723043,"345 Mark Loop Apt. 530, Thomaston, OR 26035",793,2017-01-06,2023-04-24T00:00:00Z
C1007,Stephanie Hobbs,stephanie.hobbs@hotmail.com,1541203921,"8177 Linda Dam, North David, LA 62211",555,2017-01-08,2023-05-09T00:00:00Z
C1008,Victor Roth,victor.roth@aol.com,8516247767,"3751 Ortega Gateway, Dannyport, NH 90858",577,2017-01-10,2023-10-27T00:00:00Z
C1009,Emma Lewis,emma.lewis@hotmail.com,4153099045,"5112 Kenneth Cove Apt. 963, Lake Gary, AL 11933",509,2017-01-11,2023-01-01T00:00:00Z


###new_column Zip code- Trasnfromation

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, regexp_extract, trim

def add_zip_code_column(df: DataFrame, address_column: str) -> DataFrame:
    zip_code_pattern = r'(\d{5})$'
    df_with_zip_code = df.withColumn(
        "Zip_Code",
        regexp_extract(trim(col(address_column)), zip_code_pattern, 1)  # Extract ZIP code
    )
    return df_with_zip_code

# Example usage:
customer_df = spark.read.table("bronze.customers")
customer_df_with_zip_code = add_zip_code_column(customer_df, "address")

customer_df_with_zip_code.display()


customer_id,name,email,phone,address,credit_score,join_date,last_update,Zip_Code
C1000,Courtney Jones,courtney.jones@gmail.com,(210) 421-2602,"9345 Melissa Manors Suite 869, Ibarraview, PR 08116",525,2017-01-01,2023-02-25T00:00:00Z,8116
C1001,Joshua Edwards,joshua.edwards@yahoo.com,(618) 314-9154,"8886 Katelyn Stravenue Apt. 089, Randyview, VI 49164",528,2017-01-02,2023-10-04T00:00:00Z,49164
C1002,Brandon Hernandez,brandon.hernandez@aol.com,(881) 387-5395,"72170 Christensen Parkways Apt. 696, New Thomasfort, AK 76566",508,2017-01-02,2023-04-28T00:00:00Z,76566
C1003,Mary Salazar,mary.salazar@aol.com,(140) 931-3508,"Unit 2356 Box 0826, DPO AE 82138",637,2017-01-02,2023-05-23T00:00:00Z,82138
C1004,Thomas Gibson,thomas.gibson@outlook.com,(479) 874-5147,"1445 Smith Station, Port Joel, SD 89362",594,2017-01-03,2023-09-22T00:00:00Z,89362
C1005,Cody Alvarez,cody.alvarez@hotmail.com,(889) 404-4576,"480 Catherine Forge, Williamview, ND 08510",565,2017-01-03,2023-07-09T00:00:00Z,8510
C1006,Christopher Davidson,christopher.davidson@gmail.com,(309) 372-3043,"345 Mark Loop Apt. 530, Thomaston, OR 26035",793,2017-01-06,2023-04-24T00:00:00Z,26035
C1007,Stephanie Hobbs,stephanie.hobbs@hotmail.com,(154) 120-3921,"8177 Linda Dam, North David, LA 62211",555,2017-01-08,2023-05-09T00:00:00Z,62211
C1008,Victor Roth,victor.roth@aol.com,(851) 624-7767,"3751 Ortega Gateway, Dannyport, NH 90858",577,2017-01-10,2023-10-27T00:00:00Z,90858
C1009,Emma Lewis,emma.lewis@hotmail.com,(415) 309-9045,"5112 Kenneth Cove Apt. 963, Lake Gary, AL 11933",509,2017-01-11,2023-01-01T00:00:00Z,11933


## transformation on phone column and created new column ZIP code

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import udf, col, trim, regexp_extract
from pyspark.sql.types import StringType

def format_phone_number(phone_number: str) -> str:
    """Format phone number to the format +1XXXXXXXXXX."""
    phone_number = phone_number.replace('(', '').replace(')', '').replace('-', '').replace(' ', '')
    if len(phone_number) == 10:
        return f"{phone_number}"
    return phone_number  

def add_zip_code_column(df: DataFrame, address_column: str) -> DataFrame:
    """Add a ZIP code column extracted from the address column."""
    zip_code_pattern = r'(\d{5})$'
    df_with_zip_code = df.withColumn(
        "Zip_Code",
        regexp_extract(trim(col(address_column)), zip_code_pattern, 1)  # Extract ZIP code
    )
    return df_with_zip_code

def standardize_phone_numbers(df: DataFrame, column_name: str, df_name: str) -> DataFrame:
    """Standardize phone numbers and add a ZIP code column."""
    format_phone_number_udf = udf(format_phone_number, StringType())
    standardized_df = df.withColumn(
        column_name,
        format_phone_number_udf(trim(col(column_name)))  
    )
    standardized_df_with_zip_code = add_zip_code_column(standardized_df, "address")
    
    print(f"Phone numbers in '{column_name}' column of {df_name} have been standardized and ZIP codes added.")
    return standardized_df_with_zip_code
customer_df = spark.read.table("bronze.customers")
customer_df_standardized = standardize_phone_numbers(customer_df, "phone", "customer_df")
customer_df_standardized.display()


Phone numbers in 'phone' column of customer_df have been standardized and ZIP codes added.


customer_id,name,email,phone,address,credit_score,join_date,last_update,Zip_Code
C1000,Courtney Jones,courtney.jones@gmail.com,2104212602,"9345 Melissa Manors Suite 869, Ibarraview, PR 08116",525,2017-01-01,2023-02-25T00:00:00Z,8116
C1001,Joshua Edwards,joshua.edwards@yahoo.com,6183149154,"8886 Katelyn Stravenue Apt. 089, Randyview, VI 49164",528,2017-01-02,2023-10-04T00:00:00Z,49164
C1002,Brandon Hernandez,brandon.hernandez@aol.com,8813875395,"72170 Christensen Parkways Apt. 696, New Thomasfort, AK 76566",508,2017-01-02,2023-04-28T00:00:00Z,76566
C1003,Mary Salazar,mary.salazar@aol.com,1409313508,"Unit 2356 Box 0826, DPO AE 82138",637,2017-01-02,2023-05-23T00:00:00Z,82138
C1004,Thomas Gibson,thomas.gibson@outlook.com,4798745147,"1445 Smith Station, Port Joel, SD 89362",594,2017-01-03,2023-09-22T00:00:00Z,89362
C1005,Cody Alvarez,cody.alvarez@hotmail.com,8894044576,"480 Catherine Forge, Williamview, ND 08510",565,2017-01-03,2023-07-09T00:00:00Z,8510
C1006,Christopher Davidson,christopher.davidson@gmail.com,3093723043,"345 Mark Loop Apt. 530, Thomaston, OR 26035",793,2017-01-06,2023-04-24T00:00:00Z,26035
C1007,Stephanie Hobbs,stephanie.hobbs@hotmail.com,1541203921,"8177 Linda Dam, North David, LA 62211",555,2017-01-08,2023-05-09T00:00:00Z,62211
C1008,Victor Roth,victor.roth@aol.com,8516247767,"3751 Ortega Gateway, Dannyport, NH 90858",577,2017-01-10,2023-10-27T00:00:00Z,90858
C1009,Emma Lewis,emma.lewis@hotmail.com,4153099045,"5112 Kenneth Cove Apt. 963, Lake Gary, AL 11933",509,2017-01-11,2023-01-01T00:00:00Z,11933


In [0]:
customer_df_standardized.write.format("delta").mode("overwrite").saveAsTable("hive_metastore.silver.cleaned_customer")

## convert data in uppercase- Trasnformation

In [0]:
from pyspark.sql.functions import upper

transaction_df_cleaned=transaction_df_cleaned.withColumn('channel', upper(col('channel'))) \
                               .withColumn('transaction_type', upper(col('transaction_type')))


In [0]:
transaction_df_cleaned.display()

transaction_id,customer_id,branch_id,channel,transaction_type,amount,currency,timestamp,status
T6050,C1917,B0004,WEB,TRANSFER,43.51,GBP,2023-01-22T07:39:00Z,completed
T6051,C1988,B0009,MOBILE,PAYMENT,8.08,EUR,2023-01-22T08:32:00Z,completed
T6052,C1186,B0007,MOBILE,DEPOSIT,14.01,USD,2023-01-22T09:04:00Z,completed
T6053,C1098,B0007,BRANCH,DEPOSIT,67.58,EUR,2023-01-22T09:24:00Z,completed
T6054,C1804,B0008,ATM,DEPOSIT,24.75,USD,2023-01-22T09:45:00Z,completed
T6055,C1546,B0003,WEB,PAYMENT,28.34,EUR,2023-01-22T10:27:00Z,completed
T6056,C1416,B0009,BRANCH,PAYMENT,35.45,GBP,2023-01-22T11:01:00Z,pending
T6057,C1859,B0008,MOBILE,TRANSFER,25.13,GBP,2023-01-22T11:54:00Z,completed
T6058,C1388,B0009,BRANCH,DEPOSIT,8.45,USD,2023-01-22T12:16:00Z,completed
T6059,C1332,B0000,MOBILE,DEPOSIT,52.64,GBP,2023-01-22T13:11:00Z,completed


### Transformations on amount and currency

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
conversion_rates = {
    'GBP': 1.30,  
    'EUR': 1.10,  
    'USD': 1.00   
}
def convert_to_usd(amount, currency):
    return amount * conversion_rates.get(currency, 1)

convert_to_usd_udf = F.udf(convert_to_usd, FloatType())
transaction_df_cleaned = transaction_df_cleaned.withColumn("amount", convert_to_usd_udf(F.col("amount"), F.col("currency")))
transaction_df_cleaned = transaction_df_cleaned.withColumn("currency", F.lit("USD"))
transaction_df_cleaned.display()




transaction_id,customer_id,branch_id,channel,transaction_type,amount,currency,timestamp,status
T6050,C1917,B0004,WEB,TRANSFER,56.563,USD,2023-01-22T07:39:00Z,completed
T6051,C1988,B0009,MOBILE,PAYMENT,8.888,USD,2023-01-22T08:32:00Z,completed
T6052,C1186,B0007,MOBILE,DEPOSIT,14.01,USD,2023-01-22T09:04:00Z,completed
T6053,C1098,B0007,BRANCH,DEPOSIT,74.338,USD,2023-01-22T09:24:00Z,completed
T6054,C1804,B0008,ATM,DEPOSIT,24.75,USD,2023-01-22T09:45:00Z,completed
T6055,C1546,B0003,WEB,PAYMENT,31.174,USD,2023-01-22T10:27:00Z,completed
T6056,C1416,B0009,BRANCH,PAYMENT,46.085,USD,2023-01-22T11:01:00Z,pending
T6057,C1859,B0008,MOBILE,TRANSFER,32.669,USD,2023-01-22T11:54:00Z,completed
T6058,C1388,B0009,BRANCH,DEPOSIT,8.45,USD,2023-01-22T12:16:00Z,completed
T6059,C1332,B0000,MOBILE,DEPOSIT,68.432,USD,2023-01-22T13:11:00Z,completed


In [0]:
transaction_df_cleaned.write.format("delta").mode("overwrite").saveAsTable("hive_metastore.silver.cleaned_transaction")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-529542004852558>, line 1[0m
[0;32m----> 1[0m [43mcustomer_df_standardized[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mdelta[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43mmode[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43moverwrite[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43msaveAsTable[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mhive_metastore.silver.cleaned_customer[39;49m[38;5;124;43m"[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46

In [0]:

transaction_df_cleaned = transaction_df.drop('_rescued_data')

In [0]:
joined_df = customer_df_standardized.join(transaction_df_cleaned, on='customer_id', how='inner')

In [0]:
joined_df.display()

customer_id,name,email,phone,address,credit_score,join_date,last_update,Zip_Code,transaction_id,branch_id,channel,transaction_type,amount,currency,timestamp,status
C1917,Kevin Freeman,kevin.freeman@aol.com,1291829143,"3010 Bryan Hill, South Kimberly, MP 60899",529,2022-06-13,2023-09-29T00:00:00Z,60899,T6050,B0004,web,transfer,43.51,GBP,2023-01-22T07:39:00Z,completed
C1988,Jamie Anderson,jamie.anderson@gmail.com,6766655994,"53503 Mathew Cliff Apt. 784, Bryantmouth, CO 87048",803,2022-12-01,2023-04-03T00:00:00Z,87048,T6051,B0009,mobile,payment,8.08,EUR,2023-01-22T08:32:00Z,completed
C1186,Thomas Glover,thomas.glover@hotmail.com,6056785178,"56058 Christina Ferry Apt. 169, New Williammouth, DE 80066",655,2018-01-03,2023-11-18T00:00:00Z,80066,T6052,B0007,mobile,deposit,14.01,USD,2023-01-22T09:04:00Z,completed
C1098,Daisy Clark,daisy.clark@yahoo.com,9659071171,"0332 Andrea Overpass Apt. 089, Jeffreyborough, ME 22589",560,2017-07-16,2023-02-24T00:00:00Z,22589,T6053,B0007,branch,deposit,67.58,EUR,2023-01-22T09:24:00Z,completed
C1804,Christopher Kelly,christopher.kelly@outlook.com,3016315795,"6273 Rodriguez Turnpike Suite 390, West Cynthiafort, NC 47663",647,2021-08-30,2023-03-19T00:00:00Z,47663,T6054,B0008,ATM,deposit,24.75,USD,2023-01-22T09:45:00Z,completed
C1546,Charlene Bowen,charlene.bowen@gmail.com,5418700380,"215 Williamson Crossroad Suite 238, South Johnfurt, SC 86151",682,2020-04-07,2023-11-16T00:00:00Z,86151,T6055,B0003,web,payment,28.34,EUR,2023-01-22T10:27:00Z,completed
C1416,Gary Marshall,gary.marshall@hotmail.com,5956031304,"150 Glenn Keys, South Michele, WY 56689",624,2019-06-08,2023-03-26T00:00:00Z,56689,T6056,B0009,branch,payment,35.45,GBP,2023-01-22T11:01:00Z,pending
C1859,Jerry Peterson,jerry.peterson@yahoo.com,8786120407,"USNS Miller, FPO AP 02840",554,2022-01-27,2023-05-08T00:00:00Z,2840,T6057,B0008,mobile,transfer,25.13,GBP,2023-01-22T11:54:00Z,completed
C1388,Kyle Mccall,kyle.mccall@aol.com,4379357391,"70744 Katelyn Throughway, Kaitlynborough, UT 42811",511,2019-04-09,2023-04-11T00:00:00Z,42811,T6058,B0009,branch,deposit,8.45,USD,2023-01-22T12:16:00Z,completed
C1332,Haley Reeves PhD,haley.phd@outlook.com,4887917988,"605 Taylor Estate Apt. 750, Maryview, MS 51164",793,2018-11-06,2023-07-31T00:00:00Z,51164,T6059,B0000,mobile,deposit,52.64,GBP,2023-01-22T13:11:00Z,completed


In [0]:
final_df=joined_df.join(branch_df,on="branch_id",how="inner")

In [0]:
final_df.display()

branch_id,customer_id,name,email,phone,address,credit_score,join_date,last_update,Zip_Code,transaction_id,channel,transaction_type,amount,currency,timestamp,status,branch_name,location,timezone
B0004,C1917,Kevin Freeman,kevin.freeman@aol.com,1291829143,"3010 Bryan Hill, South Kimberly, MP 60899",529,2022-06-13,2023-09-29T00:00:00Z,60899,T6050,web,transfer,43.51,GBP,2023-01-22T07:39:00Z,completed,West Branch,Port Joseph,AEST
B0009,C1988,Jamie Anderson,jamie.anderson@gmail.com,6766655994,"53503 Mathew Cliff Apt. 784, Bryantmouth, CO 87048",803,2022-12-01,2023-04-03T00:00:00Z,87048,T6051,mobile,payment,8.08,EUR,2023-01-22T08:32:00Z,completed,North Branch,Port Henryshire,PST
B0007,C1186,Thomas Glover,thomas.glover@hotmail.com,6056785178,"56058 Christina Ferry Apt. 169, New Williammouth, DE 80066",655,2018-01-03,2023-11-18T00:00:00Z,80066,T6052,mobile,deposit,14.01,USD,2023-01-22T09:04:00Z,completed,West Branch,Catherinechester,PST
B0007,C1098,Daisy Clark,daisy.clark@yahoo.com,9659071171,"0332 Andrea Overpass Apt. 089, Jeffreyborough, ME 22589",560,2017-07-16,2023-02-24T00:00:00Z,22589,T6053,branch,deposit,67.58,EUR,2023-01-22T09:24:00Z,completed,West Branch,Catherinechester,PST
B0008,C1804,Christopher Kelly,christopher.kelly@outlook.com,3016315795,"6273 Rodriguez Turnpike Suite 390, West Cynthiafort, NC 47663",647,2021-08-30,2023-03-19T00:00:00Z,47663,T6054,ATM,deposit,24.75,USD,2023-01-22T09:45:00Z,completed,West Branch,Brownfort,GMT
B0003,C1546,Charlene Bowen,charlene.bowen@gmail.com,5418700380,"215 Williamson Crossroad Suite 238, South Johnfurt, SC 86151",682,2020-04-07,2023-11-16T00:00:00Z,86151,T6055,web,payment,28.34,EUR,2023-01-22T10:27:00Z,completed,East Branch,Andersonmouth,EST
B0009,C1416,Gary Marshall,gary.marshall@hotmail.com,5956031304,"150 Glenn Keys, South Michele, WY 56689",624,2019-06-08,2023-03-26T00:00:00Z,56689,T6056,branch,payment,35.45,GBP,2023-01-22T11:01:00Z,pending,North Branch,Port Henryshire,PST
B0008,C1859,Jerry Peterson,jerry.peterson@yahoo.com,8786120407,"USNS Miller, FPO AP 02840",554,2022-01-27,2023-05-08T00:00:00Z,2840,T6057,mobile,transfer,25.13,GBP,2023-01-22T11:54:00Z,completed,West Branch,Brownfort,GMT
B0009,C1388,Kyle Mccall,kyle.mccall@aol.com,4379357391,"70744 Katelyn Throughway, Kaitlynborough, UT 42811",511,2019-04-09,2023-04-11T00:00:00Z,42811,T6058,branch,deposit,8.45,USD,2023-01-22T12:16:00Z,completed,North Branch,Port Henryshire,PST
B0000,C1332,Haley Reeves PhD,haley.phd@outlook.com,4887917988,"605 Taylor Estate Apt. 750, Maryview, MS 51164",793,2018-11-06,2023-07-31T00:00:00Z,51164,T6059,mobile,deposit,52.64,GBP,2023-01-22T13:11:00Z,completed,North Branch,Michaelshire,PST


### Branch_Table

In [0]:
branch_df.display()

branch_id,branch_name,location,timezone
B0000,North Branch,Michaelshire,PST
B0001,East Branch,Trantown,GMT
B0002,Central Branch,Burtonburgh,EST
B0003,East Branch,Andersonmouth,EST
B0004,West Branch,Port Joseph,AEST
B0005,Downtown Branch,West Nathanland,GMT
B0006,West Branch,New Jameschester,GMT
B0007,West Branch,Catherinechester,PST
B0008,West Branch,Brownfort,GMT
B0009,North Branch,Port Henryshire,PST


In [0]:
branch_df.write.format("delta").mode("overwrite").saveAsTable("hive_metastore.silver.cleaned_branch")

## Fraud_Flags_table

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, FloatType, StructType, StructField


transaction_counts = joined_df.groupBy("customer_id").agg(F.count("transaction_id").alias("transaction_count"))
joined_with_counts_df = joined_df.join(transaction_counts, on="customer_id", how="left")

def classify_transaction(amount, zipcode, transaction_count):
    results = []
    if amount > 10000:
        results.append(("unusual_amount", 0.90))
    if amount < 5:
        results.append(("low_value_transaction", 0.60))
    if zipcode.startswith('1') or zipcode.startswith('0'):
        results.append(("new_geolocation", 0.80))
    if transaction_count > 10:
        results.append(("watchlist_match", 0.95))
    return results

classify_transaction_udf = F.udf(classify_transaction, ArrayType(StructType([
    StructField("flag_type", StringType(), True),
    StructField("confidence_score", FloatType(), True)
])))
classified_df = joined_with_counts_df.withColumn(
    "flags", F.explode(classify_transaction_udf(F.col("amount"), F.col("Zip_Code"), F.col("transaction_count")))
)

fraud_flag_df = classified_df.select(
    F.monotonically_increasing_id().alias("flag_id"),
    F.col("transaction_id"),
    F.col("flags.flag_type").alias("flag_type"),
    F.round(F.col("flags.confidence_score"), 2).alias("confidence_score"),
    F.col("timestamp")
)

fraud_flag_df = fraud_flag_df.withColumn("flag_id", F.concat(F.lit("F00"), F.col("flag_id").cast("string")))
fraud_flag_df.display(truncate=False)



flag_id,transaction_id,flag_type,confidence_score,timestamp
F000,T6112,new_geolocation,0.8,2023-01-23T15:18:00Z
F001,T5033,low_value_transaction,0.6,2023-01-01T16:45:00Z
F002,T5318,new_geolocation,0.8,2023-01-07T16:31:00Z
F003,T5419,new_geolocation,0.8,2023-01-09T19:47:00Z
F004,T5589,low_value_transaction,0.6,2023-01-13T02:13:00Z
F005,T6467,low_value_transaction,0.6,2023-01-31T01:31:00Z
F006,T6299,low_value_transaction,0.6,2023-01-27T16:04:00Z
F007,T6102,unusual_amount,0.9,2023-01-23T10:30:00Z
F008,T5712,new_geolocation,0.8,2023-01-15T10:02:00Z
F009,T6211,new_geolocation,0.8,2023-01-25T16:08:00Z


In [0]:
flag_type_counts = fraud_flag_df.groupBy("flag_type").count()
flag_type_counts.show()


+--------------------+-----+
|           flag_type|count|
+--------------------+-----+
|low_value_transac...|  274|
|     new_geolocation|  561|
|      unusual_amount|  144|
|     watchlist_match|   11|
+--------------------+-----+



In [0]:
%sql
CREATE SCHEMA silver;

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-529542004852558>, line 1[0m
[0;32m----> 1[0m [43mcustomer_df_standardized[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mdelta[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43mmode[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43moverwrite[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43msaveAsTable[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mhive_metastore.silver.cleaned_customer[39;49m[38;5;124;43m"[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46

In [0]:
%sql
use schema silver;

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-529542004852558>, line 1[0m
[0;32m----> 1[0m [43mcustomer_df_standardized[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mdelta[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43mmode[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43moverwrite[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43msaveAsTable[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mhive_metastore.silver.cleaned_customer[39;49m[38;5;124;43m"[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46

In [0]:
fraud_flag_df.write.format("delta").mode("overwrite").saveAsTable("hive_metastore.silver.Fraud_Flag")

In [0]:
%sql

DROP TABLE IF EXISTS silver.Fraud_Flag;

### Customer_segments_Table

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window
from datetime import datetime, timedelta
import pandas as pd

current_date = pd.Timestamp("2023-01-01")
high_value_threshold = 100000  
high_value_customers = final_df.groupBy("customer_id").agg(F.sum("amount").alias("total_amount"))
high_value_customers = high_value_customers.filter(F.col("total_amount") > high_value_threshold).select(
    "customer_id"
).withColumn("segment_name", F.lit("High_Value")).withColumn(
    "segment_description", F.lit("Customers with high transaction volume")
).withColumn(
    "last_update", F.lit(current_date)
)

# New_User: Customers who joined in the last 30 days
new_user_customers = final_df.filter(
    F.col("join_date") > current_date - timedelta(days=30)
).select("customer_id").withColumn(
    "segment_name", F.lit("New_User")
).withColumn(
    "segment_description", F.lit("Customers who joined in last 30 days")
).withColumn(
    "last_update", F.lit(current_date)
)

# Inactive: Customers with no transactions in last 90 days
recent_transactions = final_df.filter(
    F.col("timestamp") > current_date - timedelta(days=90)
).select("customer_id").distinct()
inactive_customers = final_df.join(recent_transactions, on="customer_id", how="left_anti").select(
    "customer_id"
).withColumn(
    "segment_name", F.lit("Inactive")
).withColumn(
    "segment_description", F.lit("No transactions in last 90 days")
).withColumn(
    "last_update", F.lit(current_date)
)

# Credit_Risk: Customers with low credit scores (assuming credit_score < 600)
credit_risk_customers = final_df.filter(F.col("credit_score") < 600).select(
    "customer_id"
).withColumn(
    "segment_name", F.lit("Credit_Risk")
).withColumn(
    "segment_description", F.lit("Customers with low credit scores")
).withColumn(
    "last_update", F.lit(current_date)
)

# Loyal: Customers with consistent activity for over 5 years
loyal_customers = final_df.filter(
    F.col("join_date") < current_date - timedelta(days=5*365)
).select("customer_id").withColumn(
    "segment_name", F.lit("Loyal")
).withColumn(
    "segment_description", F.lit("Consistent activity for over 5 years")
).withColumn(
    "last_update", F.lit(current_date)
)

# Combine all segments
customer_segmentation_df = high_value_customers.union(new_user_customers).union(inactive_customers).union(credit_risk_customers).union(loyal_customers)

# Add segment_id
window_spec = Window.orderBy("customer_id")
customer_segmentation_df = customer_segmentation_df.withColumn("segment_id", F.concat(F.lit("S00"), F.row_number().over(window_spec).cast("string")))


# Show the result
customer_segmentation_df = customer_segmentation_df.select("segment_id", "customer_id", "segment_name", "segment_description", "last_update")

customer_segmentation_df.display()


segment_id,customer_id,segment_name,segment_description,last_update
S001,C1000,Credit_Risk,Customers with low credit scores,2023-01-01T00:00:00Z
S002,C1000,Credit_Risk,Customers with low credit scores,2023-01-01T00:00:00Z
S003,C1000,Credit_Risk,Customers with low credit scores,2023-01-01T00:00:00Z
S004,C1000,Loyal,Consistent activity for over 5 years,2023-01-01T00:00:00Z
S005,C1000,Loyal,Consistent activity for over 5 years,2023-01-01T00:00:00Z
S006,C1000,Loyal,Consistent activity for over 5 years,2023-01-01T00:00:00Z
S007,C1001,Credit_Risk,Customers with low credit scores,2023-01-01T00:00:00Z
S008,C1001,Loyal,Consistent activity for over 5 years,2023-01-01T00:00:00Z
S009,C1002,Credit_Risk,Customers with low credit scores,2023-01-01T00:00:00Z
S0010,C1002,Credit_Risk,Customers with low credit scores,2023-01-01T00:00:00Z


In [0]:
customer_segmentation_df.write.format("delta").mode("overwrite").saveAsTable("hive_metastore.silver.Customer_segments")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-529542004852558>, line 1[0m
[0;32m----> 1[0m [43mcustomer_df_standardized[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mdelta[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43mmode[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43moverwrite[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43msaveAsTable[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mhive_metastore.silver.cleaned_customer[39;49m[38;5;124;43m"[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46

In [0]:
%sql
SELECT DISTINCT segment_name FROM hive_metastore.silver.Customer_segments;


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-529542004852558>, line 1[0m
[0;32m----> 1[0m [43mcustomer_df_standardized[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mdelta[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43mmode[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43moverwrite[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43msaveAsTable[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mhive_metastore.silver.cleaned_customer[39;49m[38;5;124;43m"[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46

In [0]:
%sql
SELECT segment_name, COUNT(customer_id) AS total_count
FROM hive_metastore.silver.Customer_segments
GROUP BY segment_name;



segment_name,total_count
High_Value,13
Loyal,581
Credit_Risk,933
New_User,26


In [0]:
%sql
SELECT DISTINCT flag_type FROM hive_metastore.silver.Fraud_Flag;

flag_type
low_value_transaction
new_geolocation
unusual_amount
watchlist_match


In [0]:
%sql
SELECT flag_type, COUNT(flag_id) AS total_count
FROM hive_metastore.silver.Fraud_Flag
GROUP BY flag_type;


flag_type,total_count
low_value_transaction,274
new_geolocation,561
unusual_amount,3000
watchlist_match,727
