In [1]:
#importing findspark
import findspark
findspark.init()

In [2]:
# importing pyspark ependencies
import pandas
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from datetime import datetime
from pyspark.sql.functions import *
import pgeocode

In [3]:
# making a spark session
sparkContext = SparkSession.builder \
    .appName("Final") \
    .config("spark.executor.memory", "10g") \
    .config("spark.executor.cores", "8") \
    .getOrCreate()

In [4]:
# reading data

customerProduct_df = sparkContext.read.csv("file:///C:/Users/sarth/OneDrive/Desktop/Datasets/Final_CSV/Customer_Product_modified.csv", inferSchema=True, header=True)
customerChannel_df = sparkContext.read.csv("file:///C:/Users/sarth/OneDrive/Desktop/Datasets/Final_CSV/Customer_Channel_Activity_modified.csv", inferSchema=True, header=True)
customerDemographic_df = sparkContext.read.csv("file:///C:/Users/sarth/OneDrive/Desktop/Datasets/Final_CSV/customer_demographic_modified.csv", inferSchema=True, header=True)
customerTransaction_df = sparkContext.read.csv("file:///C:/Users/sarth/OneDrive/Desktop/Datasets/Final_CSV/Customer_Transaction_history_modified.csv", inferSchema=True, header=True)
productLookup_df = sparkContext.read.csv("file:///C:/Users/sarth/OneDrive/Desktop/Datasets/Final_CSV/Product_Lookup_modified.csv", inferSchema=True, header=True)

In [5]:
# checking count of each dataframes
print("customer Product count : ",customerProduct_df.count())
print("customer Channel count : ",customerChannel_df.count())
print("customer Demographic count : ",customerDemographic_df.count())
print("customer Transaction count : ",customerTransaction_df.count())
print("product Lookup count : ",productLookup_df.count())

customer Product count :  200000
customer Channel count :  575986
customer Demographic count :  200000
customer Transaction count :  760982
product Lookup count :  41


In [6]:
from datetime import datetime

def convert_to_standard_date(date_str):
    # Define possible date formats
    date_formats = [
        "%d/%m/%Y",
        "%m/%d/%Y",       
        "%Y/%m/%d",
        "%d %B %Y",       
        "%Y-%m-%d",       
        "%d-%m-%Y",
        "%m-%d-%Y",
        "%m/%d/%Y"  
    ]
    
    for date_format in date_formats:
        try:
            parsed_date = datetime.strptime(date_str, date_format)
            return parsed_date.strftime("%Y-%m-%d")
        except ValueError:
            continue
    
    raise ValueError(f"Date format not recognized: {date_str}")

# Test the function with provided date examples
date_examples = [
    "2/11/1998",
    "10 September 2010",
    "2024-01-25",
    "2012/01/25",
    "12/17/2021"  # Added example with problematic format
]

In [7]:
customerProduct_df.printSchema()
convert_to_standard_date_udf = udf(lambda date_str: convert_to_standard_date(date_str), StringType())

root
 |-- CSID: long (nullable = true)
 |-- Prod code: integer (nullable = true)
 |-- start date: string (nullable = true)
 |-- End date: string (nullable = true)



In [8]:
customerProduct_df = customerProduct_df.withColumn("start date", convert_to_standard_date_udf(col("start date")))
customerProduct_df = customerProduct_df.withColumn("end date", convert_to_standard_date_udf(col("end date")))
productLookup_df = productLookup_df.withColumn("end date", convert_to_standard_date_udf(col("end date")))
productLookup_df = productLookup_df.withColumn("end date", convert_to_standard_date_udf(col("end date")))
customerTransaction_df = customerTransaction_df.withColumn("Trans Date", convert_to_standard_date_udf(col("Trans Date")))
customerDemographic_df = customerDemographic_df.withColumn("Birth date", convert_to_standard_date_udf(col("Birth date")))
customerDemographic_df = customerDemographic_df.withColumn("Relationship Start date", convert_to_standard_date_udf(col("Relationship Start date")))
customerChannel_df = customerChannel_df.withColumn("ActivityDate", convert_to_standard_date_udf(col("ActivityDate")))


In [9]:
customerDemographic_df.show()

+----------+------+----------+------+----------+-----------------------+--------------+--------+
|      CSID|Income|dependants|Gender|Birth date|Relationship Start date|Marital_Status|Zip_Code|
+----------+------+----------+------+----------+-----------------------+--------------+--------+
|2345450601|200000|         3|     ?|1976-08-03|             2017-06-08|      Divorced|CH42 0HS|
|2345450602|100000|         2|     ?|1963-11-28|             1983-03-08|      Divorced|ME20 6PR|
|2345450603|280000|         2|     ?|1979-06-05|             2008-09-01|       Married|OX17 1EQ|
|2345450604|300000|         1|     ?|1999-05-09|             2020-12-10|      Divorced| RH7 6LT|
|2345450605|150000|         1|  Male|2004-03-31|             2011-03-06|       Married| NW2 7RJ|
|2345450606|100000|         3|     ?|1987-02-18|             2002-12-17|        Single| TQ1 3PZ|
|2345450607|130000|         2|     M|1961-12-09|             1983-11-27|      Divorced| LU6 3NP|
|2345450608|130000|         1|

In [10]:
customerDemographic_df.select('Gender').distinct().collect()

[Row(Gender='F'),
 Row(Gender='Female'),
 Row(Gender='M'),
 Row(Gender='Male'),
 Row(Gender='?')]

In [11]:
customerDemographic_df = customerDemographic_df.withColumn("Gender", when(col("Gender") == "M", "Male")
                                                           .when(col("Gender") == "F", "Female")
                                                           .when(col("Gender") == "?", "Unknown")
                                                           .otherwise(col("Gender")))


In [12]:
customerDemographic_df.show()

+----------+------+----------+-------+----------+-----------------------+--------------+--------+
|      CSID|Income|dependants| Gender|Birth date|Relationship Start date|Marital_Status|Zip_Code|
+----------+------+----------+-------+----------+-----------------------+--------------+--------+
|2345450601|200000|         3|Unknown|1976-08-03|             2017-06-08|      Divorced|CH42 0HS|
|2345450602|100000|         2|Unknown|1963-11-28|             1983-03-08|      Divorced|ME20 6PR|
|2345450603|280000|         2|Unknown|1979-06-05|             2008-09-01|       Married|OX17 1EQ|
|2345450604|300000|         1|Unknown|1999-05-09|             2020-12-10|      Divorced| RH7 6LT|
|2345450605|150000|         1|   Male|2004-03-31|             2011-03-06|       Married| NW2 7RJ|
|2345450606|100000|         3|Unknown|1987-02-18|             2002-12-17|        Single| TQ1 3PZ|
|2345450607|130000|         2|   Male|1961-12-09|             1983-11-27|      Divorced| LU6 3NP|
|2345450608|130000| 

In [13]:
customerDemographic_df.select('Marital_Status').distinct().collect()

[Row(Marital_Status='Married'),
 Row(Marital_Status='Divorced'),
 Row(Marital_Status='Widowed'),
 Row(Marital_Status='Single')]

In [14]:
# customerDemographic_df_panda = customerDemographic_df.toPandas()
# productLookup_df_panda = productLookup_df.toPandas()
# customerTransaction_df_panda = customerTransaction_df.toPandas()
# customerProduct_df_panda = customerProduct_df.toPandas()
# customerChannel_df_panda = customerChannel_df.toPandas()

In [15]:
# customerDemographic_df = customerDemographic_df_panda.to_csv('customerDemographic.csv', index = True) 
# productLookup_df = productLookup_df_panda.to_csv('productLookup.csv', index = True) 
# customerTransaction_df = customerTransaction_df_panda.to_csv('customerTransaction.csv', index = True) 
# customerProduct_df = customerProduct_df_panda.to_csv('customerProduct.csv', index = True) 
# customerChannel_df = customerChannel_df_panda.to_csv('customerChannel.csv', index = True) 

In [16]:
# Define the function to get state from postal code for Great Britain
def get_state_from_postal_code(postal_code):
    nomi = pgeocode.Nominatim('GB')
    location = nomi.query_postal_code(postal_code)
    if location is not None and not location.empty:
        return location.state_name
    else:
        return None

# Register the function as a UDF
get_state_udf = udf(lambda z: get_state_from_postal_code(z), StringType())


In [17]:
customerDemographic_df = customerDemographic_df.withColumn("State", get_state_udf(customerDemographic_df["Zip_Code"]))
customerDemographic_df.show(100)

+----------+------+----------+-------+----------+-----------------------+--------------+--------+----------------+
|      CSID|Income|dependants| Gender|Birth date|Relationship Start date|Marital_Status|Zip_Code|           State|
+----------+------+----------+-------+----------+-----------------------+--------------+--------+----------------+
|2345450601|200000|         3|Unknown|1976-08-03|             2017-06-08|      Divorced|CH42 0HS|         England|
|2345450602|100000|         2|Unknown|1963-11-28|             1983-03-08|      Divorced|ME20 6PR|         England|
|2345450603|280000|         2|Unknown|1979-06-05|             2008-09-01|       Married|OX17 1EQ|         England|
|2345450604|300000|         1|Unknown|1999-05-09|             2020-12-10|      Divorced| RH7 6LT|         England|
|2345450605|150000|         1|   Male|2004-03-31|             2011-03-06|       Married| NW2 7RJ|         England|
|2345450606|100000|         3|Unknown|1987-02-18|             2002-12-17|       

In [18]:
# customerDemographic_df.select("Country").distinct().show(100)

In [None]:
customerDemographic_df = customerDemographic_df_panda.to_csv('customerDemographic.csv', index = True) 