# Transformation of Dataset(Goosecreek) Using Pyspark

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=a15540b1951c984dcf0b9738725b7d98f88c4cea3c0f507dd28e87e433fa0964
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [11]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace
import argparse

In [5]:
# Create SparkSession from builder
spark = SparkSession.builder.master("local") \
                    .appName('project-pipeline') \
                    .getOrCreate()

In [45]:
df = spark.read.json('/content/sample_data/GoosecreekCandles_scrapped_goosescrape_2024-04-20_04-21-56.json')

In [46]:
# Filter out corrupt records
clean_df = df.drop(col("_corrupt_record"))

# Remove dollar sign and cast to double
clean_df = clean_df.withColumn("price_earlier", regexp_replace(col("price_earlier"), "\\$", "").cast("double")) \
       .withColumn("price_now", regexp_replace(col("price_now"), "\\$", "").cast("double"))

#Drop NA
clean_df = clean_df.dropna()


In [47]:
# Casting relevant columns to numeric types
clean_df = clean_df.withColumn("numReviews", col("numReviews").cast("integer")) \
       .withColumn("rating", col("rating").cast("double")) \
       .withColumn("Scrapped_On", col("Scrapped_On").cast("date")) \
       .withColumn("1_star_Rating", col("1_star_Rating").cast("integer")) \
       .withColumn("2_star_Rating", col("2_star_Rating").cast("integer")) \
       .withColumn("3_star_Rating", col("3_star_Rating").cast("integer")) \
       .withColumn("4_star_Rating", col("4_star_Rating").cast("integer")) \
       .withColumn("5_star_Rating", col("5_star_Rating").cast("integer"))

# Show the updated schema
clean_df.printSchema()

root
 |-- 1_star_Rating: integer (nullable = true)
 |-- 2_star_Rating: integer (nullable = true)
 |-- 3_star_Rating: integer (nullable = true)
 |-- 4_star_Rating: integer (nullable = true)
 |-- 5_star_Rating: integer (nullable = true)
 |-- Details: string (nullable = true)
 |-- Product Main Category: string (nullable = true)
 |-- Product Sub-Category: string (nullable = true)
 |-- Scrapped_On: date (nullable = true)
 |-- numReviews: integer (nullable = true)
 |-- price_earlier: double (nullable = true)
 |-- price_now: double (nullable = true)
 |-- rating: double (nullable = true)
 |-- title: string (nullable = true)



In [48]:
# Calculate discount percentage
clean_df = clean_df.withColumn("discount_percentage",
                               ((col("price_earlier") - col("price_now")) / col("price_earlier")) * 100)

In [49]:
# Example: Average price_now by Product Main Category
avg_price_by_category = clean_df.groupBy("Product Sub-Category").avg("price_now")
avg_price_by_category.show()

+--------------------+------------------+
|Product Sub-Category|    avg(price_now)|
+--------------------+------------------+
|                Cozy|13.989999999999998|
|Signature Collection|             13.99|
|              Floral|13.990000000000004|
|Scented Plug-ins ...|4.9483333333333315|
|All Body Care Pro...| 5.490000000000001|
|Fresh & Clean Fra...|13.990000000000004|
|           Wax Melts|               4.0|
|      3 Wick Candles|13.990000000000004|
|                Sale| 5.989999999999999|
|         Room Sprays|3.9899999999999998|
| Single Wick Candles| 7.590000000000004|
|     Candles for Men|11.445454545454545|
|Orange Blossom Fr...|             13.99|
|              Fruity|13.990000000000006|
|Vanilla Bean Frag...| 7.241666666666667|
|         All Candles|12.657379032257994|
|Little Debbie® x ...|             13.99|
|Aromatherapy Candles|             9.392|
|      Spring Candles|12.101111111111118|
|               Woody|13.989999999999998|
+--------------------+------------

In [50]:
# Show some rows from the clean DataFrame
clean_df.show(20)

+-------------+-------------+-------------+-------------+-------------+--------------------+---------------------+--------------------+-----------+----------+-------------+---------+------+--------------------+-------------------+
|1_star_Rating|2_star_Rating|3_star_Rating|4_star_Rating|5_star_Rating|             Details|Product Main Category|Product Sub-Category|Scrapped_On|numReviews|price_earlier|price_now|rating|               title|discount_percentage|
+-------------+-------------+-------------+-------------+-------------+--------------------+---------------------+--------------------+-----------+----------+-------------+---------+------+--------------------+-------------------+
|            2|            0|            5|           25|          492|Sicilian Lemon, M...| Candles & Home Fr...|World Traveler Ca...| 2024-04-20|       524|         25.5|    13.99|   4.9|Limoncello Large ...|  45.13725490196078|
|            0|            0|            2|           10|          100|Mahog

In [51]:
# Define the desired column order
desired_column_order = [
    "Product Main Category",
    "Product Sub-Category",
    "Scrapped_On",
    "title",
    "price_earlier",
    "price_now",
    "discount_percentage",
    "rating",
    "numReviews",
    "1_star_Rating",
    "2_star_Rating",
    "3_star_Rating",
    "4_star_Rating",
    "5_star_Rating",
    "Details"
    ]

# Select columns in the desired order
ordered_df = clean_df.select(desired_column_order)

ordered_df.show(20)

+---------------------+--------------------+-----------+--------------------+-------------+---------+-------------------+------+----------+-------------+-------------+-------------+-------------+-------------+--------------------+
|Product Main Category|Product Sub-Category|Scrapped_On|               title|price_earlier|price_now|discount_percentage|rating|numReviews|1_star_Rating|2_star_Rating|3_star_Rating|4_star_Rating|5_star_Rating|             Details|
+---------------------+--------------------+-----------+--------------------+-------------+---------+-------------------+------+----------+-------------+-------------+-------------+-------------+-------------+--------------------+
| Candles & Home Fr...|World Traveler Ca...| 2024-04-20|Limoncello Large ...|         25.5|    13.99|  45.13725490196078|   4.9|       524|            2|            0|            5|           25|          492|Sicilian Lemon, M...|
| Candles & Home Fr...|               Woody| 2024-04-20|Teakwood Large 3-...

In [52]:
ordered_df.groupBy("Product Sub-Category").avg("discount_percentage").show()

+--------------------+------------------------+
|Product Sub-Category|avg(discount_percentage)|
+--------------------+------------------------+
|                Cozy|       45.13725490196078|
|Signature Collection|       45.13725490196078|
|              Floral|       45.13725490196078|
|Scented Plug-ins ...|      38.068418856904465|
|All Body Care Pro...|      31.289111389236545|
|Fresh & Clean Fra...|       45.13725490196078|
|           Wax Melts|      20.295857988165686|
|      3 Wick Candles|      45.137254901960716|
|                Sale|       61.35483870967743|
|         Room Sprays|       50.06257822277848|
| Single Wick Candles|       51.03225806451609|
|     Candles for Men|      41.556376066368394|
|Orange Blossom Fr...|       45.13725490196078|
|              Fruity|       45.13725490196082|
|Vanilla Bean Frag...|       40.56536172007123|
|         All Candles|       45.74231805104994|
|Little Debbie® x ...|       45.13725490196078|
|Aromatherapy Candles|      43.845853015

#Text Transformation

In [53]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, split, lower, avg, expr, udf
from pyspark.sql.types import IntegerType, DoubleType
from textblob import TextBlob

In [54]:
# Text Processing: Clean and tokenize the Details column
ordered_df = ordered_df.withColumn("cleaned_details", lower(col("Details")))
ordered_df = ordered_df.withColumn("ingredients", split(col("cleaned_details"), ",\\s*"))

In [55]:
# Sentiment Analysis: Using TextBlob (requires local execution as Spark UDF)
def get_sentiment(text):
    return TextBlob(text).sentiment.polarity

sentiment_udf = udf(get_sentiment, DoubleType())

ordered_df = ordered_df.withColumn("details_sentiment", sentiment_udf(col("Details")))

In [56]:
# Save the final DataFrame to a new JSON file (if needed)
ordered_df.write.json("clean_data_April_Final.json")

In [57]:
spark.stop()