In [32]:
!wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import shutil
import sys

# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

import findspark
findspark.init()



In [247]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lag, coalesce, lit, isnan
from pyspark.sql.window import Window
from google.colab import files, drive
from pyspark.sql.types import IntegerType

In [248]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [249]:
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [250]:
csv_file_name = "amz_ca_total_products_data_processed"
csv_file_path = f'/content/drive/MyDrive/{csv_file_name}.csv'

In [251]:
parquet_file_path = "amz_ca_total_products_data_processed.parquet"

In [252]:
if os.path.exists(parquet_file_path):
    print(f"Deleting existing Parquet directory at {parquet_file_path}")
    shutil.rmtree(parquet_file_path)

Deleting existing Parquet directory at amz_ca_total_products_data_processed.parquet


In [253]:
df = spark.read.option("header", "true").csv(csv_file_path)

In [254]:
df.printSchema()
df.show(5)

root
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- imgUrl: string (nullable = true)
 |-- productURL: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- reviews: string (nullable = true)
 |-- price: string (nullable = true)
 |-- listPrice: string (nullable = true)
 |-- categoryName: string (nullable = true)
 |-- isBestSeller: string (nullable = true)
 |-- boughtInLastMonth: string (nullable = true)

+----------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+---------+--------------------+------------+-----------------+
|      asin|               title|              imgUrl|          productURL|             stars|             reviews|               price|listPrice|        categoryName|isBestSeller|boughtInLastMonth|
+----------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+---------+------

In [255]:
schema_transformations = {
    "asin": "string",
    "title": "string",
    "imgUrl": "string",
    "productURL": "string",
    "stars": "integer",
    "reviews": "integer",
    "price": "integer",
    "listPrice": "integer",
    "categoryName": "string",
    "isBestSeller": "string",
    "boughtInLastMonth": "integer"
}

In [256]:
for column_name, data_type in schema_transformations.items():
    df = df.withColumn(column_name, col(column_name).cast(data_type))

In [257]:
df.printSchema()
df.show(5)

root
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- imgUrl: string (nullable = true)
 |-- productURL: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- reviews: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- listPrice: integer (nullable = true)
 |-- categoryName: string (nullable = true)
 |-- isBestSeller: string (nullable = true)
 |-- boughtInLastMonth: integer (nullable = true)

+----------+--------------------+--------------------+--------------------+-----+-------+-----+---------+--------------------+------------+-----------------+
|      asin|               title|              imgUrl|          productURL|stars|reviews|price|listPrice|        categoryName|isBestSeller|boughtInLastMonth|
+----------+--------------------+--------------------+--------------------+-----+-------+-----+---------+--------------------+------------+-----------------+
|B07CV4L6HX|Green Leaf WW3D W...|https://m.media-a...|https://www.amazo

In [258]:
df.write.parquet(parquet_file_path)

In [259]:
df_parquet = spark.read.parquet(parquet_file_path)

In [260]:
df_parquet.printSchema()
df_parquet.show()

root
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- imgUrl: string (nullable = true)
 |-- productURL: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- reviews: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- listPrice: integer (nullable = true)
 |-- categoryName: string (nullable = true)
 |-- isBestSeller: string (nullable = true)
 |-- boughtInLastMonth: integer (nullable = true)

+----------+--------------------+--------------------+--------------------+-----+-------+-----+---------+------------------+------------------+-----------------+
|      asin|               title|              imgUrl|          productURL|stars|reviews|price|listPrice|      categoryName|      isBestSeller|boughtInLastMonth|
+----------+--------------------+--------------------+--------------------+-----+-------+-----+---------+------------------+------------------+-----------------+
|B0BNHZJ1SS|Kids Baby Doll St...|https://m.media-a...|https

In [261]:
columns_to_drop = ["asin", "title", "imgUrl", "productURL"]
df_parquet = df_parquet.drop(*columns_to_drop)
df_parquet.show()

+-----+-------+-----+---------+------------------+------------------+-----------------+
|stars|reviews|price|listPrice|      categoryName|      isBestSeller|boughtInLastMonth|
+-----+-------+-----+---------+------------------+------------------+-----------------+
|    2|      2|   16|        0|Dolls  Accessories|             False|                0|
|    2|      4|   22|        0|Dolls  Accessories|             False|                0|
|    4|     23|   37|        0|Dolls  Accessories|             False|                0|
|    3|      6|    0|        0|Dolls  Accessories|             False|                0|
|    5|      1|   18|        0|Dolls  Accessories|             False|                0|
|    0|      0|   10|        0|Dolls  Accessories|             False|                0|
|    5|      4|   20|        0|Dolls  Accessories|             False|                0|
|    0|      0|   85|        0|Dolls  Accessories|             False|                0|
|    0|      0|   99|        0|D

In [262]:
row_count_before = df_parquet.count()

In [263]:
df_parquet_cleaned = df_parquet.na.drop(subset=["stars"])

In [264]:
row_count_after = df_parquet_cleaned.count()


In [270]:
removed_rows_count = row_count_before - row_count_after

print(f"Removed {removed_rows_count} rows with null values")

Removed 52582 rows with null values


In [269]:
df_parquet_cleaned.show()

+-----+-------+-----+---------+------------------+------------+-----------------+
|stars|reviews|price|listPrice|      categoryName|isBestSeller|boughtInLastMonth|
+-----+-------+-----+---------+------------------+------------+-----------------+
|    2|      2|   16|        0|Dolls  Accessories|       False|                0|
|    2|      4|   22|        0|Dolls  Accessories|       False|                0|
|    4|     23|   37|        0|Dolls  Accessories|       False|                0|
|    3|      6|    0|        0|Dolls  Accessories|       False|                0|
|    5|      1|   18|        0|Dolls  Accessories|       False|                0|
|    0|      0|   10|        0|Dolls  Accessories|       False|                0|
|    5|      4|   20|        0|Dolls  Accessories|       False|                0|
|    0|      0|   85|        0|Dolls  Accessories|       False|                0|
|    0|      0|   99|        0|Dolls  Accessories|       False|                0|
|    5|      1| 

In [267]:
csv_output_path = "/content/drive/MyDrive/amz_ca_total_products_data_processed_optimized.csv"
df_parquet.coalesce(1).write.mode("overwrite").option("header", "true").option("sep", ",").csv(csv_output_path)