# PYSPARK

In [1]:
%pip install pyspark


Note: you may need to restart the kernel to use updated packages.


## Create a Spark Session


In [2]:
from pyspark.sql import SparkSession

# Create a SparkSession named "Spark"
spark = SparkSession.builder \
    .appName("Spark") \
    .getOrCreate()

### Load CSV Data


In [3]:
# Load CSV data
sold_products_path = '../datasets/products_sold_data.csv'
sold_products_df = spark.read.csv(sold_products_path, header=True, inferSchema=True)

# Show the first few rows of the DataFrame
sold_products_df.show()

+------------------+--------------------+--------------------+-------------+---------+-----------+
|          category|                link|                name|current_price|avg_stars|num_reviews|
+------------------+--------------------+--------------------+-------------+---------+-----------+
|telephone-tablette|https://www.jumia...|Écouteurs Bluetoo...|         96.0|      3.3|       13.0|
|telephone-tablette|https://www.jumia...|Support Trépied r...|        129.0|      5.0|        1.0|
|telephone-tablette|https://www.jumia...|Écouteurs Bluetoo...|         66.0|      3.7|       70.0|
|telephone-tablette|https://www.jumia...|XIAOMI Redmi Buds...|        188.0|      4.4|       20.0|
|telephone-tablette|https://www.jumia...|M10 TWS True Wire...|         49.0|      3.7|      164.0|
|telephone-tablette|https://www.jumia...|"XIAOMI REDMI NOT...|       2049.0|      4.3|        9.0|
|telephone-tablette|https://www.jumia...|écouteurs Bluetoo...|         99.0|      3.9|       24.0|
|telephone

### Check the Schema of the DataFrame


In [4]:
sold_products_df.printSchema()


root
 |-- category: string (nullable = true)
 |-- link: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: string (nullable = true)
 |-- avg_stars: double (nullable = true)
 |-- num_reviews: double (nullable = true)



### Delete the "link" Column


In [5]:
sold_products_df = sold_products_df.drop("link")

sold_products_df.show()


+------------------+--------------------+-------------+---------+-----------+
|          category|                name|current_price|avg_stars|num_reviews|
+------------------+--------------------+-------------+---------+-----------+
|telephone-tablette|Écouteurs Bluetoo...|         96.0|      3.3|       13.0|
|telephone-tablette|Support Trépied r...|        129.0|      5.0|        1.0|
|telephone-tablette|Écouteurs Bluetoo...|         66.0|      3.7|       70.0|
|telephone-tablette|XIAOMI Redmi Buds...|        188.0|      4.4|       20.0|
|telephone-tablette|M10 TWS True Wire...|         49.0|      3.7|      164.0|
|telephone-tablette|"XIAOMI REDMI NOT...|       2049.0|      4.3|        9.0|
|telephone-tablette|écouteurs Bluetoo...|         99.0|      3.9|       24.0|
|telephone-tablette|XIAOMI Redmi Buds...|        188.0|      4.1|       15.0|
|telephone-tablette|P9 Casque Bluetoo...|         65.6|      3.6|      179.0|
|telephone-tablette|bracelet pour app...|         49.0|      3.5

### Rename the Columns


In [6]:
sold_products_df = sold_products_df \
    .withColumnRenamed("name", "product_name") \
    .withColumnRenamed("current_price", "product_price") \
    .withColumnRenamed("avg_stars", "stars_avg") \
    .withColumnRenamed("num_reviews", "reviews_number") 

sold_products_df.show()


+------------------+--------------------+-------------+---------+--------------+
|          category|        product_name|product_price|stars_avg|reviews_number|
+------------------+--------------------+-------------+---------+--------------+
|telephone-tablette|Écouteurs Bluetoo...|         96.0|      3.3|          13.0|
|telephone-tablette|Support Trépied r...|        129.0|      5.0|           1.0|
|telephone-tablette|Écouteurs Bluetoo...|         66.0|      3.7|          70.0|
|telephone-tablette|XIAOMI Redmi Buds...|        188.0|      4.4|          20.0|
|telephone-tablette|M10 TWS True Wire...|         49.0|      3.7|         164.0|
|telephone-tablette|"XIAOMI REDMI NOT...|       2049.0|      4.3|           9.0|
|telephone-tablette|écouteurs Bluetoo...|         99.0|      3.9|          24.0|
|telephone-tablette|XIAOMI Redmi Buds...|        188.0|      4.1|          15.0|
|telephone-tablette|P9 Casque Bluetoo...|         65.6|      3.6|         179.0|
|telephone-tablette|bracelet

### Replace "-" with Space in the "category" Column


In [7]:
from pyspark.sql.functions import regexp_replace

sold_products_df = sold_products_df.withColumn("category", regexp_replace("category", "-", " "))

sold_products_df.show()


+------------------+--------------------+-------------+---------+--------------+
|          category|        product_name|product_price|stars_avg|reviews_number|
+------------------+--------------------+-------------+---------+--------------+
|telephone tablette|Écouteurs Bluetoo...|         96.0|      3.3|          13.0|
|telephone tablette|Support Trépied r...|        129.0|      5.0|           1.0|
|telephone tablette|Écouteurs Bluetoo...|         66.0|      3.7|          70.0|
|telephone tablette|XIAOMI Redmi Buds...|        188.0|      4.4|          20.0|
|telephone tablette|M10 TWS True Wire...|         49.0|      3.7|         164.0|
|telephone tablette|"XIAOMI REDMI NOT...|       2049.0|      4.3|           9.0|
|telephone tablette|écouteurs Bluetoo...|         99.0|      3.9|          24.0|
|telephone tablette|XIAOMI Redmi Buds...|        188.0|      4.1|          15.0|
|telephone tablette|P9 Casque Bluetoo...|         65.6|      3.6|         179.0|
|telephone tablette|bracelet

### Clean the "product_name" Column


In [8]:
from pyspark.sql.functions import trim, col

# Remove special characters and extra spaces from the "product_name" column
sold_products_df = sold_products_df.withColumn("product_name", regexp_replace(col("product_name"), "[^a-zA-Z0-9\s]+", ""))
sold_products_df = sold_products_df.withColumn("product_name", trim(col("product_name")))

sold_products_df.show()


+------------------+--------------------+-------------+---------+--------------+
|          category|        product_name|product_price|stars_avg|reviews_number|
+------------------+--------------------+-------------+---------+--------------+
|telephone tablette|couteurs Bluetoot...|         96.0|      3.3|          13.0|
|telephone tablette|Support Trpied rg...|        129.0|      5.0|           1.0|
|telephone tablette|couteurs Bluetoot...|         66.0|      3.7|          70.0|
|telephone tablette|XIAOMI Redmi Buds...|        188.0|      4.4|          20.0|
|telephone tablette|M10 TWS True Wire...|         49.0|      3.7|         164.0|
|telephone tablette|XIAOMI REDMI NOTE...|       2049.0|      4.3|           9.0|
|telephone tablette|couteurs Bluetoot...|         99.0|      3.9|          24.0|
|telephone tablette|XIAOMI Redmi Buds...|        188.0|      4.1|          15.0|
|telephone tablette|P9 Casque Bluetoo...|         65.6|      3.6|         179.0|
|telephone tablette|bracelet

### Drop Rows with Null Values


In [9]:
# Drop rows with any null values
sold_products_df = sold_products_df.dropna()

row_count = sold_products_df.count()
print("Number of rows after dropping nulls:", row_count)


Number of rows after dropping nulls: 18880


### Print the Schema of the DataFrame


In [10]:
sold_products_df.printSchema()


root
 |-- category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_price: string (nullable = true)
 |-- stars_avg: double (nullable = true)
 |-- reviews_number: double (nullable = true)



### Convert Columns to Appropriate Data Types


In [11]:
from pyspark.sql.functions import col

# Convert "reviews_number" to integer
sold_products_df = sold_products_df.withColumn("reviews_number", col("reviews_number").cast("int"))

# Convert "product_price" to double
sold_products_df = sold_products_df.withColumn("product_price", col("product_price").cast("double"))

sold_products_df.printSchema()



root
 |-- category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_price: double (nullable = true)
 |-- stars_avg: double (nullable = true)
 |-- reviews_number: integer (nullable = true)



### Show the Final Cleaned DataFrame


In [12]:
sold_products_df.show()


+------------------+--------------------+-------------+---------+--------------+
|          category|        product_name|product_price|stars_avg|reviews_number|
+------------------+--------------------+-------------+---------+--------------+
|telephone tablette|couteurs Bluetoot...|         96.0|      3.3|            13|
|telephone tablette|Support Trpied rg...|        129.0|      5.0|             1|
|telephone tablette|couteurs Bluetoot...|         66.0|      3.7|            70|
|telephone tablette|XIAOMI Redmi Buds...|        188.0|      4.4|            20|
|telephone tablette|M10 TWS True Wire...|         49.0|      3.7|           164|
|telephone tablette|XIAOMI REDMI NOTE...|       2049.0|      4.3|             9|
|telephone tablette|couteurs Bluetoot...|         99.0|      3.9|            24|
|telephone tablette|XIAOMI Redmi Buds...|        188.0|      4.1|            15|
|telephone tablette|P9 Casque Bluetoo...|         65.6|      3.6|           179|
|telephone tablette|bracelet

## Define product quality 

In [13]:
from pyspark.sql.functions import col, when

# Define sentiment based on stars_avg
sold_products_df = sold_products_df.withColumn(
    "quality",
    when(col("stars_avg") >= 4.0, "Good")
    .when((col("stars_avg") >= 2.5) & (col("stars_avg") < 4.0), "Average")
    .otherwise("Bad")
)

# Show the DataFrame with the new column
sold_products_df.show()


+------------------+--------------------+-------------+---------+--------------+-------+
|          category|        product_name|product_price|stars_avg|reviews_number|quality|
+------------------+--------------------+-------------+---------+--------------+-------+
|telephone tablette|couteurs Bluetoot...|         96.0|      3.3|            13|Average|
|telephone tablette|Support Trpied rg...|        129.0|      5.0|             1|   Good|
|telephone tablette|couteurs Bluetoot...|         66.0|      3.7|            70|Average|
|telephone tablette|XIAOMI Redmi Buds...|        188.0|      4.4|            20|   Good|
|telephone tablette|M10 TWS True Wire...|         49.0|      3.7|           164|Average|
|telephone tablette|XIAOMI REDMI NOTE...|       2049.0|      4.3|             9|   Good|
|telephone tablette|couteurs Bluetoot...|         99.0|      3.9|            24|Average|
|telephone tablette|XIAOMI Redmi Buds...|        188.0|      4.1|            15|   Good|
|telephone tablette|P

### DF save

In [14]:
%pip install pandas openpyxl


Note: you may need to restart the kernel to use updated packages.


In [15]:
import pandas as pd

sold_products_pd_df = sold_products_df.toPandas()

# Save the Pandas DataFrame to an Excel file
excel_file_path = 'products_sold_data.xlsx'
sold_products_pd_df.to_excel(excel_file_path, index=False)
