In [1]:
import os
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# SparkSession
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql.functions import to_date

# JDBC
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar


Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:6 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:7 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:8 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease [15.9 kB]
Ign:9 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:10 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:11 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:13 http://ppa.lau

In [13]:
# Spark
spark = SparkSession.builder.appName("AWS-Amazon-Reviews").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

# Get files
url = "https://s3://amazongamereview/amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   41409413|R2MTG1GCZLR2DK|B00428R89M|     112201306|yoomall 5M Antenn...|     Electronics|          5|            0|          0|   N|                Y|          Five Stars|       As described.| 2015-08-31|
|         US|   49668221|R2HBOEM8LE9928|B000068O48|     734576678|Hosa GPM-103 3.5m...|     Electronics|          5|    

In [14]:
# Setting up the dataframe
vine_df = df.select(["star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show(10)

# Finding totals and helpful totals
vine_total_votes = vine_df.filter(vine_df["total_votes"] >= 20)
vine_helpful = vine_total_votes.filter(vine_total_votes["helpful_votes"]/vine_total_votes["total_votes"] >= 0.5)

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          5|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          5|            1|          1|   N|                Y|
|          1|            0|          0|   N|                Y|
|          5|            1|          1|   N|                Y|
|          5|            1|          1|   N|                Y|
|          5|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          4|            0|          0|   N|                Y|
|          4|            0|          0|   N|                Y|
+-----------+-------------+-----------+----+-----------------+
only showing top 10 rows



In [18]:
# Filter paid and unpaid
paid_vine = vine_helpful.filter(vine_helpful['vine']== 'Y')
unpaid_vine = vine_helpful.filter(vine_helpful['vine']== 'N')

paid_vine.describe().show()
unpaid_vine.describe().show()

+-------+------------------+------------------+-----------------+----+-----------------+
|summary|       star_rating|     helpful_votes|      total_votes|vine|verified_purchase|
+-------+------------------+------------------+-----------------+----+-----------------+
|  count|              1080|              1080|             1080|1080|             1080|
|   mean| 4.093518518518518| 69.19259259259259|77.24259259259259|null|             null|
| stddev|0.9834377437155114|140.20986501388205|148.5781746930746|null|             null|
|    min|                 1|                11|               20|   Y|                N|
|    max|                 5|              2561|             2688|   Y|                Y|
+-------+------------------+------------------+-----------------+----+-----------------+

+-------+------------------+------------------+------------------+-----+-----------------+
|summary|       star_rating|     helpful_votes|       total_votes| vine|verified_purchase|
+-------+-------

In [16]:
# Analyzing paid vine
paid_five_star = paid_vine[paid_vine['star_rating']== 5].count()
paid_count = paid_vine.count()
five_star_vine = float(paid_five_star) / float(paid_count)
print(paid_count)
print(paid_five_star)
print(five_star_vine)

1080
454
0.4203703703703704


In [17]:
# Analyzing unpaid vine
unpaid_five_star = unpaid_vine[unpaid_vine['star_rating']== 5].count()
unpaid_count = unpaid_vine.count()
five_star_non_vine = float(unpaid_five_star) / float(unpaid_count)
print(unpaid_count)
print(unpaid_five_star)
print(float(unpaid_five_star) / float(unpaid_count))

49673
23043
0.463893865882874
