In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to security.ubuntu.com (185.125.190.3                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Hit:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                             

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Vine_Review_Analysis").getOrCreate()

In [3]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://celine-bucket.s3.amazonaws.com/vine_table.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("vine_table.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R18RVCKGH1SSI9|          5|            0|          0|   N|                Y|
|R3L4L6LW1PUOFY|          5|            0|          1|   N|                Y|
|R2J8AWXWTDX2TF|          5|            0|          0|   N|                Y|
|R1PR37BR7G3M6A|          1|            2|          3|   N|                Y|
|R3BDDDZMZBZDPU|          4|            0|          0|   N|                Y|
| R8T6MO75ND212|          5|            0|          0|   N|                Y|
|R2YWMQT2V11XYZ|          5|            0|          0|   N|                N|
|R1V2HYL6OI9V39|          5|            6|          6|   N|                Y|
|R3BLQBKUNXGFS4|          5|            0|          0|   N|                Y|
|R17MOWJCAR9Y8Q|          5|            0|          0|   N|     

In [4]:
all_review = df.count()
all_review

2642434

In [5]:
# 1.Retrieve the rows where the total_votes count is equal to or greater than 20 
valid_df = df.filter("total_votes >= 20")
valid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R243UXMMSSZVS6|          2|          254|        254|   N|                Y|
|R2P92EHR0S5GBB|          1|          110|        159|   N|                Y|
| RN93Z4XPNAANY|          5|           31|         31|   N|                Y|
|R1O74T9XO3OPQ4|          5|           38|         38|   N|                Y|
|R3DMIHE5BMUT0R|          3|           21|         21|   N|                Y|
|R1QWY17NLHQSB5|          5|           36|         36|   N|                Y|
|R3LANI678T2YQS|          5|           39|         47|   N|                N|
| RLV4XQ49EBYYU|          1|           56|         65|   N|                Y|
| RPBPAJKFD0T26|          5|           63|         63|   N|                Y|
|R1IHO8C56KBN6C|          1|           61|         64|   N|     

In [6]:
valid_review = valid_df.count()
valid_review

48163

In [7]:
# Change datatype of columns
from pyspark.sql.functions import col
valid_df = valid_df.withColumn("helpful_votes", col("helpful_votes").cast("float"))\
                         .withColumn("total_votes", col("total_votes").cast("float")) 
valid_df.printSchema()


root
 |-- review_id: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: float (nullable = true)
 |-- total_votes: float (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)



In [8]:
# 2.Retrieve the rows where helpful_votes is more than 50%
helpful_df = valid_df.filter("helpful_votes/total_votes >= 0.5")
helpful_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R243UXMMSSZVS6|          2|        254.0|      254.0|   N|                Y|
|R2P92EHR0S5GBB|          1|        110.0|      159.0|   N|                Y|
| RN93Z4XPNAANY|          5|         31.0|       31.0|   N|                Y|
|R1O74T9XO3OPQ4|          5|         38.0|       38.0|   N|                Y|
|R3DMIHE5BMUT0R|          3|         21.0|       21.0|   N|                Y|
|R1QWY17NLHQSB5|          5|         36.0|       36.0|   N|                Y|
|R3LANI678T2YQS|          5|         39.0|       47.0|   N|                N|
| RLV4XQ49EBYYU|          1|         56.0|       65.0|   N|                Y|
| RPBPAJKFD0T26|          5|         63.0|       63.0|   N|                Y|
|R1IHO8C56KBN6C|          1|         61.0|       64.0|   N|     

In [9]:
helpful_review = helpful_df.count()
helpful_review

44714

In [10]:
# 3. Retrieves the rows where a review is paid
paid_df = helpful_df.filter("vine == 'Y'")
paid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3JIGR122X50ZV|          5|       2280.0|     2314.0|   Y|                N|
|R3KV8P8WLD2KKC|          5|        263.0|      286.0|   Y|                N|
|R2LFY2ZPJ3J7RH|          4|         36.0|       38.0|   Y|                N|
|R3P01OXMA0U0CA|          5|        114.0|      117.0|   Y|                N|
|R2SCMXBD9OQP8Z|          2|         22.0|       26.0|   Y|                N|
|R2FGS9ECOK5IQN|          5|         22.0|       23.0|   Y|                N|
| R1V9CO41XS6QY|          2|         56.0|       58.0|   Y|                N|
|R1LEP1JGXRY2YT|          2|         18.0|       20.0|   Y|                N|
|R1ZWQJ3ZBSIZ1K|          4|         19.0|       21.0|   Y|                Y|
| RX1G4FUE5HZGN|          3|         52.0|       58.0|   Y|     

In [11]:
total_paid_review = paid_df.count()
total_paid_review

969

In [12]:
# 4. Retrieves the rows where a review is unpaid
unpaid_df = helpful_df.filter("vine == 'N'")
unpaid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R243UXMMSSZVS6|          2|        254.0|      254.0|   N|                Y|
|R2P92EHR0S5GBB|          1|        110.0|      159.0|   N|                Y|
| RN93Z4XPNAANY|          5|         31.0|       31.0|   N|                Y|
|R1O74T9XO3OPQ4|          5|         38.0|       38.0|   N|                Y|
|R3DMIHE5BMUT0R|          3|         21.0|       21.0|   N|                Y|
|R1QWY17NLHQSB5|          5|         36.0|       36.0|   N|                Y|
|R3LANI678T2YQS|          5|         39.0|       47.0|   N|                N|
| RLV4XQ49EBYYU|          1|         56.0|       65.0|   N|                Y|
| RPBPAJKFD0T26|          5|         63.0|       63.0|   N|                Y|
|R1IHO8C56KBN6C|          1|         61.0|       64.0|   N|     

In [13]:
total_unpaid_review = unpaid_df.count()
total_unpaid_review

43745

In [14]:
paid_five_star_review = paid_df.filter("star_rating == 5").count()
paid_five_star_review

430

In [15]:
percentage_of_paid_five_star = (paid_five_star_review/total_paid_review) * 100
percentage_of_paid_five_star

44.375644994840044

In [16]:
unpaid_five_star_review = unpaid_df.filter("star_rating == 5").count()
unpaid_five_star_review

19233

In [17]:
percentage_of_unpaid_five_star = (unpaid_five_star_review/total_unpaid_review) * 100
percentage_of_unpaid_five_star

43.96616756200709

In [18]:
# 5. Create a new DataFrame to make a summary list
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
data = [
    ("All Reviews", "No Filter", all_review),
    ("Valid Reviews", "Review Count >= 20", valid_review),
    ("Helpful Reviews", "Helpful Votes/Total Votes >= 50% in Valid Reviews", helpful_review),
    ("Total Paid Reviews", "Get Paid in Helpful Reviews", total_paid_review),
    ("Total Unpaid Reviews", "Not Paid in Helpful Reviews", total_unpaid_review),
    ("Paid Five Star Reviews", "5-star in Total paid Reviews ", paid_five_star_review),
    ("Unpaid Five Star Reviews", "5-star in Total Unpaid Reviews ", unpaid_five_star_review),
    ("Percentage of Paid 5-star Reviews", "Paid 5-star Reviews to %", round(percentage_of_paid_five_star, 2)),
    ("Percentage of Unpaid 5-star Reviews", "Unpaid 5-star Reviews to %", round(percentage_of_unpaid_five_star, 2))
]

schema = StructType([
    StructField("Summary", StringType(), True),\
    StructField("Filter", StringType(), True),\
    StructField("Result", StringType(), True),\
])

Summary_df = spark.createDataFrame(data=data, schema=schema) 
Summary_df.printSchema()
Summary_df.show(truncate=False)

root
 |-- Summary: string (nullable = true)
 |-- Filter: string (nullable = true)
 |-- Result: string (nullable = true)

+-----------------------------------+-------------------------------------------------+-------+
|Summary                            |Filter                                           |Result |
+-----------------------------------+-------------------------------------------------+-------+
|All Reviews                        |No Filter                                        |2642434|
|Valid Reviews                      |Review Count >= 20                               |48163  |
|Helpful Reviews                    |Helpful Votes/Total Votes >= 50% in Valid Reviews|44714  |
|Total Paid Reviews                 |Get Paid in Helpful Reviews                      |969    |
|Total Unpaid Reviews               |Not Paid in Helpful Reviews                      |43745  |
|Paid Five Star Reviews             |5-star in Total paid Reviews                     |430    |
|Unpaid Five St