### Amazon Electronics Analysis

In [1]:
import pandas as pd
import numpy as np
import json
import gzip
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
import re



In [2]:
# Create Spark session 
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

sc = spark.sparkContext

23/06/06 18:29:43 WARN Utils: Your hostname, MBP.local resolves to a loopback address: 127.0.0.1; using 192.168.0.29 instead (on interface en0)
23/06/06 18:29:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/06 18:29:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Meta electronics file for price data

View first few rows of JSON:

In [3]:
N = 3
with open("meta_Electronics.json") as f:
    for i in range(0,N):
        print(f.readline(), end = '')

{"category": ["Electronics", "Camera &amp; Photo", "Video Surveillance", "Surveillance Systems", "Surveillance DVR Kits"], "tech1": "", "description": ["The following camera brands and models have been tested for compatibility with GV-Software.\nGeoVision \tACTi \tArecont Vision \tAXIS \tBosch \tCanon\nCNB \tD-Link \tEtroVision \tHikVision \tHUNT \tIQEye\nJVC \tLG \tMOBOTIX \tPanasonic \tPelco \tSamsung\nSanyo \tSony \tUDP \tVerint \tVIVOTEK \t \n \nCompatible Standard and Protocol\nGV-System also allows for integration with all other IP video devices compatible with ONVIF(V2.0), PSIA (V1.1) standards, or RTSP protocol.\nONVIF \tPSIA \tRTSP \t  \t  \t \nNote: Specifications are subject to change without notice. Every effort has been made to ensure that the information on this Web site is accurate. No liability is assumed for incidental or consequential damages arising from the use of the information or products contained herein."], "fit": "", "title": "Genuine Geovision 1 Channel 3rd P

[Generate Schema](https://preetranjan.github.io/pyspark-schema-generator/) and Load into spark dataframe:

In [4]:
schema = StructType([StructField('category',ArrayType(StringType()),True),  
StructField('tech1',StringType(),True),  
StructField('description',ArrayType(StringType()),True),  
StructField('fit',StringType(),True),  
StructField('title',StringType(),True),  
StructField('also_buy',ArrayType(StringType()),True),  
StructField('tech2',StringType(),True),  
StructField('brand',StringType(),True),  
StructField('feature',ArrayType(StringType()),True),  
StructField('rank',ArrayType(StringType()),True),  
StructField('also_view',ArrayType(StringType()),True),  
StructField('main_cat',StringType(),True),  
StructField('similar_item',StringType(),True),  
StructField('date',StringType(),True),  
StructField('price',StringType(),True),  
StructField('asin',StringType(),True),  
StructField('imageURL',ArrayType(StringType()),True),  
StructField('imageURLHighRes',ArrayType(StringType()),True)])

Schema required some editing, added `StringType()` as parameter in some `ArrayType()` fields that had `null` as parameter.

In [5]:
meta_elect_df = spark \
    .read \
    .format("json") \
    .load("meta_Electronics.json", schema = schema)

meta_elect_df.printSchema()

root
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tech1: string (nullable = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fit: string (nullable = true)
 |-- title: string (nullable = true)
 |-- also_buy: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tech2: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- feature: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rank: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- also_view: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- main_cat: string (nullable = true)
 |-- similar_item: string (nullable = true)
 |-- date: string (nullable = true)
 |-- price: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- imageURL: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- imageU

Check partitions:

In [6]:
meta_elect_df.rdd.getNumPartitions()

82

Check size:

In [7]:
print((meta_elect_df.count(), len(meta_elect_df.columns)))



(786445, 18)


                                                                                

Looks like a few hundred rows are missing (minor %). Website lists 786,868 products

Look at data:

In [8]:
meta_elect_df.show()

+--------------------+-----+--------------------+---+--------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+----------+--------------------+--------------------+
|            category|tech1|         description|fit|               title|            also_buy|tech2|               brand|             feature|                rank|           also_view|            main_cat|        similar_item|              date|               price|      asin|            imageURL|     imageURLHighRes|
+--------------------+-----+--------------------+---+--------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+----------+--------------------+--------------------+
|[Electronics, Cam...|     |[The foll

                                                                                

Pick columns we want:

In [9]:
cols_to_use = ['asin', 'brand', 'main_cat', 'price', 'title']
meta_elect_df.select(cols_to_use).show()

+----------+--------------------+--------------------+--------------------+--------------------+
|      asin|               brand|            main_cat|               price|               title|
+----------+--------------------+--------------------+--------------------+--------------------+
|0011300000|           GeoVision|  Camera &amp; Photo|              $65.00|Genuine Geovision...|
|0043396828|        33 Books Co.|  Camera &amp; Photo|                    |Books "Handbook o...|
|0060009810|Visit Amazon's Ca...|               Books|              $11.49|      One Hot Summer|
|0060219602|Visit Amazon's Di...|               Books|.a-section.a-spac...|Hurray for Hattie...|
|0060786817|Visit Amazon's Lo...|               Books|              $13.95|sex.lies.murder.f...|
|0070524076|Visit Amazon's Al...|               Books|                    |     College Physics|
|0091912407|            ABBY LEE|               Books|               $4.76|Girl with a One-t...|
|0101635370|          Crazy Ca

We only want rows with prices:

In [10]:
expr = '\$.*' #Regex for anything that starts with a dollar sign ($)

#Filter with regex
meta_elect_df = meta_elect_df.filter(meta_elect_df.price.rlike(expr)).select(cols_to_use)
meta_elect_df.show()

+----------+--------------------+--------------------+---------+--------------------+
|      asin|               brand|            main_cat|    price|               title|
+----------+--------------------+--------------------+---------+--------------------+
|0011300000|           GeoVision|  Camera &amp; Photo|   $65.00|Genuine Geovision...|
|0060009810|Visit Amazon's Ca...|               Books|   $11.49|      One Hot Summer|
|0060786817|Visit Amazon's Lo...|               Books|   $13.95|sex.lies.murder.f...|
|0091912407|            ABBY LEE|               Books|    $4.76|Girl with a One-t...|
|0132492776|     Enter The Arena|Home Audio & Theater|    $7.99|Wireless Bluetoot...|
|0151004714|Visit Amazon's Cl...|               Books|   $13.81|The Last Life: A ...|
|0151014841|Visit Amazon's An...|               Books|    $5.79|        Lady Lazarus|
|0303532572|TDK Electronics Corp|Home Audio &amp; ...|   $48.99|TDK Hi8 MP120 Pre...|
|0312171048|Visit Amazon's Je...|               Books|

In [11]:
# Check dimensions
print((meta_elect_df.count(), len(meta_elect_df.columns)))



(304323, 5)


                                                                                

Looks like about half of the meta datset has prices.

### Electronics data subset (5-core)

Read first few lines of JSON: 

In [12]:
# Loading entire 4 gig file into 8GB of available ram...not good.....major slow downs.

N = 3
with open("Electronics_5.json") as f:
    for i in range(0,N):
        print(f.readline(), end = '')


{"overall": 5.0, "vote": "67", "verified": true, "reviewTime": "09 18, 1999", "reviewerID": "AAP7PPBU72QFM", "asin": "0151004714", "style": {"Format:": " Hardcover"}, "reviewerName": "D. C. Carrad", "reviewText": "This is the best novel I have read in 2 or 3 years.  It is everything that fiction should be -- beautifully written, engaging, well-plotted and structured.  It has several layers of meanings -- historical, family,  philosophical and more -- and blends them all skillfully and interestingly.  It makes the American grad student/writers' workshop \"my parents were  mean to me and then my professors were mean to me\" trivia look  childish and silly by comparison, as they are.\nAnyone who says this is an  adolescent girl's coming of age story is trivializing it.  Ignore them.  Read this book if you love literature.\nI was particularly impressed with  this young author's grasp of the meaning and texture of the lost world of  French Algeria in the 1950's and '60's...particularly poig

Copy first row of JSON output above and use [Schema Generator](https://preetranjan.github.io/pyspark-schema-generator/) to create schema below:

In [13]:
schema = StructType([StructField('overall',FloatType(),True),  # Changed to FloatType from StringType
    StructField('vote',StringType(),True),  
    StructField('verified',BooleanType(),True),  
    StructField('reviewTime',StringType(),True),  
    StructField('reviewerID',StringType(),True),  
    StructField('asin',StringType(),True),  
    StructField('style',StructType([StructField('Format:',StringType(),True)]),True),  
    StructField('reviewerName',StringType(),True),  
    StructField('reviewText',StringType(),True),  
    StructField('summary',StringType(),True),  
    StructField('unixReviewTime',IntegerType(),True)])

Use schema so spark does not have to infer:

In [14]:
# 5 core electronics data
e5_core_df = spark \
    .read \
    .json("Electronics_5.json", schema = schema)

e5_core_df.printSchema()

root
 |-- overall: float (nullable = true)
 |-- vote: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Format:: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: integer (nullable = true)



Show data:

In [15]:
e5_core_df.show()

+-------+----+--------+-----------+--------------+----------+-----------------+--------------------+--------------------+--------------------+--------------+
|overall|vote|verified| reviewTime|    reviewerID|      asin|            style|        reviewerName|          reviewText|             summary|unixReviewTime|
+-------+----+--------+-----------+--------------+----------+-----------------+--------------------+--------------------+--------------------+--------------+
|    5.0|  67|    true|09 18, 1999| AAP7PPBU72QFM|0151004714|     { Hardcover}|        D. C. Carrad|This is the best ...|      A star is born|     937612800|
|    3.0|   5|    true|10 23, 2013|A2E168DTVGE6SV|0151004714|{ Kindle Edition}|                 Evy|Pages and pages o...|A stream of consc...|    1382486400|
|    5.0|   4|   false| 09 2, 2008|A1ER5AYS3FQ9O3|0151004714|     { Paperback}|               Kcorn|This is the kind ...|I'm a huge fan of...|    1220313600|
|    5.0|  13|   false| 09 4, 2000|A1T17LMQABMBN5|01

Check partitions:

In [16]:
e5_core_df.rdd.getNumPartitions()

32

Check shape:

In [17]:
print((e5_core_df.count(), len(e5_core_df.columns)))



(6739590, 11)


                                                                                

Same number of rows as described on dataset website. Now to pick the columns that we want:

In [18]:
cols_to_use = ['asin', 'overall', 'reviewTime', 'reviewerID', 'reviewText', 'summary']
e5_core_df = e5_core_df.select(cols_to_use)
e5_core_df.show()

+----------+-------+-----------+--------------+--------------------+--------------------+
|      asin|overall| reviewTime|    reviewerID|          reviewText|             summary|
+----------+-------+-----------+--------------+--------------------+--------------------+
|0151004714|    5.0|09 18, 1999| AAP7PPBU72QFM|This is the best ...|      A star is born|
|0151004714|    3.0|10 23, 2013|A2E168DTVGE6SV|Pages and pages o...|A stream of consc...|
|0151004714|    5.0| 09 2, 2008|A1ER5AYS3FQ9O3|This is the kind ...|I'm a huge fan of...|
|0151004714|    5.0| 09 4, 2000|A1T17LMQABMBN5|What gorgeous lan...|The most beautifu...|
|0151004714|    3.0| 02 4, 2000|A3QHJ0FXK33OBE|I was taken in by...|A dissenting view...|
|0380709473|    4.0| 06 5, 2013|A3IYSOTP3HA77N|I read this proba...|Above average mys...|
|0380709473|    5.0|06 27, 2016|A11SXV34PZUQ5E|I read every Perr...|        Lam is cool!|
|0380709473|    5.0|07 30, 2015|A2AUQM1HT2D5T8|I love this serie...|          Five Stars|
|038070947


Next up is to merge price with the data subset on `asin`.

### Merge data (primary key = asin)

Left join to get meta data:

In [19]:
elect_df = e5_core_df.join(meta_elect_df, on = 'asin', how = 'left')
elect_df.show()
# print('5 core dataset: ', (e5_core_df.count(), len(e5_core_df.columns)))
# print('Joined dataset: ', (elect_df.count(), len(elect_df.columns)))



+----------+-------+-----------+--------------+--------------------+--------------------+--------------------+------------------+-------+--------------------+
|      asin|overall| reviewTime|    reviewerID|          reviewText|             summary|               brand|          main_cat|  price|               title|
+----------+-------+-----------+--------------+--------------------+--------------------+--------------------+------------------+-------+--------------------+
|0446697192|    5.0|07 14, 2009|A3LXXYBYUHZWS5|Fresh from Connec...|    A quick fun read|Visit Amazon's Zo...|             Books| $17.99|Hollywood Is like...|
|0446697192|    5.0|07 10, 2009|A1X4L7AO1BXMHK|I don't know abou...|  A Great Beach Read|Visit Amazon's Zo...|             Books| $17.99|Hollywood Is like...|
|0446697192|    3.0| 09 2, 2009|A1Y9RUTH5GG3MU|Obviously the pre...|Quick & fun summe...|Visit Amazon's Zo...|             Books| $17.99|Hollywood Is like...|
|0446697192|    4.0|08 30, 2009| AAR8E3JF9K93P

                                                                                

### Clean dates (reviewTime):

In [20]:
# udf with Regex


# # Second pass to remove spaces - Doesn't appear to be working

# First pass to remove commas and add dashes
elect_df = elect_df \
    .withColumn('reviewTime', regexp_replace(col('reviewTime'),'(\s|, )','-')) \
    .withColumn('reviewTime', trim(col('reviewTime'))) # Second pass to remove spaces - Doesn't appear to be working


elect_df.show()



+----------+-------+----------+--------------+--------------------+--------------------+--------------------+------------------+-------+--------------------+
|      asin|overall|reviewTime|    reviewerID|          reviewText|             summary|               brand|          main_cat|  price|               title|
+----------+-------+----------+--------------+--------------------+--------------------+--------------------+------------------+-------+--------------------+
|0446697192|    5.0|07-14-2009|A3LXXYBYUHZWS5|Fresh from Connec...|    A quick fun read|Visit Amazon's Zo...|             Books| $17.99|Hollywood Is like...|
|0446697192|    5.0|07-10-2009|A1X4L7AO1BXMHK|I don't know abou...|  A Great Beach Read|Visit Amazon's Zo...|             Books| $17.99|Hollywood Is like...|
|0446697192|    3.0| 09-2-2009|A1Y9RUTH5GG3MU|Obviously the pre...|Quick & fun summe...|Visit Amazon's Zo...|             Books| $17.99|Hollywood Is like...|
|0446697192|    4.0|08-30-2009| AAR8E3JF9K93P|I am v

                                                                                

Converting from string to date...

In [21]:
dt_format = "MM-dd-yyyy"
elect_df.withColumn('reviewTime', to_date(elect_df.reviewTime, format = dt_format)).show()


23/06/06 18:31:19 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID 548)]
org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to parse '09-2-2009' in the new parser. You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0, or set to "CORRECTED" and treat it as an invalid datetime string.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failToParseDateTimeInNewParserError(QueryExecutionErrors.scala:1368)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:149)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:142)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.util.Iso8601Times

Py4JJavaError: An error occurred while calling o98.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage 28.0 (TID 548) (192.168.0.29 executor driver): org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to parse '09-2-2009' in the new parser. You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0, or set to "CORRECTED" and treat it as an invalid datetime string.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failToParseDateTimeInNewParserError(QueryExecutionErrors.scala:1368)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:149)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:142)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:176)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.time.format.DateTimeParseException: Text '09-2-2009' could not be parsed at index 3
	at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
	at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:168)
	... 18 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:382)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to parse '09-2-2009' in the new parser. You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0, or set to "CORRECTED" and treat it as an invalid datetime string.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failToParseDateTimeInNewParserError(QueryExecutionErrors.scala:1368)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:149)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:142)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:176)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.time.format.DateTimeParseException: Text '09-2-2009' could not be parsed at index 3
	at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
	at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:168)
	... 18 more


Spark does not infer the date format resulting in nulls. Using the format variable `dt_format` results in an error. I suspect the reviewTime column is not perfectly clean, tried stripping whitespaces...Is there a way to return nulls for improper conversions? Need dates for customer lifetime value.

### A bit of exploration

How many unique items are there?

In [22]:
elect_df.select('asin').distinct().count()

                                                                                

160052

160K unique items...

How many unique customers?

In [23]:
elect_df.select('reviewerID').distinct().count()

                                                                                

728719

725K unique customers...

In [24]:
print('Columns overview')
pd.DataFrame(elect_df.dtypes, columns = ['Column Name','Data type'])

Columns overview


Unnamed: 0,Column Name,Data type
0,asin,string
1,overall,float
2,reviewTime,string
3,reviewerID,string
4,reviewText,string
5,summary,string
6,brand,string
7,main_cat,string
8,price,string
9,title,string


What are the frequency of item by category? (Customer Segment?)

In [25]:
elect_df.groupBy('main_cat').count().orderBy('count', ascending = False).toPandas()

23/06/06 18:32:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/06/06 18:32:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/06/06 18:32:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/06/06 18:32:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/06/06 18:32:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/06/06 18:32:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/06/06 18:32:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/06/06 18:32:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                

Unnamed: 0,main_cat,count
0,,2389218
1,Computers,1502536
2,All Electronics,824547
3,Home Audio & Theater,687968
4,Camera & Photo,675708
5,Cell Phones & Accessories,339949
6,Car Electronics,95838
7,Camera &amp; Photo,72954
8,Home Audio &amp; Theater,70595
9,Tools & Home Improvement,52912
