# Data Preparation

## Necessary imports

In [1]:
import os
import sys
import warnings
warnings.filterwarnings("ignore")

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [3]:
! pwd
! ls -la
! head -n 3 amazon_item_ratings.csv
! tail -n 3 amazon_item_ratings.csv

/home/big/Desktop/Aulas
total 331320
drwxrwxr-x 5 big big      4096 mai 23 03:11  .
drwxr-xr-x 3 big big      4096 mai 18 22:55  ..
drwxrwxr-x 3 big big      4096 mai 21 14:16  aula_08-20210517T134909Z-001
-rwxrw-rw- 1 big big     15277 mai 23 03:11 'Data Preparation.ipynb'
drwxrwxr-x 2 big big      4096 mai 23 02:57  .ipynb_checkpoints
drwxrwxr-x 7 big big      4096 mai 23 03:04  Projeto
-rw-rw-r-- 1 big big 339230587 mai 18 20:18  Projeto.zip
head: cannot open 'amazon_item_ratings.csv' for reading: No such file or directory
tail: cannot open 'amazon_item_ratings.csv' for reading: No such file or directory


## Read dataset from file

In [4]:
df_items = spark.read.csv("amazon_item_ratings.csv", header=False, inferSchema=True, sep=",") 

Get a fraction of the sample

In [5]:
df_items = df_items.sample(fraction=0.05) #0.05 works

## Multiple checks on structure

Check dataset schema and column datatypes

In [6]:
df_items.printSchema()
df_items.count()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: integer (nullable = true)



826913

Display 10 rows

In [None]:
df_items.show(n=10, truncate=False)

Change column names to improve readabilty

In [None]:
df_items = df_items.withColumnRenamed("_c0","Reviewer") \
    .withColumnRenamed("_c1","Item") \
    .withColumnRenamed("_c2","Rating") \
    .withColumnRenamed("_c3","Timestamp")

In [None]:
df_items.show(n=10, truncate=False)

Check for Null or NaN values

In [7]:
df_items.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_items.columns]).show()

+---+---+---+---+
|_c0|_c1|_c2|_c3|
+---+---+---+---+
|  0|  0|  0|  0|
+---+---+---+---+



Since there are no Null or NaN values we can safely convert "Rating" column to an Integer since Item ratings can only be natural numbers ranging from 1 to 5

In [None]:
df_items = df_items.withColumn("Rating", df_items["Rating"].cast(IntegerType()))

Check if .cast() was successful and look for values out of place

In [9]:
df_items.describe("Rating").show()

+-------+------------------+
|summary|               _c2|
+-------+------------------+
|  count|            826913|
|   mean| 4.162625330597052|
| stddev|1.2614705476479164|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+



## Indexing

Two methods were tried since we were never able to run StringIndexer on more than 10% of the dataset

### PySpark Pipeline

In [None]:
indexers = [StringIndexer(inputCol="Reviewer", outputCol="ReviewerID", handleInvalid="skip") , StringIndexer(inputCol="Item", outputCol="ItemID", handleInvalid="skip")]

pipeline = Pipeline(stages=indexers)
df_items_indexed = pipeline.fit(df_items).transform(df_items)

df_items_indexed.show()

### One column at a time

For some reason that allows StringIndexer to fit a greater sample than Pipeline before eventually Java runs out of memory or KryoSerializer throws a BufferOverflowing error

In [10]:
indexerItem = StringIndexer(inputCol="Item", outputCol="ItemID")
df_items_index_users = indexerItem.fit(df_items).transform(df_items)
df_items_index_users.select("_c1","Item_Index").show(truncate=False)

del df_items

indexerUsers = StringIndexer(inputCol="Reviewer", outputCol="ReviewerID")
df_items_indexed = indexerUsers.fit(df_items_index_users).transform(df_items_index_users)
df_items_indexed.select("_c0","User_Index").show(truncate=False)

del df_items_index_users

+----------+----------+
|_c1       |Item_Index|
+----------+----------+
|0000000078|109757.0  |
|B007PQTYIG|442985.0  |
|0735623872|145094.0  |
|030740515X|7460.0    |
|B003HAL5ZO|360858.0  |
|0399536957|129965.0  |
|B004HO58UW|41982.0   |
|B003JQLG4Q|362137.0  |
|B0041MUB52|373762.0  |
|B009XZ9Q1C|46018.0   |
|B007ZJ1M9C|447558.0  |
|B003A845OQ|356169.0  |
|0972973052|167851.0  |
|B002P3YRAY|84633.0   |
|0888550081|162628.0  |
|0521697522|136584.0  |
|B0049LUI9O|523.0     |
|B00ATSSQT0|483915.0  |
|B000HDK0DC|8264.0    |
|B00DNUF7KW|5786.0    |
+----------+----------+
only showing top 20 rows



## Cleaning

Double check schema

In [12]:
df_items_indexed.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- Item_Index: double (nullable = false)
 |-- User_Index: double (nullable = false)



Drop "TimeStamp" column since we are not going to use it

In [13]:
df_items_indexed = df_items_indexed.drop("TimeStamp")

## Store data

Display 10 rules to make sure the DataFrame is as we want it

In [None]:
df_items_indexed.show(10, truncate=False)

Save Dataframe do parquet format to use with SparkSQL

In [15]:
output_items = "data.parquet"
df_items_indexed.write.mode("overwrite").parquet(output_items)

In [18]:
df_items_indexed.write.mode("overwrite").saveAsTable("DataTable")

AnalysisException: Can not create the managed table('`ItemsTable`'). The associated location('file:/home/big/Desktop/Aulas/Projeto/spark-warehouse/itemstable') already exists.;