## Amazon Review Data : Data Exploration

### Setup spark

In [1]:
# Import libraries
import os, pickle, glob
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.cm as cm

from pyspark.sql import SparkSession, SQLContext, DataFrame
from pyspark.sql.types import StringType
import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.stat import Summarizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import Word2Vec, Tokenizer, StringIndexer, OneHotEncoder, PCA, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.sql.functions import col, split, lower
from pyspark.ml.feature import StopWordsRemover, Word2Vec
from pyspark.sql.functions import explode, col

from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

Matplotlib created a temporary cache directory at /tmp/matplotlib-kasx3r5b because the default path (/home/jovyan/.cache/matplotlib) is not a writable directory; it is highly recommended to set the MPLCONFIGDIR environment variable to a writable directory, in particular to speed up the import of Matplotlib and to better support multiprocessing.


In [2]:
sc = SparkSession.builder \
    .config("spark.driver.memory", "128g") \
	.config("spark.executor.memory", "64g") \
    .config('spark.executor.instances', 64) \
	.appName("Amazon Reviews") \
	.getOrCreate()
#.config("spark.kryoserializer.buffer.max", "64g") \

In [3]:
sqlContext = SQLContext(sc)



## Read Data

### Get files

In [4]:
#######################################################################################
###################### change path to: "../clin6/amazon_data" #########################
#######################################################################################
# path = "../clin6/amazon_data"
path = "amazon_data"

In [5]:
def read(path):
    """
    Method that loads data file as df
    Takes in 1 parameter: path
    """
    return sc.read.csv(path, sep = "\t", header = True, inferSchema = True)

def get_path(file):
    """
    Method to create path
    Takes in 1 parameter: file name
    """
    return "amazon_data/%s" % file

In [6]:
dir = os.listdir(path)
files = [f for f in dir if os.path.isfile(os.path.join(path, f))]

# Data Preprocessing
* Finish major preprocessing, this includes scaling and/or transforming your data, imputing your data, encoding your data, feature expansion, Feature expansion (example is taking features and generating new features by transforming via polynomial, log multiplication of features).

In [7]:
product_category_column = 'product_category'
review_body_column = 'review_body'
review_date_column = 'review_date'
title_column = 'product_title'
category_column = 'product_category'
product_parent_column = 'product_parent'
review_body_column = 'review_body'
verified_purchase_column = 'verified_purchase'

product_category_col = F.col(product_category_column)
review_body_col = F.col(review_body_column)
review_date_col = F.col(review_date_column)
title_col = F.col(title_column)
category_col = F.col(category_column)
product_parent_col = F.col(product_parent_column)
review_body_col = F.col(review_body_column)
verified_purchase_col = F.col(verified_purchase_column)

## Load Data & Take care of missing categories

In [8]:
def get_imputed_df(files, category = True):
    """
    Method that combines files into 1 big df
    Takes in 1 parameter: list of file names
    """
    df = None
    n = len(files)
    categories = {}
    for i in range(n):
        data = read(get_path(files[i]))
        
        # Fill in null categories
        if category:
            cat = files[i][18:-10]
            categories[cat] = i
            data = data.withColumn(product_category_column,
                                   product_category_col).fillna(cat)
        
        if df is None:
            df = data
        else:
            df = df.union(data)
    return df, categories

## Get df & Remove Columns

In [9]:
df, categories = get_imputed_df(files)
df = df.drop('marketplace', 'vine')#.cache()

In [10]:
sqlContext.registerDataFrameAsTable(df, "df")

In [11]:
columns = df.columns
num_cols = len(columns)

## Filter out rows with missing body and date and verified purchase

In [12]:
df = df.filter(review_body_col.isNotNull() & review_date_col.isNotNull())
df = df.filter(verified_purchase_col == True)

In [13]:
df.show(1)

+-----------+-------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----------------+---------------+--------------------+-----------+
|customer_id|    review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|verified_purchase|review_headline|         review_body|review_date|
+-----------+-------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----------------+---------------+--------------------+-----------+
|   36075342|RAB23OVFNCXZQ|B00LPRXQ4Y|     339193102|17" 2003-2006 For...|      Automotive|          1|            0|          0|                Y|As it was used,|As it was used, t...| 2015-08-31|
+-----------+-------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----------------+---------------+--------------------+-----------+
only showing to

In [14]:
# Select the review_date column
review_date_col = F.col('review_date')

# Find the minimum and maximum dates
min_max_dates = df.agg(F.min(review_date_col).alias('min_date'), 
                       F.max(review_date_col).alias('max_date'))

min_max_dates.show()

+----------+----------+
|  min_date|  max_date|
+----------+----------+
|1995-11-09|2015-08-31|
+----------+----------+



## Filter out old data

In [15]:
# Data before 2010 is more irrelevent to forcaste future trends. 
df = df.filter(F.year(review_date_col) >= 2010)
df.show(1)

+-----------+-------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----------------+---------------+--------------------+-----------+
|customer_id|    review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|verified_purchase|review_headline|         review_body|review_date|
+-----------+-------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----------------+---------------+--------------------+-----------+
|   36075342|RAB23OVFNCXZQ|B00LPRXQ4Y|     339193102|17" 2003-2006 For...|      Automotive|          1|            0|          0|                Y|As it was used,|As it was used, t...| 2015-08-31|
+-----------+-------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----------------+---------------+--------------------+-----------+
only showing to

* No more missing values

## Extract month and year

In [16]:
month_column = 'month'
year_column = 'year'

In [17]:
df = df.withColumn(month_column, F.month(review_date_col)).withColumn(year_column, F.year(review_date_col))
df.show(1)

+-----------+-------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----------------+---------------+--------------------+-----------+-----+----+
|customer_id|    review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|verified_purchase|review_headline|         review_body|review_date|month|year|
+-----------+-------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----------------+---------------+--------------------+-----------+-----+----+
|   36075342|RAB23OVFNCXZQ|B00LPRXQ4Y|     339193102|17" 2003-2006 For...|      Automotive|          1|            0|          0|                Y|As it was used,|As it was used, t...| 2015-08-31|    8|2015|
+-----------+-------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----------------+---------------+-----

## Add Sub-category Column

In [18]:
subcategory_mapping = {
    'Major_Appliances': 'Home Essentials',
    'Kitchen': 'Home Essentials',
    'Home_Improvement': 'Home Essentials',
    'Tools': 'Home Essentials',
    'Home': 'Home Essentials',
    'Furniture': 'Home Essentials',
    'Office_Products':'Home Essentials',
    
    'Outdoors': 'Outdoor Living',
    'Home_Entertainment': 'Outdoor Living',
    'Lawn_and_Garden': 'Outdoor Living',
    
    'Video': 'Media',
    'Video_DVD': 'Media',
    'Music': 'Media',
    'Books': 'Media',
    'Video_Games': 'Media',
    'Software': 'Media',
    
    'Digital_Music_Purchase': 'Digital Media',
    'Digital_Video_Download': 'Digital Media',
    'Digital_Ebook_Purchase': 'Digital Media',
    'Digital_Video_Games': 'Digital Media',
    'Digital_Software':'Digital Media',

    'Mobile_Apps': 'Electronics',
    'Wireless': 'Electronics',
    'PC': 'Electronics',
    'Electronics': 'Electronics',
    'Mobile_Electronics': 'Electronics',
    'Camera': 'Electronics',
    
    'Shoes': 'Apparel and Accessories',
    'Watches': 'Apparel and Accessories',
    'Luggage': 'Apparel and Accessories',
    'Apparel': 'Apparel and Accessories',
    
    'Health_Personal_Care': 'Personal Care',
    'Beauty': 'Personal Care',
    'Personal_Care_Appliances': 'Personal Care',
    
    'Sports': 'Entertainment and Leisure',
    'Musical_Instruments': 'Entertainment and Leisure',
    'Toys': 'Entertainment and Leisure',
    'Gift_Card': 'Entertainment and Leisure',
    
    'Baby': 'Family and Living',
    'Automotive': 'Family and Living',
    'Grocery': 'Family and Living',
    'Pet_Products': 'Family and Living',
}

In [19]:
df.columns

['customer_id',
 'review_id',
 'product_id',
 'product_parent',
 'product_title',
 'product_category',
 'star_rating',
 'helpful_votes',
 'total_votes',
 'verified_purchase',
 'review_headline',
 'review_body',
 'review_date',
 'month',
 'year']

# Logistic Regression

In [20]:
# Create binary target variable for star ratings
df = df.withColumn("high_rating", F.when(col("star_rating") >= 4, 1).otherwise(0))

# Define categorical columns and numerical columns
categorical_cols = ['product_category', 'product_title']
numerical_cols = ['helpful_votes', 'total_votes']

In [21]:
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_vec") for col in categorical_cols]

# Assemble features
assembler_inputs = [col+"_vec" for col in categorical_cols] + numerical_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# Define logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='high_rating')

# Build pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])

In [22]:
# Split data into training and testing sets
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Train the model
model = pipeline.fit(train_df)

# Make predictions
predictions = model.transform(test_df)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol='high_rating')
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

# Show classification report
predictions.select("high_rating", "prediction", "probability").show()

Py4JJavaError: An error occurred while calling o486.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 85.0 failed 1 times, most recent failure: Lost task 0.0 in stage 85.0 (TID 12482) (exp-1-18.expanse.sdsc.edu executor driver): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 67108864
Serialization trace:
_values$mcJ$sp (org.apache.spark.util.collection.OpenHashMap$mcJ$sp). To avoid this, increase spark.kryoserializer.buffer.max value.
	at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:446)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.eval(TypedAggregateExpression.scala:260)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.eval(interfaces.scala:594)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:257)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:97)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:33)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:389)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 67108864
Serialization trace:
_values$mcJ$sp (org.apache.spark.util.collection.OpenHashMap$mcJ$sp)
	at com.esotericsoftware.kryo.io.Output.require(Output.java:167)
	at com.esotericsoftware.kryo.io.UnsafeOutput.writeBytes(UnsafeOutput.java:384)
	at com.esotericsoftware.kryo.io.UnsafeOutput.writeLongs(UnsafeOutput.java:331)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$LongArraySerializer.write(DefaultArraySerializers.java:130)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$LongArraySerializer.write(DefaultArraySerializers.java:119)
	at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:442)
	... 23 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3585)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:3585)
	at org.apache.spark.ml.feature.StringIndexer.countByValue(StringIndexer.scala:204)
	at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:212)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:242)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:145)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 67108864
Serialization trace:
_values$mcJ$sp (org.apache.spark.util.collection.OpenHashMap$mcJ$sp). To avoid this, increase spark.kryoserializer.buffer.max value.
	at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:446)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.eval(TypedAggregateExpression.scala:260)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.eval(interfaces.scala:594)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:257)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:97)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:33)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:389)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 67108864
Serialization trace:
_values$mcJ$sp (org.apache.spark.util.collection.OpenHashMap$mcJ$sp)
	at com.esotericsoftware.kryo.io.Output.require(Output.java:167)
	at com.esotericsoftware.kryo.io.UnsafeOutput.writeBytes(UnsafeOutput.java:384)
	at com.esotericsoftware.kryo.io.UnsafeOutput.writeLongs(UnsafeOutput.java:331)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$LongArraySerializer.write(DefaultArraySerializers.java:130)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$LongArraySerializer.write(DefaultArraySerializers.java:119)
	at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:442)
	... 23 more


## Encode Categorical Columns

In [None]:
# titleArray_column = 'titleArray'
# titleVector_column = 'titleVector'

In [None]:
# df = df.withColumn(titleArray_column, F.split(F.lower(F.col(title_column)), ' '))

In [None]:
# word2vec = Word2Vec(inputCol = titleArray_column, outputCol = titleVector_column,
#                     minCount = 100, vectorSize = 16, numPartitions = 4)
# model = word2vec.fit(df)
# df = model.transform(df)

### Change text into vectors / Model 2

In [None]:
review_body_column = 'review_body'
review_date_column = 'review_date'
product_title_column = 'product_title'

reviewArray_column = 'reviewArray'
reviewVector_column = 'reviewVector'

In [None]:
review_body_col = F.col(review_body_column)
review_date_col = F.col(review_date_column)
product_title_col = F.col(product_title_column)

reviewArray_col = F.col(reviewArray_column)
reviewVector_col = F.col(reviewVector_column)

In [None]:
# lowercase and remove stopwords
df = df.withColumn(reviewArray_column, split(lower(product_title_col), ' '))
remover = StopWordsRemover(inputCol=reviewArray_column, outputCol="filtered_review")
df = remover.transform(df)
df.cache()

In [None]:
word2vec = Word2Vec(inputCol = "filtered_review", outputCol = reviewVector_column,
                    minCount = 0, vectorSize = 50)
model = word2vec.fit(df)
df = model.transform(df)

## Dataset Splitting
* Use last year as test and the rest as train

In [None]:
train = df.filter((year(col(review_date_col)) >= 2010) & (year(col(review_date_col)) < 2015))
test = df.filter(year(col(review_date_col)) == 2015)

In [None]:
print(train.count())
print(test.count())

In [None]:

word_freq_df = df.withColumn("word", explode(col("filtered_review")))
word_freq_df = word_freq_df.groupBy("word").count()
popular_words_df = word_freq_df.orderBy(col("count").desc())
popular_words = popular_words_df.select("word").rdd.flatMap(lambda x: x).collect()
word_vectors = model.getVectors().rdd.map(lambda row: (row.word, row.vector)).collectAsMap()


In [None]:
word_vectors_array = []
popularity_data_array = []

for word in popular_words:
    if word in word_vectors:
        word_vectors_array.append(word_vectors[word])
        popularity_data_array.append(word_freq_df.filter(col("word") == word).select("count").first()[0])

# lists to numpy arrays
word_vectors = np.array(word_vectors_array)
popularity_data = np.array(popularity_data_array)

#normalization
scaler = MinMaxScaler(feature_range=(0, 1))
popularity_data_scaled = scaler.fit_transform(popularity_data)

In [None]:
# Prepare the dataset for LSTM
X, y = [], []
time_steps = popularity_data.shape[1]
for i in range(time_steps-1):
    X.append(word_vectors[:, i:i+1])
    y.append(popularity_data_scaled[:, i+1])
X, y = np.array(X), np.array(y)

# Reshape X for LSTM input
X = np.reshape(X, (X.shape[0], X.shape[1], word_vectors.shape[1]))

# Define the LSTM model
model = Sequential()
model.add(LSTM(50, return_sequences=True, input_shape=(X.shape[1], X.shape[2])))
model.add(LSTM(50))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')

# Train the model
model.fit(X, y, epochs=100, batch_size=32)

# Forecast future popularity
forecast = model.predict(X)
forecast = scaler.inverse_transform(forecast)
print(forecast)


In [None]:
df.first(1)

In [None]:
train_df = train.select(review_date_col, product_category_col)\
                .withColumn("review_date", F.date_format(review_date_col, "yyyy-MM"))\
                .na.drop()

train_df.show(10)

## Get distinct categories

In [None]:
distinct_product_category_num = train_df.select("product_category").distinct()
distinct_product_category_num.count()

## Group by `year` and `month` then count

In [None]:
train_df = train_df.withColumn("year", F.year("review_date"))\
                   .withColumn("month", F.month("review_date"))

## Map to subcategories

In [None]:
broadcast_mapping = sc.sparkContext.broadcast(subcategory_mapping)

def get_subcategory(product_category):
    return broadcast_mapping.value.get(product_category, 'unknown')

subcategory_udf = F.udf(get_subcategory, StringType())

train_df = train_df.withColumn('subcategory', subcategory_udf(product_category_col))

## Get counts per month

In [None]:
count_per_month = train_df.groupBy("year", "month", "subcategory").agg(F.count("*").alias("count"))
df_pd = count_per_month.toPandas()

## Plot Trends
### Set plot styles

In [None]:
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette("tab10")

### Plot all sub-category trends

In [None]:
def plot_all_sub(df):
    plt.figure(figsize = (12, 6))
    
    for subcategory in df['subcategory'].unique():
        df_subcat = df[df['subcategory'] == subcategory]
        df_subcat = df_subcat.sort_values(by='year')
        df_subcat = df_subcat.groupby('year')['count'].sum().reset_index()
        plt.plot(df_subcat['year'], df_subcat['count'], label = f'Subcategory {subcategory}')
    
    plt.xlabel('Year')
    plt.ylabel('Count')
    plt.title('Sales count of Subcategories per Year (2010-2014)')
    plt.legend(bbox_to_anchor = (1.05, 1), loc = 'upper left')
    plt.xticks(np.arange(2010, 2015, 1))
    plt.tight_layout()
    return plt

In [None]:
plt = plot_all_sub(df_pd)
#plt.savefig('1.png')

### Plot single sub-category trend

In [None]:
def plot_category(category):
    '''
    Method to plot single category
    Takes in one string parameter: category
    '''
    plt.figure(figsize = (12, 6))
    df = df_pd[df_pd['subcategory'] == category]
    
    df_grouped = df.groupby('year')['count'].sum().reset_index()
    df_grouped = df_grouped.sort_values(by='year')
    plt.figure(figsize=(12, 6))
    plt.plot(df_grouped['year'], df_grouped['count'], label = 'Subcategory Media')
    plt.xlabel('Year')
    plt.ylabel('Count')
    plt.title('Sales count of %s Subcategory per Year' % category)
    plt.legend(bbox_to_anchor = (1.05, 1), loc = 'upper left')
    plt.xticks(np.arange(2010, 2015, 1))
    plt.tight_layout()
    return plt

In [None]:
subcategory_list = list(set(subcategory_mapping.values()))
subcategory_list

In [None]:
##################### SET CATEGORY ###########################
subcategory = 'Media'

plt = plot_category(subcategory)
#plt.savefig('2.png')

### Plot category trends in one sub-category

In [None]:
count_per_month_ = train_df.groupBy("year", "month", "subcategory", "product_category").agg(F.count("*").alias("count"))
df_pd_ = count_per_month_.toPandas()
df_pd_.head()

In [None]:
def plot_categories(subcategory):
    '''
    Plots category trends in one category
    Takes in one string parameter: subcategory
    '''
    df_subcat = df_pd_[df_pd_['subcategory'] == subcategory]
    
    plt.figure(figsize = (12, 6))
    
    for product_category in df_subcat['product_category'].unique():
        df_product_cat = df_subcat[df_subcat['product_category'] == product_category]
        df_product_cat = df_product_cat.sort_values(by='year')
        df_product_cat = df_product_cat.groupby('year')['count'].sum().reset_index()
        plt.plot(df_product_cat['year'], df_product_cat['count'], label=f'Category {product_category}')
        
    plt.xlabel('Year')
    plt.ylabel('Count')
    plt.title('Sales Count of %s Categories per Year (2010-2014)' % subcategory)
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.xticks(np.arange(2010, 2015, 1))
    plt.tight_layout()
    return plt

In [None]:
##################### SET SUBCATEGORY ###########################
subcategory = 'Media'

plt = plot_categories(subcategory)
#plt.savefig('3.png')

### Plot Seasonal Decomposition

In [None]:
def seasonal_decomp(df):
    count_per_month2 = df.groupBy("review_date", "subcategory").agg(F.count("*").alias("count"))
    df_pd2 = count_per_month2.toPandas()
    
    df_pd2 = df_pd2[df_pd2['subcategory'] == 'Media']
    df_pd2.set_index('review_date', inplace=True)
    count_series = df_pd2['count']
    
    result = seasonal_decompose(count_series, model='additive', period=30)
    
    plt.figure(figsize=(12, 8))
    plt.subplot(411)
    plt.plot(result.observed, label='Original', color='blue')
    plt.legend(loc='upper left')
    plt.xticks(rotation=90)
    plt.subplot(412)
    plt.plot(result.trend, label='Trend', color='blue')
    plt.legend(loc='upper left')
    plt.xticks(rotation=90)
    plt.subplot(413)
    plt.plot(result.seasonal, label='Seasonality', color='blue')
    plt.legend(loc='upper left')
    plt.xticks(rotation=90)
    plt.subplot(414)
    plt.plot(result.resid, label='Residuals', color='blue')
    plt.legend(loc='upper left')
    plt.xticks(rotation=90)
    plt.tight_layout()
    return plt

In [None]:
plt = seasonal_decomp(train_df)
#plt.savefig('4.png')

# Answer the questions
* Where does your model fit in the fitting graph? and What are the next models you are thinking of and why?

# Conclusion section
* What is the conclusion of your 1st model? What can be done to possibly improve it?