# CIS 4130 Semester Project

Our semester long project was to build a machine learning pipeline for our Big Data Technologies class.
This pipeline incorporates cloud infrastructure, in our case utilizing different aspects of AWS such as EC2 instances and EMR clusters.

This project is divided into five different Milestones, all with their own components and requirements!

> Milestone 1: Project Proposal

> Milestone 2: Data Acquisition

> Milestone 3: Descriptive Statistics

> Milestone 4: Coding and Modeling

> Milestone 5: Visualizing Results

### Milestone 1: Project Proposal

The first milestone was doing research and looking for a dataset over 10GB. In our case, we decided to use the Yelp Dataset located [here](https://www.kaggle.com/datasets/yelp-dataset/yelp-dataset?resource=download&select=yelp_academic_dataset_business.json)

This dataset consists of five different files:
- business.json
- checkin.json
- review.json
- tip.json
- user.json

### Milestone 2: Data Acquisition

The second milestone was to download this data directly into a S3 bucket

We'll need to install the Kaggle api through `pip3 install kaggle`

And create a new API Token in Kaggle. Through https://kaggle.com
After logging in, and going to the Account page, scroll down to the API section and Create a New API Token.

A `kaggle.json` file will be downloaded; open this file up in Notepad and copy contents to your clipboard.

After this is done, follow the instructions in the `Steps for downloading data directly from Kaggle to AWS.docx` file regarding set up with your kaggle API files.

In [None]:
# create an s3 bucket for project data
aws s3api create-bucket --bucket project-data-hkl --region us-east-2 --create-bucket-configuration \
LocationConstraint=us-east-2

# download yelp dataset off kaggle
kaggle datasets download --quiet -d yelp-dataset/yelp-dataset -p -  | aws s3 cp - s3://project-data-hkl/yelp.zip

# verify the download
aws s3 ls s3://project-data-hkl/

# once these files are downloaded, we'll have to unzip them, and reupload to the s3 bucket

import zipfile
import boto3 
from io import BytesIO 
bucket="project-data-hkl"
zipfile_to_unzip="yelp.zip"
s3_client = boto3.client('s3', use_ssl=False) 
s3_resource = boto3.resource('s3')
zip_obj = s3_resource.Object(bucket_name=bucket, key=zipfile_to_unzip) 
buffer = BytesIO(zip_obj.get()["Body"].read()) 
z = zipfile.ZipFile(buffer) 
# Loop through all of the files contained in the Zip archive 
for filename in z.namelist():
  print('Working on ' + filename)
s3_resource.meta.client.upload_fileobj(z.open(filename), Bucket=bucket, Key=f'{filename}')

# once again, check that these files were unzipped and uploaded to the bucket
aws s3 ls s3://project-data-hkl/

# here's some quick summary statistics we can perform to double check our data's all there
import boto3
import pandas as pd
s3 = boto3.resource('s3')
df = pd.read_json('s3://project-data-hkl/yelp_academic_dataset_business.json', lines=True)
df.dtypes
results = df.groupby('state').stars.agg(['count', 'mean'])

#### Milestone 3: Descriptive Statistics

In this milestone, we're going to better understand our data by performing some simple statistics:

- df.dtypes gives us the datatypes for our columns, so we can find the numeric variables
- df.count() gives us the numbers of values in the dataframe for each column
- df.isna().sum() gives us the count for all missing values in the dataframe
- df.dtypes shows us that which fields contain numeric values 


In [None]:
import boto3
import pandas as pd

In [None]:
s3 = boto3.resource('s3')
business_df = pd.read_json('s3://project-data-hkl/yelp_academic_dataset_business.json', lines=True)
business_df.dtypes
business_df.count()
business_df.isna().sum()
business_df.dtypes

In [None]:
s3 = boto3.resource('s3')
tip_df = pd.read_json('s3://project-data-hkl/yelp_academic_dataset_tip.json', lines=True)
tip_df.dtypes
tip_df.count()
tip_df.isna().sum()
tip_df.dtypes

In [None]:
s3 = boto3.resource('s3')
user_df = pd.read_json('s3://project-data-hkl/yelp_academic_dataset_user.json', lines=True)
user_df.dtypes
user_df.count()
user_df.isna().sum()
user_df.dtypes

In [None]:
s3 = boto3.resource('s3')
review_df = pd.read_json('s3://project-data-hkl/yelp_academic_dataset_review.json', lines=True)
review_df.dtypes
review_df.count()
review_df.isna().sum()
review_df.dtypes

### Milestone 4: Coding and Modeling

On our review dataset, we'll be performing a linear regression to describe a relationship between number of stars a review has and the number of useful reviews that other rate that review.

To do this, I borrowed methods from [Sanjjushri Varshini R](https://medium.com/featurepreneur/linear-regression-with-pyspark-in-10-steps-c6b3263a2c4) and [Susan Li](https://towardsdatascience.com/building-a-linear-regression-with-pyspark-and-mllib-d065c3ba246a) to help me with my linear regressions in PySpark.

In [None]:
df = spark.read.json("s3://project-data-hkl/yelp_academic_dataset_review.json")
df.printSchema()
df.na.drop("any").show(truncate=false)

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

vectorAssembler = VectorAssembler(inputCols = ['useful'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(df)
vhouse_df = vhouse_df.select(['features', 'stars'])
vhouse_df.show(3)

splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

lr = LinearRegression(featuresCol = 'features', labelCol='stars')
lr_model = lr.fit(train_df)

print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

pred_results = lr_model.evaluate(test_df)
output_file_path=" s3://project-data-hkl/yelp_academic_dataset_review.json "
pred_results.write.options(header="True", delimiter='\t').csv(output_file_path)


#### Milestone 5: Visualizing Results

In this milestone, we performed visualizations utilizing PySpark and Python alone, with just the business dataset.
This code is made to be run with an EMR Spark Cluster with 5 clusters.

For our EMR cluster, we plan on joining the Reviews and Business datasets, into one central dataframe, `yelp_sdf`

In [None]:
#first install the packages that we're working with
pip install matplotlib
pip install pandas
pip install s3fs

#run our pyspark instance
pyspark

In [None]:
#imports
from pyspark.sql.functions import *
import io
import pandas as pd
import s3fs 
from matplotlib import pyplot as plt

In [None]:
# Yelp Reviews Dataset  
reviews_filename = "s3://project-data-hkl/yelp_academic_dataset_review.json"
reviews_sdf = spark.read.json(reviews_filename)
# Rename the stars with review_stars
reviews_sdf = reviews_sdf.withColumnRenamed('stars', 'review_stars')
# Rename the 'date' column with 'review_date'
reviews_sdf = reviews_sdf.withColumnRenamed('date', 'review_date')
# Convert review_date to an actual date data type
reviews_sdf = reviews_sdf.withColumn("review_date", to_date(to_timestamp(col("review_date"), "yyyy-MM-dd HH:mm:ss" )) )
# Get the age of the review in days
reviews_sdf = reviews_sdf.withColumn("review_age_days", datediff(current_date(),col("review_date")) )
reviews_sdf = reviews_sdf.withColumn("review_age_years", col("review_age_days")/365.0 )
reviews_sdf = reviews_sdf.withColumn("review_year", year("review_date") )

# Likely don't need the review_id
reviews_sdf = reviews_sdf.drop('review_id')

# Create an indicator variable if the star rating is more than 3.0
reviews_sdf = reviews_sdf.withColumn("goodreview", when(col("review_stars") > 3.0, 1.0).otherwise(0.0))

# Yelp Businesses Dataset
business_filename = "s3://project-data-hkl/yelp_academic_dataset_business.json"
business_sdf = spark.read.json(business_filename)
business_sdf.printSchema()
# Rename the business "stars"
business_sdf = business_sdf.withColumnRenamed('stars', 'business_stars')
business_sdf.select("business_id", "name", "categories", "attributes.Alcohol", "attributes.NoiseLevel").show()

# Grab some columns we likely want to explore
business_sdf = business_sdf.withColumn("alcohol", business_sdf.attributes.Alcohol)

# Drop some columns we likely don't need
business_sdf = business_sdf.drop('city','hours','is_open','latitude','longitude','postal_code','state','address','attributes')

# Fill in missing / null values
business_sdf = business_sdf.withColumn('alcohol', when(lower(business_sdf.alcohol) == 'none', None).otherwise(business_sdf.alcohol))
business_sdf = business_sdf.withColumn('alcohol', when(lower(business_sdf.alcohol) == "'none'", None).otherwise(business_sdf.alcohol))
business_sdf = business_sdf.withColumn('alcohol', when(lower(business_sdf.alcohol) == u'none', None).otherwise(business_sdf.alcohol))
business_sdf = business_sdf.withColumn('alcohol', when(lower(business_sdf.alcohol) == "u'none'", None).otherwise(business_sdf.alcohol))
business_sdf = business_sdf.withColumn('alcohol', when(lower(business_sdf.alcohol) == "'beer_and_wine'", "beer_and_wine").otherwise(business_sdf.alcohol))
business_sdf = business_sdf.withColumn('alcohol', when(lower(business_sdf.alcohol) == "u'beer_and_wine'", "beer_and_wine").otherwise(business_sdf.alcohol))
business_sdf = business_sdf.withColumn('alcohol', when(lower(business_sdf.alcohol) == "'full_bar'", "full_bar").otherwise(business_sdf.alcohol))
business_sdf = business_sdf.withColumn('alcohol', when(lower(business_sdf.alcohol) == "u'full_bar'", "full_bar").otherwise(business_sdf.alcohol))

# Join business with reviews
yelp_sdf = reviews_sdf.join(business_sdf, "business_id")
yelp_sdf.printSchema()

In [None]:
# dataframe shape
#  |-- business_id: string (nullable = true)
#  |-- cool: long (nullable = true)
#  |-- review_date: date (nullable = true)
#  |-- funny: long (nullable = true)
#  |-- review_stars: double (nullable = true)
#  |-- text: string (nullable = true)
#  |-- useful: long (nullable = true)
#  |-- user_id: string (nullable = true)
#  |-- review_age_days: integer (nullable = true)
#  |-- review_age_years: double (nullable = true)
#  |-- goodreview: double (nullable = false)
#  |-- categories: string (nullable = true)
#  |-- name: string (nullable = true)
#  |-- review_count: long (nullable = true)
#  |-- business_stars: double (nullable = true)
#  |-- alcohol: string (nullable = true)

In [None]:
#Reviews in the last 10 years
year_count_df = yelp_sdf.where(col("review_year") > 2012).groupby('review_year').count().sort('review_year').toPandas()
fig = plt.figure()

plt.bar(year_count_df['review_year'], year_count_df['count'])
plt.xlabel("Year")
plt.ylabel("Number of Reviews")
plt.title("Number of Reviews after 2012 by Year")
plt.xticks(rotation=90, ha='right')
fig.tight_layout()

review_count_by_year = io.BytesIO()
plt.savefig(review_count_by_year, format='png', bbox_inches='tight')
review_count_by_year.seek(0)
# Connect to the s3fs file system
s3 = s3fs.S3FileSystem(anon=False)
with s3.open('s3://project-data-hkl/review_count_by_year.png', 'wb') as f:
  f.write(review_count_by_year.getbuffer())


# Look at Total Star ratings
star_count_df = yelp_sdf.groupBy("review_stars").count().sort("review_stars").toPandas()
fig = plt.figure()
plt.ticklabel_format(useOffset=False, style='plain', axis='y')
plt.bar(star_count_df['review_stars'],star_count_df['count'] )
# fig.tight_layout()
plt.title("Review Count by Star Rating")
star_rating_count = io.BytesIO()
plt.savefig(star_rating_count, format='png', bbox_inches='tight')
star_rating_count.seek(0)
# Connect to the s3fs file system
s3 = s3fs.S3FileSystem(anon=False)
with s3.open('s3://project-data-hkl/star_rating_count.png', 'wb') as f:
  f.write(star_rating_count.getbuffer())

# Good restaurants that serve alcohol?
alcohol_df = yelp_sdf.where(col("goodreview") != '0').groupby('alcohol').count().sort('alcohol').toPandas()
alcohol_df['alcohol'].replace({None:'None'},inplace=True)
fig = plt.figure()
plt.ticklabel_format(useOffset=False, style='plain', axis='y')
plt.bar(alcohol_df['alcohol'], alcohol_df['count'])
plt.xlabel("Alcohol Service")
plt.ylabel("Count")
plt.title("Number of Restaurants with Good Reviews (>3.0) That Serve Alcohol?")

good_review = io.BytesIO()
plt.savefig(good_review, format='png', bbox_inches='tight')
good_review.seek(0)
# Connect to the s3fs file system
s3 = s3fs.S3FileSystem(anon=False)
with s3.open('s3://project-data-hkl/good_review_alcohol.png', 'wb') as f:
  f.write(good_review.getbuffer())

#### Python Visualization Code

In [None]:
import io
import boto3
import pandas as pd
import s3fs 
from matplotlib import pyplot as plt

s3 = boto3.resource('s3')


df = pd.read_json('yelp_academic_dataset_business.json', lines=True)

#dropping restaurants in states with just one restaurant
df.loc[df['categories'].str.contains('Restaurant',case=False,na=False),'is_restaurant'] = 'Yes'
df.loc[~df['categories'].str.contains('Restaurant',case=False,na=False),'is_restaurant'] = 'No'
restaurant_df = df[df['is_restaurant']=='Yes']
restaurant_df = restaurant_df[~(restaurant_df.state.isin(['XMS','MT','NC','HI','CO']))]

#average reviews over time
restaurant_df['review_year'] = restaurant_df[review_date].dt.year
restaurant_rating_over_time = restaurant_df.groupby(['stars','review_year']).reset_index().round(2)

is_restaurant = df['is_restaurant'].value_counts().reset_index().rename(columns={'index':'is_restaurant','is_restaurant':'count'})
is_restaurant
fig1 = plt.figure()
plt.barh(is_restaurant['is_restaurant'],is_restaurant['count'])
plt.title("Number of Restaurants in Dataset")

# Create a buffer to hold the figure
img_data = io.BytesIO()
# Write the figure to the buffer
plt.savefig(img_data, format='png', bbox_inches='tight')
img_data.seek(0)
# Connect to the s3fs file system
s3 = s3fs.S3FileSystem(anon=False)
with s3.open('s3://project-data-hkl/restaurant_count.png', 'wb') as f:
  f.write(img_data.getbuffer())

######
avg_stars = restaurant_df.groupby(['state',])['stars'].mean().round(decimals=2).reset_index(name='average')
avg_stars = avg_stars.sort_values(by='average',ascending=False)
avg_stars

# Create a buffer to hold the figure
img_data = io.BytesIO()
# Write the figure to the buffer
plt.savefig(img_data, format='png', bbox_inches='tight')
img_data.seek(0)
# Connect to the s3fs file system
s3 = s3fs.S3FileSystem(anon=False)
with s3.open('s3://project-data-hkl/average_restaurant_review_state.png', 'wb') as f:
  f.write(img_data.getbuffer())

ca_rest_df = restaurant_df[restaurant_df['state']=='CA']
restaurant_review_ct = ca_rest_df.groupby(['state','stars'])['business_id'].count().reset_index(name='count')
restaurant_review_ct

fig2 = plt.figure(figsize=(9,6))
plt.bar(restaurant_review_ct['stars'],restaurant_review_ct['count'],width=.4)
plt.title("Most Frequent Star Ratings in California")

# Create a buffer to hold the figure
img_data = io.BytesIO()
# Write the figure to the buffer
plt.savefig(img_data, format='png', bbox_inches='tight')
img_data.seek(0)
# Connect to the s3fs file system
s3 = s3fs.S3FileSystem(anon=False)
with s3.open('s3://project-data-hkl/frequent_stars_rating_ca.png', 'wb') as f:
  f.write(img_data.getbuffer())

#Let's say a good indicator of a restaurant is above 4.0 stars! What city has the most of these restaurants?
four_star_ca_restaurants = ca_rest_df[ca_rest_df['stars']>= 4.0]

four_star_ca_restaurants_cnt = four_star_ca_restaurants.groupby(['city'])['stars'].count().reset_index(name='count')
four_star_ca_restaurants_cnt = four_star_ca_restaurants_cnt[four_star_ca_restaurants_cnt['count']!=1]
four_star_ca_restaurants_cnt

fig2 = plt.figure(figsize=(9,6))
plt.bar(four_star_ca_restaurants_cnt['city'],four_star_ca_restaurants_cnt['count'],width=.4)
plt.title("California Cities with Best Reviewed Restaurants")
plt.show()

# Create a buffer to hold the figure
img_data = io.BytesIO()
# Write the figure to the buffer
plt.savefig(img_data, format='png', bbox_inches='tight')
img_data.seek(0)
# Connect to the s3fs file system
s3 = s3fs.S3FileSystem(anon=False)
with s3.open('s3://project-data-hkl/ca_best_reviewed.png', 'wb') as f:
  f.write(img_data.getbuffer())
