In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

!pip install pyspark


update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 54.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=f0fe948a1c79265139b3575a8768313f7cb33bc1bcb89fe0e2ceeddf09a6c16b
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-12-17 20:37:04--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-12-17 20:37:04 (6.19 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Amazon_Vine_Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
url = "https://loganknutsons-bucket.s3.us-east-2.amazonaws.com/vine_table.csv"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("vine_table.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RTIS3L2M1F5SM|          5|            0|          0|   N|                Y|
| R1ZV7R40OLHKD|          5|            0|          0|   N|                Y|
|R3BH071QLH8QMC|          1|            0|          1|   N|                Y|
|R127K9NTSXA2YH|          3|            0|          0|   N|                Y|
|R32ZWUXDJPW27Q|          4|            0|          0|   N|                Y|
|R3AQQ4YUKJWBA6|          1|            0|          0|   N|                Y|
|R2F0POU5K6F73F|          5|            0|          0|   N|                Y|
|R3VNR804HYSMR6|          5|            0|          0|   N|                Y|
| R3GZTM72WA2QH|          5|            0|          0|   N|                Y|
| RNQOY62705W1K|          4|            0|          0|   N|     

In [5]:
# Create the vine_table. DataFrame
df_reviews = df.select(["star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
df_reviews.printSchema()
df_reviews.show()

root
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          5|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          1|            0|          1|   N|                Y|
|          3|            0|          0|   N|                Y|
|          4|            0|          0|   N|                Y|
|          1|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          4|         

In [6]:
# filter reviews with >=20 votes
df_reviews_20 = df_reviews.where(df_reviews["total_votes"] >= 20)
df_reviews_20.show(10)

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          1|           21|         34|   N|                N|
|          1|           21|         35|   N|                Y|
|          1|          147|        175|   N|                Y|
|          1|           14|         31|   N|                Y|
|          2|           55|         60|   N|                N|
|          1|           51|         65|   N|                Y|
|          4|           31|         36|   N|                N|
|          2|           19|         34|   N|                Y|
|          5|           28|         31|   N|                N|
|          1|            4|         32|   N|                N|
+-----------+-------------+-----------+----+-----------------+
only showing top 10 rows



In [7]:
# filter df_reviews_20 for all rows where helpful_votes / Total_votes >=0.5
df_reviews_50 = df_reviews_20.filter(df_reviews_20["helpful_votes"]/df_reviews_20["total_votes"] >= 0.50)

df_reviews_50.show(10)

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          1|           21|         34|   N|                N|
|          1|           21|         35|   N|                Y|
|          1|          147|        175|   N|                Y|
|          2|           55|         60|   N|                N|
|          1|           51|         65|   N|                Y|
|          4|           31|         36|   N|                N|
|          2|           19|         34|   N|                Y|
|          5|           28|         31|   N|                N|
|          2|          151|        198|   N|                N|
|          1|           49|         51|   N|                Y|
+-----------+-------------+-----------+----+-----------------+
only showing top 10 rows



In [8]:
# filter df_reviews_50 for all rows where vine = "Y"
paid_reviews = df_reviews_50.filter(df_reviews_50["vine"] == "Y")

paid_reviews.show(10)

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          5|           56|         63|   Y|                N|
|          3|           23|         23|   Y|                N|
|          5|           19|         24|   Y|                N|
|          4|           22|         26|   Y|                N|
|          3|           20|         26|   Y|                N|
|          5|           46|         51|   Y|                N|
|          5|           22|         28|   Y|                N|
|          5|           21|         24|   Y|                N|
|          4|           21|         28|   Y|                N|
|          5|           17|         20|   Y|                N|
+-----------+-------------+-----------+----+-----------------+
only showing top 10 rows



In [9]:
# explore summary stats using describe
paid_reviews.describe().show()

+-------+------------------+-----------------+------------------+----+-----------------+
|summary|       star_rating|    helpful_votes|       total_votes|vine|verified_purchase|
+-------+------------------+-----------------+------------------+----+-----------------+
|  count|                94|               94|                94|  94|               94|
|   mean| 4.202127659574468|54.59574468085106|61.787234042553195|null|             null|
| stddev|0.9791348741656415|65.26098459822536| 68.90976994895392|null|             null|
|    min|                 1|              111|               102|   Y|                N|
|    max|                 5|               97|                88|   Y|                N|
+-------+------------------+-----------------+------------------+----+-----------------+



In [10]:
# filter df_reviews_50 for all rows where vine = "N"
unpaid_reviews = df_reviews_50.filter(df_reviews_50["vine"] == "N")

unpaid_reviews.show(10)

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          1|           21|         34|   N|                N|
|          1|           21|         35|   N|                Y|
|          1|          147|        175|   N|                Y|
|          2|           55|         60|   N|                N|
|          1|           51|         65|   N|                Y|
|          4|           31|         36|   N|                N|
|          2|           19|         34|   N|                Y|
|          5|           28|         31|   N|                N|
|          2|          151|        198|   N|                N|
|          1|           49|         51|   N|                Y|
+-----------+-------------+-----------+----+-----------------+
only showing top 10 rows



In [11]:
# explore summary stats using describe
unpaid_reviews.describe().show()

+-------+------------------+------------------+------------------+-----+-----------------+
|summary|       star_rating|     helpful_votes|       total_votes| vine|verified_purchase|
+-------+------------------+------------------+------------------+-----+-----------------+
|  count|             40471|             40471|             40471|40471|            40471|
|   mean|  3.34765634651973|47.428405524943784|55.891057794470115| null|             null|
| stddev|1.6418850112078072|117.53763370687082|127.40280622961966| null|             null|
|    min|                 1|                10|               100|    N|                N|
|    max|                 5|               999|               999|    N|                Y|
+-------+------------------+------------------+------------------+-----+-----------------+



In [12]:
# determine count of paid 5 star reviews
paid_5star_reviews = paid_reviews[paid_reviews['star_rating'] == 5].count()
paid_5star_reviews

48

In [13]:
# determine count of paid reviews
paid_reviews_df = paid_reviews.count()
paid_reviews_df

94

In [14]:
# determine % of Paid 5 star reviews
percentage_paid_5star_reviews = paid_5star_reviews / paid_reviews_df * 100
percentage_paid_5star_reviews

51.06382978723404

In [15]:
# determine count of unpaid 5 star reviews
unpaid_5star_reviews = unpaid_reviews[unpaid_reviews['star_rating'] == 5].count()
unpaid_5star_reviews

15663

In [16]:
# determine count of unpaid reviews
unpaid_reviews_df = unpaid_reviews.count()
unpaid_reviews_df

40471

In [17]:
# determine % of unPaid 5 star reviews
percentage_unpaid_5star_reviews = unpaid_5star_reviews / unpaid_reviews_df * 100
percentage_unpaid_5star_reviews

38.701786464381904