In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.streaming import *
import json
import requests
import time
from urllib.parse import quote
import pandas as pd
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from pyspark.sql.functions import expr

In [None]:
# Define schema for Customers table
customer_schema =StructType([
    StructField("customer_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("address", StringType(), True),
    StructField("credit_score", IntegerType(), True),
    StructField("join_date", DateType(), True),
    StructField("last_update", TimestampType(), True),
    StructField("customer_type", StringType(), True)
]) 


In [None]:
# Define schema for Branch table
branches_schema = StructType([
    StructField("branch_id", StringType(), True),
    StructField("branch_name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("timezone", StringType(), True),
    StructField("currency", StringType(), True)
]) 


In [None]:
# Define schema for transaction data
transactions_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("branch_id", StringType(), True),
    StructField("channel", StringType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("currency", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("status", StringType(), True)
])

In [None]:
## configuration for bronze file

config = {
  "adls_connection" :{
    "storage_account" : "mavericstoragecapstone",
    "container_name" : "global-bank",
    "access_key" : " NOT_allowed_on_GIThub ",
    "storage_account_conf_key" : "fs.azure.account.key.mavericstoragecapstone.dfs.core.windows.net",
    "ingestion_location" : "abfss://global-bank@mavericstoragecapstone.dfs.core.windows.net/"
  },
  "paths":{
    "customers" : "/Transactions/csv/Customers.csv",
    "branch" : "/Transactions/csv/Branches.csv",
    "transactions" : "/Transactions/Transactions/"
  },

    "cloudFiles": {
        "format": "csv",
        "header": "true",
        "timestampFormat": "yyyy-MM-dd",
        "schemaLocation": "dbfs:/FileStore/Streaming_Schema/Bronze/Transactions",
        "inferColumnTypes": "true"
    },
    "delta": {
        "checkpointLocation": "dbfs:/FileStore/Checkpoints/Bronze/Transactions",
        "mergeSchema": "true",
        "outputMode": "append",
        "processingTime": "30 seconds",
        "table": "hive_metastore.gb_bronze_schema.transactions_Streaming"
    }
  


}

In [None]:
## Cleaning Funtions

# 1. Remove duplicates
def remove_duplicates(df: DataFrame, primary_keys: list) -> DataFrame:
    """
    Remove duplicate rows based on primary keys.
    """
    return df.dropDuplicates(primary_keys)

# 2. Handling Missing Values
def handle_missing_values(df: DataFrame, fill_values: dict) -> DataFrame:
    """
    Fill missing values with specified values.
    """
    df1 = df.na.fill(fill_values)
    return df1.dropna()

# 3. Validate Data Type using schema
def validate_data_types(df: DataFrame, schema: dict) -> DataFrame:
    """
    Ensure columns have the correct data types.
    """
    for col_name, col_type in schema.items():
        df = df.withColumn(col_name, col(col_name).cast(col_type))
    return df

# 4. Trimming White spaces
def trim_whitespaces(df: DataFrame) -> DataFrame:
    """
    Remove leading and trailing whitespaces from string columns.
    """
    string_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]
    for col_name in string_cols:
        df = df.withColumn(col_name, trim(col(col_name)))
    return df

# 5. Remove Invalid Values
def remove_invalid_values(df: DataFrame, invalid_values: dict) -> DataFrame:
    """
    Remove rows with invalid or unrealistic values.
    """
    for col_name, value_range in invalid_values.items():
        df = df.filter((col(col_name) >= value_range[0]) & (col(col_name) <= value_range[1]))
    return df

# 6. Enforce Uniqueness on the primary key columns
def enforce_uniqueness(df: DataFrame, unique_cols: list) -> DataFrame:
    """
    Enforce uniqueness constraints on key columns.
    """
    return df.dropDuplicates(unique_cols)

# 7. Capitalize string data to make data homogenous
def capitalize_string_data(df: DataFrame) -> DataFrame:
    """
    Ensure that string data such as names are properly capitalized.
    """
    string_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]
    for col_name in string_cols:
        df = df.withColumn(col_name, upper(col(col_name)))
    return df
    
# 8. Flag anomalies using conditions 
def flag_anomalies(df: DataFrame, anomaly_conditions: dict) -> DataFrame:
    """
    Flag rows with anomalies for further inspection.
    """
    for col_name, condition in anomaly_conditions.items():
        df = df.withColumn("anomaly_flag", when(condition, True).otherwise(col("anomaly_flag")))
    return df

In [None]:
# FN: Data Summery Functions

def data_summary(df: DataFrame):
    summary = {}

    # Count of total rows
    total_count = df.count()
    summary["total_count"] = total_count

    # Loop through each column and gather summary statistics
    for col_name in df.columns:
        summary[col_name] = {}

        # Count of non-null values
        non_null_count = df.filter(col(col_name).isNotNull()).count()
        summary[col_name]["non_null_count"] = non_null_count

        # Count of unique values
        unique_count = df.select(countDistinct(col(col_name)).alias("count")).collect()[0]["count"]
        summary[col_name]["unique_count"] = unique_count

        # If the column is numeric, get additional statistics
        if isinstance(df.schema[col_name].dataType, NumericType):
            numeric_summary = df.select(
                mean(col(col_name)).alias("mean"),
                stddev(col(col_name)).alias("stddev"),
                min(col(col_name)).alias("min"),
                max(col(col_name)).alias("max")
            ).collect()[0]
            summary[col_name].update({
                "mean": numeric_summary["mean"],
                "stddev": numeric_summary["stddev"],
                "min": numeric_summary["min"],
                "max": numeric_summary["max"]
            })

    # Print summary in a readable format
    print(f"Total Rows: {total_count}\n")
    for col_name, stats in summary.items():
        if col_name != "total_count":
            print("==o"*10)
            print(f"Column: {col_name}")
            print(f"  Non-Null Count: {stats['non_null_count']}")
            print(f"  Null count:  {total_count - (stats['non_null_count'])}")
            print(f"  Unique Count: {stats['unique_count']}")
            if "mean" in stats:
                print(f"  Mean: {stats['mean']}")
                print(f"  StdDev: {stats['stddev']}")
                print(f"  Min: {stats['min']}")
                print(f"  Max: {stats['max']}")
            print("")

In [None]:
def clean_duplicate_transactions(df: DataFrame, primary_keys: list, order_by_col: str) -> DataFrame:
    """
    Remove duplicate rows based on primary keys, keeping the last instance of duplicates.
    """
    # Define a window specification to partition by primary keys and order by order_by_col in descending order
    window_spec = Window.partitionBy(*primary_keys).orderBy(col(order_by_col).desc())

    # Add a row number column based on the window specification
    df_with_row_num = df.withColumn("row_num", row_number().over(window_spec))

    # Filter rows to keep only the last instance (row_num == 1) and drop the row_num column
    df_deduplicated = df_with_row_num.filter(col("row_num") == 1).drop("row_num")
    
    return df_deduplicated


In [None]:
# FN: Format Dates and Timestamps
def format_dates(df: DataFrame, date_columns: list = [], timestamp_columns: list = []) -> DataFrame:
    """
    Formats specified columns in a DataFrame as dates or timestamps.
    """
    for col_name in date_columns:
        if col_name in df.columns:
            try:
                df = df.withColumn(col_name, to_date(col(col_name)))
            except Exception as e:
                print(f"Error formatting column {col_name} to date: {e}")
        else:
            print(f"Column {col_name} does not exist in DataFrame.")

    for col_name in timestamp_columns:
        if col_name in df.columns:
            try:
                df = df.withColumn(col_name, to_timestamp(col(col_name)))
                df = df.withColumn(col_name, date_format(col(col_name), "yyyy-MM-dd HH:mm:ss"))
            except Exception as e:
                print(f"Error formatting column {col_name} to timestamp: {e}")
        else:
            print(f"Column {col_name} does not exist in DataFrame.")
    
    return df


In [None]:
# FN: Extracting dates functions

def extract_date_parts(df: DataFrame, date_column: str, Year: bool = True, Month: bool = False, Day: bool = False) -> DataFrame:
    """
    Extracts parts of a date from a specified column in a DataFrame and adds them as new columns.
    """
    if date_column not in df.columns:
        raise ValueError(f"Column {date_column} does not exist in DataFrame.")
    
    try:
        if Year:
            df = df.withColumn("year", year(col(date_column)))
    except Exception as e:
        print(f"Error extracting year from column {date_column}: {e}")
    
    try:
        if Month:
            df = df.withColumn("month", month(col(date_column)))
    except Exception as e:
        print(f"Error extracting month from column {date_column}: {e}")
    
    try:
        if Day:
            df = df.withColumn("day", dayofmonth(col(date_column)))
    except Exception as e:
        print(f"Error extracting day from column {date_column}: {e}")
    
    return df


In [None]:
# FN: streaming_data_summary

def streaming_data_summary(df: DataFrame):
    try:
        # Check if DataFrame is streaming
        if not df.isStreaming:
            raise ValueError("The provided DataFrame is not a streaming DataFrame.")

        # Count of total rows (processed in streaming batches)
        total_count_query = df.groupBy().agg(count("*").alias("total_count")) \
            .writeStream \
            .outputMode("complete") \
            .format("console") \
            .start()

        for col_name in df.columns:
            # Calculate non-null count and unique count for each column
            col_summary_query = df.groupBy().agg(
                count(col(col_name)).alias(f"{col_name}_non_null_count"),
                approx_count_distinct(col(col_name)).alias(f"{col_name}_unique_count")
            ).writeStream \
             .outputMode("complete") \
             .format("console") \
             .start()

            col_summary_query.awaitTermination()

            # If the column is numeric, calculate additional statistics
            if isinstance(df.schema[col_name].dataType, NumericType):
                numeric_summary_query = df.groupBy().agg(
                    mean(col(col_name)).alias(f"{col_name}_mean"),
                    stddev(col(col_name)).alias(f"{col_name}_stddev"),
                    min(col(col_name)).alias(f"{col_name}_min"),
                    max(col(col_name)).alias(f"{col_name}_max")
                ).writeStream \
                 .outputMode("complete") \
                 .format("console") \
                 .start()

                numeric_summary_query.awaitTermination()

        total_count_query.awaitTermination()

    except StreamingQueryException as e:
        print(f"Error in streaming query: {e}")
    except ValueError as e:
        print(f"ValueError: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

In [None]:
# FN: Transformation Functions for Customers dataframe

# 1. Formating phone number
def format_phone_numbers(df: DataFrame, phone_column: str) -> DataFrame:
    """
    Converts a phone number column to a numerical column by removing non-numeric characters.
    """
    # Remove non-numeric characters and cast to LongType
    df = df.withColumn(phone_column, regexp_replace(col(phone_column), "[^0-9]", "").cast(LongType()))
    
    return df

# 2. Domain extraction
def domain_extract(df : DataFrame, col_name: str) -> DataFrame:
    """
    Extracts the domain from the email address column and adds it as a new column.
    """
    df = df.withColumn("domain", split(col(col_name), "@")[1])
    return df


# 3. Postal code extraction
def extract_pincode(df: DataFrame, address_column: str, pincode_column: str) -> DataFrame:
    """
    Extracts the pincode (postal code) from the end of an address column and adds it as a new column.
    """
    # Extract the pincode (digits at the end of the address string)
    df = df.withColumn(pincode_column, regexp_extract(col(address_column), r'(\d+)$', 1).cast(IntegerType()))
    
    return df


In [None]:
# FN: Conversion Rate function
def fetch_conversion_rates(base_currency='USD'):
    """
    Fetches conversion rates for a given base currency from ExchangeRate-API.
    """
    url = f'https://api.exchangerate-api.com/v4/latest/{base_currency}'
    
    try:
        response = requests.get(url)
        response.raise_for_status()  # Raise an exception for HTTP errors
        data = response.json()
        rates = data.get('rates', {})
        return rates
    except requests.RequestException as e:
        print(f"Error fetching conversion rates: {e}")
        return {}


In [None]:
# FN: Converting all amount to USD
def convert_currency_to_usd(df: DataFrame) -> DataFrame:
    """
    Converts currency amounts in a DataFrame to USD.
    """

    # conversion Rates
    conversion_rates = fetch_conversion_rates()
    
    # Broadcast the conversion rates dictionary
    broadcasted_conversion_rates = df.sql_ctx.sparkSession.sparkContext.broadcast(conversion_rates)

    # Define the UDF for currency conversion
    def convert_currency(amount, currency):
        if amount is None:
            return None  # Return None if amount is None
        rate = broadcasted_conversion_rates.value.get(currency, 1)  # Default to 1 if currency not found
        return amount / rate

    # Register the UDF
    convert_currency_udf = udf(convert_currency, DoubleType())

    # Apply the UDF to create the 'Amount_USD' column
    transformed_df = df.withColumn("Amount_USD", convert_currency_udf(col("amount"), col("currency")))

    return transformed_df


In [None]:
# FN: Getting Coordinate for locations
def get_coordinates(location: str) -> list:
    """
    Fetches coordinates for a given location using the Nominatim API.
    """
    time.sleep(2)
    # handling spaces in the location names
    if " " in location:
        location = location.replace(" ","_")

    encoded_location = quote(location)  # URL encode the location
    url = f'https://nominatim.openstreetmap.org/search?q={encoded_location}&format=json&addressdetails=1'
    max_retries = 3
    
    for attempt in range(max_retries):
        try:
            response = requests.get(url)
            response.raise_for_status()  # Raise an HTTPError for bad responses (4xx and 5xx)
            data = response.json()
            if data:
                lat = data[0]['lat']
                lng = data[0]['lon']
                return [float(lat), float(lng)]
            else:
                return [None, None]
        except requests.RequestException as e:
            print(f"Request failed: {e}")
            # Exponential backoff
            time.sleep(2 ** attempt+1)  
    
    return [None, None]


In [None]:
# FN: Joining all tables
def join_customer_branch_transaction(customers_df, branches_df, transactions_df):
    # Alias the DataFrames to handle columns with the same name
    customers_df = customers_df.alias("cust")
    branches_df = branches_df.alias("branch")
    transactions_df = transactions_df.alias("txn")

    # Perform inner join between transactions and customers on customer_id
    transactions_customers_df = transactions_df.join(customers_df, on="customer_id", how="inner")

    # Perform inner join between the above result and branches on branch_id
    full_join_df = transactions_customers_df.join(branches_df, on="branch_id", how="inner")

    # List of columns to drop from customers and branches (except the join keys)
    columns_to_drop = [f"cust.{col}" for col in customers_df.columns if col != "customer_id"] + \
                      [f"branch.{col}" for col in branches_df.columns if col != "branch_id"]

    # Drop the columns from the joined DataFrame
    full_join_df = full_join_df.drop(*columns_to_drop)

    return full_join_df



In [None]:
# FN: Aggregating customer data
def aggregate_customers(customers_df, transactions_df):
    # Join customers_df with transactions_df on customer_id
    joined_df = customers_df.join(transactions_df, "customer_id")

    # Customer Segmentation
    customer_segmentation = joined_df.groupBy("customer_type") \
        .agg(
            F.countDistinct("customer_id").alias("total_customers"),
            F.avg("amount").alias("avg_transaction_amount"),
            F.sum("amount").alias("total_spend")
        )

    # Find the latest transaction timestamp
    last_transaction_date = transactions_df.select(F.max("timestamp")).collect()[0][0]

    # Define a window to calculate the most recent transaction date for each customer
    window_spec = Window.partitionBy("customer_id")

    # Add a column with the most recent transaction date for each customer
    transactions_df = transactions_df.withColumn(
        "last_transaction_date", F.max("timestamp").over(window_spec)
    )

    # Identify active customers: transactions within the last 180 days
    active_customers_df = transactions_df.filter(
        F.datediff(F.lit(last_transaction_date), "last_transaction_date") <= 180
    ).select("customer_id").distinct()

    # Identify churned customers: no transactions in the last 365 days
    churned_customers_df = transactions_df.filter(
        F.datediff(F.lit(last_transaction_date), "last_transaction_date") > 180
    ).select("customer_id").distinct()

    # Get the counts
    active_customers = active_customers_df.count()
    churned_customers = churned_customers_df.count()

    return customer_segmentation, active_customers, churned_customers


In [None]:
# FN: Branch Performance and Analysis
def Currency_Analysis(transactions_df):
      # Currency Analysis
    currency_analysis = transactions_df.groupBy("currency") \
        .agg(
            F.sum("Amount_USD").alias("total_amount_by_currency"),
            F.avg("Amount_USD").alias("avg_amount_by_currency")
        )

    return currency_analysis
