In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, to_timestamp, from_utc_timestamp, initcap, when, concat_ws, split, sum, min as pyspark_min, stddev, mean
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, LongType, TimestampType
from pyspark.sql.window import Window
from pyspark.sql import functions as F



#-----------------------Cleansing and Transformation pipeline----------------------# 

def load_data_spark(spark, file_path, encoding='utf-8'):
    # Define the schema for your DataFrame
    schema = StructType([
        StructField('Transaction_id', StringType()),
        StructField('Transaction_time', StringType()),
        StructField('Transaction_type', StringType()),
        StructField('Amount', FloatType()),
        StructField('Category', StringType()),
        StructField('Mode', StringType()),
        StructField('Narratives', StringType()),
        StructField('Customer_id', IntegerType()),
        StructField('Customer_firstname', StringType()),
        StructField('Customer_lastName', StringType()),
        StructField('Customer_gender', StringType()),
        StructField('Date_of_birth', StringType()),
        StructField('Salary', FloatType()),
        StructField('Account_Number', LongType()),
        StructField('Status', StringType()),
        StructField('Date_of_Creation', StringType()),
        StructField('Address_zipcode', IntegerType()),  # Use StringType for flexible formatting
        StructField('Address_state_address_City_address_Street', StringType()),
        StructField('Recipient_id', StringType()),
        StructField('Duplicate', StringType())
    ])
    
    # Load data into DataFrame using defined schema and header
    df = spark.read.csv(file_path, header=False, schema=schema, encoding=encoding)
    return df


def remove_duplicates(df):
    # Drop duplicates based on all columns
    df = df.dropDuplicates()
    return df


def clean_address(df):
    # Define a function to clean the address column
    clean_address_expr = (
        regexp_replace(col('Address_state_address_City_address_Street'), r'[^a-zA-Z0-9,\s-]', '')
    )
    
    # Replace consecutive dashes at the beginning with a single dash
    clean_address_expr = (
        regexp_replace(clean_address_expr, r'^(-)+', '-')  # Replace multiple leading dashes with a single dash
    )
    
    # Apply the cleaning expression to the DataFrame
    cleaned_df = df.withColumn('Address_state_address_City_address_Street', clean_address_expr)
    return cleaned_df


def transform_data(df):
    # Step 1: Create a new column for 'recipient_id_str' containing only string values
    df = df.withColumn('Recipient_id_str', regexp_replace(col('Recipient_id'), '\\d+', ''))

    # Step 2: Concatenate 'address_state_address_City_address_Street' with 'recipient_id_str'
    df = df.withColumn('New_address', concat_ws('', col('Address_state_address_City_address_Street'), col('Recipient_id_str')))

    # Step 3: Drop the duplicate of the 'recipient_id' column
    df = df.drop('Recipient_id_str', 'Address_state_address_City_address_Street')

    # Step 4: Remove string values from 'recipient_id' column (keeping only numeric values)
    df = df.withColumn('Recipient_id', regexp_replace(col('Recipient_id'), '\\D+', ''))

    # Step 5: Cast 'Duplicate' column to match 'Recipient_id' column's type
    df = df.withColumn('Duplicate', col('Duplicate').cast(df.schema['Recipient_id'].dataType))

    # Step 6: Replace null values in 'recipient_id' with corresponding values from the 'duplicate' column
    df = df.withColumn('Recipient_id', when(col('Recipient_id').isNull() | (col('Recipient_id') == ''), col('Duplicate')).otherwise(col('Recipient_id')))

    # Step 7: Convert 'Recipient_id' to LongType
    df = df.withColumn('Recipient_id', df['Recipient_id'].cast(LongType()))

    # Step 8: Drop the 'duplicate' column
    df = df.drop('Duplicate')
    
    # Step 9: Split 'new_address' into separate columns for 'State', 'City', and 'Street'
    df = df.withColumn('State', split(col('New_address'), '-').getItem(0))
    df = df.withColumn('City', split(col('New_address'), '-').getItem(1))
    df = df.withColumn('Street', split(col('New_address'), '-').getItem(2))

    # Step 10: Drop the 'new_address' column
    df = df.drop('New_address')
    
    # Step 11: Define a list of non-standard representations of null values to replace
    non_standard_nulls = ['null', 'Null', 'na', '']

    # Step 12: Iterate over each column in the DataFrame
    for column in df.columns:
        # Replace non-standard null representations with None
        df = df.withColumn(column, when(col(column).isin(non_standard_nulls), None).otherwise(col(column)))

        # Convert empty strings to None (treat them as null)
        df = df.withColumn(column, when(col(column) == '', None).otherwise(col(column)))

    return df


def data_type(df):    
    # Step 1: Convert 'Date_of_birth' to TimestampType with timezone (Asia/Kolkata)
    df = df.withColumn('Date_of_birth', from_utc_timestamp(to_timestamp(col('Date_of_birth'), 'dd-MM-yyyy HH:mm:ss'), 'Asia/Kolkata'))

    # Step 2: Convert 'Transaction_time' to TimestampType with timezone (Asia/Kolkata)
    df = df.withColumn('Transaction_time', from_utc_timestamp(to_timestamp(col('Transaction_time'), 'dd-MM-yyyy HH:mm:ss'), 'Asia/Kolkata'))

    # Step 3: Convert 'Date_of_Creation' to TimestampType with timezone (Asia/Kolkata)
    df = df.withColumn('Date_of_Creation', from_utc_timestamp(to_timestamp(col('Date_of_Creation'), 'yyyy-MM-dd\'T\'HH:mm:ss'), 'Asia/Kolkata'))

    return df


def encoding(df):    
    # Step 1: Replace values in 'customer_gender' column
    df = df.withColumn('Customer_gender', when(col('Customer_gender') == 'F', 'Female')
                                          .when(col('Customer_gender') == 'M', 'Male')
                                          .otherwise(col('Customer_gender')))
    return df


def capitalize_string_columns(df, excluded_columns=['Transaction_id']):
    # Identify string columns in the DataFrame excluding the specified columns
    string_columns = [col_name for col_name, dtype in df.dtypes if dtype == 'string' and col_name not in excluded_columns]

    # Apply initcap function to each string column
    for col_name in string_columns:
        df = df.withColumn(col_name, initcap(col(col_name)))

    return df


def fill_missing_values(df):
    # Fill missing values with mode (most frequent value) for string columns
    string_columns = [col_name for col_name, dtype in df.dtypes if dtype == 'string']
    for col_name in string_columns:
        mode_value = df.groupBy(col_name).count().orderBy(col('count').desc()).select(col_name).first()[0]
        df = df.withColumn(col_name, when(col(col_name).isNull(), mode_value).otherwise(col(col_name)))
    
    # Fill missing values with mean for numeric columns
    numeric_columns = [col_name for col_name, dtype in df.dtypes if dtype in ('int', 'long', 'float')]
    for col_name in numeric_columns:
        mean_value = df.select(mean(col_name)).collect()[0][0]
        df = df.withColumn(col_name, when(col(col_name).isNull(), mean_value).otherwise(col(col_name)))
            
    return df


def fill_missing_timestamps_with_oldest_value(df):
    from pyspark.sql import Window
    from pyspark.sql import functions as F
    
    # Identify timestamp columns in the DataFrame with missing values
    timestamp_columns_with_missing_values = [
        col_name for col_name, dtype in df.dtypes
        if dtype == 'timestamp' and df.filter(col(col_name).isNull()).count() > 0
    ]

    # If no timestamp columns with missing values are found, return the original DataFrame
    if not timestamp_columns_with_missing_values:
        return df

    # Iterate over each timestamp column with missing values and perform interpolation
    for timestamp_column in timestamp_columns_with_missing_values:
        # Find the oldest (minimum) valid timestamp value in the column
        oldest_timestamp = df.select(F.min(col(timestamp_column))).first()[0]

        # Log or print the details of the interpolation process
        print(f"Interpolating missing timestamps in column '{timestamp_column}'...")

        # Apply linear interpolation using window function to fill missing timestamps
        interpolated_column = (
            F.when(col(timestamp_column).isNull(),
                   F.last(timestamp_column, ignorenulls=True).over(Window.orderBy('Transaction_time').rowsBetween(Window.unboundedPreceding, 0)))
            .otherwise(col(timestamp_column))
        )

        # Fill remaining missing values with the oldest valid timestamp
        df = df.withColumn(timestamp_column, F.when(col(timestamp_column).isNull(), oldest_timestamp).otherwise(col(timestamp_column)))

    return df




def identify_and_treat_outliers(df):
    # Get numeric column names
    numeric_columns = [col_name for col_name, dtype in df.dtypes if dtype in ('int', 'bigint', 'float', 'double')]
    
    # Iterate over each numeric column to identify and treat outliers
    for column in numeric_columns:
        # Calculate mean and standard deviation for the column
        column_mean = df.select(mean(col(column))).first()[0]
        column_stddev = df.select(stddev(col(column))).first()[0]
        
        # Set outlier bounds based on mean and standard deviation
        lower_bound = column_mean - 3 * column_stddev
        upper_bound = column_mean + 3 * column_stddev
        
        # Identify outliers in the column
        outliers = df.filter((col(column) < lower_bound) | (col(column) > upper_bound))
        outliers_count = outliers.count()
        
        if outliers_count > 0:
            print(f"There are outliers in the '{column}' column: {outliers_count} outliers.")
            
            # Treat outliers by capping them at the lower or upper bound
            df = df.withColumn(column, when(col(column) < lower_bound, lower_bound).when(col(column) > upper_bound, upper_bound).otherwise(col(column)))
            print(f"Outliers have been treated in the '{column}' column.")
        else:
            print(f"No outliers found in the '{column}' column.")
    
    return df

def save_to_hadoop(df, hdfs_path):
    # Write DataFrame to HDFS in CSV format with a single output file
    df.write.mode("overwrite").csv(hdfs_path)
    
    


    
def main_spark():
    # Initialize Spark session
    spark = SparkSession.builder.appName("DataPreprocessing").getOrCreate()

    # Load the data
    file_path = 'file:///home/susan1232/savingtransactions 3.csv'
    data = load_data_spark(spark, file_path, encoding='utf-8')
    
    # Check for duplicates
    if data.count() == data.dropDuplicates().count():
        print("No duplicates found.")
    else:
        print("Duplicates are present.")

        # Remove duplicates
        data = remove_duplicates(data)
        
        # Print confirmation message after removing duplicates
        print("Duplicates have been removed.")

    # Clean up the address column
    data = clean_address(data)
    
    # Transform the data
    data = transform_data(data)
    
    # Correct data types
    data = data_type(data)
    
    # Encode gender values
    data = encoding(data)
    
    # Capitalize column values
    data = capitalize_string_columns(data)
    
    # Fill missing values based on column types
    data = fill_missing_values(data)
    
    # Fill missing timestamps
    data = fill_missing_timestamps_with_oldest_value(data)
    
    # Identify and treat outliers in numeric columns
    data=identify_and_treat_outliers(data)
    
          
  
    # Define the HDFS directory where you want to save the CSV file
    hdfs_path = "hdfs://nameservice1/user/susan1232/Livestream/data.csv"

    # Save DataFrame to HDFS
    save_to_hadoop(data, hdfs_path)
    
    

    # Validation and Visualization
    
    data.show(5)  # Show the first 5 rows of the cleaned data
    data.printSchema()  # Print the schema of the cleaned DataFrame
    
    # Count the number of null values in each column
    missing_counts = data.select([sum(col(column).isNull().cast('int')).alias(column) for column in data.columns])

    # Collect the missing value counts into a dictionary
    missing_value_counts = missing_counts.collect()[0].asDict()

    # Identify columns with missing values
    columns_with_missing_values = [column for column, count in missing_value_counts.items() if count > 0]

    # Print columns with missing values and their respective counts
    if columns_with_missing_values:
          for column in columns_with_missing_values:
                print(f"Column '{column}' has {missing_value_counts[column]} missing values.")
    else:
        print("No missing values in the dataset.")    
            
   
    # Stop the Spark session
    spark.stop()

if __name__ == "__main__":
    main_spark()
    
    
#-----------------------Hadoop to Hive Tables----------------------# 

def create_and_load_tables(spark, input_path, database_name):
    # Define schema for the CSV file
    schema = "Transaction_id STRING,Transaction_time TIMESTAMP,Transaction_type STRING,Amount DOUBLE,Category STRING,Mode STRING,Narratives STRING,Customer_id DOUBLE,Customer_firstname STRING,Customer_lastName STRING,Customer_gender STRING,Date_of_birth TIMESTAMP,Salary DOUBLE,Account_Number BIGINT,Status STRING,Date_of_Creation TIMESTAMP,Address_zipcode STRING,Recipient_id BIGINT,State STRING,City STRING,Street STRING"

    # Read CSV file into DataFrame
    df = spark.read.csv(input_path, header=False, schema=schema)

    # Extract and clean data for Customers table
    customers_df = df.select("Customer_id", "Customer_firstname", "Customer_lastName", "Customer_gender", "Date_of_birth", "Salary", "State", "City", "Street", "Address_zipcode").dropDuplicates()

    # Extract and clean data for Accounts table
    accounts_df = df.select("Customer_id", "Account_Number", "Status", "Date_of_Creation").dropDuplicates()

    # Extract and clean data for Transactions table
    transactions_df = df.select("Account_Number", "Transaction_id", "Transaction_time", "Transaction_type", "Mode", "Category", "Narratives", "Recipient_id").dropDuplicates()

    # Set the current database context
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
    spark.sql(f"USE {database_name}")

    # Define table names
    table_names = {
        "Customers": customers_df,
        "Accounts": accounts_df,
        "Transactions": transactions_df
    }

    # Create tables and load data
    for table_name, table_df in table_names.items():
        table_df.createOrReplaceTempView(f"tmp_{table_name}")
        spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM tmp_{table_name}")

        # Verify table creation
        result = spark.sql(f"SELECT COUNT(*) AS total_records FROM {table_name}")
        total_records = result.first().total_records

        if total_records > 0:
            print(f"Table '{table_name}' is successfully created in Hive.")
            print(f"The '{table_name}' table contains {total_records} records.")
        else:
            print(f"The '{table_name}' table is empty.")
        
    
    
    
if __name__ == "__main__":
    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("create_and_load_tables_pipeline") \
        .config("hive.metastore.uris", "thrift://ip-10-1-2-24.ap-south-1.compute.internal:9083") \
        .config("spark.sql.warehouse.dir", "hdfs://nameservice1/user/hive/warehouse") \
        .enableHiveSupport() \
        .getOrCreate()

    # Define Hadoop HDFS path where the CSV file is stored
    hadoop_path = "hdfs://nameservice1/user/susan1232/Livestream/data.csv"

    # Define database name
    database_name = "RetailBanking"

    # Call the function to create and load tables
    create_and_load_tables(spark, hadoop_path, database_name)

    # Stop Spark session
    spark.stop()


No duplicates found.
Interpolating missing timestamps in column 'Date_of_Creation'...
There are outliers in the 'Amount' column: 1104 outliers.
Outliers have been treated in the 'Amount' column.
No outliers found in the 'Customer_id' column.
No outliers found in the 'Salary' column.
No outliers found in the 'Account_Number' column.
No outliers found in the 'Address_zipcode' column.
No outliers found in the 'Recipient_id' column.
+----------------+-------------------+----------------+------------------+-------------+-----------+--------------------+-----------+------------------+-----------------+---------------+-------------------+-------+--------------+------+-------------------+---------------+------------+-----------+---------+-------------+
|  Transaction_id|   Transaction_time|Transaction_type|            Amount|     Category|       Mode|          Narratives|Customer_id|Customer_firstname|Customer_lastName|Customer_gender|      Date_of_birth| Salary|Account_Number|Status|   Date_o