In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.0.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
#https://downloads.apache.org/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz 

!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()

In [None]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DateType

In [None]:
# Next we need to create the list of struct fields
schema = [
StructField("marketplace", StringType(), True),
StructField("customer_id", StringType(), True),
StructField("review_id", StringType(), True),
StructField("product_id", StringType(), True),
StructField("product_parent", StringType(), True),
StructField("product_title", StringType(), True),
StructField("product_category", StringType(), True),
StructField("star_rating", IntegerType(), True),
StructField("helpful_votes", IntegerType(), True),
StructField("total_votes", IntegerType(), True),
StructField("vine", StringType(), True),
StructField("verified_purchase", StringType(), True),
StructField("review_headline", StringType(), True),
StructField("review_body", StringType(), True),
StructField("review_date", DateType(), True),
]
schema

In [None]:
from pyspark import SparkFiles
url = "https://paulviet-bucket.s3.us-east-2.amazonaws.com/amazon_reviews_us_Video_Games_v1_00.tsv"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Video_Games_v1_00.tsv"), schema=final, sep="\t", header=True, inferSchema=True)
df.count()

In [None]:
df.na.drop(subset=["star_rating", "helpful_votes", "total_votes", "vine", ]).show() #(truncate=False)


In [None]:
# Load in a sql function to use columns
from pyspark.sql.functions import col

In [None]:
# 1 retrieve all the rows where the total_votes count is equal to or greater than 20
votedformore_df = df.filter(col("total_votes") >= 20)
votedformore_df.show()

In [None]:
votedformore_df.count()

In [None]:
# 2 retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%.
over50_df = votedformore_df.filter(col("helpful_votes") / col("total_votes") >= 0.5)
over50_df.show()

In [None]:
# 3 Create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid), vine == 'Y'.
vine_df = over50_df.filter(col("vine") == "Y")
vine_df.show()

In [None]:
print("How many Vine reviews? " + str(vine_df.count()))

In [None]:
# 4 Create a new DataFrame or table that retrieves all the rows where a review was not part of the Vine program (paid), vine == 'N'.
notvine_df = over50_df.filter(col("vine") == "N")
notvine_df.show()

In [None]:
print("How many Non-Vine reviews? " + str(notvine_df.count()))

In [None]:
# 5a Total number of Reviews
print("Total number of Reviews: " + str(over50_df.count()))

In [None]:
# 5b the number of 5-star reviews 
print("Total number of 5 star Reviews: " + str(over50_df.filter(col("star_rating")== 5).count()))

In [None]:
# 5c percentage of 5-star reviews for the two types of review (paid vs unpaid).
print("Paid 5 star reviews  : " + str(vine_df.filter(col("star_rating")== 5).count()))
print("UnPaid 5 star reviews: " + str(notvine_df.filter(col("star_rating")== 5).count()))
print("Percentage of 5 star reviews Paid vs. Unpaid: " + str(vine_df.filter(col("star_rating")== 5).count()

In [None]:
# Reviews regardless of rating
print("Paid reviews  : " + str(vine_df.count()))
print("UnPaid reviews: " + str(notvine_df.count()))
print("Percentage of reviews Paid vs. Unpaid: " + str(vine_df.count() / over50_df.count() * 100) + "