In [58]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import initcap
from pyspark.sql import functions as F
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import sum, col

spark = SparkSession.builder.getOrCreate()

In [59]:
df_sales = (spark.read.option("delimiter", ',').csv('/Users/denis/Desktop/Andersen/1-task:spark/Raw_data/sales.csv',
                                                    header=True, inferSchema=True))
df_customers = (spark.read.option("delimiter", ',').csv('/Users/denis/Desktop/Andersen/1-task:spark/Raw_data/customers.csv',
                                                    header=True, inferSchema=True))

In [60]:
df_sales.show()
df_customers.show(truncate=False)

+----------+-----------+----------+--------+-----------+
|  Txn_Date|Customer_Id|Product_Id|Quantity|Total_Sales|
+----------+-----------+----------+--------+-----------+
|07-03-2021|       6279|         4|       0|       80.0|
|05-03-2021|       1234|         8|       0|      100.0|
|19-02-2021|       1698|         3|       0|       30.0|
|13-03-2021|        400|         7|       0|       60.0|
|13-02-2021|       2270|         1|       2|        0.0|
|27-02-2021|       7940|         4|       2|        0.0|
|02-03-2021|       9235|        10|       1|        0.0|
|30-01-2021|       3382|         9|       2|        0.0|
|12-03-2021|       4124|         5|       1|        0.0|
|02-01-2021|       2103|         1|       2|       20.0|
|04-03-2021|       3965|         5|       1|       10.0|
|13-03-2021|       9459|         3|       2|       60.0|
|21-01-2021|       8486|         6|       1|       50.0|
|01-03-2021|       5523|        10|       1|       25.0|
|16-02-2021|       7154|       

In [62]:
# Data cleansing
df_sales = df_sales.dropDuplicates()


df_customers = df_customers.dropDuplicates()
df_customers = df_customers.drop('Middle_Name', 'Suffix')
df_customers = df_customers.withColumn('First_Name', initcap(df_customers['First_Name'])) \
            .withColumn('Last_Name', initcap(df_customers['Last_Name'])) \
            .withColumn('Prefix', initcap(df_customers['Prefix']))
df_customers = df_customers.withColumnRenamed('DoB', 'DateOfBirth')
df_customers = df_customers.withColumn('DateOfBirth', F.to_date(df_customers['DateOfBirth'], 'M/d/y'))

df_customers.show(truncate=False)
df_customers.dtypes

+-----------+------+----------+---------+------+-----------+-----------------------------------------------------------+
|Customer_ID|Prefix|First_Name|Last_Name|Gender|DateOfBirth|Address                                                    |
+-----------+------+----------+---------+------+-----------+-----------------------------------------------------------+
|1763       |Dr.   |Anthony   |Parsons  |M     |1960-12-07 |86; Stokes Island Forge; Abilene&; <30004>; US             |
|1845       |Dr.   |Timothy   |Ibarra   |M     |1944-10-24 |758; Farrell Springs Lock; Troutdale&; <30096>; US         |
|2231       |Mr.   |Ryan      |Garcia   |M     |1932-05-17 |655; Paul Manor Pike; Fitchburg&; <30013>; US              |
|2522       |Ms.   |Julie     |Taylor   |F     |1994-02-14 |720; Shannon Shore Viaduct; Schererville&; <30092>; US     |
|2585       |Mrs.  |Stefanie  |Lawson   |F     |1983-07-13 |361; Megan Lights Ridge; Harrisburg&; <30098>; US          |
|2617       |Dr.   |Alex      |Y

[('Customer_ID', 'int'),
 ('Prefix', 'string'),
 ('First_Name', 'string'),
 ('Last_Name', 'string'),
 ('Gender', 'string'),
 ('DateOfBirth', 'date'),
 ('Address', 'string')]

In [63]:
df_customers = df_customers.withColumn('Address', regexp_replace(df_customers['Address'], '[&<>]',''))
df_customers = df_customers.withColumn('Street', F.concat_ws(' ', F.split(df_customers['Address'], ';').getItem(0), 
                                                    F.split(df_customers['Address'], ';').getItem(1))) \
                            .withColumn('City', F.split(df_customers['Address'], ';').getItem(2)) \
                            .withColumn('Zip', F.split(df_customers['Address'], ';').getItem(3)) \
                            .withColumn('Country', F.split(df_customers['Address'], ';').getItem(4))
df_customers = df_customers.drop('Address')
df_customers.show(truncate=False)

+-----------+------+----------+---------+------+-----------+---------------------------------+-------------+------+-------+
|Customer_ID|Prefix|First_Name|Last_Name|Gender|DateOfBirth|Street                           |City         |Zip   |Country|
+-----------+------+----------+---------+------+-----------+---------------------------------+-------------+------+-------+
|1763       |Dr.   |Anthony   |Parsons  |M     |1960-12-07 |86  Stokes Island Forge          | Abilene     | 30004| US    |
|1845       |Dr.   |Timothy   |Ibarra   |M     |1944-10-24 |758  Farrell Springs Lock        | Troutdale   | 30096| US    |
|2231       |Mr.   |Ryan      |Garcia   |M     |1932-05-17 |655  Paul Manor Pike             | Fitchburg   | 30013| US    |
|2522       |Ms.   |Julie     |Taylor   |F     |1994-02-14 |720  Shannon Shore Viaduct       | Schererville| 30092| US    |
|2585       |Mrs.  |Stefanie  |Lawson   |F     |1983-07-13 |361  Megan Lights Ridge          | Harrisburg  | 30098| US    |
|2617   

In [64]:
df_joined = df_customers.join(df_sales, 'Customer_ID')
df_joined.show(truncate=False)

+-----------+------+----------+---------+------+-----------+---------------------------------+------------+------+-------+----------+----------+--------+-----------+
|Customer_ID|Prefix|First_Name|Last_Name|Gender|DateOfBirth|Street                           |City        |Zip   |Country|Txn_Date  |Product_Id|Quantity|Total_Sales|
+-----------+------+----------+---------+------+-----------+---------------------------------+------------+------+-------+----------+----------+--------+-----------+
|2231       |Mr.   |Ryan      |Garcia   |M     |1932-05-17 |655  Paul Manor Pike             | Fitchburg  | 30013| US    |01-02-2021|16        |2       |20.0       |
|2585       |Mrs.  |Stefanie  |Lawson   |F     |1983-07-13 |361  Megan Lights Ridge          | Harrisburg | 30098| US    |10-03-2021|11        |2       |50.0       |
|2585       |Mrs.  |Stefanie  |Lawson   |F     |1983-07-13 |361  Megan Lights Ridge          | Harrisburg | 30098| US    |19-02-2021|10        |2       |50.0       |
|272

In [65]:
result = df_joined.groupBy('Customer_ID','Prefix','First_Name', 'Last_Name', 'DateOfBirth').agg(sum('Total_Sales').alias('Total_Sales_Sum'))
result.show(truncate=False)

+-----------+------+-----------+---------+-----------+---------------+
|Customer_ID|Prefix|First_Name |Last_Name|DateOfBirth|Total_Sales_Sum|
+-----------+------+-----------+---------+-----------+---------------+
|6164       |Mr.   |Thomas     |Dunlap   |1983-04-11 |50.0           |
|5551       |Dr.   |Edward     |Hill     |1974-03-05 |60.0           |
|5307       |Mr.   |Bryan      |Hamilton |1993-03-01 |20.0           |
|3810       |Mr.   |Kyle       |Martin   |1930-06-28 |120.0          |
|9593       |Mr.   |Timothy    |Payne    |2018-06-24 |200.0          |
|6540       |Dr.   |Jordan     |Adams    |1989-08-11 |90.0           |
|3317       |Dr.   |Amanda     |Ross     |1913-03-12 |60.0           |
|7487       |Mr.   |Trevor     |Edwards  |1998-11-07 |120.0          |
|209        |Mr.   |Gilbert    |Hurley   |1941-03-11 |370.0          |
|1018       |Mrs.  |Crystal    |Huffman  |1920-04-23 |50.0           |
|5157       |Mrs.  |Savannah   |Ashley   |1973-04-02 |150.0          |
|6276 