In [2]:
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Fashion-Recommender-Engineer")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "8")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

26/02/10 19:46:50 WARN Utils: Your hostname, MacBook-Air-M4.local resolves to a loopback address: 127.0.0.1; using 192.168.100.12 instead (on interface en0)
26/02/10 19:46:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


26/02/10 19:46:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Verify spark
spark

## Load CSV dataset into Spark DataFrame

In [5]:
CSV_PATH = "../data/fashion_datasets.csv"

df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(CSV_PATH)
)
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- item_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- sleeve_type: string (nullable = true)
 |-- season: string (nullable = true)
 |-- fabric: string (nullable = true)
 |-- occasion: string (nullable = true)
 |-- formality_level: string (nullable = true)
 |-- size_range: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- click_count: integer (nullable = true)
 |-- purchase_count: integer (nullable = true)
 |-- length_cm: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- stocks: integer (nullable = true)



## Convert Spark DataFrame → Pandas DataFrame

In [6]:
df = df.drop("_c0")

# Select & reorder columns
columns = [
    "item_id",
    "category",
    "subcategory",
    "sleeve_type",
    "season",
    "fabric",
    "occasion",
    "formality_level",
    "size_range",
    "brand",
    "view_count",
    "click_count",
    "purchase_count",
    "length_cm",
    "price",
    "stocks",
]

df = df.select(columns)

In [7]:
# Convert Spark -> Dataframe
df_cleaned = df.toPandas()
print(type(df_cleaned))
df_cleaned

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,item_id,category,subcategory,sleeve_type,season,fabric,occasion,formality_level,size_range,brand,view_count,click_count,purchase_count,length_cm,price,stocks
0,TNC_000003,tops,printed_tshirts,short,summer,polyester,casual,low,L,Nike,2411,787,42,65,157055,146
1,TNC_000004,tops,formal_shirts,long,summer,denim,party,low,M,ZARA,1219,10,21,68,282021,115
2,TNC_000006,tops,formal_shirts,long,summer,cotton,casual,medium,S,Nike,1209,21,27,71,315906,0
3,TNC_000007,tops,solid_tshirts,short,all-season,fleece,casual,high,XS,Tommy Hilfiger,257,146,8,73,105185,151
4,TNC_000008,tops,casual_shirts,long,winter,linen,office,medium,S,Polo,401,322,23,78,209024,112
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
550270,TNC_1099992,tops,printed_tshirts,short,all-season,polyester,casual,low,M,Polo,55,9,17,70,160868,95
550271,TNC_1099993,tops,formal_shirts,long,summer,linen,office,low,L,Polo,136,261,6,73,302436,196
550272,TNC_1099995,tops,casual_shirts,long,winter,fleece,casual,medium,XL,HnM,512,328,21,78,276939,100
550273,TNC_1099996,tops,printed_tshirts,short,summer,cotton,office,low,M,HnM,165,242,5,72,122545,73


## Insert Spark DataFrame into a Table

In [9]:
spark_sql = (
    SparkSession.builder
    .appName("fashion-recommender")
    .enableHiveSupport()
    .getOrCreate()
)
df.write.mode("overwrite").saveAsTable("fashion_sql")

26/02/10 19:47:08 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


                                                                                

In [11]:
spark.sql("SELECT * FROM fashion_sql").show()

+----------+--------+---------------+-----------+----------+---------+--------+---------------+----------+--------------+----------+-----------+--------------+---------+------+------+
|   item_id|category|    subcategory|sleeve_type|    season|   fabric|occasion|formality_level|size_range|         brand|view_count|click_count|purchase_count|length_cm| price|stocks|
+----------+--------+---------------+-----------+----------+---------+--------+---------------+----------+--------------+----------+-----------+--------------+---------+------+------+
|TNC_000003|    tops|printed_tshirts|      short|    summer|polyester|  casual|            low|         L|          Nike|      2411|        787|            42|       65|157055|   146|
|TNC_000004|    tops|  formal_shirts|       long|    summer|    denim|   party|            low|         M|          ZARA|      1219|         10|            21|       68|282021|   115|
|TNC_000006|    tops|  formal_shirts|       long|    summer|   cotton|  casual| 

In [12]:
df.write.mode("overwrite").parquet("hdfs/fashion_items_parquet")