## Summary of the actions in this file: "Yelp_review_Part1_NK.ipynb"  

#### PLEASE UPDATE THE read_path / write_path IN FIRST CODE CELL IF YOU NEED TO!

The below code takes the data, which has circa 5.5 million rows, and using pyspark df, it does, in order, the below actions:  
1) creates an additional column where ratings are given under "label" as float.  
2) creates another df that only includes user_id, text and label columns  
3) processes (cleans and lemmatizes) all text columns and uses these as "words" column  
4) eliminates all rows that do not have user_id information  
5) eliminates all rows that are not a 1 or 5 rating  
6) counts the number of users (~1.1 million) that remain in the df; and the total number of (rating 1 and 5) reviews they have  
7) filters the df to leave it only with teh information of teh users that have more than 20 reviews, so we are left with 12,356 users in df_shorter  
8) saves df_shorter as yelp-cleaned  

In [11]:
# MOUNT GOOGLE DRIVE SO CAN READ THE FILE FROM THERE

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [12]:
!pip install pyspark
!pip install nltk



In [13]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import avg, sum, col
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import nltk
import re

read_path = "/content/drive/My Drive/Colab Notebooks/yelp-dataset/yelp_review.csv"
write_path = "/content/drive/MyDrive/Colab Notebooks/yelp-cleaned"

In [15]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("yelpPart1")
sc = SparkContext(conf = conf)
spark = SparkSession(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=yelpPart1, master=local[*]) created by __init__ at <ipython-input-9-46f90a8c86e3>:3 

In [16]:
df = spark.read.format("csv").option("header", "true").option("multiline","true").load(read_path) #Reading the loaded csv file

In [17]:
df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: string (nullable = true)
 |-- funny: string (nullable = true)
 |-- cool: string (nullable = true)



In [18]:
df.show()

+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+
|           review_id|             user_id|         business_id|stars|      date|                text|useful|funny|cool|
+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+
|vkVSCC7xljjrAI4UG...|bv2nCi5Qv5vroFiqK...|AEx2SYEUJmTxVVB18...|    5|2016-05-28|Super simple plac...|     0|    0|   0|
|n6QzIUObkYshz4dz2...|bv2nCi5Qv5vroFiqK...|VR6GpWIda3SfvPC-l...|    5|2016-05-28|Small unassuming ...|     0|    0|   0|
|MV3CcKScW05u5LVfF...|bv2nCi5Qv5vroFiqK...|CKC0-MOWMqoeWf6s-...|    5|2016-05-28|Lester's is locat...|     0|    0|   0|
|IXvOzsEMYtiJI0CAR...|bv2nCi5Qv5vroFiqK...|ACFtxLv8pGrrxMm6E...|    4|2016-05-28|Love coming here....|     0|    0|   0|
|L_9BTb55X0GDtThi6...|bv2nCi5Qv5vroFiqK...|s2I_Ni76bjJNK9yG6...|    4|2016-05-28|Had their chocola...|     0|    0|   0|
|HRPm3vEZ_F-33TYVT...|_4iMDXbXZ1

In [19]:
df = df.withColumn('label', df["stars"].cast("double"))

In [24]:
df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: string (nullable = true)
 |-- funny: string (nullable = true)
 |-- cool: string (nullable = true)
 |-- label: double (nullable = true)



In [25]:
nltk.download('stopwords') # if needed
nltk.download('punkt') # if needed
nltk.download('wordnet') # if needed
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from pyspark.sql.functions import udf # udf takes a function and returns a function that can be applied to a column
from pyspark.sql.types import StringType, ArrayType
import string

stopword_list = set(stopwords.words("english"))

punkt_list = set(string.punctuation)

def lemmatized(word):
    lemmatizer = WordNetLemmatizer()
    return lemmatizer.lemmatize(word)

def ProcessText(text):
    tokens = nltk.word_tokenize(text)
    processed_tokens = [word.lower() for word in tokens if word.isalpha() and word.lower() not in stopword_list]
    processed_tokens = [lemmatized(word) for word in processed_tokens]
    return processed_tokens

# a function that takes a column of text and returns a column of processed text
process_text_udf = udf(ProcessText, StringType()) # the StringType() is the return type of the function

# we use the udf to create a user defined function "ProcessText" that takes a column of text and returns a column of processed text
# udf ensures that each cell in our column is processed by the function


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...


In [26]:
df_2 = df['user_id','text','label']

# we apply a mapping function ProcessText to all values in teh text column -
# the result is stored in a new column called words
# we need to pass each row in teh column to the function by one by - teh function expects a string not a column

df_3= df_2.withColumn("words", process_text_udf(col("text")))

df_short = df_3['user_id','words','label']

# now we will groupby user_id first and label second

In [27]:
df_short.show()

+--------------------+--------------------+-----+
|             user_id|               words|label|
+--------------------+--------------------+-----+
|bv2nCi5Qv5vroFiqK...|[super, simple, p...|  5.0|
|bv2nCi5Qv5vroFiqK...|[small, unassumin...|  5.0|
|bv2nCi5Qv5vroFiqK...|[lester, located,...|  5.0|
|bv2nCi5Qv5vroFiqK...|[love, coming, ye...|  4.0|
|bv2nCi5Qv5vroFiqK...|[chocolate, almon...|  4.0|
|_4iMDXbXZ1p1ONG29...|[cycle, pub, la, ...|  5.0|
|u0LXt3Uea_GidxRW1...|[would, guess, wo...|  4.0|
|u0LXt3Uea_GidxRW1...|[always, drove, p...|  4.0|
|u0LXt3Uea_GidxRW1...|[bad, love, vegan...|  3.0|
|u0LXt3Uea_GidxRW1...|[love, place, peg...|  5.0|
|u0LXt3Uea_GidxRW1...|[currently, paren...|  4.0|
|u0LXt3Uea_GidxRW1...|[server, little, ...|  3.0|
|u0LXt3Uea_GidxRW1...|[thought, tidy, f...|  1.0|
|u0LXt3Uea_GidxRW1...|[wanted, check, p...|  3.0|
|u0LXt3Uea_GidxRW1...|[place, awesome, ...|  5.0|
|u0LXt3Uea_GidxRW1...|[must, stop, mont...|  4.0|
|u0LXt3Uea_GidxRW1...|[trying, book, ap...|  1.0|


In [28]:
# df.count() # returns 5427013

In [29]:
# We will count the number of rows with the same user_id information.
counts_df = df_short.groupBy("user_id").count()
counts_df.sort("count").show(10)

+--------------------+-----+
|             user_id|count|
+--------------------+-----+
| on a beautiful P...|    1|
|Q87V0vOAtbpxdkrF6...|    1|
|1yGnVPN0ORgvLr_pM...|    1|
|PqUQyGApS2pho7aox...|    1|
|LqdHGAYxwICIXuqfU...|    1|
|32vx6QPtlUvMFurvb...|    1|
| given the swathe...|    1|
| go to another lo...|    1|
|wpHRE_R8rOWp1JQur...|    1|
|bSA6m2r1k67uaEPCc...|    1|
+--------------------+-----+
only showing top 10 rows



In [30]:
# The data shows that there are various rows without a user ID. We will remove these rows from the dataset.
# We also notice that subscribed User_id's include alpha characters, and the ones that reviewed without subscription have smaller pure number user_id's.
# To filer for these, we will also exclude any user_id's that do not contain alphabet character.

df_short = df_short.filter(df_short.user_id != '')
df_short = df_short.filter(df_short.user_id.rlike('[a-zA-Z]'))  # rlike is a regex function that filters for rows that contain alphabet characters

In [31]:
# We also eliminate entries where there is no 1 or 5 rating associated

df_short = df_short.filter(df.label.isin(1.0,5.0)) # We only want to keep the reviews that are rated as 1 or 5 to see what turns off a customer and what really pleases them

In [32]:
counts_df = df_short.groupBy("user_id").count()
counts_df.persist()
counts_df.sort("count", ascending = False).show(10)

+--------------------+-----+
|             user_id|count|
+--------------------+-----+
|dt9IHwfuZs9D9LOH7...|  387|
|RBZ_kMjowV0t6_nv2...|  326|
|rCWrxuRC8_pfagpch...|  307|
|JLv2Dmfj73-I0d9N4...|  302|
|ELcQDlf69kb-ihJfx...|  301|
|U4INQZOPSUaj8hMjL...|  289|
|7sNE58P4AvsX6QHE8...|  288|
|dIIKEfOgo0KqUfGQv...|  276|
|cMEtAiW60I5wE_vLf...|  272|
|G-_KF_Ul4d3WGEa-G...|  268|
+--------------------+-----+
only showing top 10 rows



In [33]:
counts_df.count() # returns 1,127,806 users
counts_df.unpersist()

DataFrame[user_id: string, count: bigint]

# The below cell will collect - so it takes long time to run!

There are 1,127,806 users in our df, with the majority of the users have only written few reviews
We will create a list of user ID's from our df_short that have posted more than 20 reviews

In [None]:
r = 20
user_list = counts_df.filter(counts_df["count"] > r).select("user_id").rdd.flatMap(lambda x: x).collect()

# We will filter our df_short to only include users who have posted more than 20 reviews
df_shorter = df_short.filter(df_short.user_id.isin(user_list))
df_shorter.persist() # this will cache the dataframe in memory and it is useful because we use it to save it to a file and to count the rows later.

counts_df = df_shorter.groupBy("user_id").count()

counts_df.count() # returns 12,356 users

The below code could have been a good alternative to the above, since it does not collect.  
Can work on it later, when things run faster on cloud.

In [None]:
# # this one, had it worked, could have been more efficient than teh previous cell, as it does not collect, but so far it does not work.

# # we join a filtered user_id's with the original dataframe to get the reviews of the users who have posted more than 20 reviews
# # we do not want to have a count column in our final dataframe, so we will drop it

# df_filtered = df_short.groupBy("user_id").count().filter("count > 20")
# df_shorter = df_filtered.join(df_short, "user_id", "inner").drop("count") # something is wrong with this line - maybe the user_id is ambiguous?
# df_shorter.persist()

# counts_df = df_shorter.groupBy("user_id").count()
# counts_df.count() # if it works properly, it should return 12356

In [None]:
df_shorter.count() # returns X rows - the length of our shorter df ; do not run - it will take long time

In [None]:
df_shorter.write.csv(write_path) # so we can use this cleaned data in the future without having to re-run the whole code