In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col,sum,count,mean

# Starting the pyspark session

In [5]:
scSpark = SparkSession.builder.appName("pyspark_assignment_5.1").getOrCreate()

23/05/16 11:45:03 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# Loading the 3 transaction csv files for the 3 stores

In [23]:
t1 = scSpark.read.csv("./store_transactions/transactions_1.csv", header=True)
t2 = scSpark.read.csv("./store_transactions/transactions_2.csv", header=True)
t3 = scSpark.read.csv("./store_transactions/transactions_3.csv", header=True)

# Loading the 'customers' and 'products' csv files 
### We then changed the 'Name' column in both dataframes to avoid complexity

In [132]:
cust = scSpark.read.csv("./customers.csv", header=True)
cust = cust.withColumnRenamed('Name', 'customer_name')
prod = scSpark.read.csv("./products.csv", header=True)
prod = prod.withColumnRenamed('Name', 'product_name')

### Inspecting the first transaction dataframe

In [25]:
t1.show()

+-------+-------------+----------+---------+--------+-------------------+
|StoreId|TransactionId|CustomerId|ProductId|Quantity|    TransactionTime|
+-------+-------------+----------+---------+--------+-------------------+
|      1|          971|        13|        2|      10|2022-12-23 04:13:05|
|      1|          605|         7|       10|       5|2022-12-23 09:36:22|
|      1|          567|        37|        2|       8|2022-12-23 19:44:43|
|      1|          607|        38|        5|       4|2022-12-23 04:36:41|
|      1|          141|        17|        9|       7|2022-12-23 19:11:29|
|      1|          248|        17|       11|      12|2022-12-23 06:27:58|
|      1|          726|        45|        4|      13|2022-12-23 14:12:34|
|      1|          725|         4|        9|       1|2022-12-23 12:15:47|
|      1|          232|        30|       10|       9|2022-12-23 01:26:10|
|      1|          954|        47|        6|      14|2022-12-23 06:45:59|
|      1|           38|         2|    

# Calculating total sales for store 1

### Joining t1 with the products dataframe

In [187]:
joined_t1 = t1.join(prod, t1["ProductId"] == prod["ProductId"])
joined_t1.show(5)

+-------+-------------+----------+---------+--------+-------------------+---------+--------------+--------+---------+
|StoreId|TransactionId|CustomerId|ProductId|Quantity|    TransactionTime|ProductId|  product_name|Category|UnitPrice|
+-------+-------------+----------+---------+--------+-------------------+---------+--------------+--------+---------+
|      1|          971|        13|        2|      10|2022-12-23 04:13:05|        2|  White Shorts|  Shorts|    89.27|
|      1|          605|         7|       10|       5|2022-12-23 09:36:22|       10|Black Sneakers|   Shoes|   146.41|
|      1|          567|        37|        2|       8|2022-12-23 19:44:43|        2|  White Shorts|  Shorts|    89.27|
|      1|          607|        38|        5|       4|2022-12-23 04:36:41|        5|  Black Shorts|  Shorts|    74.58|
|      1|          141|        17|        9|       7|2022-12-23 19:11:29|        9| Green Sandals|   Shoes|   137.53|
+-------+-------------+----------+---------+--------+---

### Created a new column 'cost' by multiplying the 'Quantity' and 'UnitPrice' columns

In [188]:
joined_t1 = joined_t1.withColumn("cost", joined_t1["Quantity"] * joined_t1["UnitPrice"])
joined_t1.show(5)

+-------+-------------+----------+---------+--------+-------------------+---------+--------------+--------+---------+-----------------+
|StoreId|TransactionId|CustomerId|ProductId|Quantity|    TransactionTime|ProductId|  product_name|Category|UnitPrice|             cost|
+-------+-------------+----------+---------+--------+-------------------+---------+--------------+--------+---------+-----------------+
|      1|          971|        13|        2|      10|2022-12-23 04:13:05|        2|  White Shorts|  Shorts|    89.27|892.6999999999999|
|      1|          605|         7|       10|       5|2022-12-23 09:36:22|       10|Black Sneakers|   Shoes|   146.41|           732.05|
|      1|          567|        37|        2|       8|2022-12-23 19:44:43|        2|  White Shorts|  Shorts|    89.27|           714.16|
|      1|          607|        38|        5|       4|2022-12-23 04:36:41|        5|  Black Shorts|  Shorts|    74.58|           298.32|
|      1|          141|        17|        9|    

In [137]:
joined_t1.printSchema()

root
 |-- StoreId: string (nullable = true)
 |-- TransactionId: string (nullable = true)
 |-- CustomerId: string (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- TransactionTime: string (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- cost: double (nullable = true)



### Changing cost datatype to integer

In [None]:
joined_t1 = joined_t1.withColumn(
    "cost", joined_t1["cost"].cast(IntegerType())
)

## Now summing the cost column and getting the total sales for store 1

In [139]:
store_1_total_sales = joined_t1.agg({"cost": "sum"}).collect()[0][0]
store_1_total_sales

41239

# Calculating mean sales for store 2

### Joining with 'prod' dataframe

In [190]:
joined_t2 = t2.join(prod, t2["ProductId"] == prod["ProductId"])
joined_t2.show(5)

+-------+-------------+----------+---------+--------+-------------------+---------+------------+--------+---------+
|StoreId|TransactionId|CustomerId|ProductId|Quantity|    TransactionTime|ProductId|product_name|Category|UnitPrice|
+-------+-------------+----------+---------+--------+-------------------+---------+------------+--------+---------+
|      2|            2|         2|        2|       2|2022-12-23 18:49:45|        2|White Shorts|  Shorts|    89.27|
|      2|            2|         2|        2|       2|2022-12-23 13:19:51|        2|White Shorts|  Shorts|    89.27|
|      2|            2|         2|        2|       2|2022-12-23 22:39:21|        2|White Shorts|  Shorts|    89.27|
|      2|          514|        14|       21|       5|2022-12-23 00:24:15|       21|  Red Chinos|   Pants|   134.42|
|      2|          363|        44|       16|       2|2022-12-23 10:46:04|       16|Blue t-shirt|T-Shirts|   140.68|
+-------+-------------+----------+---------+--------+-------------------

In [192]:
joined_t2 = joined_t2.withColumn("cost", joined_t2["Quantity"] * joined_t2["UnitPrice"])
joined_t2.show(5)

+-------+-------------+----------+---------+--------+-------------------+---------+------------+--------+---------+-----------------+
|StoreId|TransactionId|CustomerId|ProductId|Quantity|    TransactionTime|ProductId|product_name|Category|UnitPrice|             cost|
+-------+-------------+----------+---------+--------+-------------------+---------+------------+--------+---------+-----------------+
|      2|            2|         2|        2|       2|2022-12-23 18:49:45|        2|White Shorts|  Shorts|    89.27|           178.54|
|      2|            2|         2|        2|       2|2022-12-23 13:19:51|        2|White Shorts|  Shorts|    89.27|           178.54|
|      2|            2|         2|        2|       2|2022-12-23 22:39:21|        2|White Shorts|  Shorts|    89.27|           178.54|
|      2|          514|        14|       21|       5|2022-12-23 00:24:15|       21|  Red Chinos|   Pants|   134.42|672.0999999999999|
|      2|          363|        44|       16|       2|2022-12-2

### Changing cost datatype to integer

In [193]:
joined_t2 = joined_t2.withColumn(
    "cost", joined_t2["cost"].cast(IntegerType())
)

## Finally calculating the mean for the total sales for store 2

In [195]:
store_2_mean_sales = joined_t2.agg({"cost": "mean"}).collect()[0][0]
store_2_mean_sales

512.9803921568628

# Summing up purchases from all stores

## Joining all the transaction csv files in one dataframe

In [196]:
all_transactions = scSpark.read.csv("./store_transactions/transactions*.csv", header=True)
all_transactions.show(5)

+-------+-------------+----------+---------+--------+-------------------+
|StoreId|TransactionId|CustomerId|ProductId|Quantity|    TransactionTime|
+-------+-------------+----------+---------+--------+-------------------+
|      3|          454|        35|        3|       3|2022-12-23 17:36:11|
|      3|          524|        37|        9|      11|2022-12-23 22:02:51|
|      3|          562|         4|        3|       4|2022-12-23 02:51:50|
|      3|          581|        35|       14|      56|2022-12-23 17:05:54|
|      3|          200|        34|       15|      24|2022-12-23 07:15:01|
+-------+-------------+----------+---------+--------+-------------------+
only showing top 5 rows



In [198]:
# first joining the transactions df with customer df
joined_trans_cust = all_transactions.join(cust, all_transactions["CustomerId"] == cust["CustomerId"])

# now joining the above df with products df to make a combined df containing all information
joined_all = joined_trans_cust.join(prod, joined_trans_cust["ProductId"] == prod["ProductId"])
joined_all.show(5)

+-------+-------------+----------+---------+--------+-------------------+----------+--------------+--------------------+---------+-------------+--------+---------+
|StoreId|TransactionId|CustomerId|ProductId|Quantity|    TransactionTime|CustomerId| customer_name|               Email|ProductId| product_name|Category|UnitPrice|
+-------+-------------+----------+---------+--------+-------------------+----------+--------------+--------------------+---------+-------------+--------+---------+
|      3|          454|        35|        3|       3|2022-12-23 17:36:11|        35|Dwayne Johnson|dwayne.johnson@gm...|        3|  Blue Shorts|  Shorts|   118.88|
|      3|          524|        37|        9|      11|2022-12-23 22:02:51|        37| Brittany Holt|brittany.holt@exa...|        9|Green Sandals|   Shoes|   137.53|
|      3|          562|         4|        3|       4|2022-12-23 02:51:50|         4| Alevtin Paska|alevtin.paska@exa...|        3|  Blue Shorts|  Shorts|   118.88|
|      3|       

In [148]:
joined_all.printSchema()

root
 |-- StoreId: string (nullable = true)
 |-- TransactionId: string (nullable = true)
 |-- CustomerId: string (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- TransactionTime: string (nullable = true)
 |-- CustomerId: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- UnitPrice: string (nullable = true)



## Changing datatypes 

In [149]:
joined_all = joined_all.withColumn("Quantity", joined_all["Quantity"].cast(IntegerType()))
joined_all = joined_all.withColumn("UnitPrice", joined_all["UnitPrice"].cast(IntegerType()))
joined_all.printSchema()

root
 |-- StoreId: string (nullable = true)
 |-- TransactionId: string (nullable = true)
 |-- CustomerId: string (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- TransactionTime: string (nullable = true)
 |-- CustomerId: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- UnitPrice: integer (nullable = true)



## Adding a cost column

In [216]:
joined_all = joined_all.withColumn("cost", joined_all["Quantity"] * joined_all["UnitPrice"])
joined_all.show(5)

+-------+-------------+----------+---------+--------+-------------------+----------+--------------+--------------------+---------+-------------+--------+---------+-------+
|StoreId|TransactionId|CustomerId|ProductId|Quantity|    TransactionTime|CustomerId| customer_name|               Email|ProductId| product_name|Category|UnitPrice|   cost|
+-------+-------------+----------+---------+--------+-------------------+----------+--------------+--------------------+---------+-------------+--------+---------+-------+
|      3|          454|        35|        3|       3|2022-12-23 17:36:11|        35|Dwayne Johnson|dwayne.johnson@gm...|        3|  Blue Shorts|  Shorts|   118.88| 356.64|
|      3|          524|        37|        9|      11|2022-12-23 22:02:51|        37| Brittany Holt|brittany.holt@exa...|        9|Green Sandals|   Shoes|   137.53|1512.83|
|      3|          562|         4|        3|       4|2022-12-23 02:51:50|         4| Alevtin Paska|alevtin.paska@exa...|        3|  Blue Sho

## Grouping by customer email to get the total spending of each customer

In [218]:
cust_activity = joined_all.groupBy("Email").sum("cost")

In [219]:
# sorting in descending order
cust_activity.sort("sum(cost)", ascending=False).show()

+--------------------+------------------+
|               Email|         sum(cost)|
+--------------------+------------------+
|dwayne.johnson@gm...|          10653.08|
|sevastiana.nester...|           8440.65|
|thies.blumel@exam...|           6799.25|
|gladis.dasneves@e...| 5954.469999999999|
|emilia.pedraza@ex...| 5633.580000000001|
|avi.shet@example.com|           5579.95|
|angelique.vennix@...|            5317.7|
|alice.morin@examp...|            5060.1|
|dominic.lo@exampl...|           4944.65|
|suzy.gibson@examp...|            4352.0|
|dobrik.svida@exam...|3962.9799999999996|
|jonathan.carrasco...|           3837.05|
|claudia.gutierrez...|           3824.06|
|william.nielsen@e...|           3582.17|
|leah.green@exampl...|           3579.04|
|kyn.aalyzdh@examp...|3480.7799999999997|
|stella.masson@exa...|           3335.17|
|flenn.henderson@e...|           3279.46|
|signe.petersen@ex...|3269.7800000000007|
|an.jansen@example...|3237.5400000000004|
+--------------------+------------

## Email of the customer who spent the most money

In [206]:
# now we sort in descending order and get the first row
most_active_customer = cust_activity.sort("sum(cost)", ascending=False).collect()[0][0]
most_active_customer

'dwayne.johnson@gmail.com'

## Getting the count of each product in the product_name column
### sorting in descending order
### and showing the top 5 results

In [185]:
frequently_bought_products = joined_all.groupBy('product_name').count()\
                                        .sort('count', ascending=False)\
                                        .show(5)
frequently_bought_products

+-------------+-----+
| product_name|count|
+-------------+-----+
| White Shorts|   20|
| Green jacket|    9|
| Black Shorts|    9|
|White t-shirt|    8|
|   Blue Jeans|    7|
+-------------+-----+
only showing top 5 rows

