In [1]:
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.1.2'
# spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:12 http://security.ubuntu.com/ubuntu bionic-securi

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [3]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Office_Products_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
office_df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
office_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   43081963|R18RVCKGH1SSI9|B001BM2MAC|     307809868|Scotch Cushion Wr...| Office Products|          5|            0|          0|   N|                Y|          Five Stars|      Great product.| 2015-08-31|
|         US|   10951564|R3L4L6LW1PUOFY|B00DZYEXPQ|      75004341|Dust-Off Compress...| Office Products|          5|    

In [14]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame

In [15]:
# Create the customers_table DataFrame
customers_df = office_df.groupby("customer_id").agg({"customer_id":"count"}).withColumnRenamed("count(customer_id)", "customer_count")
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   43622307|             1|
|   33972406|             1|
|     108460|             1|
|   43515569|             1|
|     132406|             1|
|   42560427|             1|
|    1673863|             2|
|   14552054|             1|
|   45632184|             1|
|   14703850|             2|
|   35768925|             3|
|   45640757|            28|
|   33194403|             1|
|   39383689|             1|
|   10645124|             1|
|   52512151|            10|
|   14646714|             1|
|   38659596|             1|
|   15634680|             1|
|   44178035|             1|
+-----------+--------------+
only showing top 20 rows



In [16]:
# Create the products_table DataFrame and drop duplicates. 
products_df = office_df.select(["product_id","product_title"]).drop_duplicates()
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00PITWBPA|Teacher Lesson Pl...|
|B0038ZTDGQ|        LINKYO TN650|
|B0028N6PSI|Mead Composition ...|
|B001IZX8AA|Generic DR400 and...|
|B010B1ROR4|Viz-pro Double-si...|
|B00WT9IDPE|AIM Compatible Re...|
|B003ZZUKPC|HTS 262B1 1,000 P...|
|B00NQ3UB34|Plush Cute Genuin...|
|B00JFOJ7PG|BIC Kids Pencil -...|
|B000AN1Q64|Pendaflex Hanging...|
|B00X5G94G2|Elmer's Freestyle...|
|B000BV0AYY|Canon PGI-5 BK 06...|
|B00SKO1E8A|No Trespassing We...|
|B00JXQR1ZY|KeySmart Extended...|
|B002OJN3VI|1 Pack. Refurbish...|
|B004OA6X6C|VTech LS6405 Acce...|
|B002FPQBEM|ENGLISH US LARGE ...|
|B002PIWE18|3 Mil 4x6 Photo L...|
|B000A2BJR6|Canon CLI-8 Ink Tank|
|B0006HVJ3Y|Highland Notes, Y...|
+----------+--------------------+
only showing top 20 rows



In [17]:
# Create the review_id_table DataFrame. 
# Convert the 'review_date' column to a date datatype with to_date("review_date", 'yyyy-MM-dd').alias("review_date")
review_id_df = office_df.select(["review_id","customer_id","product_id","product_parent", to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R18RVCKGH1SSI9|   43081963|B001BM2MAC|     307809868| 2015-08-31|
|R3L4L6LW1PUOFY|   10951564|B00DZYEXPQ|      75004341| 2015-08-31|
|R2J8AWXWTDX2TF|   21143145|B00RTMUHDW|     529689027| 2015-08-31|
|R1PR37BR7G3M6A|   52782374|B00D7H8XB6|     868449945| 2015-08-31|
|R3BDDDZMZBZDPU|   24045652|B001XCWP34|      33521401| 2015-08-31|
| R8T6MO75ND212|   21751234|B004J2NBCO|     214932869| 2015-08-31|
|R2YWMQT2V11XYZ|    9109358|B00MOPAG8K|     863351797| 2015-08-31|
|R1V2HYL6OI9V39|    9967215|B003AHIK7U|     383470576| 2015-08-31|
|R3BLQBKUNXGFS4|   11234247|B006TKH2RO|     999128878| 2015-08-31|
|R17MOWJCAR9Y8Q|   12731488|B00W61M9K0|     622066861| 2015-08-31|
|R11EPG1GHOXMGB|   49861762|B00BXOGI3A|     688569009| 2015-08-31|
|R2797NKTEX5THN|    1541556|B0018RHWGE|     513803406| 2015-08

In [18]:
# Create the vine_table. DataFrame
from pyspark.sql.types import IntegerType
vine_df = office_df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
vine_df = vine_df.withColumn("star_rating",vine_df["star_rating"].cast(IntegerType()))
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R18RVCKGH1SSI9|          5|            0|          0|   N|                Y|
|R3L4L6LW1PUOFY|          5|            0|          1|   N|                Y|
|R2J8AWXWTDX2TF|          5|            0|          0|   N|                Y|
|R1PR37BR7G3M6A|          1|            2|          3|   N|                Y|
|R3BDDDZMZBZDPU|          4|            0|          0|   N|                Y|
| R8T6MO75ND212|          5|            0|          0|   N|                Y|
|R2YWMQT2V11XYZ|          5|            0|          0|   N|                N|
|R1V2HYL6OI9V39|          5|            6|          6|   N|                Y|
|R3BLQBKUNXGFS4|          5|            0|          0|   N|                Y|
|R17MOWJCAR9Y8Q|          5|            0|          0|   N|     

**Deliverable 2: Determine Bias of Vine Reviews** 

In [21]:
# Filter the dataset for total_votes being greater than or equal to 20
vine_df = office_df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
vote_count = office_df.filter("total_votes >= 20")

In [22]:
# Create a new DataFrame or table to retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%.
new_table = vote_count.filter(vote_count["helpful_votes"]/vote_count["total_votes"]>=0.5)
new_table.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   11212836|R243UXMMSSZVS6|B00C2MBMNS|     934393421|Lion 2-Pocket Pla...| Office Products|          2|          254|        254|   N|                Y|      Disappointing!|We ordered about ...| 2015-08-31|
|         US|   23384746|R2P92EHR0S5GBB|B00C47U5OI|     712795023|24pcs Vitamin pil...| Office Products|          1|    

In [23]:
# Create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid), vine == 'Y'
new_table.filter(new_table["vine"] == "Y").show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   35360512|R3JIGR122X50ZV|B00ZI7MI96|     592756103|Scotch Thermal La...| Office Products|          5|         2280|       2314|   Y|                N|Here’s what’s REA...|You’re reading th...| 2015-08-21|
|         US|   50120605|R3KV8P8WLD2KKC|B00ZI7MI96|     592756103|Scotch Thermal La...| Office Products|          5|    

In [24]:
# Create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid), vine == 'N'
new_table.filter(new_table["vine"] == "N").show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   11212836|R243UXMMSSZVS6|B00C2MBMNS|     934393421|Lion 2-Pocket Pla...| Office Products|          2|          254|        254|   N|                Y|      Disappointing!|We ordered about ...| 2015-08-31|
|         US|   23384746|R2P92EHR0S5GBB|B00C47U5OI|     712795023|24pcs Vitamin pil...| Office Products|          1|    

In [25]:
# Number of five star reviews
five_star = new_table.filter(new_table["star_rating"]== 5)
five_star.cache()

DataFrame[marketplace: string, customer_id: int, review_id: string, product_id: string, product_parent: int, product_title: string, product_category: string, star_rating: string, helpful_votes: int, total_votes: int, vine: string, verified_purchase: string, review_headline: string, review_body: string, review_date: string]

In [26]:
five_star.count()

19663

In [27]:
# Total reviews
new_table.count()

44714

In [28]:
# Percentage of five star reviews
five_star.count() / new_table.count()

0.43975041374066287

In [29]:
# Percentage of paid reviews
five_star.filter(five_star["verified_purchase"] == "Y").count()/new_table.filter(new_table["verified_purchase"]== "Y").count()

0.4695371077088852

In [30]:
# Percentage of unpaid reviews
five_star.filter(five_star["verified_purchase"] == "N").count()/new_table.filter(new_table["verified_purchase"]== "N").count()

0.4101867118221113