# Phase 1: Data Loading & Initial Inspection

##  Objectives

In this notebook, we will:

1. Load all 8 CSV files into Spark DataFrames
2. Validate schemas and row counts
3. Inspect data structure and types
4. Understand table relationships (foreign keys)
5. Perform initial data quality checks

##  Expected Datasets

| Dataset | Rows | Columns | Purpose |
|---------|------|---------|---------|
| customers | 99,441 | 5 | Customer demographics |
| orders | 99,441 | 8 | Order lifecycle tracking |
| order_items | 112,650 | 7 | Products in each order |
| products | 32,951 | 9 | Product catalog |
| order_reviews | 99,224 | 7 | Customer satisfaction |
| order_payments | 103,886 | 5 | Payment information |
| sellers | 3,095 | 4 | Seller information |
| geolocation | 1,000,163 | 5 | Zip code coordinates |

##  Key Concepts

**Why Explicit Schemas?**
- Faster loading (no type inference)
- Type safety (catch errors early)
- Production-ready (explicit contracts)

**Why Validate Relationships?**
- Ensure data integrity
- Understand join keys
- Catch orphaned records

In [1]:
#setup and Import

import sys
sys.path.append("..")


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count , when, isnan, isnull

#our custom modules
from config.spark_config import get_spark_session
from src.data_src.load_data import(
    load_all_datasets,
    display_dataframe_info,
    get_table_relationships,
    validate_relationships,
    get_data_summary
)


import pandas as pd




In [2]:
#start spark session

spark=get_spark_session()

print("SPARK SESSION DETAILS")

print(f"spark Version: {spark.version}")
print(f"Master: {spark.sparkContext.master}")
print(f"App Name: {spark.sparkContext.appName}")
print(f"Parallelism: {spark.sparkContext.defaultParallelism}")


26/02/11 22:42:22 WARN Utils: Your hostname, pc-365M resolves to a loopback address: 127.0.1.1; using 192.168.220.3 instead (on interface enp7s0)
26/02/11 22:42:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/11 22:42:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


✅ Spark Session Created: 3.5.0
✅ Spark UI available at: http://localhost:4040
✅ Master: local[*]
✅ Cores: 8
SPARK SESSION DETAILS
spark Version: 3.5.0
Master: local[*]
App Name: EcommerceIntelligence
Parallelism: 8


In [3]:
#Load all Datasets
#Loads all 8 CSV files into Spark DataFrames using our custom module.

# Load all datasets
# This returns a dictionary: {'customers': DataFrame, 'orders': DataFrame, ...}
print("Starting data load...\n")

datasets = load_all_datasets(spark, validate=True)

# Extract individual DataFrames for convenience
customers = datasets['customers']
orders = datasets['orders']
order_items = datasets['order_items']
products = datasets['products']
order_reviews = datasets['order_reviews']
order_payments = datasets['order_payments']
sellers = datasets['sellers']
geolocation = datasets['geolocation']

print("\n All datasets loaded and assigned to variables")


Starting data load...

LOADING ALL DATASETS
 LOADED customers: 99,441 rows, 5 columns
 LOADED orders: 99,441 rows, 8 columns
 LOADED order_items: 112,650 rows, 7 columns
 LOADED products: 32,951 rows, 9 columns
 LOADED order_reviews: 104,162 rows, 7 columns
 LOADED order_payments: 103,886 rows, 5 columns
 LOADED sellers: 3,095 rows, 4 columns
 LOADED geolocation: 1,000,163 rows, 5 columns

 Successfully loaded 8 datasets!

 All datasets loaded and assigned to variables


In [4]:
#Overall  Data Summary

get_data_summary(datasets)


DATA SUMMARY

Dataset                         Rows    Columns    Size (MB)
------------------------------------------------------------
customers                     99,441          5         3.79
orders                        99,441          8         6.07
order_items                  112,650          7         6.02
products                      32,951          9         2.26
order_reviews                104,162          7         5.56
order_payments               103,886          5         3.96
sellers                        3,095          4         0.09
geolocation                1,000,163          5        38.15
------------------------------------------------------------
TOTAL                      1,555,789



## Deep Dive - Customers Dataset

Detailed inspection of the customers table.

WHAT WE'RE CHECKING:
--------------------
1. Schema (data types correct?)
2. Sample data (looks reasonable?)
3. Null counts (missing data?)

BUSINESS UNDERSTANDING:
-----------------------
- customer_id: Unique per ORDER (same customer, different orders = different IDs)
- customer_unique_id: Same across ALL orders (tracks repeat customers!)
- Geographic fields: For location-based analysis


In [5]:
display_dataframe_info(customers, 'customers', sample_rows=5)

# Additional exploration
print("\n ADDITIONAL EXPLORATION:")
print("\nUnique customers (by customer_unique_id):")
unique_customers = customers.select('customer_unique_id').distinct().count()
total_orders = customers.count()
print(f"  Total order records: {total_orders:,}")
print(f"  Unique customers: {unique_customers:,}")
print(f"  Average orders per customer: {total_orders/unique_customers:.2f}")

print("\n Top 10 states by customer count:")


DATASET: CUSTOMERS

 Basic Info:
 Rows:99,441
 Columns:5

 Schema:
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)


 Sample Data (first 5 rows): 
+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                   09790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                   01151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259d

In [6]:
print("\n Top 10 states by customer count:")
customers.groupBy('customer_state') \
    .count() \
    .orderBy('count', ascending=False) \
    .show(10)


 Top 10 states by customer count:
+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SP|41746|
|            RJ|12852|
|            MG|11635|
|            RS| 5466|
|            PR| 5045|
|            SC| 3637|
|            BA| 3380|
|            DF| 2140|
|            ES| 2033|
|            GO| 2020|
+--------------+-----+
only showing top 10 rows



## Deep Dive - Orders Dataset


Orders table is CRITICAL for churn prediction.
We need to understand order lifecycle and timestamps.

KEY INSIGHTS TO GAIN:
---------------------
1. What order statuses exist?
2. How many orders are 'delivered' (our target for ML)?
3. Are there missing timestamps?
4. What's the time range of data?

BUSINESS QUESTIONS:
-------------------
- What % of orders are successfully delivered?
- How long between purchase and delivery?
- Are there canceled orders (why?)?

In [9]:
display_dataframe_info(orders,'orders',sample_rows=5)







DATASET: ORDERS

 Basic Info:
 Rows:99,441
 Columns:8

 Schema:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)


 Sample Data (first 5 rows): 
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------

In [8]:
print("\n ORDER STATUS DISTRIBUTION:")
orders.groupBy('order_status') \
.count() \
.orderBy('count', ascending=False) \
.show()


 ORDER STATUS DISTRIBUTION:
+------------+-----+
|order_status|count|
+------------+-----+
|   delivered|96478|
|     shipped| 1107|
|    canceled|  625|
| unavailable|  609|
|    invoiced|  314|
|  processing|  301|
|     created|    5|
|    approved|    2|
+------------+-----+



In [10]:
print("\n DATE RANGE:")
from pyspark.sql.functions import min, max

date_range = orders.select(
    min('order_purchase_timestamp').alias('First Order'),
    max('order_purchase_timestamp').alias('Last Order')
).collect()[0]

print(f"  First order: {date_range['First Order']}")
print(f"  Last order: {date_range['Last Order']}")


 DATE RANGE:
  First order: 2016-09-04 21:15:19
  Last order: 2018-10-17 17:30:18


In [13]:
print("Timestamp Completeness")

timestamp_cols=[
    'order_purchase_timestamp',
    'order_approved_at',
    'order_delivered_carrier_date',
    'order_delivered_customer_date'
]

for col_name in timestamp_cols:
    null_count=orders.filter(col(col_name).isNull()).count()
    null_pct=(null_count/orders.count())*100
    print(f"{col_name}:{null_pct:.1f}% null")

Timestamp Completeness
order_purchase_timestamp:0.0% null
order_approved_at:0.2% null
order_delivered_carrier_date:1.8% null
order_delivered_customer_date:3.0% null


## Deep Dive - Order Items



KEY INSIGHTS:
-------------
1. How many items per order on average?
2. Price distribution
3. Freight (shipping) costs

In [14]:
display_dataframe_info(order_items, 'order_items', sample_rows=5)

DATASET: ORDER_ITEMS

 Basic Info:
 Rows:112,650
 Columns:7

 Schema:
root
 |-- order_id: string (nullable = true)
 |-- order_item_di: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)


 Sample Data (first 5 rows): 
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_di|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        

26/02/11 22:58:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: order_id, order_item_id, product_id, seller_id, shipping_limit_date, price, freight_value
 Schema: order_id, order_item_di, product_id, seller_id, shipping_limit_date, price, freight_value
Expected: order_item_di but found: order_item_id
CSV file: file:///media/dk/Data/Projects/ecommerce-intelligence-spark/data/raw/olist_order_items_dataset.csv
26/02/11 22:58:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: order_id, order_item_id, product_id, seller_id, shipping_limit_date, price, freight_value
 Schema: order_id, order_item_di, product_id, seller_id, shipping_limit_date, price, freight_value
Expected: order_item_di but found: order_item_id
CSV file: file:///media/dk/Data/Projects/ecommerce-intelligence-spark/data/raw/olist_order_items_dataset.csv


+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_di|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
|0       |0            |0         |0        |0                  |0    |0            |
+--------+-------------+----------+---------+-------------------+-----+-------------+



In [15]:
order_items.select('price', 'freight_value').summary().show()

+-------+------------------+------------------+
|summary|             price|     freight_value|
+-------+------------------+------------------+
|  count|            112650|            112650|
|   mean|120.65373901464174|19.990319928982977|
| stddev| 183.6339280502595|15.806405412297098|
|    min|              0.85|               0.0|
|    25%|              39.9|             13.08|
|    50%|             74.99|             16.26|
|    75%|             134.9|             21.15|
|    max|            6735.0|            409.68|
+-------+------------------+------------------+



In [16]:
items_per_order=order_items.groupBy('order_id').count()
items_per_order.select('count').summary('mean', 'min', 'max').show()

+-------+------------------+
|summary|             count|
+-------+------------------+
|   mean|1.1417306873695092|
|    min|                 1|
|    max|                21|
+-------+------------------+



## Deep Dive - Products


Product catalog - critical for recommendations.

KEY CHECKS:
-----------
1. How many product categories?
2. Missing product dimensions?
3. Category distribution

In [17]:
display_dataframe_info(products,'products',sample_rows=5)

DATASET: PRODUCTS

 Basic Info:
 Rows:32,951
 Columns:9

 Schema:
root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_length: integer (nullable = true)
 |-- product_description_length: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)


 Sample Data (first 5 rows): 
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_length|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+----------------

26/02/11 23:14:39 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: product_id, product_category_name, product_name_lenght, product_description_lenght, product_photos_qty, product_weight_g, product_length_cm, product_height_cm, product_width_cm
 Schema: product_id, product_category_name, product_name_length, product_description_length, product_photos_qty, product_weight_g, product_length_cm, product_height_cm, product_width_cm
Expected: product_name_length but found: product_name_lenght
CSV file: file:///media/dk/Data/Projects/ecommerce-intelligence-spark/data/raw/olist_products_dataset.csv
26/02/11 23:14:39 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: product_id, product_category_name, product_name_lenght, product_description_lenght, product_photos_qty, product_weight_g, product_length_cm, product_height_cm, product_width_cm
 Schema: product_id, product_category_name, product_name_length, product_description_length, product_photos

+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_name|product_name_length|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|0         |610                  |610                |610                       |610               |2               |2                |2                |2               |
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+



In [18]:
print(f"total unique categories : {products.select('product_category_name').distinct().count()}")

total unique categories : 74


In [19]:
print("Top 10 Categories by product count:")
products.groupBy('product_category_name') \
.count() \
.orderBy('count', ascending=False) \
.show(10, truncate=False)

Top 10 Categories by product count:
+----------------------+-----+
|product_category_name |count|
+----------------------+-----+
|cama_mesa_banho       |3029 |
|esporte_lazer         |2867 |
|moveis_decoracao      |2657 |
|beleza_saude          |2444 |
|utilidades_domesticas |2335 |
|automotivo            |1900 |
|informatica_acessorios|1639 |
|brinquedos            |1411 |
|relogios_presentes    |1329 |
|telefonia             |1134 |
+----------------------+-----+
only showing top 10 rows



## Deep Dive - Reviews


Reviews = customer satisfaction indicator.
Critical for churn prediction.

KEY QUESTIONS:
--------------
1. Review score distribution (1-5 stars)
2. How many customers leave comments?
3. Average rating

In [21]:
print(" REVIEW SCORE DISTRIBUTION:")
order_reviews.groupBy('review_score') \
    .count() \
    .orderBy('review_score') \
    .show()

 REVIEW SCORE DISTRIBUTION:


26/02/11 23:25:53 ERROR Executor: Exception in task 0.0 in stage 175.0 (TID 266)
org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [null].
Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. 
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1610)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:79)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:456)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon

Py4JJavaError: An error occurred while calling o576.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 175.0 failed 1 times, most recent failure: Lost task 1.0 in stage 175.0 (TID 267) (192.168.220.3 executor driver): org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [null].
Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. 
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1610)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:79)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:456)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "2018-06-24 12:05:59"
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:365)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:307)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:452)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 25 more
Caused by: java.lang.NumberFormatException: For input string: "2018-06-24 12:05:59"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at scala.collection.immutable.StringLike.toInt(StringLike.scala:310)
	at scala.collection.immutable.StringLike.toInt$(StringLike.scala:310)
	at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6(UnivocityParser.scala:189)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6$adapted(UnivocityParser.scala:189)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:291)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$5(UnivocityParser.scala:189)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:346)
	... 28 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [null].
Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. 
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1610)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:79)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:456)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "2018-06-24 12:05:59"
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:365)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:307)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:452)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 25 more
Caused by: java.lang.NumberFormatException: For input string: "2018-06-24 12:05:59"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at scala.collection.immutable.StringLike.toInt(StringLike.scala:310)
	at scala.collection.immutable.StringLike.toInt$(StringLike.scala:310)
	at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6(UnivocityParser.scala:189)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6$adapted(UnivocityParser.scala:189)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:291)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$5(UnivocityParser.scala:189)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:346)
	... 28 more


In [22]:
validate_relationships(datasets)


VALIDATING TABLE RELATIONSHIPS

 ORDERS:
  good customer_id → customers.customer_id
     Unique FK values: 99,441
     Unique PK values: 99,441
     All FKs have matches (referential integrity OK)

 ORDER_ITEMS:
  good order_id → orders.order_id
     Unique FK values: 98,666
     Unique PK values: 99,441
     All FKs have matches (referential integrity OK)
  good product_id → products.product_id
     Unique FK values: 32,951
     Unique PK values: 32,951
     All FKs have matches (referential integrity OK)
  good seller_id → sellers.seller_id
     Unique FK values: 3,095
     Unique PK values: 3,095
     All FKs have matches (referential integrity OK)

 ORDER_REVIEWS:
  bad order_id → orders.order_id
     Unique FK values: 99,743
     Unique PK values: 99,441
       Orphaned records: 4,938
        (FK values with no match in orders)

 ORDER_PAYMENTS:
  good order_id → orders.order_id
     Unique FK values: 99,440
     Unique PK values: 99,441
     All FKs have matches (referential int