# Project - Product analysis in spark

## Abstract
The objective of this project is to analyse various datasets in PySpark about commercial
products and rating of e-commerce website NewChic.com.<br>

## Overview of the datasets:
For this project there are three csv files one for each product
1. Shoe products 
2. Jewelry products
3. Accessories <br>

#### Features
- Category (men, accessory, jewelry, etc)
- Subcategory type of catgeory
- name of the product
- current price : price listed on the website
- raw price : total price of the product before any discounts(i.e original price of the product)
- discounts
- Currency (currency of which the price listed in)
- likes count : popularity of the product
- isnew : tells whether the product is used or not (Binary value true or false)

## Tasks:
- ### Task 1 : Extract Data<br>
        
    1. Import the necessary packages and start your spark session
    2. load the 3 datasets into 1 dataframe.
    3. Print the schema
    
<br>

- ### Task 2 : Data Transformation

    Building more features to help analyse each product For these tasks I will be
    create more columns to help get a better understanding of each product.
    First create functions which would take the dataframe as its input and return a new dataframe containing the following columns:
    1. Average price of each category
    2. Average price of each subcategory
    3. Average discount of each category 
    4. Average discount of each subcategory 
    5. Average likes of each category
    6. Average likes of each subcategory
    7. Rank of the likes of each subcategory
    8. Dense rank of the likes of each subcategory
    9. Rank of the likes of each category
    10. Dense rank of the likes of each category
    11. Rank of the discount of each category
    12. Dense rank of the discounts of each category
    13. Sum of the unused products in each category
    14. Sum of the unused products in subcategory.
    
  Call the functions i created to create the new columns and Print the schema of the dataframe to check column created.
    
<br>

- ### Task 3- Analysis

    Utilising the newly created features to answer some questions. For this task, use the
    dataframe API to answer the following questions.<br>
    
    1. Which type of product is the least popular with the users? (tip - find the feature which
       demonstrates the popularity of a product)
    2. Which type of product is the most popular with the users?
    3. Which subcategory product is the most popular with the users?
    4. Which subcategory product is the least popular with the users?
    5. In the accessory products, which subcategory was the most expensive?
    6. Which type of product had the highest discount offerings?
    7. Which type of product had the lowest discount offerings ?
    8. In the shoe's products, which subcategory was the most common?
    9. Which type of product had the most unused products listed for sale?
    10. Which subcategory product had the most unused products listed for sale?
    
<br>
    
- ### Task 4- Load Data to postgres database<br>
using Docker Image to set Postgresql DBMS Enviroment to load the data

    1. create a new database
    2. subsequently mount a new folder.
    3. Load your dataframes as 3 different tables in the database.

# Tasks
## Task 1 : Extract Data
1. Import Libs and start spark session

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql import Window
from datetime import date

2. Start Spark Session

In [2]:
# conf spark session
spark = (SparkSession \
    .builder \
    .appName("project") \
    .config("spark.jars", "/opt/polynote/notebooks/jdbc files/postgresql-42.5.1.jar") \
    .master('local[*]')\
    .getOrCreate()
)
# start spark context
sc = SparkContext

3. Load Datasets into DataFrames and print schemas

In [3]:
df_shoe = spark.read.csv("./Datasets/shoes.csv", header = True, inferSchema = True)

In [4]:
df_shoe.printSchema()

root
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)



In [5]:
df_jewel = spark.read.csv("./Datasets/jewelry.csv", header = True, inferSchema = True)

In [6]:
df_jewel.printSchema()

root
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)



In [7]:
df_accessories = spark.read.csv("./Datasets/accessories.csv", header = True, inferSchema = True)

In [8]:
df_accessories.printSchema()

root
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)



4. Load All Data Into Main DataFrame

In [9]:
# union all dfs into one df
df = df_shoe.union(df_jewel)
df = df.union(df_accessories)

In [10]:
df.printSchema()

root
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)



In [11]:
# load the df to spark session
df.createOrReplaceTempView("products")

In [12]:
spark.catalog.listTables()

[Table(name='products', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

##  Task 2 : Data Transformations

`Notes and Questions`
1. Q: Can I create multiple level grouping using window functions

In [13]:
# get the df
products = spark.table("products")

In [14]:
products.printSchema()

root
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)



In [15]:
# create window 
cat_window = Window.partitionBy("category")

subcat_window = Window.partitionBy("subcategory")

In [16]:
# functions definition

# 1 - avg agg function
def avg_agg(df, window, df_col, new_col_name):
    # calculate the avg
    new_col = fn.avg(df_col).over(window)
    
    # round to the nearest 2 decimals
    new_col = fn.round(new_col, 2)
    
    # add the new column to df
    df = df.withColumn(new_col_name, new_col)
    
    # return new df
    return df

# 2 - rank function
def rank_agg(df, window, df_col, new_col_name):
    # order the window to rank
    new_window = window.orderBy(df[df_col].desc())
    
    # rank the values
    ranked_col = fn.rank().over(new_window)
    
    # add the ranked column to df
    df = df.withColumn(new_col_name, ranked_col)
    
    return df

# 3 - dense rank function
def dense_rank_agg(df, window, df_col, new_col_name):
    # order the window to rank
    new_window = window.orderBy(df[df_col].desc())
    
    # dense rank the values
    ranked_col = fn.dense_rank().over(new_window)
    
    # add the ranked column to df
    df = df.withColumn(new_col_name, ranked_col)
    
    return df

# 4 - def sum new products
def sum_new(df, window, df_col, new_col_name):
    # sum the new products
    sum_col = fn.sum(fn.col(df_col).cast("integer")).over(window)
    
    # add the sum column to df
    df = df.withColumn(new_col_name, sum_col)
    
    return df

1. Average price of each category
    

In [17]:
products = avg_agg(products, cat_window, "raw_price", "category_avg_price")

2. Average price of each subcategory
    

In [18]:
products = avg_agg(products, subcat_window, "raw_price", "subcategory_avg_price")

3. Average discount of each category 
   

In [19]:
products = avg_agg(products, cat_window, "discount", "category_avg_discount")

 4. Average discount of each subcategory 
    

In [20]:
products = avg_agg(products, subcat_window, "discount", "subcategory_avg_discount")

5. Average likes of each category
    

In [21]:
products = avg_agg(products, cat_window, "likes_count", "category_avg_likes_count")

6. Average likes of each subcategory
    

In [22]:
products = avg_agg(products, subcat_window, "likes_count", "subcategory_avg_likes_count")

7. Rank of the likes of each subcategory
    

In [23]:
products = rank_agg(products, subcat_window, "likes_count", "subcategory_likes_count_rank")

8. Dense rank of the likes of each subcategory
    


In [24]:
products = dense_rank_agg(products, subcat_window, "likes_count", "subcategory_likes_count_dense_rank")

9. Rank of the likes of each category
    

In [25]:
products = rank_agg(products, cat_window, "likes_count", "category_likes_count_rank")

10. Dense rank of the likes of each category
    

In [26]:
products = dense_rank_agg(products, cat_window, "likes_count", "category_likes_count_dense_rank")

11. Rank of the discount of each category
    



In [27]:
products = rank_agg(products, cat_window, "discount", "category_discount_rank")

12. Dense rank of the discounts of each category
    

In [28]:
products = dense_rank_agg(products, cat_window, "discount", "category_discount_dense_rank")

13. Sum of the unused products in each category
    

In [29]:
products = sum_new(products, cat_window, "is_new", "new_product_per_category")

14. Sum of the unused products in subcategory.

In [30]:
products = sum_new(products, subcat_window, "is_new", "new_product_per_subcategory")

### Print the schema of the dataframe to check the new columns created 

In [31]:
products.printSchema()

root
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)
 |-- category_avg_price: double (nullable = true)
 |-- s

## Task 3: Analysis - Utilising the newly created features to answer some questions<br>
use the dataframe API to answer the following questions

### Which type of product is the least popular with the users 
find the feature which demonstrates the popularity of a product

In [32]:
products.groupBy("category").\
    agg(fn.sum("likes_count").\
    alias("total_likes_count")).\
    orderBy(fn.col("total_likes_count")).\
    limit(1).\
    show()

+-----------+-----------------+
|   category|total_likes_count|
+-----------+-----------------+
|accessories|           613522|
+-----------+-----------------+



`Category accessories products is the least popular products based on likes count`

### Which type of product is the most popular with the users 

In [33]:
products.groupBy("category").\
    agg(fn.sum("likes_count").\
    alias("total_likes_count")).\
    orderBy(fn.col("total_likes_count").desc()).\
    limit(1).\
    show()

+--------+-----------------+
|category|total_likes_count|
+--------+-----------------+
|   shoes|          3358818|
+--------+-----------------+



`Category shoes products is the most popular products based on likes count`

### Which subcategory product is the most popular with the users

In [34]:
products.groupBy("subcategory").\
    agg(fn.sum("likes_count").\
    alias("total_likes_count")).\
    orderBy(fn.col("total_likes_count").desc()).\
    limit(1).\
    show()

+-------------------+-----------------+
|        subcategory|total_likes_count|
+-------------------+-----------------+
|Derbies & Mocassins|           794351|
+-------------------+-----------------+



`SubCategory Derbies & Mocassins products is the most popular products based on likes count`

###  Which subcategory product is the least popular with the users

In [35]:
products.groupBy("subcategory").\
    agg(fn.sum("likes_count").\
    alias("total_likes_count")).\
    orderBy(fn.col("total_likes_count")).\
    limit(1).\
    show()

+-----------+-----------------+
|subcategory|total_likes_count|
+-----------+-----------------+
|     Berets|               12|
+-----------+-----------------+



`Subcategory Berets products is the least popular products based on likes count`

### In the accessory products, which subcategory was the most expensive

In [36]:
products.select("subcategory", "subcategory_avg_price").\
    where(df.category == "accessories").\
    orderBy(fn.col("subcategory_avg_price").\
    desc()).\
    limit(1).\
    toPandas()

Unnamed: 0,subcategory,subcategory_avg_price
0,Sun Protection Sleeve,40.48


based on subcategory_avg_price feature, `Sun Protection Sleeve is the most expensive subcategory in accessories category`

### Which type of product had the highest discount offerings

In [42]:
products.groupBy("category").\
    agg(fn.round(fn.max("category_avg_discount"), 2).alias("Max Discount")).\
    orderBy(fn.col("Max Discount").desc()).\
    limit(1).\
    show()

+--------+------------+
|category|Max Discount|
+--------+------------+
| jewelry|       53.13|
+--------+------------+



`Category Jewelry products had the highest discount offerings based on Category average discount feature`

### Which type of product had the lowest discount offerings

In [43]:
products.groupBy("category").\
    agg(fn.round(fn.min("category_avg_discount"), 2).alias("Minimum Discount")).\
    orderBy(fn.col("Minimum Discount")).\
    limit(1).\
    show()

+-----------+----------------+
|   category|Minimum Discount|
+-----------+----------------+
|accessories|           50.76|
+-----------+----------------+



`Category accessories products had the lowest discount offerings based on Category average discount feature`

### In the shoes products, which subcategory was the most common.

In [44]:
products.select("subcategory", "subcategory_avg_likes_count").\
    where(products.category == "shoes").\
    orderBy(fn.col("subcategory_avg_likes_count").desc()).\
    limit(1).\
    show()

+-----------+---------------------------+
|subcategory|subcategory_avg_likes_count|
+-----------+---------------------------+
|  Escarpins|                     519.88|
+-----------+---------------------------+



`Escarpins is the most common subcategory in shoes category products`

### Which type of product had the most unused products listed for sale

In [45]:
products.select("category", "new_product_per_category").\
    orderBy(fn.col("new_product_per_category").desc()).\
    limit(1).\
    show()

+--------+------------------------+
|category|new_product_per_category|
+--------+------------------------+
|   shoes|                     271|
+--------+------------------------+



Has the most used means the least new items, `accessories category has the most used items listed for sale` 

### Which subcategory product  had the most unused products listed for sale

In [46]:
products.select("subcategory", "new_product_per_subcategory").\
    orderBy(fn.col("new_product_per_subcategory").desc()).\
    limit(1).\
    show()

+-----------+---------------------------+
|subcategory|new_product_per_subcategory|
+-----------+---------------------------+
|    Baskets|                         68|
+-----------+---------------------------+



Has the most unused means the most new items, `Baskets subcategory has the most new items listed for sale`

## Task 4: Load into postgres database.

### Load To Postgres DB

#### DB connection to load Configrations

In [47]:
# configrations
url = "jdbc:postgresql://postgres_db:5432/products_db"
mode = "overwrite"

properties = {
    "user": "user",
    "password": "passward",
    "driver": "org.postgresql.Driver"
}

#### load shoes products data

In [48]:
table_shoes = "shoes_products"
df_shoe.write.jdbc(url=url, table=table_shoes, mode=mode, properties=properties)

#### load jewelry products data

In [49]:
table_jewelry = "jewelry_products"
df_jewel.write.jdbc(url=url, table=table_jewelry, mode=mode, properties=properties)

#### load accessories products data

In [50]:
table_accessories = "accessories_products"
df_accessories.write.jdbc(url=url, table=table_accessories, mode=mode, properties=properties)

#### Load products analytical table

In [51]:
table_products = "products_analytics"
products.write.jdbc(url=url, table=table_products, mode=mode, properties=properties)

### Read each table as a dataframe and just print the schema of each one.

#### Read shoes products data from postgresql DB

In [52]:
# load data
shoes_loaded = spark.read.jdbc(url=url, table=table_shoes, properties=properties)

In [53]:
# print Schema
shoes_loaded.printSchema()

root
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)



#### Read jewelry products data from postgresql DB

In [54]:
# load data
jewel_loaded = spark.read.jdbc(url=url, table=table_jewelry, properties=properties)

In [55]:
# print Schema
jewel_loaded.printSchema()

root
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)



#### Read accessories products data from postgresql DB

In [56]:
# load data
accessories_loaded = spark.read.jdbc(url=url, table=table_accessories, properties=properties)

In [57]:
#print Schema
accessories_loaded.printSchema()

root
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)



#### Read products analytics data from postgresql DB

In [58]:
# load data
products_loaded = spark.read.jdbc(url=url, table=table_products, properties=properties)

In [59]:
#print Schema
products_loaded.printSchema()

root
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- raw_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- likes_count: integer (nullable = true)
 |-- is_new: boolean (nullable = true)
 |-- brand: string (nullable = true)
 |-- brand_url: string (nullable = true)
 |-- codCountry: string (nullable = true)
 |-- variation_0_color: string (nullable = true)
 |-- variation_1_color: string (nullable = true)
 |-- variation_0_thumbnail: string (nullable = true)
 |-- variation_0_image: string (nullable = true)
 |-- variation_1_thumbnail: string (nullable = true)
 |-- variation_1_image: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- model: string (nullable = true)
 |-- category_avg_price: double (nullable = true)
 |-- s