# Brazilian E-Commerce: Olist

## Imports

In [18]:
import re
import json
from pathlib import Path

import pyspark.sql.functions as F
from pyspark.sql import SparkSession, DataFrame

### Create SparkSession

In [19]:
spark = SparkSession.builder.config("spark.sql.repl.eagerEval.enabled", True).getOrCreate()

25/07/05 19:45:39 WARN Utils: Your hostname, TestBoi resolves to a loopback address: 127.0.1.1; using 192.168.1.79 instead (on interface eno1)
25/07/05 19:45:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/05 19:45:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Utility functions

In [20]:
def schema_to_dict(schema) -> None:
    print(json.dumps(json.loads(schema), indent=4, default=str))

## Entity Analysis

In [21]:
def normalize_entity(entity: str) -> str:
    if entity.endswith("s"):
        return entity[:-1]
    else:
        return entity

def generate_file_mapping(input_path: Path) -> dict[str, Path]:
    csv_files: tuple[Path] = tuple(input_path.glob("olist*"))
    pattern = re.compile(r"(?<=olist_).\w+(?=_dataset)")

    # Generate mapping between table name and its system filepath
    file_mapping: dict[str, Path] = {
        normalize_entity(next(pattern.finditer(csv_file.stem)).group()): csv_file
        for csv_file in csv_files
    }

    return file_mapping

In [22]:
input_dir = Path(r"/home/elianther/Documents/Projects/Remix/interview/code_challenge/data/input")

In [23]:
table_mapping = generate_file_mapping(input_dir)

In [7]:
print(table_mapping)

{'order': PosixPath('/home/elianther/Documents/Projects/Remix/interview/code_challenge/data/input/olist_orders_dataset.csv'), 'product': PosixPath('/home/elianther/Documents/Projects/Remix/interview/code_challenge/data/input/olist_products_dataset.csv'), 'seller': PosixPath('/home/elianther/Documents/Projects/Remix/interview/code_challenge/data/input/olist_sellers_dataset.csv'), 'customer': PosixPath('/home/elianther/Documents/Projects/Remix/interview/code_challenge/data/input/olist_customers_dataset.csv'), 'order_payment': PosixPath('/home/elianther/Documents/Projects/Remix/interview/code_challenge/data/input/olist_order_payments_dataset.csv'), 'geolocation': PosixPath('/home/elianther/Documents/Projects/Remix/interview/code_challenge/data/input/olist_geolocation_dataset.csv'), 'order_review': PosixPath('/home/elianther/Documents/Projects/Remix/interview/code_challenge/data/input/olist_order_reviews_dataset.csv'), 'order_item': PosixPath('/home/elianther/Documents/Projects/Remix/inter

In [2]:
code_path = Path(r"/home/elianther/Documents/Projects/Remix/interview/code_challenge")

In [15]:
table_relative_path = {table: table_mapping['order'].relative_to(code_path) for table, filepath in table_mapping.items()}
table_relative_path = dict(sorted(table_relative_path.items()))
print(json.dumps(table_relative_path, indent=4, default=str))

{
    "customer": "data/input/olist_orders_dataset.csv",
    "geolocation": "data/input/olist_orders_dataset.csv",
    "order": "data/input/olist_orders_dataset.csv",
    "order_item": "data/input/olist_orders_dataset.csv",
    "order_payment": "data/input/olist_orders_dataset.csv",
    "order_review": "data/input/olist_orders_dataset.csv",
    "product": "data/input/olist_orders_dataset.csv",
    "seller": "data/input/olist_orders_dataset.csv"
}


In [17]:
print(" ".join(f"{table}.json" for table in table_relative_path.keys()))

customer.json geolocation.json order.json order_item.json order_payment.json order_review.json product.json seller.json


### Orders Dataset

**Assumptions:**
1. `order_id` PK
2. `customer_id` FK {unique}

In [49]:
order_df: DataFrame = (
    spark.read
    .format("csv")
    .options(header=True,
             delimiter=",")
    .load(table_mapping['order'].as_posix())
)

In [48]:
order_df.summary()

The history saving thread hit an unexpected error (OperationalError('attempt to write a readonly database')).History will not be written to the database.


                                                                                

summary,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
count,99441,99441,99441,99441,99281,97658,96476,99441
mean,,,,,,,,
stddev,,,,,,,,
min,00010242fe8c5a6d1...,00012a2ce6f8dcda2...,approved,2016-09-04 21:15:19,2016-09-15 12:16:38,2016-10-08 10:34:01,2016-10-11 13:46:32,2016-09-30 00:00:00
25%,,,,,,,,
50%,,,,,,,,
75%,,,,,,,,
max,fffe41c64501cc87c...,ffffe8b65bbe3087b...,unavailable,2018-10-17 17:30:18,2018-09-03 17:40:06,2018-09-11 19:48:28,2018-10-17 13:22:46,2018-11-12 00:00:00


In [47]:
schema_to_dict(order_df.schema.json())

{
    "fields": [
        {
            "metadata": {},
            "name": "order_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "customer_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "order_status",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "order_purchase_timestamp",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "order_approved_at",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "order_delivered_carrier_date",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "order_delivered_customer_date",

In [52]:
order_df.select('order_status').distinct()

order_status
shipped
canceled
invoiced
created
delivered
unavailable
processing
approved


### Order Payments

**Assumptions:**
1. `order_id` PK
2. The field `payment_sequential` contains information about various payment types

In [53]:
order_payment_df: DataFrame = (
    spark.read
    .format("csv")
    .options(header=True,
             delimiter=",")
    .load(table_mapping['order_payment'].as_posix())
)

In [56]:
order_payment_df

order_id,payment_sequential,payment_type,payment_installments,payment_value
b81ef226f3fe1789b...,1,credit_card,8,99.33
a9810da82917af2d9...,1,credit_card,1,24.39
25e8ea4e93396b6fa...,1,credit_card,1,65.71
ba78997921bbcdc13...,1,credit_card,8,107.78
42fdf880ba16b47b5...,1,credit_card,2,128.45
298fcdf1f73eb413e...,1,credit_card,2,96.12
771ee386b001f0620...,1,credit_card,1,81.16
3d7239c394a212faa...,1,credit_card,3,51.84
1f78449c87a54faf9...,1,credit_card,6,341.09
0573b5e23cbd79800...,1,boleto,1,51.95


In [54]:
order_payment_df.summary()

                                                                                

summary,order_id,payment_sequential,payment_type,payment_installments,payment_value
count,103886,103886.0,103886,103886.0,103886.0
mean,,1.0926785129853878,,2.853348863176944,154.10038041698792
stddev,,0.7065837791949958,,2.687050673856492,217.494063864724
min,00010242fe8c5a6d1...,1.0,boleto,0.0,0.0
25%,,1.0,,1.0,56.79
50%,,1.0,,1.0,100.0
75%,,1.0,,4.0,171.79
max,fffe41c64501cc87c...,9.0,voucher,9.0,999.68


In [55]:
schema_to_dict(order_payment_df.schema.json())

{
    "fields": [
        {
            "metadata": {},
            "name": "order_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "payment_sequential",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "payment_type",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "payment_installments",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "payment_value",
            "nullable": true,
            "type": "string"
        }
    ],
    "type": "struct"
}


In [63]:
order_payment_df.select('payment_sequential', 'payment_type').distinct()

payment_sequential,payment_type
1,voucher
2,debit_card
22,voucher
19,voucher
1,not_defined
6,voucher
7,voucher
11,voucher
16,voucher
4,voucher


In [70]:
orders_with_more_than_one_payment_type = (
    order_payment_df
    .join(order_df, 'order_id', how='inner')
    .groupBy('order_id', 'payment_type')
    .count()
    .where('count > 1')
)
order_payment_df.join(orders_with_more_than_one_payment_type, 'order_id', how='inner')

order_id,payment_sequential,payment_type,payment_installments,payment_value,payment_type.1,count
cc204c830046f86bf...,2,credit_card,1,100.92,credit_card,2
cc204c830046f86bf...,1,credit_card,8,236.0,credit_card,2
098fa8e236952567d...,2,voucher,1,78.24,voucher,4
098fa8e236952567d...,3,voucher,1,11.85,voucher,4
098fa8e236952567d...,5,voucher,1,22.35,voucher,4
098fa8e236952567d...,4,voucher,1,17.2,voucher,4
098fa8e236952567d...,1,credit_card,1,1.26,voucher,4
642d9648ee8c3cd7d...,1,credit_card,2,63.57,credit_card,2
642d9648ee8c3cd7d...,2,credit_card,8,114.87,credit_card,2
a3a97bc6e236d5351...,4,voucher,1,6.67,voucher,5


### Customer

**Assumptions**
- `customer_id`: PK
- `customer_zip_code_prefix`: FK

Interestingly, the table has another field called `customer_id` which makes it unclear if the `customer_id` in the orders table is unique or not... or what is the actual purpose of it.

In [74]:
customer_df: DataFrame = (
    spark.read
    .format("csv")
    .options(header=True,
             delimiter=",")
    .load(table_mapping['customer'].as_posix())
)

In [75]:
customer_df.summary()

                                                                                

summary,customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
count,99441,99441,99441.0,99441,99441
mean,,,35137.47458291851,,
stddev,,,29797.93899620613,,
min,00012a2ce6f8dcda2...,0000366f3b9a7992b...,1003.0,abadia dos dourados,AC
25%,,,11346.0,,
50%,,,24415.0,,
75%,,,58884.0,,
max,ffffe8b65bbe3087b...,ffffd2657e2aad290...,99990.0,zortea,TO


In [76]:
schema_to_dict(customer_df.schema.json())

{
    "fields": [
        {
            "metadata": {},
            "name": "customer_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "customer_unique_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "customer_zip_code_prefix",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "customer_city",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "customer_state",
            "nullable": true,
            "type": "string"
        }
    ],
    "type": "struct"
}


In [82]:
customer_df.createOrReplaceTempView('customer')

In [88]:
spark.sql("""
WITH repeated_customers AS
(
    SELECT 
        customer_unique_id,
        COUNT(DISTINCT customer_id) AS customer_id_count
    FROM customer
    GROUP BY customer_unique_id
    HAVING COUNT(DISTINCT customer_id) > 1
)
SELECT
    customer.*,
    rp.customer_id_count
FROM customer
    INNER JOIN repeated_customers AS rp ON
        rp.customer_unique_id = customer.customer_unique_id
ORDER BY customer.customer_unique_id
;
"""
)

customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,customer_id_count
1afe8a9c67eec3516...,00172711b30d52eea...,45200,jequie,BA,2
24b0e2bd287e47d54...,00172711b30d52eea...,45200,jequie,BA,2
1b4a75b3478138e99...,004288347e5e88a27...,26220,nova iguacu,RJ,2
f6efe5d5c7b85e123...,004288347e5e88a27...,26220,nova iguacu,RJ,2
49cf243e0d353cd41...,004b45ec5c6418746...,57055,maceio,AL,2
d95f60d70d9ea9a7f...,004b45ec5c6418746...,57035,maceio,AL,2
8ac44e9c15d396b8c...,0058f300f57d7b93c...,41370,salvador,BA,2
f530197ea86ced948...,0058f300f57d7b93c...,40731,salvador,BA,2
876356df457f95245...,00a39521eb40f7012...,72595,brasilia,DF,2
cbb68c721ba9ddb30...,00a39521eb40f7012...,72595,brasilia,DF,2


### Order Item

**Assumptions**

It seems like an intermediate table among [Product, Order and Seller] in order to show its interaction.

- `order_id` FK
- `product_id` FK
- `seller_id` FK

In [92]:
order_item_df: DataFrame = (
    spark.read
    .format("csv")
    .options(header=True,
             delimiter=",")
    .load(table_mapping['order_item'].as_posix())
)customer

In [93]:
order_item_df.summary()

                                                                                

summary,order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
count,112650,112650.0,112650,112650,112650,112650.0,112650.0
mean,,1.1978339991122948,,,,120.65373901464174,19.990319928982977
stddev,,0.7051240313951721,,,,183.6339280502595,15.806405412297098
min,00010242fe8c5a6d1...,1.0,00066f42aeeb9f300...,0015a82c2db000af6...,2016-09-19 00:15:34,0.85,0.0
25%,,1.0,,,,39.9,13.08
50%,,1.0,,,,74.99,16.26
75%,,1.0,,,,134.9,21.15
max,fffe41c64501cc87c...,9.0,fffe9eeff12fcbd74...,ffff564a4f9085cd2...,2020-04-09 22:35:08,999.99,99.97


In [94]:
schema_to_dict(order_item_df.schema.json())

{
    "fields": [
        {
            "metadata": {},
            "name": "order_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "order_item_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "product_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "seller_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "shipping_limit_date",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "price",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "freight_value",
            "nullable": true,
            "type": "

### Product

**Assumptions**
`product_id`: PK

In [97]:
product_df: DataFrame = (
    spark.read
    .format("csv")
    .options(header=True,
             delimiter=",")
    .load(table_mapping['product'].as_posix())
)

In [98]:
product_df.summary()

summary,product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
count,32951,32341,32341.0,32341.0,32341.0,32949.0,32949.0,32949.0,32949.0
mean,,,48.47694876472589,771.4952846232337,2.18898611669398,2276.472487784152,30.81507784758263,16.937661234028347,23.196728277034204
stddev,,,10.245740725237289,635.1152246349538,1.7367656379315437,4282.038730977024,16.914458054065953,13.637554061749569,12.079047453227794
min,00066f42aeeb9f300...,agro_industria_e_...,10.0,100.0,1.0,0.0,10.0,10.0,10.0
25%,,,42.0,339.0,1.0,300.0,18.0,8.0,15.0
50%,,,51.0,595.0,1.0,700.0,25.0,13.0,20.0
75%,,,57.0,972.0,3.0,1900.0,38.0,21.0,30.0
max,fffe9eeff12fcbd74...,utilidades_domest...,9.0,999.0,9.0,998.0,99.0,99.0,98.0


In [99]:
schema_to_dict(product_df.schema.json())

{
    "fields": [
        {
            "metadata": {},
            "name": "product_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "product_category_name",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "product_name_lenght",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "product_description_lenght",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "product_photos_qty",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "product_weight_g",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "product_length_cm",
 

In [102]:
product_df.select('product_category_name').distinct().count()

74

### Seller

**Assumptions**
- `seller_id`: PK
- `seller_zip_code_prefix`: FK

In [108]:
seller_df: DataFrame = (
    spark.read
    .format("csv")
    .options(header=True,
             delimiter=",")
    .load(table_mapping['seller'].as_posix())
)

In [109]:
seller_df.summary()

summary,seller_id,seller_zip_code_prefix,seller_city,seller_state
count,3095,3095.0,3095,3095
mean,,32291.059450726974,4482255.0,
stddev,,32713.45382950901,,
min,0015a82c2db000af6...,1001.0,04482255,AC
25%,,7093.0,4482255.0,
50%,,14940.0,4482255.0,
75%,,65072.0,4482255.0,
max,ffff564a4f9085cd2...,99730.0,xaxim,SP


In [110]:
schema_to_dict(seller_df.schema.json())

{
    "fields": [
        {
            "metadata": {},
            "name": "seller_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "seller_zip_code_prefix",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "seller_city",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "seller_state",
            "nullable": true,
            "type": "string"
        }
    ],
    "type": "struct"
}


### Geolocation

**Assumptions**
- `geolocation_zip_code_prefix`: PK

In [105]:
geolocation_df: DataFrame = (
    spark.read
    .format("csv")
    .options(header=True,
             delimiter=",")
    .load(table_mapping['geolocation'].as_posix())
)

In [106]:
geolocation_df.summary()

                                                                                

summary,geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state
count,1000163.0,1000163,1000163.0,1000163,1000163
mean,36574.16646586607,-21.17615291038391,-46.39054132093593,,
stddev,30549.335710319585,5.7158663088228705,4.2697483066195705,,
min,1001.0,-0.00004367379244...,-101.46676644931476,* cidade,AC
25%,11075.0,-23.603593766864766,-48.57374836733223,,
50%,26530.0,-22.91938310945101,-46.63791575103784,,
75%,63500.0,-19.979783996158545,-43.76747036551028,,
max,99990.0,45.06593318269697,9.341527629906514,óleo,TO


In [107]:
schema_to_dict(geolocation_df.schema.json())

{
    "fields": [
        {
            "metadata": {},
            "name": "geolocation_zip_code_prefix",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "geolocation_lat",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "geolocation_lng",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "geolocation_city",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "geolocation_state",
            "nullable": true,
            "type": "string"
        }
    ],
    "type": "struct"
}


### Order Review

**Assumptions**
- `review_id`: PK
- `order_id`: FK

In [25]:
order_review_df: DataFrame = (
    spark.read
    .format("csv")
    .options(header=True,
             delimiter=",")
    .load(table_mapping['order_review'].as_posix())
)

In [26]:
order_review_df.summary()

25/07/05 19:48:21 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

summary,review_id,order_id,review_score,review_comment_title,review_comment_message,review_creation_date,review_answer_timestamp
count,104161,101926,101782,12005,41083,95398,95377
mean,4.5,0.0,4.087204087597126,3.165434995880696E10,8.172413793103448,,
stddev,0.7071067811865476,0.0,1.3750046423971858,5.625434750908947E11,3.1175650470615843,,
min,"""",,"""",,,FOI A MINHA PRIM...,POIS NÃO CUMPREM...
25%,4.0,0.0,4.0,5.0,8.0,,
50%,4.0,0.0,5.0,10.0,10.0,,
75%,5.0,0.0,5.0,10.0,10.0,,
max,"🤙🏼👏🏼👏🏼""",visando sempre o ...,seria mais coeren...,🔟,😡😡😡😡😡👎👎👎👎👎,veio bem embalada...,70 + R$15


In [27]:
schema_to_dict(order_review_df.schema.json())

{
    "fields": [
        {
            "metadata": {},
            "name": "review_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "order_id",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "review_score",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "review_comment_title",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "review_comment_message",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "review_creation_date",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "review_answer_timestamp",
            "n