In [None]:
# Consolidate: get_valid_state() and get_transaction_type()
# json_to_psql.ipynb: check datatype of TIMEID

In [3]:
from pyspark.sql import SparkSession, DataFrame

from pyspark.sql.functions import count, sum, col, when

from typing import Union

In [4]:
spark = SparkSession.builder \
    .appName('Query MySQL database: creditcard_capstone') \
    .getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # to see the entire contents of the DataFrame in the console output.


In [5]:
mysql_database_name = 'creditcard_capstone'
mysql_driver = 'com.mysql.jdbc.Driver'
mysql_url = f'jdbc:mysql://localhost:3306/{mysql_database_name}'
mysql_properties = {
                'user': 'root',
                'password': 'password'
}

In [6]:
customer_df = spark.read.jdbc(url=mysql_url, table='customer', properties=mysql_properties)
credit_df = spark.read.jdbc(url=mysql_url, table='credit', properties=mysql_properties)
branch_df = spark.read.jdbc(url=mysql_url, table='branch', properties=mysql_properties)

---

## 2.1 Transaction Details Module

1)    Used to display the transactions made by customers living in a given zip code for a given month and year. Order by day in descending order.

In [7]:
def get_input_zip_month_year() -> tuple[str, int, int]:
    """
    Prompt the user to enter a zipcode, month, and year, and validate the inputs.

    Returns:
        A tuple containing the validated inputs: (zipcode, month, year)

    Raises:
        ValueError: If any of the inputs are invalid.

    """

    def validate_zipcode(zipcode: str) -> None:
        if len(zipcode) != 5 or not zipcode.isdigit():
            raise ValueError("Invalid zipcode. Please enter a 5-digit numeric value.")

    def validate_month(month: int) -> None:
        if not 1 <= month <= 12:
            raise ValueError("Invalid month. Please enter a numeric value between 1 and 12.")

    def validate_year(year: int) -> None:
        if len(str(year)) != 4 or not str(year).isdigit():
            raise ValueError("Invalid year. Please enter a 4-digit numeric value.")

    while True:
        try:
            zipcode = input("Zipcode: ")
            validate_zipcode(zipcode)
            break
        except ValueError as e:
            print(str(e))

    while True:
        try:
            month = int(input("Month: "))
            validate_month(month)
            break
        except ValueError as e:
            print(str(e))

    while True:
        try:
            year = int(input("Year: "))
            validate_year(year)
            break
        except ValueError as e:
            print(str(e))

    return zipcode, month, year

In [10]:
def transactions_by_zip_month_year() -> None:
    """
    Query credit data by joining with customer data and applying filters.

    Args:
        credit_df (DataFrame): The DataFrame containing the credit data.
        customer_df (DataFrame): The DataFrame containing the customer data.

    Returns:
        DataFrame: The resulting DataFrame after joining and applying filters.

    Raises:
        ValueError: If the required input parameters are not provided.

    """

    input_zip, input_month, input_year = get_input_zip_month_year()
    
    result_df = credit_df.join(customer_df, credit_df.CUST_SSN == customer_df.SSN, 'left') \
                         .where((customer_df.CUST_ZIP == input_zip) & (credit_df.MONTH == input_month) & (credit_df.YEAR == input_year)) \
                         .orderBy(credit_df.DAY)

    result_df.show()

---

2)    Used to display the number and total values of transactions for a given type.

In [11]:
def distinct_options(df: DataFrame, column_name: str) -> list[str]:
    """
    Get the distinct options from a specific column in the DataFrame.

    Args:
        df (DataFrame): The DataFrame containing the data.
        column_name (str): The name of the column to fetch distinct options from.

    Returns:
        list[str]: The list of distinct options from the specified column.

    """
    distinct_options = df.select(column_name).distinct().rdd.flatMap(lambda x: x).collect()
    return distinct_options


In [12]:
def get_transaction_type() -> str:
    """
    Prompt the user to enter a valid transaction type and validate the input.

    Returns:
        str: The validated transaction type entered by the user.

    Raises:
        ValueError: If the entered transaction type is not valid.

    """

    valid_transaction_types = distinct_options(df = credit_df, column_name = 'TRANSACTION_TYPE') 
    valid_options = ", ".join(valid_transaction_types)

    while True:
        transaction_type = input(f"Transaction Type ({valid_options}): ")
        if transaction_type in valid_transaction_types:
            break
        print(f"Invalid transaction type. Please enter a valid type from the given options: {valid_options}.")
    
    return transaction_type

In [13]:
def transaction_total_and_no_by_type() -> None:
    """
    Query transaction data based on the user-provided transaction type.

    Returns:
        None

    Raises:
        ValueError: If the input transaction type is not valid.

    """

    # get a valid input from the user
    transaction_type = get_transaction_type() 

    result_df = credit_df.filter(credit_df.TRANSACTION_TYPE == transaction_type) \
                             .select(count('TRANSACTION_ID').alias('Total no of transactions'), \
                                     sum('TRANSACTION_VALUE').alias('Sum of Transaction Values'))

    result_df.show()

---

3)    Used to display the total number and total values of transactions for branches in a given state.

In [14]:
def get_valid_state() -> str:
    """
    Prompt the user to enter a valid state and validate the input.

    Returns:
        str: The validated state entered by the user.

    Raises:
        ValueError: If the entered state is not valid.

    """

    valid_states = distinct_options(df = branch_df, column_name = 'BRANCH_STATE')
    valid_options = ", ".join(valid_states)

    while True:
        input_state = input(f"State ({valid_options}): ")
        if input_state in valid_states:
            break
        print(f"Invalid state. Please enter a valid state from the given options: {valid_options}.")
    
    return input_state

In [15]:
def transaction_total_and_no_by_branch_on_state ()-> None:
    """
    Process transaction data by querying based on user input of state.

    Args:
        credit_df (DataFrame): The DataFrame containing the credit data.
        customer_df (DataFrame): The DataFrame containing the customer data.

    """

    state = get_valid_state()

    result_df = credit_df.join(branch_df, credit_df.BRANCH_CODE == branch_df.BRANCH_CODE, 'left') \
                         .where(branch_df.BRANCH_STATE == 'MN') \
                         .groupBy(branch_df.BRANCH_CODE) \
                         .agg(count(credit_df.TRANSACTION_ID).alias('Transaction_Count'), sum(credit_df.TRANSACTION_VALUE).alias('Transaction_Sum'))

    result_df.show()

---

## 2.2 Customer Details Module

1) Used to check the existing account details of a customer.


In [16]:
def get_valid_column_name(filter_options: list, msg: str = "Filter Option") -> str:
    """
    Prompt the user to enter a valid filter option and validate the input.

    Args:
        filter_options (list): The list of available filter options.

    Returns:
        str: The validated filter option entered by the user.

    Raises:
        ValueError: If the entered filter option is not valid.

    """
    while True:
        filter_option = input(f"{msg}({', '.join(filter_options)}): ")
        if filter_option in filter_options:
            break
        print(f"Invalid filter option. Please enter a valid option from the given list: {', '.join(filter_options)}")

    return filter_option

In [17]:
def get_valid_value(msg: str = "Filter Value: ") -> Union[str, int]:
    """
    Prompt the user to enter a valid filter value and validate the input.

    Returns:
        Union[str, int]: The validated filter value entered by the user.

    Raises:
        ValueError: If the entered filter value is not valid.

    """
    input_value = input(msg)

    try:
        input_value = int(input_value)
    except ValueError:
        pass

    return input_value


In [18]:
def account_details():
    """
    Perform account details lookup based on user input.

    """
    valid_column_names = customer_df.columns

    filter_column = get_valid_column_name(valid_column_names)
    filter_value = get_valid_value()

    result_df = customer_df.filter(customer_df[filter_column] == filter_value)
    result_df.show()

2) Used to modify the existing account details of a customer.

In [19]:
def update_dataframe() -> DataFrame:
    """
    Update values in a PySpark DataFrame based on a filter condition.

    Returns:
        DataFrame: The updated PySpark DataFrame.

    """
    valid_column_names = customer_df.columns

    filter_column = get_valid_column_name(valid_column_names)
    filter_value = get_valid_value()

    column_to_update = get_valid_column_name(valid_column_names, msg = 'Column to update')
    new_value = get_valid_value(msg = 'New Value')

    updated_df = customer_df.withColumn(
        column_to_update, 
        when(col(filter_column) == filter_value, new_value).otherwise(col(column_to_update)))
    
    return updated_df

3) Used to generate a monthly bill for a credit card number for a given month and year.
4) Used to display the transactions made by a customer between two dates. Order by year, month, and day in descending order.

In [33]:
input_cc_no = 4210653366110577	
input_year = 2018
input_month = 12

result_df = credit_df.filter((credit_df['CREDIT_CARD_NO'] == input_cc_no) &
                             (credit_df['MONTH'] == input_month) &
                             (credit_df['YEAR'] == input_year))

In [35]:
len(input_)

In [38]:
result_df.select(['TRANSACTION_ID', 'DAY', 'MONTH', 'YEAR', 'TRANSACTION_TYPE', 'TRANSACTION_VALUE'])

TRANSACTION_ID,DAY,MONTH,YEAR,TRANSACTION_TYPE,TRANSACTION_VALUE
34637,20,12,2018,Test,83.54
34642,3,12,2018,Gas,65.94
34644,17,12,2018,Education,45.11
34647,23,12,2018,Bills,76.65
34651,11,12,2018,Test,92.65
34570,1,12,2018,Grocery,61.14
34597,27,12,2018,Test,34.98
34603,22,12,2018,Education,90.82
34615,20,12,2018,Test,62.88
34621,5,12,2018,Gas,13.15


In [32]:
total_monthly_bill = result_df.agg({"TRANSACTION_VALUE": "sum"}).collect()[0][0]
total_monthly_bill

201251.07999999946

In [40]:
def sum(a, b):
    return (a + b)

In [41]:
sum(1, 2)

3

In [42]:
def log_function_call(func):
    def wrapper(*args, **kwargs):
        print(f"Calling function: {func.__name__}")
        print(f"Arguments: {args}")
        print(f"Keyword arguments: {kwargs}")
        result = func(*args, **kwargs)
        print(f"Function {func.__name__} completed.")
        return result
    return wrapper



In [45]:
@log_function_call
def add_numbers(a, b):
    print('hi')
    return a + b