We run a webshop that sells all kinds of products and where customers are able to write reviews about those items that they buy. We want to monitor the number of reviews by customers over time so that we can give incentives to do so in case these numbers diminish drastically.

The goal of this project is to compute summary statistics from tab separated files that contain reviews for Amazon products. The summary statistics should be stored in a database and displayed on a dashboard to see the evolution of the number of reviews per month.


In [9]:
from pyspark.sql import SparkSession
import mysql.connector

In [67]:

'''
Yeah, don't mind this. It is just for myself to check if I have the correct java version activated.

source ~/.bashrc
source ~/.bash_profile

uncomment the following line in /.bash_profile
 - export JAVA_HOME=$(/usr/libexec/java_home -v 1.8) pyspark
'''
import os
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home'

import subprocess
subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT)


b'java version "1.8.0_241"\nJava(TM) SE Runtime Environment (build 1.8.0_241-b07)\nJava HotSpot(TM) 64-Bit Server VM (build 25.241-b07, mixed mode)\n'

In [68]:
spark = SparkSession.builder.appName("Basics").getOrCreate()


In [69]:
amazon = spark.read.csv('../amazon_reviews_multilingual_US_v1_00.tsv', sep='\t', header=True,inferSchema=True)

In [70]:
# amazon.show()

In [71]:
# We know from the project description that we only need the 
# customer_id, review_date, product_category and star_rating columns, 
# so we'll drop all the other columns.
amazon = amazon.select('customer_id', 'review_date', 'product_category', 'star_rating')

In [72]:
# amazon.printSchema()

'''
So there is no problem with our datatypes: Spark doesn't do weird things like
saying that a column full of numbers is of type string or something like that.
If it does, you can use this code to tell spark, for example, that the 
customer_id-column has the type 'int'

amazon = amazon.withColumn('customer_id', amazon['customer_id'].cast('int'))

'''

In [73]:
# amazon.select('review_date').show()


DataFrame[review_date: timestamp]

In [74]:
'''
Extracting the month from the 'review_date' column , creating a new column
called "month" and deleting the "review_date" column. 
Please note that this month column is in numerical format (E.g. 1 = "January")
'''
from pyspark.sql.functions import month
amazon = amazon.withColumn("month", month(amazon['review_date']))
amazon = amazon.drop('review_date')

In [75]:
# amazon.show()


In [76]:
'''
Ok, here it gets messy. It is also coded quite dirty but YOLO.

What we do here is important however. In the first paragraph we make a 
new dataframe called 'amazon_grouped' in which we store the data from 
our amazon dataframe, but also add a new column called 'count'. In this
new column we store how many times each unique combination of 
'month','product_category' and 'star_rating appears in amazon. 

So, as you can see when you run amazon_grouped.show() "month = 1,
product_category = Apparel and star_rating = 1" appears only one
time in amazon, while "month = 1, product_category = Apparel and 
star_rating = 5" appears no less than 9 times! 
(Yay, people are happy with the clothes they buy at amazon in January. 
Probably because they just got new money thanks to selling the sweaters 
and the Mamma Mia Blu-ray they got as a present from grandma, fucking 
ungrateful shits)

In the second paragraph we do the same, but now only for the unique 
combinations of 'month' and'product_category'. Which, as you might
have guessed, gives us way less rows and higher values in the counts-column,
which I renamed here to "count_per_month"

In the last paragraph we add both columns together to make our 
final_dataframe.

If you don't understand this part completely, no worries: I'll explain it
to you in the call and you'll see that is actually quite easy, pinky promise! 

'''

feature_group = ['month','product_category','star_rating']
amazon_grouped = amazon.groupBy(feature_group).count()
# print('amazon_grouped')
# amazon_grouped.sort('product_category','month','star_rating').show()

per_month = amazon.groupBy('month','product_category').count()
per_month = per_month.withColumnRenamed('count', "count_per_month")
# print('per_month')
# per_month.sort('product_category','month').show()

final_dataframe = amazon_grouped.join(per_month, on= ['month','product_category'], how='inner')

In [77]:
# print('final_dataframe')
# final_dataframe.sort('product_category','month','star_rating').show()

final_dataframe


KeyboardInterrupt: 

In [None]:
'''
Comment these lines if you don't want to get Rick rolled and learn the danger of running 
code without understanding what it does or reading the documentation.
'''

import webbrowser
webbrowser.open('https://www.youtube.com/watch?v=dQw4w9WgXcQ') 

In [13]:
# It is bad practice to hardcode your passwords into your programs. 
# That's why we prompt the user here to type it in himself every time the program runs
database_password = input("Can I have your password please?")

In [14]:
# Setting up a connection with our local database and creating a cursor

cnx = mysql.connector.connect(
    host = '127.0.0.1',
    user = 'root',
    passwd = database_password,
    auth_plugin='mysql_native_password',
    buffered=True
)

my_cursor = cnx.cursor()


In [21]:
# Here we run the "amazon.sql" file, in which we set up our database and
# insert a table into it.
path = "amazon.sql"
for line in open(path).read().split(';'):
    my_cursor.execute(line)
    print(line)

CREATE DATABASE IF NOT EXISTS amazon


use amazon


drop table IF EXISTS scores


create table IF NOT EXISTS scores(
    month int,
    product_category varchar(200),
    star_rating int,
    count int,
    count_per_month int

)




['CREATE DATABASE IF NOT EXISTS amazon', '\n\nuse amazon', '\n\ndrop table IF EXISTS scores', '\n\ncreate table IF NOT EXISTS scores(\n    month int,\n    product_category varchar(200),\n    star_rating int,\n    count int,\n    count_per_month int\n\n)\n\n\n\n']


In [22]:
# Read the documentation to know what this method does you lazy fuck! 

collect = final_dataframe.collect()

'''
Ok, I'll help because who in the world reads documentation: 
collect() returns all the records as a list of Row. 

So we'll get a list of all the records in this shape: 
Row(month=8, product_category='Video', star_rating=1, count=286, count_per_month=5074 )
'''

NameError: name 'final_dataframe' is not defined

In [82]:
total = final_dataframe.count()

In [83]:
'''
Ok, this is another hard part (but not really once you get it). I could again
explain here every line and show what it does, but that would result in a 
wall of text and also I'm getting bored. So let's give this a didactic spin 
(which has nothing to do with the fact that the sun is shining and I want to 
go outside) and say that you can try and figure it out yourself as an exercise.
Yes, let's say that: it's an exercise... 

'''
for x in range (total):
    
    # Extracting the month of the x'th row and saving it to a variable called 
    # 'month' (and the same for the other variables)
    month = collect[x][0]
    product_category = collect[x][1]
    star_rating = collect[x][2]
    count = collect[x][3]
    count_per_month = collect[x][4]
    
    # Saving these newly created variables into the 'scores' table 
    # (cf. amazon.sql) into the similarly named columns
    my_cursor.execute("INSERT INTO "
                      "scores(month,product_category,star_rating,count, count_per_month) "
                      "values(%s, %s,%s, %s, %s)", 
                      (month,product_category,star_rating,count,count_per_month))


    # Saving our database after every 1000'th row (not really 'saving' 
    # but let's say it does) so that even if our program crashes, we still
    # don't lose all our progress. Also: printing our progress in percent, 
    # because it's pretty to see that number go up.
    if(x%1000 == 0):
        cnx.commit()
        
        progress = round(100 * x/total, 2)

        print("Progress: %s%%" % progress)
        
cnx.commit()

print("DONE!") 


Progress: 0.0%
Progress: 52.0%
DONE!


In [None]:
# Congratulations on making it through!! 

import webbrowser
webbrowser.open('https://media.giphy.com/media/fxsqOYnIMEefC/giphy.gif')
webbrowser.open('https://media.giphy.com/media/gFccuw5vFkc9trBiQ1/giphy.gif')

