<a href="https://colab.research.google.com/github/dgambone3/Big_Data_Project/blob/main/Big_Data_Project_colab_version.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=52eb92fb8b222ea750c1eff9ea89bfb0a1db80cff7789c06a18e80779b22b346
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import pyspark.sql.functions as sf
from pyspark.sql.types import *
from pyspark.ml.feature import StopWordsRemover
import matplotlib.pyplot as plt
import pandas as pd
import pyspark
from pyspark.sql.functions import explode, sum


In [4]:
# from pyspark.sql import SparkSession

# spark = (
#     SparkSession.builder
#     .master("spark://127.0.0.1:7077")
#     # the number of executors this job needs
#     .config("spark.executor.instances", 2)
#     # the number of CPU cores memory this needs from the executor,
#     # it would be reserved on the worker
#     .config("spark.executor.cores", "2")
#     .config("spark.executor.memory", "4G")
#     .getOrCreate()
# )
# sc = spark.sparkContext

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Pet Supplies Product Data

In [5]:
data_df = spark.read.json(path='/content/drive/MyDrive/Big_Data/Data/Pet_Supplies.json')
# data_df = pyspark.read.json("data/Pet_Supplies.json")

 # Data Stuff

In [9]:
# output count of reviewerID and count unique reviewerID
print(data_df.select("reviewerID").count())
print(data_df.select("reviewerID").distinct().count())

Add primary key to dataset

In [10]:
# add a distinct key to each row of data_df
add_keys = data_df.withColumn("id", sf.monotonically_increasing_id())

#check count distinct id
print(f"Distinct keys: {add_keys.select("id").distinct().count()}")

## Process Review Text
#### Preprocessing: remove null values for review text

In [None]:
review_df = add_keys.select('id', 'overall', 'reviewText')

# check null values
print(f"Null reviewText: {review_df.filter(sf.col('reviewText').isNull()).count()}")
print(f"Null overall: {review_df.filter(sf.col('overall').isNull()).count()}")

# drop null reviewText values
review_df = review_df.filter(sf.col('reviewText').isNotNull())

# check for null values in review_df
print(f"Check null reviewText removed: {review_df.filter(sf.col('reviewText').isNull()).count()}")

In [None]:
# calculate statistics of overall
print("Statistics of overall rating: ")
review_df.select('overall').describe().show()

#### Down sample dataset with matchng distribution for development due to large dataset size

In [None]:
distribution = review_df.groupBy('overall').count().collect()

# Calculate the sampling ratios based on the distribution
sampling_ratios = {row['overall']: 0.1 for row in distribution}

# Use sampleBy to downsample the data
review_df = review_df.sampleBy('overall', fractions=sampling_ratios, seed=42)

# Show the downsampled DataFrame
review_df.show()

+---+-------+--------------------+
| id|overall|          reviewText|
+---+-------+--------------------+
|  7|    3.0|Maybe it's just m...|
| 16|    1.0|Horrible just a w...|
| 18|    5.0|           its great|
| 50|    1.0|seriously, buy a ...|
| 58|    5.0|This is the best ...|
| 63|    1.0|When I bought thi...|
| 66|    3.0|They came to me r...|
| 72|    5.0|It doesn't make s...|
| 79|    2.0|Purchased this ca...|
| 88|    4.0|Two of three of t...|
| 90|    5.0|My cat loves the ...|
|103|    5.0|powerful protecti...|
|105|    5.0|Power - Mune Chic...|
|106|    5.0|power mune by vet...|
|109|    5.0|Liver Support" Su...|
|123|    5.0|Excellent-so info...|
|129|    1.0|Bad science. Bad ...|
|139|    5.0|I Love Love Love ...|
|142|    5.0|            Awesome!|
|176|    5.0|Fast shipping.. L...|
+---+-------+--------------------+
only showing top 20 rows



## Analysis of Review Text

### Visualuze count distribution of overall rating

In [None]:
# convert columns id and overall from sampled_df to pd
sampled_df_pd = review_df.select('overall').toPandas()
# convert overall col to int
sampled_df_pd['overall'] = sampled_df_pd['overall'].astype(int)

plt.bar(sampled_df_pd['overall'].value_counts().index,
        sampled_df_pd['overall'].value_counts().values,
        color='darkseagreen')
plt.title('Count of Overall Rating')
plt.xlabel('Overall Rating')
plt.ylabel('Count')
plt.show()

#### Process and clean reviewText
* Make all words lowercase and remove punctuation
* StopWordsRemover to remove words like [the, with, so, and...] as they are common words don't tell much about the text information

In [None]:
# remove punctuation from reviewText, convert to lowercase, split reviews into list of words
clean_txt = review_df.withColumn("reviewText", sf.regexp_replace(sf.col("reviewText"), "[^a-zA-Z0-9\\s]", "")) \
                        .withColumn("reviewText", sf.lower(sf.col("reviewText"))) \
                        .withColumn("splitText", sf.split(sf.col("reviewText"), " "))

# remove stop words [the, with, etc.]
stop_words_remover = StopWordsRemover() \
                    .setInputCol("splitText") \
                    .setOutputCol("filteredWords") \
                    .transform(clean_txt)

# apply stop_words_remover to reviewText column
filtered_df = stop_words_remover.select('id', 'overall', 'reviewText', 'filteredWords')
# print(filtered_df.show(5, truncate=False))

In [None]:
print(filtered_df.show(5, truncate=False))

### Count word frequency and add to dictionary of {word: count} using rdd's

In [None]:
# Convert DataFrame to RDD and perform mapping and reduceByKey to create dict of {word : count}
word_counts_rdd = (
    filtered_df.select("id", "filteredWords")
    .rdd  # Convert to RDD for map-reduce
    .flatMap(lambda row: [(word, 1) for word in row["filteredWords"]])
    .reduceByKey(lambda a, b: a + b)
)

# sorted word_counts_rdd by count
word_counts_rdd = word_counts_rdd.sortBy(lambda x: x[1], ascending=False)

Add values to dictionary from sorted list, and sort by keys

In [None]:
# convert word_counts_rdd to dict
word_counts_dict = dict(word_counts_rdd.collect())

# remove all keys '' from word_counts_dict
word_counts_dict = {k: v for k, v in word_counts_dict.items() if k != ''}

top_20_words = dict(sorted(word_counts_dict.items(), key=lambda x: x[1], reverse=True)[:20])

In [None]:
print(top_20_words)

#### Add dictionary for each review and add to RDD

In [None]:
def count_words(words):
    """
    Count the number of times each word appears in a list of words, for each review.
    Args:
        words (list): list of words
    Returns:
        dict: {word : counts}
    """
    # remove newline characters
    words = [word.replace('\n', '') for word in words]
    # Filter out empty strings
    words = [word for word in words if word.strip()]
    return {word: words.count(word) for word in set(words)}


# add a new column with word counts as a dict
def add_word_counts(row):
    word_counts = count_words(row["filteredWords"])
    return (row["id"], row["overall"], word_counts)

# apply function to each row in the original df
filtered_df_with_counts_rdd = filtered_df.rdd.map(lambda row: add_word_counts(row))
# print(filtered_df_with_counts_rdd.take(5))
# # convert RDD back to df
filtered_df_with_counts = filtered_df_with_counts_rdd.toDF(["id", "overall", "word_counts"])

filtered_df_with_counts.show(truncate=False)

#### Calculate average rating per review with each of the top 20 most common words

In [None]:
# Explode the word_counts column to separate rows
df_exploded = filtered_df_with_counts.select("id", "overall", sf.explode("word_counts").alias("word", "count"))
# print(df_exploded.show(5, truncate=False))

# Filter rows based on the list of words
top_20_list = list(top_20_words.keys())
df_filtered = df_exploded.filter(df_exploded.word.isin(top_20_list))

# Calculate the sum of ratings and count of occurrences for each word
df_avg_rating = df_filtered.groupBy("word").agg(sf.sum('overall'), sf.sum("count"))

# sort df_avg_rating by count
df_avg_rating = df_avg_rating.sort("sum(count)", ascending=False)
# print(df_avg_rating.show(5, truncate=False))

# Calculate the average rating for each word
df_avg_rating = df_avg_rating.withColumn("avg_rating", df_avg_rating["sum(overall)"] / df_avg_rating["sum(count)"])

# Show the result
df_avg_rating.show(20, truncate=False)

#### Plot results of top 20 words for whole dataset along with average rating per word

In [None]:
# plot histogram of word
df_plot = df_avg_rating.toPandas()
fig, ax1 = plt.subplots()

# axis 1 -> bar plot of word counts
ax1.bar(df_plot['word'],
        df_plot['sum(count)'],
        color='darkseagreen')
ax1.set_xlabel('Word')
ax1.set_ylabel('Count')

# ax2 -> line plot of average rating
ax2 = ax1.twinx()
ax2.plot(df_plot['word'],
         df_plot['avg_rating'],
         color='royalblue',
         marker='o',
         label='Average Rating')
ax2.set_ylabel('Average Rating')

# Title for the plot
plt.title('Word Counts and Average Rating')

ticks = range(len(df_plot['word']))
ax1.set_xticks(ticks)
ax2.set_xticks(ticks)
ax1.set_xticklabels(df_plot['word'], rotation=45, ha='right')
ax2.set_xticklabels(df_plot['word'], rotation=45, ha='right')
plt.show()


### Plot most common words for each overall rating group

In [None]:
df_exploded = filtered_df_with_counts.select("id", "overall", explode("word_counts").alias("word", "count"))
overall_1 = df_exploded.filter(df_exploded.overall == 1.0) \
                        .groupBy("word").agg(sum("overall"), sum("count")) \
                        .sort("sum(count)", ascending=False)

avg_rating_1 = overall_1.withColumn("avg_rating", overall_1["sum(overall)"] / overall_1["sum(count)"]) \
                        .limit(10)

# plot histogram of word
overall_1 = avg_rating_1.toPandas()
fig, ax = plt.subplots()
ax.bar(overall_1['word'],
        overall_1['sum(count)'],
        color='darkseagreen')
ax.set_xlabel('Word')
ax.set_ylabel('Count')
ax.xaxis.set_tick_params(rotation=45)
# set y axis range to 30000
ax.set_ylim([0, 120000])
plt.title('Highest Word Counts for Overall Rating 1')
plt.show()


In [None]:
overall_2 = df_exploded.filter(df_exploded.overall == 2.0) \
                        .groupBy("word").agg(sum("overall"), sum("count")) \
                        .sort("sum(count)", ascending=False)

avg_rating_2 = overall_2.withColumn("avg_rating", overall_2["sum(overall)"] / overall_2["sum(count)"]) \
                        .limit(10)

# plot histogram of word
overall_2 = avg_rating_2.toPandas()
fig, ax = plt.subplots()
ax.bar(overall_2['word'],
        overall_2['sum(count)'],
        color='darkseagreen')
ax.set_xlabel('Word')
ax.set_ylabel('Count')
ax.xaxis.set_tick_params(rotation=45)
# set y axis range to 30000
ax.set_ylim([0, 120000])
plt.title('Highest Word Counts for Overall Rating 2')
plt.show()


In [None]:
overall_3 = df_exploded.filter(df_exploded.overall == 3.0) \
                        .groupBy("word").agg(sum("overall"), sum("count")) \
                        .sort("sum(count)", ascending=False)

avg_rating_3 = overall_3.withColumn("avg_rating", overall_3["sum(overall)"] / overall_3["sum(count)"]) \
                        .limit(10)

# plot histogram of word
overall_3 = avg_rating_3.toPandas()
fig, ax = plt.subplots()
ax.bar(overall_3['word'],
        overall_3['sum(count)'],
        color='darkseagreen')
ax.set_xlabel('Word')
ax.set_ylabel('Count')
ax.xaxis.set_tick_params(rotation=45)
# set y axis range to 30000
ax.set_ylim([0, 120000])
plt.title('Highest Word Counts for Overall Rating 3')
plt.show()


In [None]:
overall_4 = df_exploded.filter(df_exploded.overall == 4.0) \
                        .groupBy("word").agg(sum("overall"), sum("count")) \
                        .sort("sum(count)", ascending=False)

avg_rating_4 = overall_4.withColumn("avg_rating", overall_4["sum(overall)"] / overall_4["sum(count)"]) \
                        .limit(10)

# plot histogram of word
overall_4 = avg_rating_4.toPandas()
fig, ax = plt.subplots()
ax.bar(overall_4['word'],
        overall_4['sum(count)'],
        color='darkseagreen')
ax.set_xlabel('Word')
ax.set_ylabel('Count')
ax.xaxis.set_tick_params(rotation=45)
# set y axis range to 30000
ax.set_ylim([0, 120000])
plt.title('Highest Word Counts for Overall Rating 4')
plt.show()


In [None]:
overall_5 = df_exploded.filter(df_exploded.overall == 5.0) \
                        .groupBy("word").agg(sum("overall"), sum("count")) \
                        .sort("sum(count)", ascending=False)

avg_rating_5 = overall_5.withColumn("avg_rating", overall_5["sum(overall)"] / overall_5["sum(count)"]) \
                        .limit(10)

# plot histogram of word
overall_5 = avg_rating_5.toPandas()
fig, ax = plt.subplots()
ax.bar(overall_5['word'],
        overall_5['sum(count)'],
        color='darkseagreen')
ax.set_xlabel('Word')
ax.set_ylabel('Count')
ax.xaxis.set_tick_params(rotation=45)
# set y axis range to 30000
# ax.set_ylim([0, 120000])
plt.title('Highest Word Counts for Overall Rating 5')
plt.show()


# Metadata for Pet Supplies Products

In [None]:
meta_data = spark.read.json("/content/drive/MyDrive/Big_Data/Data/meta_Pet_Supplies.json")
# meta_data = spark.read.json(path='/content/drive/MyDrive/Big_Data/Data/Pet_Supplies.json', schema=meta_schema)

Check for null values

In [None]:
# check null values
print(f"Null asin: {meta_data.filter(sf.col('asin').isNull()).count()}")
print(f"Null also_buy: {meta_data.filter(sf.col('also_buy').isNull()).count()}")
print(f"Null also_view: {meta_data.filter(sf.col('also_view').isNull()).count()}")

In [None]:
# select asin, also_buy, also_view from meta_df where also_view and also_buy are not null
meta = meta_data.select('asin', 'also_buy', 'also_view')

In [10]:
# # randomly sample 5000 rows from meta_data for testing
# meta = meta.sample(False, 0.5, seed=0) # df
# print(f"Shape of meta: ({meta.count()}, {len(meta.columns)})") # (5,000, 3)

### RDD of meta data and identify duplicates

In [None]:
# create rdd of view_data using the spark context and parallelize
view_rdd = sc.parallelize(meta.collect())

# print rows with count asin > 1 (duplicates)
dupes = view_rdd.map(lambda x: (x[0], 1)) \
                .reduceByKey(lambda x, y: x + y) \
                .filter(lambda x: x[1] > 1).collect()
# get only the asin
dupes = [x[0] for x in dupes]
print(dupes) # 6
# remove duplicate asin from view_rdd
view_rdd = view_rdd.filter(lambda x: x[0] not in dupes)
# print(f"Shape of view_rdd after removing {len(dupes)} duplicate values: ({view_rdd.count()}, {len(view_rdd.take(1)[0])})") # (4,994, 3)

In [None]:
# check amount of unique asin
print(f"Count unique asin: {view_rdd.map(lambda x: x[0]).distinct().count()}")

### Joining Product Metadata and Review Data

In [30]:
joined_df = data_df.select('asin', 'reviewerID') \
                    .join(meta.select('asin', 'also_buy', 'also_view'),on='asin', how='inner') \
                    .filter(sf.col('reviewerID').isNotNull())
joined_df.show()

+----------+--------------+--------------------+--------------------+
|      asin|    reviewerID|            also_buy|           also_view|
+----------+--------------+--------------------+--------------------+
|0615553605| AL933I7VQRKBZ|                  []|[B075DYQ1PH, 1604...|
|0615553605| AWTI98YZPFQTH|                  []|[B075DYQ1PH, 1604...|
|0615553605|A1NW1C0J0VYG0A|                  []|[B075DYQ1PH, 1604...|
|0615553605|A3OMJBE0MF1721|                  []|[B075DYQ1PH, 1604...|
|0615553605| A1SDED4QSDX4I|                  []|[B075DYQ1PH, 1604...|
|0615553605| ALLFL71KWYWWP|                  []|[B075DYQ1PH, 1604...|
|0615553605|  ABZ8CQXD42H4|                  []|[B075DYQ1PH, 1604...|
|0615553605|A363P047LR5XI6|                  []|[B075DYQ1PH, 1604...|
|0615553605|A3PG0KS1YE8MR4|                  []|[B075DYQ1PH, 1604...|
|0793816793| AD7AYZY8UPDTG|[1911142186, 1514...|                  []|
|0793816793|A2I21DHTEPWWMU|[1911142186, 1514...|                  []|
|0793816793|A1L0VY8E

In [None]:
# count distinct also_buy and asin -> check if also_buy corresponds exactly to asin
print(f"Count distinct also_buy: {joined_df.select('also_buy').distinct().count()}")
print(f"Count distinct asin: {joined_df.select('asin').distinct().count()}")

## What percent of similar purchased items are purchased?
## What percent of viewed items are purchased?

In [32]:
# find where also_buy and also_view are not empty
filter_buy = joined_df.filter(sf.size(sf.col('also_buy')) > 0) \
                        .filter(sf.size(sf.col('also_view')) > 0)

Create lists of what each reviewer bought and viewed
 1. group by reviewerID
 2. create list of all asin with that reviewerID

In [34]:
# group by reviewerID and create new column with list of asin
grouped_reviewer = filter_buy.groupBy('reviewerID') \
                        .agg(sf.collect_list('asin').alias('reviewer_bought'))

grouped_reviewer = filter_buy.join(grouped_reviewer, on='reviewerID', how='outer').select('reviewerID', 'asin', 'reviewer_bought', 'also_buy', 'also_view')
grouped_reviewer.show()

+--------------------+----------+--------------------+--------------------+--------------------+
|          reviewerID|      asin|     reviewer_bought|            also_buy|           also_view|
+--------------------+----------+--------------------+--------------------+--------------------+
|A0020320YVA26ZNLOUX3|B0009YYSBE|[B01DMTPS8E, B000...|[B004ZM65LC, B000...|[B000HCI5CG, B01B...|
|A0020320YVA26ZNLOUX3|B01DMTPS8E|[B01DMTPS8E, B000...|[B01M4OPTQD, B071...|[B007P54O0A, B00F...|
|A0084055RM28SR8A0DQM|B004ZM7JB2|        [B004ZM7JB2]|[B003JHLY1U, B004...|[B00ODRO7W8, B00F...|
|A0207285Q798IKKBY9TI|B011AY4JWO|        [B011AY4JWO]|[B002B8WJPI, B01E...|[B07DGZJX6B, B07H...|
|A0231053UDFCC75M0RI3|B00VE78ZPU|        [B00VE78ZPU]|[B0050ICKN2, B005...|[B075Q57QGL, B07B...|
|A02430977ZM6HTS5PYW2|B00AQU8F2O|        [B00AQU8F2O]|[B00AQU8HAO, B00A...|[B00AQU8HAO, B00A...|
|A02626447PIKHLBV4CJY|B005F5DHW8|        [B005F5DHW8]|[B002H3BLCY, B003...|[B0002DJ4P2, B001...|
|A0314454CJYQE4NT3D8H|B013Q7FG

In [None]:
# sample group_reviewer
# sample_grouped_reviewer = grouped_reviewer.sample(False, 0.005, seed=0)
# # combine also_buy and also_view into their own columns for that reviewerID
# combined_df = sample_grouped_reviewer.agg(sf.flatten(sf.collect_list("also_buy")),
#                                                 sf.flatten(sf.collect_list("also_view")))
# combined_df.show()

In [35]:
# calculating how many items in also_buy are in also_view (intersection)
buy_also_bought = grouped_reviewer.withColumn("intersect_bought_count", sf.size(sf.array_intersect("also_buy", "reviewer_bought"))) \
                                        .sort(sf.desc("intersect_bought_count"))


In [36]:
buy_viewed = buy_also_bought.withColumn("intersect_view_count", sf.size(sf.array_intersect("also_view", "reviewer_bought"))) \
                                        .sort(sf.desc("intersect_view_count"))


In [37]:
# calculate percentages of bought items that are actually bought, and viewed items that are actually bought
prob_buy = buy_viewed.withColumn("percent_buy", sf.round(sf.col("intersect_bought_count") / sf.size(sf.col("reviewer_bought")) * 100,2)) \
                        .withColumn("percent_view", sf.round(sf.col("intersect_view_count") / sf.size(sf.col("reviewer_bought")) * 100,2))

prob_buy.show()

+--------------+----------+--------------------+--------------------+--------------------+----------------------+--------------------+-----------+------------+
|    reviewerID|      asin|     reviewer_bought|            also_buy|           also_view|intersect_bought_count|intersect_view_count|percent_buy|percent_view|
+--------------+----------+--------------------+--------------------+--------------------+----------------------+--------------------+-----------+------------+
|A3V0D97QKXDN5R|B002CJCEQA|[B000WTEJC4, B001...|[B002CJG1HI, B001...|[B002CJARSW, B001...|                    10|                   9|      55.56|        50.0|
|A3F22SXQNXWV3K|B007CLZ3HA|[B000084F1Z, B000...|[B007CLZRWG, B00C...|[B007CLZRWG, B00C...|                     7|                   8|       20.0|       22.86|
|A3V0D97QKXDN5R|B002CJG1YQ|[B000WTEJC4, B001...|[B002CJAS86, B002...|[B002CJARSW, B002...|                    10|                   8|      55.56|       44.44|
|A3F22SXQNXWV3K|B007CLZRWG|[B000084F1Z, 

In [39]:
avg_prob_buy = prob_buy.agg(sf.avg("percent_buy")).collect()[0][0]
avg_prob_view = prob_buy.agg(sf.avg("percent_view")).collect()[0][0]

In [40]:
print(avg_prob_buy)
print(avg_prob_view)

0.9374184898625909
0.8264738793246708
