In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("svdtest").getOrCreate()

# Load your data into a DataFrame
df = spark.read.csv("dataforFP.csv", header=True, inferSchema=True)

24/11/04 13:12:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## TF-IDF 

In [4]:
from pyspark.ml.feature import StringIndexer, Tokenizer, StopWordsRemover, HashingTF, IDF, PCA, Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import col

In [36]:
# Step 1: Convert LISTINGID to a unique integer index
indexer = StringIndexer(inputCol="LISTINGID", outputCol="listing_id_index")
df_indexed = indexer.fit(df).transform(df)

# Step 2: Tokenize the TITLE column
tokenizer = Tokenizer(inputCol="TITLE", outputCol="words")
words_data = tokenizer.transform(df_indexed)

                                                                                

In [39]:
words_data.show(10)

+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+
|           CREATEDAT|              USERID|           LISTINGID|               TITLE|listing_id_index|               words|
+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+
|2024-10-03 00:03:...|1027633c-bbf0-443...|00903e30-deb2-4d3...|       Funko Arlong |          1633.0|     [funko, arlong]|
|2024-10-03 00:55:...|b7589547-dace-456...|83fdbd98-78a9-439...|Funko SSGSS Veget...|           417.0|[funko, ssgss, ve...|
|2024-10-03 02:04:...|70b74c2f-9648-4ec...|82c2bb33-3bcf-4dd...|    Funko Darth Maul|          1680.0|[funko, darth, maul]|
|2024-10-03 00:42:...|9526ea4f-6281-462...|5cdae714-aa07-4f4...|Funko Tanjiro sig...|           934.0|[funko, tanjiro, ...|
|2024-10-03 02:10:...|1c3ec79c-d28f-4ba...|5cdae714-aa07-4f4...|Funko Tanjiro sig...|           934.0|[funko, tanjiro, ...|
|2024-10

In [40]:
# Step 3: Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_data = remover.transform(words_data)

In [41]:
# Step 4: Apply TF-IDF
hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=2000)
featurized_data = hashingTF.transform(filtered_data)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
tfidf_data = idf_model.transform(featurized_data)


                                                                                

In [43]:
tfidf_data.printSchema()

root
 |-- CREATEDAT: timestamp (nullable = true)
 |-- USERID: string (nullable = true)
 |-- LISTINGID: string (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- listing_id_index: double (nullable = false)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)



## Applying PCA

In [31]:
# Step 5: Apply SVD (try pca first)
pca = PCA(k=50, inputCol="features", outputCol="pcaFeatures")  # Adjust k as needed
pca_model = pca.fit(tfidf_data)
pca_data = pca_model.transform(tfidf_data)

In [32]:
# Calculate total variance of the original features
total_variance = tfidf_data.select("features").rdd \
    .map(lambda row: Vectors.dense(row["features"].toArray()).var()) \
    .sum()

# Calculate variance explained by each principal component
explained_variance = []
k = 50
for i in range(k):
    component_variance = pca_data.select("pcaFeatures") \
        .rdd.map(lambda row: row["pcaFeatures"][i] ** 2).mean()
    explained_variance.append(component_variance / total_variance)

# Display the explained variance ratio for each component
for i, variance in enumerate(explained_variance):
    print(f"Explained variance by PC {i + 1}: {variance:.4f}")

                                                                                

Explained variance by PC 1: 0.0006
Explained variance by PC 2: 0.0003
Explained variance by PC 3: 0.0002
Explained variance by PC 4: 0.0002
Explained variance by PC 5: 0.0002
Explained variance by PC 6: 0.0002
Explained variance by PC 7: 0.0002
Explained variance by PC 8: 0.0002
Explained variance by PC 9: 0.0002
Explained variance by PC 10: 0.0002
Explained variance by PC 11: 0.0002
Explained variance by PC 12: 0.0002
Explained variance by PC 13: 0.0002
Explained variance by PC 14: 0.0002
Explained variance by PC 15: 0.0002
Explained variance by PC 16: 0.0002
Explained variance by PC 17: 0.0002
Explained variance by PC 18: 0.0002
Explained variance by PC 19: 0.0002
Explained variance by PC 20: 0.0002
Explained variance by PC 21: 0.0001
Explained variance by PC 22: 0.0001
Explained variance by PC 23: 0.0001
Explained variance by PC 24: 0.0001
Explained variance by PC 25: 0.0001
Explained variance by PC 26: 0.0001
Explained variance by PC 27: 0.0001
Explained variance by PC 28: 0.0001
E

                                                                                

In [52]:
# Step 6: Normalize data for KNN
normalizer = Normalizer(inputCol="pcaFeatures", outputCol="normFeatures")
normalized_data = normalizer.transform(pca_data)

# Step 7: Perform K-Means Clustering as an alternative to KNN
kmeans = KMeans(featuresCol="normFeatures", k=45)  # Adjust k based on data size
model = kmeans.fit(normalized_data)
predictions = model.transform(normalized_data)

                                                                                

In [53]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator(featuresCol="normFeatures", metricName="silhouette", distanceMeasure="squaredEuclidean")
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette}")



Silhouette Score: 0.40533884963072436


                                                                                

I experimented with various numFeatures values for TF-IDF (ranging from 500 to 2000) and tested different PCA dimensions (from 50 to 150). Despite these adjustments, the explained variance per principal component never exceeded 0.0006. When running KNN, I observed that the Silhouette score only improved when the number of neighbors (K) matched the number of PCA dimensions. This suggests that the variance is distributed thinly across many dimensions, which is typical in sparse text data where each word appears in only a small subset of documents. In this context, each PCA component captures only a small fraction of the total variance, leading to low explained variance in each dimension.

The nature of my data likely contributes to this challenge: the titles are short and often consist of character names from anime or movies, with limited symbolic or contextual overlap between them. As a result, text data may not be the most effective dimension for my analysis. Moving forward, I plan to focus on images to explore whether this modality yield better results.

## Images

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Images_associate").getOrCreate()

# Load your data into a DataFrame
df = spark.read.parquet("image_features_with_metadata.parquet")

24/11/04 16:32:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.ml.linalg import Vectors

# Step 1: Define the feature columns
feature_columns = [str(i) for i in range(2048)]  # All columns from '0' to '2047'

# Step 2: Fill null values with 0.0 in all feature columns
df_filled = df.fillna(0.0, subset=feature_columns)

# Step 3: Use VectorAssembler to combine these columns into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features_vector")
df_vectorized = assembler.transform(df_filled)

In [5]:
df_vectorized.select("features_vector").show(10)

24/11/04 16:32:46 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.




+--------------------+
|     features_vector|
+--------------------+
|[147.0,121.0,106....|
|[190.0,255.0,19.0...|
|[130.0,243.0,17.0...|
|[190.0,255.0,19.0...|
|[147.0,121.0,106....|
|[190.0,255.0,19.0...|
|[190.0,255.0,19.0...|
|[147.0,121.0,106....|
|[190.0,255.0,19.0...|
|[147.0,121.0,106....|
+--------------------+
only showing top 10 rows



                                                                                

In [7]:
df_vectorized = df_vectorized.na.drop(subset=["features_vector"])

In [8]:
# Step 4: Apply PCA on the combined vector column
pca = PCA(k=50, inputCol="features_vector", outputCol="pca_features")  # Adjust 'k' as needed
pca_model = pca.fit(df_vectorized)
df_with_pca = pca_model.transform(df_vectorized)

                                                                                

In [9]:
df_with_pca.select("pca_features").show(10)

24/11/04 16:36:54 WARN DAGScheduler: Broadcasting large task binary with size 1507.3 KiB


                                                                                

24/11/04 16:36:55 WARN DAGScheduler: Broadcasting large task binary with size 1507.3 KiB


[Stage 24:>                                                         (0 + 4) / 4]

+--------------------+
|        pca_features|
+--------------------+
|[-5715.4713731739...|
|[-6035.3505252281...|
|[-5935.7142382889...|
|[-6035.3505252281...|
|[-5715.4713731739...|
|[-6035.3505252281...|
|[-6035.3505252281...|
|[-5715.4713731739...|
|[-6035.3505252281...|
|[-5715.4713731739...|
+--------------------+
only showing top 10 rows



                                                                                

In [10]:
normalizer = Normalizer(inputCol="pca_features", outputCol="norm_features")
normalized_data = normalizer.transform(df_with_pca)

In [11]:
# Step 6: Perform K-Means Clustering as an alternative to KNN
kmeans = KMeans(featuresCol="norm_features", k=30)  # Adjust k based on data size
model = kmeans.fit(normalized_data)
predictions = model.transform(normalized_data)

24/11/04 16:37:03 WARN DAGScheduler: Broadcasting large task binary with size 1566.9 KiB


                                                                                

24/11/04 16:37:44 WARN DAGScheduler: Broadcasting large task binary with size 1567.6 KiB
24/11/04 16:37:44 WARN DAGScheduler: Broadcasting large task binary with size 1568.2 KiB
24/11/04 16:37:44 WARN DAGScheduler: Broadcasting large task binary with size 1568.6 KiB
24/11/04 16:37:44 WARN DAGScheduler: Broadcasting large task binary with size 1568.6 KiB
24/11/04 16:37:45 WARN DAGScheduler: Broadcasting large task binary with size 1569.0 KiB
24/11/04 16:37:45 WARN DAGScheduler: Broadcasting large task binary with size 1569.5 KiB
24/11/04 16:37:45 WARN DAGScheduler: Broadcasting large task binary with size 1568.8 KiB
24/11/04 16:37:46 WARN DAGScheduler: Broadcasting large task binary with size 1568.8 KiB
24/11/04 16:37:46 WARN DAGScheduler: Broadcasting large task binary with size 1568.8 KiB
24/11/04 16:37:46 WARN DAGScheduler: Broadcasting large task binary with size 1568.8 KiB
24/11/04 16:37:46 WARN DAGScheduler: Broadcasting large task binary with size 1568.8 KiB
24/11/04 16:37:46 WAR



24/11/04 16:38:10 WARN DAGScheduler: Broadcasting large task binary with size 1942.4 KiB


                                                                                

In [12]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator(featuresCol="norm_features", metricName="silhouette", distanceMeasure="squaredEuclidean")
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette}")

24/11/04 16:38:11 WARN DAGScheduler: Broadcasting large task binary with size 1597.8 KiB


                                                                                

24/11/04 16:38:32 WARN DAGScheduler: Broadcasting large task binary with size 1625.9 KiB




Silhouette Score: 0.038786311244633075


                                                                                

- after running this analysis, I am not sure if this is a good idea for my project because this method is really computational intensive and not sure, but the result isn't as good as I thought. Find the best k value can take forever becasue the data is too large to be processed. The prediction evaluation isn't showing this is a really promissing method either, so I am just gonna pause here for my exploration and move on for collaborative filtering. 