In [17]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz
!tar xf spark-3.4.3-bin-hadoop3.tgz
!pip install -q findspark boto3 pyspark


In [19]:
import pandas as pd
import boto3
import os
from io import BytesIO
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

#  Spark configuration and context
spark_con = SparkSession.builder.appName("Olist-ETL").getOrCreate()
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark_con)
spark_con.conf.set("spark.sql.execution.arrow.enabled", "true")



In [20]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.3-bin-hadoop3"


In [22]:

# Personal AWS credentials
AWS_ACCESS_KEY_ID = 'XXXX'
AWS_SECRET_ACCESS_KEY = 'YYYY+Zmcu/b/ZZZZ/e6mByRkw1x'
AWS_REGION = 'us-east-1'

# S3 resource setup
s3_files = boto3.resource(
    service_name='s3',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=AWS_REGION
)
print("Objects in S3 bucket:")

bucket_nm = 'ecommerce-company-olist'

# List objects in bucket
bucket = s3_files.Bucket(bucket_nm)

for obj in bucket.objects.all():
    print(obj.key)

file_key = 'new/'


Objects in S3 bucket:
new/
new/olist_customers_dataset.csv
new/olist_geolocation_dataset.csv
new/olist_order_items_dataset.csv
new/olist_order_payments_dataset.csv
new/olist_order_reviews_dataset.csv
new/olist_orders_dataset.csv
new/olist_products_dataset.csv
new/olist_sellers_dataset.csv
new/product_category_name_translation.csv


In [23]:
obj = s3_files.Bucket('ecommerce-company-olist').Object('new/olist_order_items_dataset.csv').get()
items_csv = pd.read_csv(obj[ 'Body'])

In [24]:
items_csv.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 112650 entries, 0 to 112649
Data columns (total 7 columns):
 #   Column               Non-Null Count   Dtype  
---  ------               --------------   -----  
 0   order_id             112650 non-null  object 
 1   order_item_id        112650 non-null  int64  
 2   product_id           112650 non-null  object 
 3   seller_id            112650 non-null  object 
 4   shipping_limit_date  112650 non-null  object 
 5   price                112650 non-null  float64
 6   freight_value        112650 non-null  float64
dtypes: float64(2), int64(1), object(4)
memory usage: 6.0+ MB


In [25]:
obj = s3_files.Bucket('ecommerce-company-olist').Object('new/olist_orders_dataset.csv').get()
csv_orders = pd.read_csv(obj[ 'Body'])
csv_orders.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 99441 entries, 0 to 99440
Data columns (total 8 columns):
 #   Column                         Non-Null Count  Dtype 
---  ------                         --------------  ----- 
 0   order_id                       99441 non-null  object
 1   customer_id                    99441 non-null  object
 2   order_status                   99441 non-null  object
 3   order_purchase_timestamp       99441 non-null  object
 4   order_approved_at              99281 non-null  object
 5   order_delivered_carrier_date   97658 non-null  object
 6   order_delivered_customer_date  96476 non-null  object
 7   order_estimated_delivery_date  99441 non-null  object
dtypes: object(8)
memory usage: 6.1+ MB


In [26]:
obj = s3_files.Bucket('ecommerce-company-olist').Object('new/olist_products_dataset.csv').get()
df_products = pd.read_csv(obj['Body'])
df_products.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 32951 entries, 0 to 32950
Data columns (total 9 columns):
 #   Column                      Non-Null Count  Dtype  
---  ------                      --------------  -----  
 0   product_id                  32951 non-null  object 
 1   product_category_name       32341 non-null  object 
 2   product_name_lenght         32341 non-null  float64
 3   product_description_lenght  32341 non-null  float64
 4   product_photos_qty          32341 non-null  float64
 5   product_weight_g            32949 non-null  float64
 6   product_length_cm           32949 non-null  float64
 7   product_height_cm           32949 non-null  float64
 8   product_width_cm            32949 non-null  float64
dtypes: float64(7), object(2)
memory usage: 2.3+ MB


In [27]:
# Merge the seller id/shipping limit, order information, and product information
delay_df = items_csv.merge(csv_orders, on='order_id').merge(df_products[['product_id', 'product_category_name']],
                                                    on='product_id')

In [28]:
delay_df.head()

Unnamed: 0,order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,product_category_name
0,00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19 09:45:35,58.9,13.29,3ce436f183e68e07877b285a838db11a,delivered,2017-09-13 08:59:02,2017-09-13 09:45:35,2017-09-19 18:34:16,2017-09-20 23:43:48,2017-09-29 00:00:00,cool_stuff
1,130898c0987d1801452a8ed92a670612,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-07-05 02:44:11,55.9,17.96,e6eecc5a77de221464d1c4eaff0a9b64,delivered,2017-06-28 11:52:20,2017-06-29 02:44:11,2017-07-05 12:00:33,2017-07-13 20:39:29,2017-07-26 00:00:00,cool_stuff
2,532ed5e14e24ae1f0d735b91524b98b9,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2018-05-23 10:56:25,64.9,18.33,4ef55bf80f711b372afebcb7c715344a,delivered,2018-05-18 10:25:53,2018-05-18 12:31:43,2018-05-23 14:05:00,2018-06-04 18:34:26,2018-06-07 00:00:00,cool_stuff
3,6f8c31653edb8c83e1a739408b5ff750,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-08-07 18:55:08,58.9,16.17,30407a72ad8b3f4df4d15369126b20c9,delivered,2017-08-01 18:38:42,2017-08-01 18:55:08,2017-08-02 19:07:36,2017-08-09 21:26:33,2017-08-25 00:00:00,cool_stuff
4,7d19f4ef4d04461989632411b7e588b9,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-08-16 22:05:11,58.9,13.29,91a792fef70ecd8cc69d3c7feb3d12da,delivered,2017-08-10 21:48:40,2017-08-10 22:05:11,2017-08-11 19:43:07,2017-08-24 20:04:21,2017-09-01 00:00:00,cool_stuff


In [29]:
# Orders having shipping deadline to the carrier was missed
delay_df[delay_df['shipping_limit_date'] < delay_df['order_delivered_carrier_date']].head()


Unnamed: 0,order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,product_category_name
0,00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19 09:45:35,58.9,13.29,3ce436f183e68e07877b285a838db11a,delivered,2017-09-13 08:59:02,2017-09-13 09:45:35,2017-09-19 18:34:16,2017-09-20 23:43:48,2017-09-29 00:00:00,cool_stuff
1,130898c0987d1801452a8ed92a670612,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-07-05 02:44:11,55.9,17.96,e6eecc5a77de221464d1c4eaff0a9b64,delivered,2017-06-28 11:52:20,2017-06-29 02:44:11,2017-07-05 12:00:33,2017-07-13 20:39:29,2017-07-26 00:00:00,cool_stuff
2,532ed5e14e24ae1f0d735b91524b98b9,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2018-05-23 10:56:25,64.9,18.33,4ef55bf80f711b372afebcb7c715344a,delivered,2018-05-18 10:25:53,2018-05-18 12:31:43,2018-05-23 14:05:00,2018-06-04 18:34:26,2018-06-07 00:00:00,cool_stuff
9,00018f77f2f0320c557190d7a144bdd3,1,e5f2d52b802189ee658865ca93d83a8f,dd7ddc04e1b6c2c614352b383efe2d36,2017-05-03 11:05:13,239.9,19.93,f6dd3ec061db4e3987629fe6b26e5cce,delivered,2017-04-26 10:53:06,2017-04-26 11:05:13,2017-05-04 14:35:00,2017-05-12 16:04:24,2017-05-15 00:00:00,pet_shop
15,00042b26cf59d7ce69dfabb4e55b4fd9,1,ac6c3623068f30de03045865e4e10089,df560393f3a51e74553ab94004ba5c87,2017-02-13 13:57:51,199.9,18.14,58dbd0b2d70206bf40e62cd34e84d795,delivered,2017-02-04 13:57:51,2017-02-04 14:10:13,2017-02-16 09:46:09,2017-03-01 16:42:31,2017-03-17 00:00:00,ferramentas_jardim


In [30]:
delay_df.groupby('order_status').count()

Unnamed: 0_level_0,order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value,customer_id,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,product_category_name
order_status,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
approved,3,3,3,3,3,3,3,3,3,3,0,0,3,3
canceled,542,542,542,542,542,542,542,542,542,542,76,7,542,528
delivered,110197,110197,110197,110197,110197,110197,110197,110197,110197,110182,110195,110189,110197,108660
invoiced,359,359,359,359,359,359,359,359,359,359,0,0,359,347
processing,357,357,357,357,357,357,357,357,357,357,0,0,357,344
shipped,1185,1185,1185,1185,1185,1185,1185,1185,1185,1185,1185,0,1185,1158
unavailable,7,7,7,7,7,7,7,7,7,7,0,0,7,7


In [50]:
hadoop_conf = spark_con._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY_ID )
hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")


In [51]:
dff_items = spark_con.read.csv("s3a://ecommerce-company-olist/new/olist_order_items_dataset.csv",header=True,inferSchema=True)
dff_orders = spark_con.read.csv("s3a://ecommerce-company-olist/new/olist_orders_dataset.csv",header=True,inferSchema=True)
dff_products = spark_con.read.csv("s3a://ecommerce-company-olist/new/olist_products_dataset.csv",header=True,inferSchema=True)


In [35]:
dff_items.head()

Row(order_id='00010242fe8c5a6d1ba2dd792cb16214', order_item_id=1, product_id='4244733e06e7ecb4970a6e2683c13e61', seller_id='48436dade18ac8b2bce089ec2a041202', shipping_limit_date=datetime.datetime(2017, 9, 19, 9, 45, 35), price=58.9, freight_value=13.29)

In [36]:
# Create SQL Tables from dfs
dff_items.createOrReplaceTempView('items')
dff_orders.createOrReplaceTempView('orders')
dff_products.createOrReplaceTempView('products')


In [37]:
spark_con.sql('SELECT * FROM items').columns

['order_id',
 'order_item_id',
 'product_id',
 'seller_id',
 'shipping_limit_date',
 'price',
 'freight_value']

In [38]:
spark_con.sql('SELECT * FROM orders').columns

['order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date']

In [39]:
spark_con.sql('SELECT * FROM products').columns

['product_id',
 'product_category_name',
 'product_name_lenght',
 'product_description_lenght',
 'product_photos_qty',
 'product_weight_g',
 'product_length_cm',
 'product_height_cm',
 'product_width_cm']

In [40]:
delayed_orders = spark_con.sql("""
SELECT
    i.order_id,
    i.seller_id,
    i.shipping_limit_date,
    i.price,
    i.freight_value,
    p.product_id,
    p.product_category_name,
    o.customer_id,
    o.order_status,
    o.order_purchase_timestamp,
    o.order_delivered_carrier_date,
    o.order_delivered_customer_date,
    o.order_estimated_delivery_date
FROM
    items AS i
JOIN
    orders AS o
ON
    i.order_id = o.order_id
JOIN
    products AS p
ON
    i.product_id = p.product_id
WHERE
    i.shipping_limit_date < o.order_delivered_carrier_date
""")


In [41]:
delayed_orders.show()

+--------------------+--------------------+-------------------+------+-------------+--------------------+---------------------+--------------------+------------+------------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|           seller_id|shipping_limit_date| price|freight_value|          product_id|product_category_name|         customer_id|order_status|order_purchase_timestamp|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+-------------------+------+-------------+--------------------+---------------------+--------------------+------------+------------------------+----------------------------+-----------------------------+-----------------------------+
|00018f77f2f0320c5...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|e5f2d52b802189ee6...|             pet_shop|f6dd3ec061db4e398...|   delivered|     2017-0

In [53]:
# Export Delayed orders table as csv
delayed_orders.coalesce(1).write.option("header", "true").csv("Order_exceeding_shipping_limit")
