# Basic ETL
---

## Goal
Consolidate the data from the two `json` files into a single more manageable storage type.

# Imports and setup
---

In [1]:
# automatically reloads modified libraries
%load_ext autoreload
%autoreload 2

# allows plots to be displayed inline
%matplotlib inline

# sharper figure for retina displays
%config InlineBackend.figure_format='retina'

In [2]:
# standard library
import html
import os
import re
import sys
import numpy as np
import pandas as pd

# third parties
import pandas as pd
import pyspark as ps

from pprint import pprint
from pyspark.sql.types import BooleanType
from pyspark.sql import functions as psf
from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import MapType, StringType, IntegerType, DataType, StructType, StructField

In [3]:
# create a spark session
spark = ps.sql.SparkSession.builder\
                            .master('local[*]')\
                            .appName('json_etl')\
                            .getOrCreate()

# Metadata

Let's take a look at the metadata's schema.

In [None]:
# import the metadata into a 
meta = spark.read.json('../data/metadata.json.gz')

In [6]:
meta.columns

['_corrupt_record',
 'asin',
 'brand',
 'categories',
 'description',
 'imUrl',
 'price',
 'related',
 'salesRank',
 'title']

In [7]:
meta.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- description: string (nullable = true)
 |-- imUrl: string (nullable = true)
 |-- price: double (nullable = true)
 |-- related: struct (nullable = true)
 |    |-- also_bought: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- also_viewed: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- bought_together: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- buy_after_viewing: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- salesRank: struct (nullable = true)
 |    |-- Appliances: long (nullable = true)
 |    |-- Arts, Crafts & Sewing: long (nullable = true)
 |    |-- Automotive: long

In [15]:
%%time
meta.take(1)

KeyboardInterrupt: 

The first thing that I want to do is to convert the zipped `json` file into something with better IO speed. A `parquet` format might be a good option, but we can see that under the sales rank field there a few illegal characters. This could be solved manually, but why not create an over engineered solution for a simple problem?

In [10]:
# create a new schema for the sales rank structure
pattern = re.compile(r' & |, ')
schema_str = meta.select('salesRank').schema.simpleString()
schema_str = re.findall('struct<salesRank:(.*)>', schema_str)[0]
new_schema = re.sub(pattern, '_', html.unescape(schema_str)).replace(' ', '_')

In [11]:
# replace white spaces, & and commas with underscores
meta = meta.withColumn('salesRank', meta['salesRank'].cast(new_schema))

In [12]:
meta.select('salesRank').printSchema()

root
 |-- salesRank: struct (nullable = true)
 |    |-- Appliances: long (nullable = true)
 |    |-- Arts_Crafts_Sewing: long (nullable = true)
 |    |-- Automotive: long (nullable = true)
 |    |-- Baby: long (nullable = true)
 |    |-- Beauty: long (nullable = true)
 |    |-- Books: long (nullable = true)
 |    |-- Camera_Photo: long (nullable = true)
 |    |-- Cell_Phones_Accessories: long (nullable = true)
 |    |-- Clothing: long (nullable = true)
 |    |-- Computers_Accessories: long (nullable = true)
 |    |-- Electronics: long (nullable = true)
 |    |-- Gift_Cards_Store: long (nullable = true)
 |    |-- Grocery_Gourmet_Food: long (nullable = true)
 |    |-- Health_Personal_Care: long (nullable = true)
 |    |-- Home_Kitchen: long (nullable = true)
 |    |-- Home_Improvement: long (nullable = true)
 |    |-- Industrial_Scientific: long (nullable = true)
 |    |-- Jewelry: long (nullable = true)
 |    |-- Kitchen_Dining: long (nullable = true)
 |    |-- Magazines: long (nullable

The categories' names got a bit more confusing, but now we will not run into problems when trying to convert the data frame to parquet.

In [13]:
meta.write.save('../data/metadata.parquet', format='parquet', mode='overwrite')

In [5]:
meta = spark.read.parquet('../data/metadata.parquet')

Let's take a look at this `_corrupt_record`.

In [8]:
meta.select('_corrupt_record').distinct().show(truncate=False)

Py4JJavaError: An error occurred while calling o69.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(_corrupt_record#6, 200)
+- *(1) HashAggregate(keys=[_corrupt_record#6], functions=[], output=[_corrupt_record#6])
   +- *(1) FileScan json [_corrupt_record#6] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/Users/josehjblanco/DataScience/Projects/Fivestars-Structured/data/metadat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_corrupt_record:string>

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
	at org.apache.spark.sql.execution.BaseLimitExec$class.inputRDDs(limit.scala:62)
	at org.apache.spark.sql.execution.LocalLimitExec.inputRDDs(limit.scala:97)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3273)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).json(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).json(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().;
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.buildReader(JsonFileFormat.scala:118)
	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:160)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:295)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:293)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:313)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 41 more


It seems to be quite useless.

What about the categories? How many are available? What do they look like?

In [None]:
meta.write.parquet()

In [9]:
meta.select('categories').show()

KeyboardInterrupt: 

The categories seem to be strings separated by commas stored inside embedded lists. Let's try to solve this problem.

In [None]:
categories = psf.explode(meta['categories'])
exploded_meta = meta.withColumn('categories_exploded', categories)
exploded_meta.select('categories_exploded').show(5)

We need to go deeper

In [None]:
exploded_meta.columns

In [None]:
categories = psf.explode(exploded_meta['categories_exploded'])
exploded_meta = exploded_meta.withColumn('categories_exploded', categories)
exploded_meta.select('categories_exploded').show(5)

Aha! Now is only a matter of splinting the categories.

In [None]:
categories = psf.explode(exploded_meta['categories_exploded'])
exploded_meta = exploded_meta.withColumn('category', psf.explode(psf.split('categories_exploded', ', ')))
exploded_meta.select('category').show(5)

In [None]:
exploded_meta.select('category').distinct().show()

In [None]:
exploded_meta.select('category').distinct().count()

Hmmm... using 18k categories doesn't sound very promising...

In [None]:
exploded_meta.unpersist();

Let's take a look a `salesRank` instead

In [None]:
salesrank_samples = meta.select('salesRank').take(20)

In [None]:
salesrank_dicts = [sample['salesRank'].asDict() for sample in salesrank_samples if sample['salesRank']]
salesrank_df = pd.DataFrame(salesrank_dicts)
# count the number of non null values in each row
salesrank_df.count(axis=1)

It looks like each `salesRank` contains only one not null value, but just to be sure...

In [None]:
@udf
def cat_counts(struct):
    '''
    Simple function used to could the number or non null values
    in each `salesRank` field.
    '''
    if struct:
        struct_values = struct.asDict().values()
        cats = 0
        for value in struct_values:
            if value:
                cats += 1
        return cats

In [None]:
meta.select(cat_counts('salesRank').alias('n_cats')).groupby('n_cats').count().show()

Yeah, each observation has at most one category. Good.

In [None]:
cats = struct_sample['salesRank'].__fields__
print(f'Number of distinct categories: {len(cats)}')
cats

Aahhh, much more manageable. From now on, this will be our categories.

In [None]:
schema = StructType([
    StructField('category', StringType()),
    StructField('rank', IntegerType())
])

@udf(schema)
def get_catrank(struct):
    '''
    Returns the category with non null value if it exists.
    '''
    if struct:
        struct_items = list(struct.asDict().items())
        for cat, rank in struct_items:
            if rank:
                cat_field = StructField('category', StringType(), )
                return {'category': cat, 'rank': rank}

In [None]:
cat_ranks = get_catrank(meta['salesRank'])
category = cat_ranks.getField('category').alias('category')
sales_rank = cat_ranks.getField('rank').alias('sales_rank')

meta = meta.withColumn('category', category)
meta = meta.withColumn('sales_rank', sales_rank)
meta.columns

Now it would be a good idea to remove unnecessary columns such as `imUrl` and `brand`. I will leave in `price` just out of curiosity and `asin` is necessary to join this data frame with the reviews one latter.

In [None]:
keep_cols = {
    'asin',
    'category',
    'price',
    'salesRank'
}

meta = meta.select(*keep_cols)

In [None]:
meta.write.parquet('../data/metadata_clean.parquet', mode='overwrite')

In [131]:
meta.select(meta['salesRank']).take(10)

[Row(salesRank=Row(Appliances=None, Arts_Crafts_Sewing=None, Automotive=None, Baby=None, Beauty=None, Books=6334800, Camera_Photo=None, Cell_Phones_Accessories=None, Clothing=None, Computers_Accessories=None, Electronics=None, Gift_Cards_Store=None, Grocery_Gourmet_Food=None, Health_Personal_Care=None, Home_Kitchen=None, Home_Improvement=None, Industrial_Scientific=None, Jewelry=None, Kitchen_Dining=None, Magazines=None, Movies_TV=None, Music=None, Musical_Instruments=None, Office_Products=None, Patio_Lawn_Garden=None, Pet_Supplies=None, Prime_Pantry=None, Shoes=None, Software=None, Sports_Outdoors=None, Toys_Games=None, Video_Games=None, Watches=None)),
 Row(salesRank=Row(Appliances=None, Arts_Crafts_Sewing=None, Automotive=None, Baby=None, Beauty=None, Books=None, Camera_Photo=None, Cell_Phones_Accessories=None, Clothing=None, Computers_Accessories=None, Electronics=None, Gift_Cards_Store=None, Grocery_Gourmet_Food=None, Health_Personal_Care=None, Home_Kitchen=None, Home_Improvement=

In [43]:
meta.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- description: string (nullable = true)
 |-- imUrl: string (nullable = true)
 |-- price: double (nullable = true)
 |-- related: struct (nullable = true)
 |    |-- also_bought: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- also_viewed: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- bought_together: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- buy_after_viewing: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- salesRank: struct (nullable = true)
 |    |-- Appliances: long (nullable = true)
 |    |-- Arts, Crafts & Sewing: long (nullable = true)
 |    |-- Automotive: long

In [33]:
meta.select('salesRank').show(1)

+--------------------+
|           salesRank|
+--------------------+
|[,,,,, 6334800,,,...|
+--------------------+
only showing top 1 row



In [40]:
meta.sort(meta['price'].desc()).show()

+---------------+----------+---------+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+
|_corrupt_record|      asin|    brand|          categories|         description|               imUrl| price|             related|           salesRank|               title|
+---------------+----------+---------+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+
|           null|B000HLT5OE|     null|[[Software, Busin...|ACT! by Sage  200...|http://ecx.images...|999.99|                null|[,,,,,,,,,,,,,,,,...|                null|
|           null|B000GACWVE|     null|[[Musical Instrum...|The Deluxe Power ...|http://ecx.images...|999.99|[, [B000GAP0FO, B...|[,,,,,,,,,,,,,,,,...|Fender Deluxe Pow...|
|           null|B000FFCZU8|     null|[[Health & Person...|Beneath this luxu...|http://ecx.images...|999.99|[, [B007XN16VY, B...|[,,,,,,,,,,

In [156]:
meta.sort('price', ascending=False).show()

+---------------+----------+---------+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+
|_corrupt_record|      asin|    brand|          categories|         description|               imUrl| price|             related|           salesRank|               title|
+---------------+----------+---------+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+
|           null|B000HLT5OE|     null|[[Software, Busin...|ACT! by Sage  200...|http://ecx.images...|999.99|                null|[,,,,,,,,,,,,,,,,...|                null|
|           null|B000FFCZU8|     null|[[Health & Person...|Beneath this luxu...|http://ecx.images...|999.99|[, [B007XN16VY, B...|[,,,,,,,,,,,,, 12...|HT-104 Bone Human...|
|           null|B000E7K28Y|     null|                null|                null|http://ecx.images...|999.99|[[B00005AV8O, B00...|[,,,,,,,,,,

# Reviews

In [6]:
reviews = spark.read.json('../data/amazon_reviews.json.gz')

In [7]:
reviews.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [9]:
reviews.write.save('../data/reviews.parquet', format='parquet', mode='overwrite')

In [56]:
reviews = spark.read.parquet('../data/reviews.parquet')

In [32]:
reviews.drop('asin').drop('unixReviewTime').show(5)

+-------+-------+--------------------+-----------+--------------+-------------------+--------------------+
|helpful|overall|          reviewText| reviewTime|    reviewerID|       reviewerName|             summary|
+-------+-------+--------------------+-----------+--------------+-------------------+--------------------+
| [0, 0]|    4.0|We use this type ...| 12 3, 2013| ACNGUPJ3A3TM9|                GCM|         Nice Hymnal|
| [2, 3]|    5.0|I bought this for...|09 13, 2009|A2SUAM1J3GNN3B|        J. McDonald|Heavenly Highway ...|
| [0, 0]|    5.0|This is a large s...| 03 9, 2013| APOZ15IEYQRRR|          maewest64|   Awesome Hymn Book|
| [0, 0]|    5.0|We use this hymn ...| 01 2, 2012| AYEDW3BFK53XK|              Missb|Hand Clapping Toe...|
| [0, 0]|    3.0|One review advise...|08 10, 2013|A1KLCGLCXYP1U1|Paul L "Paul Lytle"|          Misleading|
+-------+-------+--------------------+-----------+--------------+-------------------+--------------------+
only showing top 5 rows



In [28]:
reviews.drop

TypeError: split() missing 1 required positional argument: 'pattern'

In [34]:
reviews.columns

['asin',
 'helpful',
 'overall',
 'reviewText',
 'reviewTime',
 'reviewerID',
 'reviewerName',
 'summary',
 'unixReviewTime']

In [36]:
reviews.registerTempTable('reviews')

In [37]:
meta.registerTempTable('meta')

In [None]:
query = '''
        SELECT 
        '''
spark.sql()

In [45]:
helpful = reviews['helpful']

+----------+
|helpful[0]|
+----------+
|         0|
|         2|
|         0|
|         0|
|         0|
|         3|
|         4|
|         3|
|         3|
|         5|
|         1|
|         1|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows

