# Exploration and Feature Engineering using Amazon Dataset in Apache Spark

**Apache Spark**, distributed computing system, renowned for its performance in big data processing. Spark's in-memory computing power significantly speeds up data processing tasks, particularly for iterative operations common in machine learning. 

Its user-friendly nature is enhanced by high-level APIs in languages like Java, Scala, Python, and R, making Spark accessible to a broader audience of developers and data scientists. The framework simplifies the development of complex parallel applications and offers built-in modules for various tasks, including SQL, streaming data, machine learning, and graph processing.

In [1]:
# Boiler plates
# Establishing Cluster on DSMLP Data Science and Machine Learning Platform 
PID = 'A17017372' 
%load_ext autoreload
%autoreload 2
import os
import getpass

from pyspark.sql import SparkSession
from utilities import SEED
from utilities import PA2Test
from utilities import PA2Data
from utilities import data_cat
from pa2_main import PA2Executor
import time

import pyspark.ml as M
import pyspark.sql.functions as F
import pyspark.sql.types as T



**SparkSession**, entry point for working with DataFrame and SQL functionality in Apache Spark.

In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--py-files utilities.py,assignment2.py \
--deploy-mode client \
pyspark-shell'

INPUT_FORMAT = 'dataframe'

class args:
    review_filename = data_cat.review_filename
    product_filename = data_cat.product_filename
    product_processed_filename = data_cat.product_processed_filename
    ml_features_train_filename = data_cat.ml_features_train_filename
    ml_features_test_filename = data_cat.ml_features_test_filename
    output_root = '/home/{}/{}-pa2/test_results'.format(getpass.getuser(), PID)
    test_results_root = data_cat.test_results_root
    pid = PID

pa2 = PA2Executor(args, input_format=INPUT_FORMAT)
data_io = pa2.data_io
data_dict = pa2.data_dict

# Bring the part_1 datasets to memory and de-cache part_2 datasets. 
# Execute this once before you start working on this Part
data_dict, _ = data_io.cache_switch(data_dict, 'part_1')

23/12/28 01:26:57 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext should be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServ

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:89)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:650)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


## DataFrame Library 

#### Python API for Apache Spark

Module (F) contains a variety of functions for manipulating data in DataFrames. These functions include string manipulation, date arithmetic, common mathematical operations, and more complex operations like window functions.

Module (M) represents Spark's machine learning library. It provides a variety of machine learning algorithms, feature transformers, and utilities for constructing ML pipelines, training models, and making predictions.

Module (T) provides classes that describe the types of data in DataFrames, like string, integer, float, array, and more complex types like StructType. These types are used to define the schema of DataFrames.

In [4]:
import pyspark.ml as M
import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline

from pyspark.sql.functions import col, avg, count, udf, when, size, lit, explode
from pyspark.sql.functions import map_keys, map_values, lower, split

from pyspark.sql import functions as F

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, FloatType

# DataSet Schema

In [5]:
product_data = data_dict['product']
product_data.printSchema()

root
 |-- asin: string (nullable = true)
 |-- salesRank: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)
 |-- categories: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- price: float (nullable = true)
 |-- related: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: string (containsNull = true)



In [11]:
print("Total number of products in dataset: " + str(product_data.count()))

Total number of products in dataset: 9430000


In [6]:
review_data = data_dict['review']
review_data.printSchema()

root
 |-- reviewerID: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- overall: float (nullable = true)



In [12]:
print("Total number of reviews in dataset: " + str(review_data.count()))

Total number of reviews in dataset: 57873997


In [50]:
product_processed_data = data_dict['product_processed']
product_processed_data.printSchema()

root
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- category: string (nullable = true)



In [51]:
print("Total number of products in processed dataset: " + str(product_processed_data.count()))

Total number of products in processed dataset: 9430000


## Merging Reviews with Product Details

In [7]:
joined_df = product_data.join(review_data, "asin", "left")
    
ratings_df = joined_df.groupBy("asin").agg(
    avg("overall").alias("meanRating"),
    count("overall").alias("countRating")
)

summary_stats = ratings_df.describe("meanRating", "countRating").toPandas()

numZeros_countRating = ratings_df.filter(col("countRating") == 0).count()

filtered_df = ratings_df.filter(col("countRating") != 0)

                                                                                

In [10]:
summary_stats

Unnamed: 0,summary,meanRating,countRating
0,count,7456220.0,9430000.0
1,mean,4.151314482625535,5.885972852598091
2,stddev,1.062890526084599,45.849015609871294
3,min,1.0,0.0
4,max,5.0,17817.0


In [8]:
ratings_df.show()

[Stage 49:>                                                         (0 + 1) / 1]

+----------+------------------+-----------+
|      asin|        meanRating|countRating|
+----------+------------------+-----------+
|0000032034|3.6666666666666665|          3|
|0000095699|               3.0|          1|
|0000143502|              null|          0|
|0000143529|              null|          0|
|0001048775|              null|          0|
|0001064487| 4.666666666666667|          3|
|0001203088|               5.0|          1|
|0001380877|               5.0|          1|
|0001384163|               5.0|          1|
|0001384198|               5.0|          1|
|0001720252|               2.0|          1|
|0001837192|              null|          0|
|0001845357|               4.0|          1|
|0001850164|              3.75|          4|
|0001856871|               3.0|          1|
|0001857169|               5.0|          1|
|0001923331|               5.0|          1|
|0001956752|               5.0|          1|
|000195783X|              null|          0|
|0001982796|               5.0| 

                                                                                

In [15]:
print(str(numZeros_countRating) + " products do not have any reviews.")

1973780 products do not have any reviews.


## Flattening  Categories [Array of Arrays] into Columns

In [16]:
df_with_category = product_data.withColumn(
        "category",
        F.when(F.size(F.col("categories")) > 0, F.col("categories")[0][0])
        .otherwise(F.lit(None))
    )
    
df_with_category = df_with_category.withColumn("category", F.when(F.col("category") != "", F.col("category")).otherwise(F.lit(None)))

df_with_cat_sales = df_with_category.withColumn(
    "bestSalesCategory",
    when(col("salesRank").isNull(), lit(None))
    .otherwise(map_keys(col("salesRank"))[0])
).withColumn(
    "bestSalesRank",
    when(col("salesRank").isNull(), lit(None))
    .otherwise(map_values(col("salesRank"))[0])
)

In [25]:
df_with_category.select(col("asin"), col("category")).show()

+----------+--------------------+
|      asin|            category|
+----------+--------------------+
|B00I8HVV6E|      Home & Kitchen|
|B00I8KEOTM|    Apps for Android|
|B00I8KCW4G|Clothing, Shoes &...|
|B00I8JKCQW|Clothing, Shoes &...|
|B00I8JKI8E|Clothing, Shoes &...|
|B00I8JEVO6|   Sports & Outdoors|
|B00I8K312I|              Beauty|
|B00I8K9A3M|               Books|
|B00I8KDKHO|          Automotive|
|B00I8JGBIU|        Toys & Games|
|B00I8H5FQG|Amazon Instant Video|
|B00I8KGXLY|Clothing, Shoes &...|
|B00I8JPBB8|      Home & Kitchen|
|B00I8KFRL6|Health & Personal...|
|B00I8IU9QQ|    Apps for Android|
|B00I8KF7ZW|Clothing, Shoes &...|
|B00I8JMBKM|Clothing, Shoes &...|
|B00I8I50EM|               Books|
|B00I8JEHJ0|Cell Phones & Acc...|
|B00I8JIDJ0|Clothing, Shoes &...|
+----------+--------------------+
only showing top 20 rows



In [27]:
# Filter out rows where 'category' is null
df_filtered = df_with_cat_sales.filter(F.col("category").isNotNull())

# Now count distinct values in the 'category' column
countDistinct_category = df_filtered.select("category").distinct().count()

# Count nulls in bestSalesCategory
numNulls_bestSalesCategory = df_with_cat_sales.filter(F.col("bestSalesCategory").isNull()).count()

df_filtered_sales_cat = df_with_cat_sales.filter(F.col("bestSalesCategory").isNotNull())

# Count distinct values in bestSalesCategory, excluding nulls
countDistinct_bestSalesCategory = df_filtered_sales_cat.select("bestSalesCategory").distinct().count()

                                                                                

In [29]:
print(str(countDistinct_category) + " distinct categories in products dataset.")

82 distinct categories in products dataset.


## Flattening Related key-value Pair into Columns

Each entry of `related` column is a map with four keys/attributes: `also_bought`, `also_viewed`, `bought_together`,and `buy_after_viewing`. Each value of these keys contains an array of product IDs. We call them attribute arrays.

Goal: Calculate the mean price of all products from the `also_viewed` attribute array, and place it into a new column `menaPriceAlsoViewed`.

In [34]:
 # Inputs:
asin_column = 'asin'
price_column = 'price'
attribute = 'also_viewed'
related_column = 'related'
# Outputs:
meanPriceAlsoViewed_column = 'meanPriceAlsoViewed'
countAlsoViewed_column = 'countAlsoViewed'

# Define column references from the DataFrame
asin_col = F.col(asin_column)  # Reference to the 'asin' column
price_col = F.col(price_column)  # Reference to the 'price' column
attribute_col = F.col(attribute)  # Reference to the 'attribute' column
related_col = F.col(related_column)  # Reference to the 'related' column

# Explode the 'related' column to create a new row for each element in the list/map
data_exploded = product_data.select(
    asin_col,
    F.explode(related_col.getItem(attribute)).alias('asinSK')
).alias('data_exploded')

# Join the exploded data with the original product data on matching ASIN values
data_joined = data_exploded.join(
    product_data.alias('product_data'),
    F.col('data_exploded.asinSK') == F.col('product_data.asin'),
    how='left'
).drop(F.col('product_data.asin'))  # Drop the duplicate 'asin' column from the join result

# Aggregate data by ASIN to calculate mean price and count of also viewed products
data_agg = data_joined.groupBy(asin_column).agg(
    F.mean(F.when(price_col.isNotNull(), price_col)).alias(meanPriceAlsoViewed_column),
    F.count(F.col('asinSK')).alias(countAlsoViewed_column)
)

# Join the aggregated data back to the original product data
product_data_output = product_data.join(data_agg, [asin_column], how='left')

# Prepare functions for further aggregation - mean, variance, and count of nulls
fns = []
for column in [meanPriceAlsoViewed_column, countAlsoViewed_column]:
    fns.append(F.mean(F.when(F.col(column).isNotNull(), F.col(column))).alias(f'mean_{column}'))
    fns.append(F.variance(F.when(F.col(column).isNotNull(), F.col(column))).alias(f'variance_{column}'))
    fns.append(F.sum(F.col(column).isNull().cast('integer')).alias(f'numNulls_{column}'))

# Apply the prepared functions and collect the first row of the result
rr = product_data_output.select(fns).collect()[0]

                                                                                

In [39]:
product_data_output.select(col("asin"),col("meanPriceAlsoViewed"),col("countAlsoViewed")).show()

[Stage 118:>                                                        (0 + 1) / 1]

+----------+-------------------+---------------+
|      asin|meanPriceAlsoViewed|countAlsoViewed|
+----------+-------------------+---------------+
|0195338103|               null|           null|
|0465014984|               null|           null|
|0465015638|               null|           null|
|0465016073|               null|           null|
|0465016537|               null|           null|
|0806525312|  12.92333337995741|             11|
|0883686341|               null|           null|
|0883686600|               null|           null|
|1590950305|               null|           null|
|1844487229|               null|           null|
|3540041303|               null|           null|
|3540211322|               null|           null|
|3540221891|               null|              1|
|B00000DC10|               null|           null|
|B00000DC4R| 13.861428601401192|             10|
|B00005U14C|               null|           null|
|B000A4HAFE|               null|           null|
|B000E7DMU4| 25.9820

                                                                                

## Mean/Median Data Imputation

There are lots of nulls in the table. We will impute them with meaningful values that can be used to train machine learning models.

In [43]:
meanImputedPrice_column = 'meanImputedPrice'
medianImputedPrice_column = 'medianImputedPrice'
unknownImputedTitle_column = 'unknownImputedTitle'


# Cast the 'price' column to float type
product_data = product_data.withColumn("price", product_data["price"].cast("float"))

#### Impute Means
# Calculate the mean of non-null values in the 'price' column
mean_price = product_data.select(avg(col("price")).alias("mean")).collect()[0]["mean"]

# Impute null values in the 'price' column with the mean value
product_data = product_data.withColumn("meanImputedPrice", when(col("price").isNull(), mean_price).otherwise(col("price")))


#### Impute Medians
# Calculate median
median_price = product_data.approxQuantile(price_column, [0.5], 0.001)[0]
# Impute null values with median
product_data = product_data.withColumn(medianImputedPrice_column, F.when(F.col(price_column).isNull(), median_price).otherwise(F.col(price_column)))


# Impute nulls and empty strings in the 'title' column
product_data = product_data.withColumn(
    unknownImputedTitle_column,
    F.when(
        (F.col("title").isNull()) | (F.col("title") == ''), F.lit('unknown')
    ).otherwise(F.col("title"))
)

                                                                                

In [46]:
product_data.select(col("asin"), col("meanImputedPrice"), col("medianImputedPrice")).show()

+----------+------------------+------------------+
|      asin|  meanImputedPrice|medianImputedPrice|
+----------+------------------+------------------+
|B00I8HVV6E|27.989999771118164|27.989999771118164|
|B00I8KEOTM| 34.93735609456491|14.989999771118164|
|B00I8KCW4G| 41.95000076293945| 41.95000076293945|
|B00I8JKCQW| 34.93735609456491|14.989999771118164|
|B00I8JKI8E|24.989999771118164|24.989999771118164|
|B00I8JEVO6| 34.93735609456491|14.989999771118164|
|B00I8K312I|              58.0|              58.0|
|B00I8K9A3M| 34.93735609456491|14.989999771118164|
|B00I8KDKHO| 34.93735609456491|14.989999771118164|
|B00I8JGBIU| 5.489999771118164| 5.489999771118164|
|B00I8H5FQG| 34.93735609456491|14.989999771118164|
|B00I8KGXLY|              30.0|              30.0|
|B00I8JPBB8| 34.93735609456491|14.989999771118164|
|B00I8KFRL6|15.989999771118164|15.989999771118164|
|B00I8IU9QQ| 34.93735609456491|14.989999771118164|
|B00I8KF7ZW| 34.93735609456491|14.989999771118164|
|B00I8JMBKM| 34.93735609456491|

## Embed `title` with word2vec

I transform `title` into a fixed-length vector via word2vec which captures semantic relationships between words by translating words into numerical vectors. Words with similar meanings have vectors that are close to each other in the vector space.

In [52]:
# converts the title into array list 
product_processed_data = product_processed_data.withColumn(
    "titleArray", split(lower(col("title")), " ")
)

# configure the word2vec model 

word2Vec = Word2Vec(
    vectorSize=16,  # Dimension of the word embedding
    minCount=100,   # Minimum number of times a token must appear
    numPartitions=4,  # Number of partitions
    seed=102,       # Random seed
    inputCol="titleArray",  # Input column
    outputCol="wordVectors"  # Output column
)

model = word2Vec.fit(product_processed_data)


                                                                                

23/12/23 22:29:39 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/12/23 22:29:39 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
[Row(word='solos', similarity=0.9661991596221924), Row(word='instrumental', similarity=0.9660047888755798), Row(word='orchestra,', similarity=0.9640913605690002), Row(word='suites', similarity=0.9631103873252869), Row(word='quintet', similarity=0.9624511003494263), Row(word='transcriptions', similarity=0.95783931016922), Row(word='organ', similarity=0.9576882719993591), Row(word='accompaniment', similarity=0.9571981430053711), Row(word='mozart,', similarity=0.9547368288040161), Row(word='chopin:', similarity=0.9536173939704895)]


In [57]:
model.findSynonymsArray('mad', 10)

[('bigfoot', 0.9636884927749634),
 ('ghost', 0.9486820697784424),
 ('dead', 0.9330147504806519),
 ('mccoy', 0.9303552508354187),
 ('man', 0.9302895665168762),
 ('bighorn', 0.9282985329627991),
 ('masked', 0.9274529218673706),
 ('soldier', 0.9248751997947693),
 ('chase', 0.9230467677116394),
 ('rising', 0.9200738668441772)]