##### Functions:
##### 1. remove_non_alphabet(df, column_name): removes non-alphabets in the column passed
##### 2. remove_spaces_hyphen(df): removes spaces and hyphens(-) in the column names and replace with (_)
##### 3. enrich_name(df,name,email): incase the name is empty, the string in email main part is used for enriching. and a new column is created for flagging attention
##### 4. clean_phone_number(df, column_name): clean the phone number to the format of +1(xxx)xxx-xxxx or 'Invalid Number' based on the field value.
##### 5. aggregate_dataframe(df, col1,col2,col3,col4,col5): groupBy the dstaframe using col1,col2,col3,col4 and find the aggregated sum of the col5

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import regexp_replace,col, udf,when,regexp_extract,sum
from pyspark.sql.types import StringType
import re

In [0]:
def remove_non_alphabet(df, column_name):
    """
    Input arguments: dataframe, col1
    Returns: dataframe
    Description: transforms a single column in the input dataframe. Non-alphabetic charatcers in the column is removed. 
        Title character is made uppercase. 
        Whitepace is inserted before the Uppercase characxters except the first uppercase character.
    """

    try:
        # Define UDF to remove non-alphabet characters
        def clean_text(text):
            # Remove non-alphabetic characters and whitespace
            cleaned_text = re.sub(r'[^a-zA-Z]', '', text)
            # Add whitespace before uppercase characters (except for the first character)
            cleaned_text = re.sub(r'(?<!^)(?=[A-Z])', ' ', cleaned_text)
            # Make sure the first character is Upper case
            cleaned_text = cleaned_text.title()
            return cleaned_text
        
        # Register UDF
        clean_text_udf = udf(clean_text, StringType())
        
        # Apply UDF to the specified column and overwrite the original column
        cleaned_df = df.withColumn(column_name, clean_text_udf(col(column_name)))
        
        return cleaned_df
    except Exception as e:
        print("Error occurred while removing non-alphabet characters:", str(e))
        return df  # Return original DataFrame in case of error

In [0]:
def remove_spaces_hyphen(df) -> DataFrame:
    """
    Input arguments: dataframe
    Returns: dataframe
    Description: Transforms the column names in the given dataframe. Any whitespaces is removed and replaced with '_'
    """
    try:
        new_df = df.select([col(i).alias(i.replace(' ','_').replace('-','_')) for i in df.columns])
        return new_df
    except Exception as e:
        print("Error occurred while removing spaces and hyphens:", str(e))
        return df  # Return original DataFrame in case of error

In [0]:
def clean_product_name(df, column_name):
    """
    Input arguments: dataframe, col1
    Returns: dataframe
    Description: transforms a single column, col1, in the input dataframe. Removes non-ASCII characters from the given column.
    """
    try:
        # Define UDF to remove non-ASCII characters
        def remove_non_ascii(s):
            return re.sub(r'[^\x00-\x7F\s]+', '', s)
        
        # Register UDF
        clean_text_udf = udf(remove_non_ascii, StringType())
        
        # Apply UDF to the specified column and overwrite the original column
        cleaned_df = df.withColumn(column_name, clean_text_udf(col(column_name)))
        
        return cleaned_df
    except Exception as e:
        print("Error occurred while cleaning product name:", str(e))
        return df  # Return original DataFrame in case of error

In [0]:
def enrich_name(df, name, email) -> DataFrame:
    """
    Input arguments: dataframe, col1, col2
    Returns: dataframe
    Description: checks if col1 is empty. If so, the alphabetic part before '@' in col2 isfilled into col1.
        The flag is set to True in a newly created column name_filled
    """
    try:
        customers_enriched = df.withColumn(
            "name_filled",
            when(col(name) != "", False).otherwise(True)  # Set to True if name is empty
        ).withColumn(
            name,
            when(col(name) == "", 
                when(col(email).contains("@"),
                    regexp_extract(col(email), "([^@]+)@", 1)  # Extract characters before '@'
                    ).otherwise(regexp_replace(col(email), "[^a-zA-Z]", ""))  # Remove numbers from the email column
                ).otherwise(col(name))  # Retain the existing value in the name column if it's not empty
        )

        customers_enriched = customers_enriched.withColumn(
            name,
            when(col("name_filled") == True, regexp_replace(col(name), "[^a-zA-Z]", "")).otherwise(col(name))
        )
        return customers_enriched
    except Exception as e:
        print("Error occurred while enriching name:", str(e))
        return df  # Return original DataFrame in case of error

In [0]:
def clean_phone_number(df, column_name):
    """
    Input arguments: dataframe, col1
    Returns: dataframe
    Description: checks for the compatability of the given col1 with North American Numbering Plan. If valid, the column is filled in +1(***)***-****x**** format.
        Otherwise "Invalid number" will be filled in col1
    """
    try:
        # Define the cleaning logic as a separate function
        def clean_phone_number_func(phone_number):
            # Split the string with the first 'x' from the right
            split_phone = phone_number.rsplit('x', 1)
            if len(split_phone) == 2:
                remaining_number = split_phone[0]
                extension = split_phone[1]
            else:
                remaining_number = phone_number
                extension = ''
            
            # Remove all non-digit characters from both extension and remaining string
            remaining_number = re.sub(r'\D', '', remaining_number)
            extension = re.sub(r'\D', '', extension)
            
            # Check the length of the remaining string
            if len(remaining_number) < 10:
                return 'Invalid phone number'
            elif len(remaining_number) == 10:
                # Format the number to +1(xxx)xxx-xxx format
                formatted_number = '+1' + '(' + remaining_number[:3] + ')' + remaining_number[3:6] + '-' + remaining_number[6:]
            elif len(remaining_number) > 10:
                # Check if the extra digits from the left are 1
                extra_digits = remaining_number[:-10]
                if int(extra_digits) == 1:
                    # Format the number to +1(xxx)xxx-xxx format
                    formatted_number = '+1' + '(' + remaining_number[-10:-7] + ')' + remaining_number[-7:-4] + '-' + remaining_number[-4:]
                else:
                    return 'Invalid phone number'
            
            # Add extension (if not empty)
            if extension:
                formatted_number += 'x' + extension
            
            return formatted_number
        
        # Register UDF
        clean_text_udf = udf(clean_phone_number_func, StringType())
        
        # Apply UDF to the specified column and overwrite the original column
        cleaned_df = df.withColumn('_cleaned_'+column_name, clean_text_udf(col(column_name)))
        
        return cleaned_df
    except Exception as e:
        print("Error occurred while cleaning phone number:", str(e))
        return df  # Return original DataFrame in case of error


In [0]:
def aggregate_dataframe(df, Order_year, Customer_ID, Category, Sub_Category, Profit) -> DataFrame:
    """
    Input arguments: dataframe, col1, col2, col3, col4, col5
    Returns: dataframe
    Description: groups the given dataframe by col1, col2, col3, col4 aggregating with the sum of col5
    """
    try:
        # Aggregate the DataFrame based on four columns and compute the sum of the fifth column
        aggregated_df = df.groupBy(Order_year, Customer_ID, Category, Sub_Category).agg(sum(col(Profit)).alias('Total_Profit'))
        return aggregated_df
    except Exception as e:
        print("Error occurred while aggregating DataFrame:", str(e))
        return df  # Return original DataFrame in case of error
