# DSCI 632 Final

Your final is to use the dataset of your choice and the techniques in this class to do the following:

1. Define and describe the dataset (10 points)
1. Clean the data (10 points)
1. Transform the clean data (10 points)
1. Show your analysis of the data (10 points)

## Ground Rules

1. Explain everything you're doing with pyspark in the code cells using [markdown](https://www.markdownguide.org/cheat-sheet) in text cells. Help the reader understand why you're doing each step so they can re-create it. Remember, this is an assignment to show how you analyze data to a potential employer. Don't use code comments to explain things.
1. All of the sections are heading 1 in markdown, so use heading 2 to write your explainations, and heading 3 for any sub-headers. Check in your table of contents view in colab to make sure each point is listed before you turn this in.
1. Don't hesisate to use multiple code/text cells in each section, as long as they're all labeled and described.
1. Assume that the reader doesn't have access to the dataset on your local machine. Provide a link to the dataset you're using, or if able, include code to copy it from a public source. Don't rely on uploading from your local machine. (Importing from Google Drive is ok, as long as you provide the link to the data)
1. If using GCP services, include screenshots from your console if there's a step that you aren't able to re-produce in code. Using the SDK is always preferred, but you won't lose points for using screenshots and explaining them.
1. Import the data ONCE, then transform it to fit your analysis.
1. Don't overwrite data, make new columns for new transformations. You can always drop columns later, but you can't get overwritten values back.
1. Feel free to work with classmates, but all work submitted must be your own.
1. Make sure to run disconnect the runtime and re-run the notebook at least once before turning in. If you are getting certain Java runtime errors, this might also help.

## Extra Credit Opportunities!

- If you provide a link to this notebook on your public GitHub page instead of turning it in as an attachment, you will get 2 points extra credit.
- Extra credit will be given for using the [Google Cloud SDK](https://cloud.google.com/sdk/) to create/use/destroy any cloud resources, up to 2 points per section, 8 points total.
 - Don't be afraid to look at the GCP example colab notebooks from the class notes, the GCP documentation, or GitHub for examples.
- If this notebook is self-contained, you will get 5 points extra credit. (Requirements below)
 - The data is imported without relying on Google Drive, as the paths to data in Google Drive are user-specific. Downloads from public storage buckets/services are fine. (1 point)
 - It be run from start to finish without making any changes to paths, code, or variable names. (1 point)
 - All calls to external services, including GCP, are done programatically, no screenshots explaining how it worked in a browser console. (Using an API or SDK, 2 points)
 - All cloud resources are destroyed at the end of the notebook in a seperate section (Also via API or SDK, 1 point)

---

# Section 1: Define and describe the dataset

10 points

Import the dataset, and describe why you'll be analyzing in it. You can summarize a few columns, show a more information on the relavent features, or but help the reader understand what the dataset is, what is in it, and why you picked it.

Some questions that it might help you to answer:
1. Why are you choosing this dataset?
1. What variables will you use?
1. What analysis(es) will you run?
1. Do you have any hypotheses? What are they?

### This section should include one or more of the following:
- A histogram of several features relavent to your analysis
- The schema of the dataset, with the datatypes assigned correctly
- A text cell with explainations of the relavent features in [markdown](https://www.markdownguide.org/cheat-sheet)
- Use Spark SQL or built-in methods to show a range of values
- Most common words/n-grams found

## Prepare pyspark

In [1]:
# Install Spark 3.0.1
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz

### Set Environment Variables

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

### Install required libraries

In [None]:
!python -m pip install --upgrade pyspark==2.4.0
!python -m pip install -q findspark
!python -m pip install -q ydata_profiling
!python -m pip install -q kaggle
!python -m pip install -q quantulum3
!python -m pip install -q nltk
!python -m pip install -q transformers

Collecting pyspark==2.4.0
  Downloading pyspark-2.4.0.tar.gz (213.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m213.4/213.4 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.7 (from pyspark==2.4.0)
  Downloading py4j-0.10.7-py2.py3-none-any.whl (197 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m197.3/197.3 kB[0m [31m23.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.0-py2.py3-none-any.whl size=213793582 sha256=d9df66bb0a91598f960409a04d71d514b1a2e312e04bdac0b6962915e6b7711e
  Stored in directory: /root/.cache/pip/wheels/f7/6f/a8/4d2c26233a51a570ccf015208651aeed4590ed3f935b70e7c6
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10

### init spark

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
APP_NAME = "dsci632final"
spark = SparkSession.builder.appName(APP_NAME).getOrCreate()
spark

In [None]:
ls

## import packages for this project

In [None]:
from pyspark.sql.functions import col, udf, avg, split
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, Tokenizer, VectorAssembler, CountVectorizer
from pyspark.sql.types import DoubleType, StringType, ArrayType
from pyspark.ml.linalg import Vectors

import matplotlib.pyplot as plt
import seaborn as sns
from ydata_profiling import ProfileReport

from quantulum3 import parser as q_parser
import json
import ast
import re

from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor

## Download data from kaggle

In [None]:
!mkdir -p ~/.kaggle

In [None]:
kaggle_token = {
    "username":"bobmst",
    "key":"ecaebe713a35f46ed0e113710e60ff4d"
}
with open('kaggle.json', 'w') as f:
  f.write(json.dumps(kaggle_token))


In [None]:
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d nadyinky/sephora-products-and-skincare-reviews

In [None]:
!unzip sephora-products-and-skincare-reviews

In [None]:
ls

## Read Data

### Read the dataset into Spark DataFrame

In [None]:
df_product_info = spark.read.csv('product_info.csv', header=True, inferSchema=True)
df_reviews = spark.read.csv('reviews_0-250.csv', header=True, inferSchema=True) \
    .union(spark.read.csv('reviews_1250-end.csv', header=True, inferSchema=True)) \
    .union(spark.read.csv('reviews_250-500.csv', header=True, inferSchema=True)) \
    .union(spark.read.csv('reviews_500-750.csv', header=True, inferSchema=True)) \
    .union(spark.read.csv('reviews_750-1250.csv', header=True, inferSchema=True))

In [None]:
print("Product Info Dataset:")
df_product_info.show(5)
df_product_info.printSchema()

In [None]:
print("Reviews Dataset:")
df_reviews.show(5)
df_reviews.printSchema()

## General EDA

### Summary Statistics of Numerical Variables

In [None]:
df_product_info.describe().show()

### Examine the unique values and frequency counts of categorical variables

In [None]:
def explor_category_attr(df,attr):
  df_category = df.groupBy(attr).count().orderBy('count', ascending=False)
  df_category.show()

  category_counts = df_category.toPandas()
  plt.figure(figsize=(10, df_category.count()*0.2))
  sns.barplot(x='count', y=attr, data=category_counts)
  plt.title(f'Distribution of {attr}')
  plt.xticks(rotation=45)
  plt.show()

In [None]:
explor_category_attr(df_product_info,"brand_name")

In [None]:
explor_category_attr(df_product_info,"primary_category")

In [None]:
explor_category_attr(df_product_info,"variation_type")

### A full data report for all attributes

In [None]:
ProfileReport(df_product_info)

### Calculate the correlation matrix

In [None]:

correlation_matrix = df_product_info.select(
    [col(c).cast('float') for c in df_product_info.columns]
).toPandas().corr()

# Visualize the correlation matrix using a heatmap
plt.figure(figsize=(24, 16))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm')
plt.title('Correlation Matrix')
plt.show()

# Section 2: Clean the data

10 points

Not every dataset is going to be ready to use right away. Take steps to fix incorrect inputs, remove null values, and assign datatypes that fit each feature.

### This section should include one or more of the following:
- A count of rows before and after dropping data with null or incorrect values, including an explaination of why removing the rows was necessary in [markdown](https://www.markdownguide.org/cheat-sheet).
- Renaming a column, adding a new one, or casting an existing one as a different datatype.
- Remove punctuation marks, symbols, etc from the data, and convert all upper-case lettering to lower-case.
- Remove or fix rows that have bad values/data.

## Select the data we are going to use to train the model on

### Select the desired columns from df_product_info

The following information pertains to the columns I have selected for further analysis from the product information dataset:

- I have chosen to include only the columns with category ID, rather than category name. This is done to avoid potential duplication of names.

- Since our objective is to predict the price in USD, I have omitted the other column containing price. This is to prevent any potential data leaks.

In [None]:
df_product_info_sub = df_product_info.select("product_id", "brand_id", "loves_count", "rating", "reviews", "size", "ingredients", "price_usd", "limited_edition", "new", "online_only", "out_of_stock", "sephora_exclusive", "primary_category", "secondary_category", "tertiary_category")
df_product_info_sub.show(5)


## Select the desired columns from df_reviews

We only need the comment column, the original product id, and the helpfulness column from this dataframe. We will conduct a quick sentiment analysis on each comment, weigh each comment using the helpfulness score, and calculate the average comment attitude. Finally, we will merge the results back into the original dataframe using the product id.

In [None]:
df_reviews_sub = df_reviews.select("helpfulness", "review_text", "product_id")
df_reviews_sub.show(5)

## Remove or proper treat the NA

 ### Count the number of rows before dropping data with null or incorrect values

In [None]:
print(f"Number of rows in the product info dataframe before dropping na: \n{df_product_info_sub.count()}")
print(f"Number of rows in the reviews dataframe before dropping na: \n{df_reviews_sub.count()}")

By looking at the EDA report, I found there are 7 columns that contain NA: `rating`, `reviews`, `size`, `ingredients`, `primary_category`, `secondary_category`, and `tertiary_category`. I decided to remove columns that have more than 3 NAs in a row.

In [None]:
num_na = 3
df_product_info_sub_clean = df_product_info_sub.dropna(thresh=(len(df_product_info_sub.columns)-num_na))
df_product_info_sub_clean.show()

### Fill na with empty string or 0 depends on their data type

In [None]:
df_product_info_sub_clean = df_product_info_sub_clean.na.fill(
    {'rating': 0,
     'reviews':0,
     'size':'',
     'ingredients':'',
     'primary_category':'',
     'secondary_category':'',
     'tertiary_category':''})

### Change the data type of certain columns to numeric

In [None]:
df_product_info_sub_clean = df_product_info_sub_clean.withColumn("brand_id", df_product_info_sub_clean["brand_id"].cast('int'))
df_product_info_sub_clean = df_product_info_sub_clean.withColumn("loves_count", df_product_info_sub_clean["loves_count"].cast('double'))
df_product_info_sub_clean = df_product_info_sub_clean.withColumn("rating", df_product_info_sub_clean["rating"].cast('double'))
df_product_info_sub_clean = df_product_info_sub_clean.withColumn("reviews", df_product_info_sub_clean["reviews"].cast('double'))

In [None]:
df_product_info_sub_clean.printSchema()

### Remove rows with review_text is na or being too short

The review would be useless if it does not contain enough content or is empty. Therefore, I have decided to remove any columns that have an empty comment or a comment with a length of less than 5.

In [None]:
df_reviews_sub_clean = df_reviews_sub.filter(col("review_text").isNotNull() & (col("review_text").rlike(".{5,}")))
df_reviews_sub_clean.show(10)

The helpfulness score is null when there is no vote on this review. Therefore, I will assign a general score that is neither good nor bad, which will be 0.5.

In [None]:
df_reviews_sub_clean = df_reviews_sub_clean.na.fill({'helpfulness': 0.5})
df_reviews_sub_clean.show(10)

### Count the number of rows after dropping data with null or incorrect values

In [None]:
print(f"Number of rows in the product info dataframe before dropping na: \n{df_product_info_sub_clean.count()}")
print(f"Number of rows in the reviews dataframe before dropping na: \n{df_reviews_sub_clean.count()}")

# Section 3: Transform the clean data

10 points

Once you have clean data, start to prepare it to fit your analysis tools. This might mean using custom code to normalize certain values, joining supplemental datasets, and/or preparing it for machine learning.

### This section should include one or more of the following:
- Write a UDF to perform a function, then use it to add a new column to your data. Explain why in [markdown](https://www.markdownguide.org/cheat-sheet)
- Join an outside data source. (It can be one you've prepared alongside the primary source you're using, as long as you link it)
- Split the data into train/test sets
- Create vectors for relavent features
- One-hot encode catagorical variables

## Transform data

### Transform the size column

In [None]:
df_product_info_sub_clean.select('size').distinct().collect()

I noticed that most of the sizes mentioned in this column are measured in ounces, so I will extract only the numerical part of the sizes labeled with ounces and disregard the rest.

In [None]:
def extract_oz(text):
  try:
    if text is None:
      return 0.0
    quants = q_parser.parse(text)

    for q in quants:
      # print(q.unit.name)
      if q.unit.name == "ounce":
        return q.value

    # return 0 if no applicable volume
    return 0.0
  except:
    # The package quantulum3 got an error when calling stemmer
    # skip this one when this occur
    return 0.0


In [None]:
udf_extract_oz = udf(lambda x: extract_oz(x), DoubleType())
df_product_info_sub_clean = df_product_info_sub_clean.withColumn("size_numeric", udf_extract_oz(df_product_info_sub_clean["size"]))
df_product_info_sub_clean.show()

### Transform the tag columns `ingredients`

In [None]:
df_product_info_sub_clean.show(5)

In [None]:
def str_to_array(x):
  if x != '' :
    ls_in = ast.literal_eval(x)
    ls_out = []
    for obj in ls_in:
      ls_out.append(re.sub('[^a-z0-9 ]+', '', obj.lower()))
    return ls_out
  else:
     return ['']


udf_array = udf(lambda x: str_to_array(x), ArrayType(StringType()))

df_product_info_sub_clean = df_product_info_sub_clean.withColumn("ingredients_tokens", udf_array(df_product_info_sub_clean["ingredients"]))
df_product_info_sub_clean.show(5)

In [None]:
cv_ingredients = CountVectorizer(inputCol="ingredients_tokens", outputCol="ingredients_feature", vocabSize=1000000).fit(df_product_info_sub_clean)
df_product_info_sub_clean = cv_ingredients.transform(df_product_info_sub_clean)

df_product_info_sub_clean.show()

## One hot encoding the categorical columns

In [None]:
df_product_info_sub_clean.show()

###Create dummy variables using OneHotEncoder

In [None]:
udf_transform_empty = udf(lambda s: "NA" if s == "" else s, StringType())
for category in ["primary_category", "secondary_category", "tertiary_category"]:
    df_product_info_sub_clean = df_product_info_sub_clean.withColumn(category, udf_transform_empty(category))
indexers = StringIndexer(inputCols=["primary_category", "secondary_category", "tertiary_category"], outputCols=["primary_category_index", "secondary_category_index", "tertiary_category_index"])
df_product_info_sub_clean = indexers.fit(df_product_info_sub_clean).transform(df_product_info_sub_clean)

encoder = OneHotEncoder(inputCols=["primary_category_index", "secondary_category_index", "tertiary_category_index"], outputCols=["primary_category_encoded", "secondary_category_encoded", "tertiary_category_encoded"])
df_product_info_sub_clean = encoder.fit(df_product_info_sub_clean).transform(df_product_info_sub_clean)
df_product_info_sub_clean.show(5)


## Sentimental analysis on the comment

### Calculate sentiment score with transformer

In [None]:
from transformers import pipeline
sentiment_pipeline = pipeline("sentiment-analysis")

In [None]:
df_reviews_sub_clean.show(5)

In [None]:
def quick_sentiment(x):
  sentiment = sentiment_pipeline(x)[0]
  if sentiment.get('label') == 'NEGATIVE':
    return sentiment.get('score') * -1
  else:
    return sentiment.get('score')


A negative sentiment would be represented as -1, while a positive sentiment would be represented as 1.

In [None]:
udf_sentiment = udf(lambda x: quick_sentiment(x), DoubleType())
df_reviews_sub_clean = df_reviews_sub_clean.withColumn("review_sentiment", udf_sentiment(df_reviews_sub_clean["review_text"]))
df_reviews_sub_clean.show(5)

### Time the sentimental score with the rate of helpfulness as a weight

In [None]:
df_reviews_sub_clean.printSchema()

In [None]:
df_reviews_sub_clean = df_reviews_sub_clean.withColumn("review_sentiment_weighted", col("helpfulness") * col("review_sentiment"))
df_reviews_sub_clean.show(5)

In [None]:
df_sentiment = df_reviews_sub_clean.select("product_id","review_sentiment_weighted")
df_sentiment.show(5)
df_sentiment.printSchema()

### Group the sentiment scores by product ID

In [None]:
df_avg_sentiment = df_sentiment.groupBy("product_id").agg(avg("review_sentiment_weighted").alias("avg_sentiment"))
df_avg_sentiment.show()

### Merge the weighted average sentiment score with the product information dataframe

In [None]:
df_product_info_sub_clean = df_product_info_sub_clean.join(df_avg_sentiment, "product_id", "left")
df_product_info_sub_clean = df_product_info_sub_clean.na.fill({'avg_sentiment':0})
df_product_info_sub_clean.show()

In [None]:
df_product_info_sub_clean.printSchema()

## Assemble feature vector

In [None]:
assembler = VectorAssembler(inputCols=["brand_id","loves_count","rating","reviews","limited_edition","new","online_only","out_of_stock","out_of_stock","sephora_exclusive","size_numeric","ingredients_feature","primary_category_index","secondary_category_index","tertiary_category_index","avg_sentiment"], outputCol="features")
df_assembled = assembler.transform(df_product_info_sub_clean)
df_assembled.show(5)

# Section 4: Show your analysis of the data

10 points

This is where the science happens. Use your data to show some kind of insight, and how you got there. Make the reader understand why it's important, and how they can get the same conslusion, and/or what would need to change to reach a different one.

### This section should include one or more of the following:
- Fit the data to a model
- Show the outcome of clustering, regression, and/or classification algorithms.
 - We used several in class, but you can use whatever fits your needs for this assignment
- Reccomend a product/item
- Use a SQL query to filter results


## Select relevant columns for modeling

In [None]:
df_final = df_assembled.select("features", "price_usd")
df_final.show(5)

## Train test split
Split the data into train and test with 20% of the data in the test sample.

In [None]:
train_data, test_data = df_final.randomSplit([0.7, 0.3], seed=42)

print("Train data count:", train_data.count())
print("Test data count:", test_data.count())

## Train models

Since the data is already complecated enough, I will just use a simply linear regression to test out the performance.

### Linear regression

Fit the data to a linear regression model

In [None]:
lr = LinearRegression(featuresCol='features', labelCol='price_usd')
lr_model = lr.fit(train_data)

Evaluate the linear regression model

In [None]:
lr_predictions = lr_model.transform(train_data)
lr_evaluator = RegressionEvaluator(labelCol="price_usd", predictionCol="prediction", metricName="rmse")
lr_rmse = lr_evaluator.evaluate(lr_predictions)
print("Linear Regression Train RMSE: ", lr_rmse)

In [None]:
lr_predictions = lr_model.transform(test_data)
lr_evaluator = RegressionEvaluator(labelCol="price_usd", predictionCol="prediction", metricName="rmse")
lr_rmse = lr_evaluator.evaluate(lr_predictions)
print("Linear Regression Test RMSE: ", lr_rmse)

The root mean square error (RMSE) indicates that there is some level of error in the predictions of the linear regression model. However, considering that it is a very simple model, this level of error is still acceptable.