In [205]:
import yaml
from pymongo import MongoClient
import os
from pyspark.sql import SparkSession
import pandas as pd
import glob

In [206]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [207]:
folder_path = "/content/drive/My Drive/02 Scaler Casestudy/E-Commerce ETL pipeline/Data/Raw"
config_file = "/content/drive/My Drive/02 Scaler Casestudy/E-Commerce ETL pipeline/config.yaml"

csv_files = glob.glob(os.path.join(folder_path, "*.csv"))
print("Found CSVs:", csv_files)

Found CSVs: ['/content/drive/My Drive/02 Scaler Casestudy/E-Commerce ETL pipeline/Data/Raw/olist_customers_dataset.csv', '/content/drive/My Drive/02 Scaler Casestudy/E-Commerce ETL pipeline/Data/Raw/olist_geolocation_dataset.csv', '/content/drive/My Drive/02 Scaler Casestudy/E-Commerce ETL pipeline/Data/Raw/olist_order_items_dataset.csv', '/content/drive/My Drive/02 Scaler Casestudy/E-Commerce ETL pipeline/Data/Raw/olist_order_reviews_dataset.csv', '/content/drive/My Drive/02 Scaler Casestudy/E-Commerce ETL pipeline/Data/Raw/olist_orders_dataset.csv', '/content/drive/My Drive/02 Scaler Casestudy/E-Commerce ETL pipeline/Data/Raw/olist_products_dataset.csv', '/content/drive/My Drive/02 Scaler Casestudy/E-Commerce ETL pipeline/Data/Raw/olist_sellers_dataset.csv', '/content/drive/My Drive/02 Scaler Casestudy/E-Commerce ETL pipeline/Data/Raw/product_category_name_translation.csv', '/content/drive/My Drive/02 Scaler Casestudy/E-Commerce ETL pipeline/Data/Raw/olist_order_payments_dataset.csv'

In [208]:
try:
    with open(config_file , "r") as f:
        config = yaml.full_load(f)
        print("Found config file")
except Exception as e:
    print(e)

Found config file


In [209]:
spark = SparkSession.builder.appName("Dataamodelling").getOrCreate()

In [210]:
dfs = {}

for file in csv_files:
    file_name = os.path.basename(file)
    dfs[file_name[6:-4]] = spark.read.csv(file ,inferSchema= True , header=True )

In [211]:
customers_dataset = dfs['customers_dataset'] #SparkSession.cs(config['spark_data_bricks']['olist_customers_dataset'])
geolocation_dataset = dfs['geolocation_dataset'] #pd.read_csv(config['spark_data_bricks']['olist_geolocation_dataset'])
orders_dataset =  dfs['orders_dataset'] #pd.read_csv(config['spark_data_bricks']['olist_orders_dataset'])
order_items_dataset = dfs['order_items_dataset'] #pd.read_csv(config['spark_data_bricks']['olist_order_items_dataset'])
order_payments_dataset = dfs['order_payments_dataset'] # pd.read_csv(config['spark_data_bricks']['olist_order_payments_dataset'])
order_reviews_dataset = dfs['order_reviews_dataset'] #pd.read_csv(config['spark_data_bricks']['olist_order_reviews_dataset'])
products_dataset = dfs['products_dataset'] # pd.read_csv(config['spark_data_bricks']['olist_products_dataset'])
sellers_dataset = dfs['sellers_dataset'] #pd.read_csv(config['spark_data_bricks']['olist_sellers_dataset'])

# **Connect & Read the data form mongo**

In [212]:
hostname = config['momgodb_datababse']['hostname']
database = config['momgodb_datababse']['database']
port = config['momgodb_datababse']['port']
username = config['momgodb_datababse']['username']
password = config['momgodb_datababse']['password']

try:
    uri = f"mongodb://{username}:{password}@{hostname}:{port}/{database}"
    client = MongoClient(uri)
    if client._connect:
        print("Successfully connected to the mongo server.")
except ConnectionError as cerror:
    print(cerror)

# Access database
mydatabase = client[database]

Successfully connected to the mongo server.


In [213]:
for db_name in mydatabase.list_collection_names():
    print(f"Database: {db_name}")


Database: test
Database: ecomerce


In [214]:
collection = mydatabase.ecomerce

In [215]:
mongo_df = pd.DataFrame(collection.find()).iloc[: , 1:]

In [216]:
mongo_df_spark = spark.createDataFrame(mongo_df)

In [217]:
mongo_df.head()

Unnamed: 0,product_category_name,product_category_name_english
0,beleza_saude,health_beauty
1,informatica_acessorios,computers_accessories
2,automotivo,auto
3,cama_mesa_banho,bed_bath_table
4,moveis_decoracao,furniture_decor


In [218]:
collection.find_one()

{'_id': ObjectId('68d7d28d2e6643ee3a407ab9'),
 'product_category_name': 'beleza_saude',
 'product_category_name_english': 'health_beauty'}

In [219]:
mydatabase['ecomerce'].find_one()

{'_id': ObjectId('68d7d28d2e6643ee3a407ab9'),
 'product_category_name': 'beleza_saude',
 'product_category_name_english': 'health_beauty'}

## **Cleanign The Data**

In [220]:
from pyspark.sql.functions import col, to_date, date_diff , current_date

In [221]:
%who

MongoClient	 SparkSession	 clean_data	 clean_dataFrame	 client	 col	 col_name	 col_names	 collection	 
config	 config_file	 count	 csv_files	 current_date	 customers_dataset	 database	 date_diff	 db	 
db_name	 dfs	 drive	 f	 file	 file_name	 final_df	 folder_path	 geolocation_dataset	 
glob	 hostname	 i	 mongo_df	 mongo_df_spark	 month	 months	 mydatabase	 oi	 
op	 order_customer_df	 order_items_dataset	 order_items_df	 order_payment	 order_payments_dataset	 order_reviews_dataset	 orders_dataset	 os	 
password	 pd	 port	 products_dataset	 sellers_dataset	 spark	 to_date	 uri	 username	 
when	 yaml	 


In [222]:
def clean_dataFrame(df, name):
    print("Cleanig" + name)
    return df.dropDuplicates().na.drop("all")

orders_dataset = clean_dataFrame(orders_dataset, "orders")
orders_dataset.show(3)

Cleanigorders
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|acce194856392f074...|7e20bf5ca92da6820...|   delivered|     2018-06-04 00:00:13|2018-06-05 00:35:10|         2018-06-05 13:24:00|          2018-06-16 15:20:55|          2018-07-18 00:00:00|
|1d067305b599c1e0d...|0489975a325480c9e...|   delivered|     2018-02-14 13:05:17|2018-02-14 13:15:38|         2018-02-20 20:12:57|          2018-03-09 21:52:36|          2018-03-09 00:00:00|
|6f841dde94727854e...|a9c953206

In [223]:
orders_dataset.columns


['order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date']

In [224]:
orders_dataset.dtypes

[('order_id', 'string'),
 ('customer_id', 'string'),
 ('order_status', 'string'),
 ('order_purchase_timestamp', 'timestamp'),
 ('order_approved_at', 'timestamp'),
 ('order_delivered_carrier_date', 'timestamp'),
 ('order_delivered_customer_date', 'timestamp'),
 ('order_estimated_delivery_date', 'timestamp')]

In [225]:
# Convert the date
orders_dataset  = orders_dataset.withColumn("order_purchase_timestamp" , to_date(col("order_purchase_timestamp")))

In [226]:
from pyspark.sql.functions import month, col, when

orders_dataset = orders_dataset.withColumn(
    "order_month",
    month(col("order_purchase_timestamp"))
)

orders_dataset.show(5)


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-----------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_month|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-----------+
|acce194856392f074...|7e20bf5ca92da6820...|   delivered|              2018-06-04|2018-06-05 00:35:10|         2018-06-05 13:24:00|          2018-06-16 15:20:55|          2018-07-18 00:00:00|          6|
|1d067305b599c1e0d...|0489975a325480c9e...|   delivered|              2018-02-14|2018-02-14 13:15:38|         2018-02-20 20:12:57|          2018-03-09 21:52:36|          2018-03-09 00:00:0

In [227]:
# Calculate delivery & time delay

orders_dataset = orders_dataset.withColumn("delivery time" , date_diff(col('order_delivered_customer_date') , col('order_purchase_timestamp')))
orders_dataset = orders_dataset.withColumn("Estimated delivery time" , date_diff(col('order_estimated_delivery_date') , col('order_purchase_timestamp')))
orders_dataset = orders_dataset.withColumn("delays", col("delivery time") - col("Estimated delivery time"))


In [228]:
orders_dataset.sort(col("delays")).show(3)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-----------+-------------+-----------------------+------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_month|delivery time|Estimated delivery time|delays|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-----------+-------------+-----------------------+------+
|dfee867c0e46410af...|e06b5eac4efd08334...|  processing|              2017-05-20|2017-05-21 09:31:55|                        NULL|                         NULL|          2017-06-13 00:00:00|          5|         NULL|                     24|  NULL|
|2eb5ba2

# Joining

In [229]:
order_items_dataset.count()

112650

In [230]:
order_customer_df = orders_dataset.join(customers_dataset ,
                                        orders_dataset.customer_id == customers_dataset.customer_id , "left")

print(order_customer_df.count())

99441


In [231]:
order_payment  = order_customer_df.join(
                                order_payments_dataset,
                                order_customer_df.order_id == order_payments_dataset.order_id,
                                "left"
)
order_payment.count()

103887

In [232]:
order_items_df = order_payment.join(
    order_items_dataset,
    "order_id",  # natural join on column name
    "left"
)


In [234]:
final_df = order_items_df.join(
                    sellers_dataset ,
                    order_items_df.seller_id == sellers_dataset.seller_id , 'left'
)

In [235]:

final_df.show(5)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-----------+-------------+-----------------------+------+--------------------+--------------------+------------------------+-------------+--------------+--------------------+------------------+------------+--------------------+-------------+-------------+--------------------+--------------------+-------------------+------+-------------+--------------------+----------------------+-------------+------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_month|delivery time|Estimated delivery time|delays|         customer_id|  customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|            order_id|payment_sequential|payment_type|paym

In [237]:
final_df.columns

['order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date',
 'order_month',
 'delivery time',
 'Estimated delivery time',
 'delays',
 'customer_id',
 'customer_unique_id',
 'customer_zip_code_prefix',
 'customer_city',
 'customer_state',
 'order_id',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value',
 'order_item_id',
 'product_id',
 'seller_id',
 'shipping_limit_date',
 'price',
 'freight_value',
 'seller_id',
 'seller_zip_code_prefix',
 'seller_city',
 'seller_state']