In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [1 InRelease 0 B/88.7 kB                                                                                Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelea

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Tokens").getOrCreate()

In [3]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://dpro23451-bucket.s3.amazonaws.com/vine_table.csv"
spark.sparkContext.addFile(url)


In [4]:
df = spark.read.csv(SparkFiles.get("vine_table.csv"), sep=",", header=True)

In [29]:
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import col

df2 = df.selectExpr("cast (vine as string) vine", "cast (verified_purchase as string) verified_purchase","cast (review_id as string) review_id","cast(total_votes as int) total_votes","cast(star_rating as int) star_rating","cast(helpful_votes as int) helpful_votes")
df2.printSchema()



root
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)

+----+-----------------+--------------+-----------+-----------+-------------+
|vine|verified_purchase|     review_id|total_votes|star_rating|helpful_votes|
+----+-----------------+--------------+-----------+-----------+-------------+
|   N|                Y| RTIS3L2M1F5SM|          0|          5|            0|
|   N|                Y| R1ZV7R40OLHKD|          0|          5|            0|
|   N|                Y|R3BH071QLH8QMC|          1|          1|            0|
|   N|                Y|R127K9NTSXA2YH|          0|          3|            0|
|   N|                Y|R32ZWUXDJPW27Q|          0|          4|            0|
|   N|                Y|R3AQQ4YUKJWBA6|          0|          1|            0|
|   N|                Y|R2F0POU5

In [58]:

# retrieve rows with vote total > 20
df_filtered = df2.filter("total_votes>20")
# filter out non helpful votes
df_clean = df_filtered.filter("helpful_votes/total_votes >= 0.5")
#filter paid reviews
df_paid = df_clean.filter("vine == 'Y'")
#filter unpaid reviews
df_unpaid = df_clean.filter("vine == 'N'")
#filter 5 star paid reviews
df_paidFive = df_paid.groupby("star_rating").count()
#filter 5 star unpaid reviews
df_unpaidFive = df_unpaid.groupby("star_rating").count()

#get review counts for each star rating
df_reviewCount = df_clean.groupby("star_rating").count()
#get total review counts for paid and unpaid
df_paidUnpaid = df_clean.groupby("vine").count()





In [54]:
df_reviewCount.show()

+-----------+-----+
|star_rating|count|
+-----------+-----+
|          1| 9563|
|          3| 4130|
|          5|14748|
|          4| 6333|
|          2| 3147|
+-----------+-----+



In [59]:
df_paidFive.show()

+-----------+-----+
|star_rating|count|
+-----------+-----+
|          1|    1|
|          3|   16|
|          5|   44|
|          4|   24|
|          2|    5|
+-----------+-----+



In [61]:
df_unpaidFive.show()

+-----------+-----+
|star_rating|count|
+-----------+-----+
|          1| 9562|
|          3| 4114|
|          5|14704|
|          4| 6309|
|          2| 3142|
+-----------+-----+



In [62]:
df_paidUnpaid.show()

+----+-----+
|vine|count|
+----+-----+
|   Y|   90|
|   N|37831|
+----+-----+

