In [None]:
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    import pandas as pd
    import re
    import boto3
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql import SQLContext
    sc = SparkContext.getOrCreate()
    sqlContext = SQLContext(sc)
    
    # Initialize S3 client
    s3 = boto3.client('s3')
    
    # Read Excel file from S3
    bucket_name = 'ch-portfolio-project-data-engineering'
    file_key = 'customer_call_list.csv'
    file_obj = s3.get_object(Bucket=bucket_name, Key=file_key)
    df = pd.read_csv(file_obj['Body'])
    
    # Drop Duplicates and Bad Column Data
    df = df.drop_duplicates()
    df = df.drop(columns = "Not_Useful_Column")
    
    # Remove Special Characters from Last Name
    def clean_last_name(last_name): 
        return re.sub(r'[^a-zA-Z]','',str(last_name))
    
    df['Last_Name'] = df['Last_Name'].fillna('')
    
    df['Last_Name'] = df['Last_Name'].apply(clean_last_name)
    
    # Clean and Format Y/N Responses
    
    columns_to_check = ['Paying Customer','Do_Not_Contact']
    
    def convert_y_n(response):
        response_lower = str(response).lower()
        if response_lower == 'y' or response_lower == 'yes':
            return 'Y'
        elif response_lower == 'n' or response_lower == 'no':
            return 'N'
        else:
            return 'N'
        
    df[columns_to_check] = df[columns_to_check].applymap(convert_y_n)
    
    # Clean and Format Phone Numbers
    def clean_phone(phone):
        phone = re.sub(r'[^0-9]','',str(phone))
        if len(phone) > 10:
            return None
        elif phone.isdigit():
            return '-'.join([phone[:3], phone[3:6], phone[6:]])
        return phone
    
    df['Phone_Number'] = df['Phone_Number'].apply(clean_phone)
    
    # Split Address by Street, State, Zip
    
    df[['Street_Address','State','Zip']] = df['Address'].str.split(',',expand=True)
    df = df.drop(columns = 'Address')
    
    df = df.fillna('')
    
    # Only Return Customers We Can Call
    
    for x in df.index:
        if df.loc[x, 'Do_Not_Contact'] == 'Y':
            df.drop(x,inplace=True)
    
    # Convert pandas DataFrame to Spark DynamicFrame
    spark_dff = sqlContext.createDataFrame(df)
    dynamic_frame = DynamicFrame.fromDF(spark_dff, glueContext, "dynamic_frame")
    
    return DynamicFrameCollection({"default": dynamic_frame}, glueContext)