In [12]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CategoriesPrediction").getOrCreate()


In [13]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, BooleanType

schema_prod = StructType([
    StructField("title", StringType(), True),
    StructField("stars", FloatType(), True),
    StructField("price", FloatType(), True),
    StructField("listPrice", FloatType(), True),
    StructField("category_id", IntegerType(), True),
    StructField("boughtInLastMonth", IntegerType(), True),
    StructField("isBestSeller", BooleanType(), True),
    StructField("Discount", FloatType(), True)
])

df_prod = spark.read.option("quote", '"').option("escape", '"').csv("../cleaned_data/amazon_product_cleaned.csv", header=True, schema=schema_prod)

schema_cat = StructType([
    StructField("id", IntegerType(), True),
    StructField("category_name", StringType(), True)
])

df_cat = spark.read.csv("../cleaned_data/amazon_categories.csv", header=True, schema=schema_cat)

df_cat.show()
df_prod.show()


+---+--------------------+
| id|       category_name|
+---+--------------------+
|  1|Beading & Jewelry...|
|  2|   Fabric Decorating|
|  3|Knitting & Croche...|
|  4|Printmaking Supplies|
|  5|Scrapbooking & St...|
|  6|     Sewing Products|
|  7|Craft & Hobby Fabric|
|  8| Needlework Supplies|
|  9|Arts, Crafts & Se...|
| 10|Painting, Drawing...|
| 11|Craft Supplies & ...|
| 12|Gift Wrapping Sup...|
| 13|   Party Decorations|
| 14|Automotive Paint ...|
| 15|Heavy Duty & Comm...|
| 16|Automotive Tires ...|
| 17|Automotive Tools ...|
| 18|Automotive Perfor...|
| 19|            Car Care|
| 20|       Oils & Fluids|
+---+--------------------+
only showing top 20 rows

+--------------------+-----+------+---------+-----------+-----------------+------------+----------+
|               title|stars| price|listPrice|category_id|boughtInLastMonth|isBestSeller|  Discount|
+--------------------+-----+------+---------+-----------+-----------------+------------+----------+
|Sion Softside Exp...|  4.

In [14]:
# Rename the column 'id' to 'category_id' in data_cat
data_cat = df_cat.withColumnRenamed('id', 'category_id')

# Merge (join) data_prod with data_cat on 'category_id'
data = df_prod.join(data_cat, on='category_id', how='left')

# Drop the 'category_id' column
data = data.drop('category_id')

# Rename the 'title' column to 'product'
data = data.withColumnRenamed('title', 'product')

# Show the first 3 rows of the resulting DataFrame
data.show(3)

+--------------------+-----+------+---------+-----------------+------------+----------+-------------+
|             product|stars| price|listPrice|boughtInLastMonth|isBestSeller|  Discount|category_name|
+--------------------+-----+------+---------+-----------------+------------+----------+-------------+
|Sion Softside Exp...|  4.5|139.99|   139.99|             2000|       false|       1.0|    Suitcases|
|Luggage Sets Expa...|  4.5|169.99|   209.99|             1000|       false|0.80951476|    Suitcases|
|Platinum Elite So...|  4.6|365.49|   429.99|              300|       false| 0.8499965|    Suitcases|
+--------------------+-----+------+---------+-----------------+------------+----------+-------------+
only showing top 3 rows



In [15]:
from pyspark.sql import functions as F

# Select only the 'product' and 'category_name' columns
df = data.select('product', 'category_name')

# Identify and filter the top five categories by frequency
top_categories = (
    df.groupBy('category_name')
    .count()
    .orderBy(F.desc('count'))
    .limit(5)
    .select('category_name')
)

# Join the top categories with the original DataFrame to filter the top five categories by frequency
df = df.join(top_categories, on='category_name', how='inner')

# Show the first few rows of the filtered DataFrame
df.show(100)

pandas_df = df.toPandas()
output_path = '../cleaned_data/category_rating.csv'
pandas_df.to_csv(output_path, index=False, encoding='utf-8')


+-------------+--------------------+
|category_name|             product|
+-------------+--------------------+
|  Men's Shoes|American Heritage...|
|  Men's Shoes|Men's Speedcross ...|
|  Men's Shoes|Women's Terrex AX...|
|  Men's Shoes|            mens Hvc|
|  Men's Shoes|Unisex-Adult Adil...|
|  Men's Shoes|Men's Braver-Rayl...|
|  Men's Shoes|Steel Toe Sneaker...|
|  Men's Shoes|Steel Toe Shoes f...|
|  Men's Shoes|unisex-adult Clas...|
|  Men's Shoes|   Men's Neumel Boot|
|  Men's Shoes|Mens Recovery San...|
|  Men's Shoes|Originals Men's S...|
|  Men's Shoes|Men's Lite Racer ...|
|  Men's Shoes|Men's AL0A7R6F To...|
|  Men's Shoes|Men's Leather Lin...|
|  Men's Shoes|Men's Mio Li Sneaker|
|  Men's Shoes|Men's Moab 2 Mid ...|
|  Men's Shoes|Rubber Hunting Bo...|
|  Men's Shoes|Men's Low Neck Ru...|
|  Men's Shoes|Men's Sneaker,Run...|
|  Men's Shoes|Mens Slip On Walk...|
|  Men's Shoes|Men's Ultraboost ...|
|  Men's Shoes|American Heritage...|
|  Men's Shoes|Men's Casual Dres...|
|

In [16]:
from transformers import pipeline

# Initialize the zero-shot classification pipeline with the BART model
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")

# Example usage:
sequence_to_classify = "I love Spark for big data processing!"
candidate_labels = ["technology", "education", "entertainment"]

# Classify the text with zero-shot classification
result = classifier(sequence_to_classify, candidate_labels)
print(result)



  from .autonotebook import tqdm as notebook_tqdm


{'sequence': 'I love Spark for big data processing!', 'labels': ['technology', 'entertainment', 'education'], 'scores': [0.9908714294433594, 0.005852681118994951, 0.003275853581726551]}


In [17]:
from pyspark.sql import functions as F

# Set the sample size
size = 100

# Sample a subset of the DataFrame
small_df = df.sample(withReplacement=False, fraction=size / df.count(), seed=42)

# Convert the 'product' column to a list
new_products = [row['product'] for row in small_df.select('product').collect()]

# Get unique category labels
candidate_labels = [row['category_name'] for row in df.select('category_name').distinct().collect()]

# Show the first 20 rows of the sampled DataFrame
small_df.show(20)



+---------------+--------------------+
|  category_name|             product|
+---------------+--------------------+
|    Men's Shoes|Taylor Waterproof...|
|    Men's Shoes|Air Jordan 1 Mid ...|
|    Men's Shoes|Men's Excursion T...|
|    Men's Shoes|Zone LV Climbing ...|
|    Men's Shoes|You Got My Back S...|
|    Men's Shoes|Men's Dundee Slipper|
|    Men's Shoes|Mens Black Backle...|
|    Men's Shoes|    Invader Mid Vent|
|    Men's Shoes|Happy Feet Animal...|
|    Men's Shoes|AltamaVengeance S...|
|    Men's Shoes| Men's 113352 Loafer|
|    Men's Shoes|Motion Moc Toe Sl...|
|    Men's Shoes|   Men's Not Boaring|
|    Men's Shoes|Men's Crestwood M...|
|Girls' Clothing|Girls Thermal Und...|
|Girls' Clothing|Nickelodeon Girls...|
|Girls' Clothing|Girls and Toddler...|
|Girls' Clothing|Frozen Elsa and A...|
|Girls' Clothing|Girls Wide-Leg Pa...|
|Girls' Clothing|Boys Girls Polar ...|
+---------------+--------------------+
only showing top 20 rows



In [18]:
import pandas as pd

# Lists to store products and their predicted categories
classified_products = []
predicted_categories = []

# Perform zero-shot classification for each product
for product in new_products:
    result = classifier(product, candidate_labels)
    classified_products.append(product)
    predicted_categories.append(result['labels'][0])


In [19]:
# Create a pandas DataFrame to store classified products and predicted categories
result_df = pd.DataFrame({
    'product': classified_products,
    'predicted_category': predicted_categories
})

# Convert PySpark DataFrames to pandas DataFrames
small_df = small_df.toPandas()

In [20]:
# Merge `small_df` and `result_df` on the 'product' column
new_df = pd.merge(small_df, result_df, on='product', how='left')

# Identify rows where 'category_name' matches 'predicted_category'
identical_rows = new_df['category_name'] == new_df['predicted_category']

# Calculate the accuracy rate
acc_rate = identical_rows.sum() / len(small_df)
print("Accuracy Rate:", acc_rate)

Accuracy Rate: 0.8421052631578947


In [21]:
from transformers import pipeline
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
# classifier.save_pretrained("category_model")
