In [1]:
import os
# Find the latest version of spark 3.2 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.3.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu focal InRelease
Get:4 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Get:5 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Hit:7 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Get:8 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Hit:9 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Get:10 http://archive.ubuntu.com/ubuntu focal-updates/universe amd64 Packages [1,323 kB]
Hit:11 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu focal InRelease
Get:12 http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages [3,069 kB]
Hit:13 http://ppa.launchpad.net/ubuntugis/ppa/ubuntu focal InRelease
Fetched 4,728 kB in 3s (1,67

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2023-04-03 16:39:30--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.1’


2023-04-03 16:39:32 (1.17 MB/s) - ‘postgresql-42.2.16.jar.1’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M17-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
# First attempt but all Vine data in 'N'
#url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Gift_Card_v1_00.tsv.gz"
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Camera_v1_00.tsv.gz"

spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    2975964|R1NBG94582SJE2|B00I01JQJM|     860486164|GoPro Rechargeabl...|          Camera|          5|            0|          0|   N|                Y|          Five Stars|                  ok|2015-08-31 00:00:00|
|         US|   23526356|R273DCA6Y0H9V7|B00TCO0ZAA|     292641483|Professional 58mm...| 

In [5]:
# retrieve all the rows where the total_votes count is equal to or greater than 20
sample_df = df.filter('total_votes>20')
sample_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    3200070|R3FJ319XA6ZAUQ|B00ZI7IH1E|     450586100|Funlux? 8CH Full ...|          Camera|          2|           21|         25|   N|                Y|            Mediocre|The product is gr...|2015-08-31 00:00:00|
|         US|   40768960| R6HRF25HUMIIE|B00TAG8F52|     482310914|LimoStudio Digita...| 

In [6]:
sample_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [7]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, TimestampType, FloatType


In [8]:
# Next we need to create the list of struct fields
schema = [StructField("marketplace", StringType(), True), StructField("customer_id", IntegerType(), True), StructField("review_id", StringType(), True), StructField("product_id", StringType(),True),  StructField("product_parent", IntegerType(),True), StructField("product_title", StringType(),True), StructField("product_category", StringType(),True), StructField("star_rating", IntegerType(),True), StructField("helpful_votes", FloatType(),True), StructField("total_votes", FloatType(),True), StructField("vine", StringType(),True), StructField("verified_purchase", StringType(),True), StructField("review_headline", StringType(),True), StructField("review_body", StringType(),True), StructField("review_date", TimestampType(),True) ]
schema


[StructField('marketplace', StringType(), True),
 StructField('customer_id', IntegerType(), True),
 StructField('review_id', StringType(), True),
 StructField('product_id', StringType(), True),
 StructField('product_parent', IntegerType(), True),
 StructField('product_title', StringType(), True),
 StructField('product_category', StringType(), True),
 StructField('star_rating', IntegerType(), True),
 StructField('helpful_votes', FloatType(), True),
 StructField('total_votes', FloatType(), True),
 StructField('vine', StringType(), True),
 StructField('verified_purchase', StringType(), True),
 StructField('review_headline', StringType(), True),
 StructField('review_body', StringType(), True),
 StructField('review_date', TimestampType(), True)]

In [9]:
# Pass in our fields
final = StructType(fields=schema)
final


StructType([StructField('marketplace', StringType(), True), StructField('customer_id', IntegerType(), True), StructField('review_id', StringType(), True), StructField('product_id', StringType(), True), StructField('product_parent', IntegerType(), True), StructField('product_title', StringType(), True), StructField('product_category', StringType(), True), StructField('star_rating', IntegerType(), True), StructField('helpful_votes', FloatType(), True), StructField('total_votes', FloatType(), True), StructField('vine', StringType(), True), StructField('verified_purchase', StringType(), True), StructField('review_headline', StringType(), True), StructField('review_body', StringType(), True), StructField('review_date', TimestampType(), True)])

In [10]:
from pyspark import SparkFiles
# Read our data with our new schema
#dataframe = spark.read.csv(SparkFiles.get("food.csv"), schema=final, sep=",", header=True)
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Camera_v1_00.tsv.gz"

spark.sparkContext.addFile(url)
vine_df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", schema=final, header=True)
vine_df.show()
vine_df.printSchema()
#df.show()dataframe.printSchema()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    2975964|R1NBG94582SJE2|B00I01JQJM|     860486164|GoPro Rechargeabl...|          Camera|          5|          0.0|        0.0|   N|                Y|          Five Stars|                  ok|2015-08-31 00:00:00|
|         US|   23526356|R273DCA6Y0H9V7|B00TCO0ZAA|     292641483|Professional 58mm...| 

In [11]:
from pyspark.sql.functions import count

In [20]:
from pyspark.sql.functions import col, udf

In [None]:
sample_df.'WHERE CAST(helpful_votes AS FLOAT)/CAST(total_votes AS FLOAT) >=0.5').show()

SyntaxError: ignored

In [16]:
vine_df.filter('(helpful_votes as decimal / total_votes9 as decimal)')

ParseException: ignored

In [18]:
filtered_df = vine_df.filter('(helpful_votes / total_votes)' >= .5)

#vine_df.filter('(helpful_votes / sample_df.total_votes)')
#temp_df = sample_df.withColumn('division', sample_df.helpful_votes/sample_df.total_votes).filter('division' >= .5).show()

#filtered_df = sample_df.withColumn('division', sample_df.helpful_votes/sample_df.total_votes).filter(division >=.5).show()
#sample_df.filter(helpful_votes AS FLOAT)/CAST(total_votes AS FLOAT) >=0.5).show()

TypeError: ignored

In [26]:
filtered_df = vine_df.filter('(helpful_votes.cast(FloatType) / total_votes.cast(FloatType))' >= .5)

TypeError: ignored

In [25]:
filtered_df = vine_df.withColumn(helpful_votes, round(col(total_votes))).filter(division >=.5).show()

NameError: ignored

In [None]:
filtered_df.describe()

AttributeError: ignored