# Dataset preparation

In this notebook, we will convert the raw JSON data to Parquet, and perform some other minor cleanup work.

## S3 bucket

First, change the name of the S3 bucket below.

In [1]:
bucket = 'bucket'

VBox()

Starting Spark application


ID,Kind,State,Spark UI,Driver log,User,Current session?
0,pyspark,idle,Link,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Data format

Example entry:

    {
    "image": ["https://images-na.ssl-images-amazon.com/images/I/71eG75FTJJL._SY88.jpg"], 
    "overall": 5.0, 
    "vote": "2", 
    "verified": True, 
    "reviewTime": "01 1, 2018", 
    "reviewerID": "AUI6WTTT0QZYS", 
    "asin": "5120053084", 
    "style": {
        "Size:": "Large", 
        "Color:": "Charcoal"
        }, 
    "reviewerName": "Abbey", 
    "reviewText": "I now have 4 of the 5 available colors of this shirt... ", 
    "summary": "Comfy, flattering, discreet--highly recommended!", 
    "unixReviewTime": 1514764800
    }

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, DoubleType, BooleanType, LongType

schema = StructType([
    StructField("image", ArrayType(StringType()), True), 
    StructField("overall", DoubleType(), True),
    StructField("vote", StringType(), True),
    StructField("verified", BooleanType(), True),
    StructField("reviewTime", StringType(), True),
    StructField("reviewerID", StringType(), True), 
    StructField("asin", StringType(), True),
    StructField("style", MapType(StringType(), StringType()), True), 
    StructField("reviewerName", StringType(), True),
    StructField("reviewText", StringType(), True),
    StructField("summary", StringType(), True),
    StructField("unixReviewTime", LongType(), True)  
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df = spark.read.schema(schema).json(f"s3://{bucket}/data/raw/All_Amazon_Review.json")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- vote: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- style: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- reviewerName: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)

In [6]:
df.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------+----+--------+-----------+--------------+----------+-----+----------------+--------------------+--------------------+--------------+
|image|overall|vote|verified| reviewTime|    reviewerID|      asin|style|    reviewerName|          reviewText|             summary|unixReviewTime|
+-----+-------+----+--------+-----------+--------------+----------+-----+----------------+--------------------+--------------------+--------------+
| null|    1.0|null|   false|12 11, 2015|A27BTSGLXK2C5K|B017O9P72A| null|Jacob M. Wessler|Alexa is not able...|VERY Buggy, doesn...|    1449792000|
| null|    4.0|   5|   false| 12 8, 2015|A27ZJ1NCBFP1HZ|B017O9P72A| null|            Greg|Alexa works great...|      So Far So Good|    1449532800|
| null|    1.0|  11|   false| 12 7, 2015| ACCQIOZMFN4UK|B017O9P72A| null|        Da-Gr8-1|Weak!!\n\nAlexa d...|         Time waster|    1449446400|
+-----+-------+----+--------+-----------+--------------+----------+-----+----------------+--------------------+-

In [7]:
df = df.drop("image").drop("reviewTime").drop("reviewerID").drop("reviewerName")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
df.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----+--------+----------+-----+--------------------+--------------------+--------------+
|overall|vote|verified|      asin|style|          reviewText|             summary|unixReviewTime|
+-------+----+--------+----------+-----+--------------------+--------------------+--------------+
|    1.0|null|   false|B017O9P72A| null|Alexa is not able...|VERY Buggy, doesn...|    1449792000|
|    4.0|   5|   false|B017O9P72A| null|Alexa works great...|      So Far So Good|    1449532800|
|    1.0|  11|   false|B017O9P72A| null|Weak!!\n\nAlexa d...|         Time waster|    1449446400|
+-------+----+--------+----------+-----+--------------------+--------------------+--------------+
only showing top 3 rows

In [9]:
from pyspark.sql import functions as F
df = df.withColumn("tsReviewTime", F.from_unixtime(df.unixReviewTime).cast("timestamp"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
df = df.drop("unixReviewTime")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
df.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----+--------+----------+-----+--------------------+--------------------+-------------------+
|overall|vote|verified|      asin|style|          reviewText|             summary|       tsReviewTime|
+-------+----+--------+----------+-----+--------------------+--------------------+-------------------+
|    1.0|null|   false|B017O9P72A| null|Alexa is not able...|VERY Buggy, doesn...|2015-12-11 00:00:00|
|    4.0|   5|   false|B017O9P72A| null|Alexa works great...|      So Far So Good|2015-12-08 00:00:00|
|    1.0|  11|   false|B017O9P72A| null|Weak!!\n\nAlexa d...|         Time waster|2015-12-07 00:00:00|
+-------+----+--------+----------+-----+--------------------+--------------------+-------------------+
only showing top 3 rows

In [12]:
df.filter(df["style"].isNotNull()).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----+--------+----------+--------------------+--------------------+--------------------+-------------------+
|overall|vote|verified|      asin|               style|          reviewText|             summary|       tsReviewTime|
+-------+----+--------+----------+--------------------+--------------------+--------------------+-------------------+
|    5.0|null|   false|B01ENR8GD8|   {Format: ->  App}|Love the gratitud...|Get Your Gratitud...|2017-03-04 00:00:00|
|    5.0|null|   false|B005V7U52A|  {Color: ->  Black}|This seems to be ...|Good band for the...|2012-09-10 00:00:00|
|    5.0|   7|   false|B005V7U52A|  {Color: ->  Black}|There are similar...|A watch with mult...|2012-09-08 00:00:00|
|    4.0|   2|   false|B005V7U52A|  {Color: ->  White}|I bought this ban...|  Use it for running|2012-07-06 00:00:00|
|    1.0|   5|    true|B005V7U52A|  {Color: ->  Black}|I wish I could su...|Awful...broken in...|2012-01-22 00:00:00|
|    5.0|   2|    true|B005V7U52A|  {Color: ->  Black}|I

In [13]:
df.write.parquet(f"s3://{bucket}/data/parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
parquet_path = f"s3://{bucket}/data/parquet"
spark.sql("CREATE EXTERNAL TABLE reviews(overall DOUBLE, vote STRING, verified BOOLEAN, asin STRING, style MAP<STRING, STRING>, reviewText STRING, summary STRING, tsReviewTime TIMESTAMP) STORED AS PARQUET LOCATION '{0}'".format(parquet_path))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]