In [27]:
# Proyecto de Análisis Predictivo - Olist E-commerce
##Este notebook prepara los datos de Olist para análisis predictivo con PySpark.

## 1. Importar Librerías

In [28]:
import pandas as pd
import os
from datetime import datetime

# Verificar la estructura de archivos
data_path = 'Data'
files = os.listdir(data_path)
print("Archivos disponibles:")
for file in files:
    print(f"  - {file}")

Archivos disponibles:
  - .ipynb_checkpoints
  - olist_customers_dataset.csv
  - olist_geolocation_dataset.csv
  - olist_orders_dataset.csv
  - olist_order_items_dataset.csv
  - olist_order_payments_dataset.csv
  - olist_order_reviews_dataset.csv
  - olist_products_dataset.csv
  - olist_sellers_dataset.csv
  - olist_unified_dataset.parquet
  - product_category_name_translation.csv


## 2. Cargar Datasets Individuales

In [29]:
print("Cargando datasets...")

# Cargar todos los datasets
customers = pd.read_csv('Data/olist_customers_dataset.csv')
geolocation = pd.read_csv('Data/olist_geolocation_dataset.csv')
order_items = pd.read_csv('Data/olist_order_items_dataset.csv')
order_payments = pd.read_csv('Data/olist_order_payments_dataset.csv')
order_reviews = pd.read_csv('Data/olist_order_reviews_dataset.csv')
orders = pd.read_csv('Data/olist_orders_dataset.csv')
products = pd.read_csv('Data/olist_products_dataset.csv')
sellers = pd.read_csv('Data/olist_sellers_dataset.csv')
category_translation = pd.read_csv('Data/product_category_name_translation.csv')

print("✓ Datasets cargados exitosamente")
print(f"\nResumen de registros:")
print(f"  - Clientes: {len(customers):,}")
print(f"  - Órdenes: {len(orders):,}")
print(f"  - Items de órdenes: {len(order_items):,}")
print(f"  - Pagos: {len(order_payments):,}")
print(f"  - Reviews: {len(order_reviews):,}")
print(f"  - Productos: {len(products):,}")
print(f"  - Vendedores: {len(sellers):,}")
print(f"  - Geolocalización: {len(geolocation):,}")

Cargando datasets...
✓ Datasets cargados exitosamente

Resumen de registros:
  - Clientes: 99,441
  - Órdenes: 99,441
  - Items de órdenes: 112,650
  - Pagos: 103,886
  - Reviews: 99,224
  - Productos: 32,951
  - Vendedores: 3,095
  - Geolocalización: 1,000,163


## 3. Unificar Datasets (Joins)

Vamos a crear un dataset maestro unificando todas las tablas mediante las claves foráneas correspondientes.

In [30]:
# Paso 1: Traducir categorías de productos al inglés
products_translated = products.merge(
    category_translation, 
    on='product_category_name', 
    how='left'
)

# Paso 2: Comenzar desde orders (tabla central)
df = orders.copy()

# Paso 3: Unir con customers usando customer_id
df = df.merge(customers, on='customer_id', how='left')

# Paso 4: Unir con order_items usando order_id
df = df.merge(order_items, on='order_id', how='left')

# Paso 5: Unir con products usando product_id
df = df.merge(
    products_translated, 
    on='product_id', 
    how='left'
)

# Paso 6: Unir con sellers usando seller_id
df = df.merge(
    sellers, 
    on='seller_id', 
    how='left',
    suffixes=('_customer', '_seller')
)

# Paso 7: Unir con order_payments usando order_id
# Agregamos los pagos por orden (una orden puede tener múltiples pagos)
payments_agg = order_payments.groupby('order_id').agg({
    'payment_sequential': 'max',
    'payment_type': lambda x: x.mode()[0] if len(x.mode()) > 0 else x.iloc[0],  # tipo de pago más común
    'payment_installments': 'max',
    'payment_value': 'sum'  # suma total de pagos
}).reset_index()

df = df.merge(payments_agg, on='order_id', how='left')

# Paso 8: Unir con order_reviews usando order_id
# Agregamos las reviews por orden
reviews_agg = order_reviews.groupby('order_id').agg({
    'review_score': 'mean',
    'review_comment_title': 'count',
    'review_comment_message': lambda x: x.notna().sum()
}).reset_index()
reviews_agg.columns = ['order_id', 'avg_review_score', 'review_title_count', 'review_message_count']

df = df.merge(reviews_agg, on='order_id', how='left')

# Paso 9: Unir con geolocation para CUSTOMERS usando customer_zip_code_prefix
# Agregamos geolocation por zip_code_prefix (promedio de coordenadas)
geo_customers = geolocation.groupby('geolocation_zip_code_prefix').agg({
    'geolocation_lat': 'mean',
    'geolocation_lng': 'mean',
    'geolocation_city': lambda x: x.mode()[0] if len(x.mode()) > 0 else x.iloc[0],
    'geolocation_state': lambda x: x.mode()[0] if len(x.mode()) > 0 else x.iloc[0]
}).reset_index()
geo_customers.columns = ['zip_code_prefix', 'customer_geo_lat', 'customer_geo_lng', 'customer_geo_city', 'customer_geo_state']

df = df.merge(
    geo_customers, 
    left_on='customer_zip_code_prefix', 
    right_on='zip_code_prefix', 
    how='left'
).drop('zip_code_prefix', axis=1)

# Paso 10: Unir con geolocation para SELLERS usando seller_zip_code_prefix
geo_sellers = geolocation.groupby('geolocation_zip_code_prefix').agg({
    'geolocation_lat': 'mean',
    'geolocation_lng': 'mean',
    'geolocation_city': lambda x: x.mode()[0] if len(x.mode()) > 0 else x.iloc[0],
    'geolocation_state': lambda x: x.mode()[0] if len(x.mode()) > 0 else x.iloc[0]
}).reset_index()
geo_sellers.columns = ['zip_code_prefix', 'seller_geo_lat', 'seller_geo_lng', 'seller_geo_city', 'seller_geo_state']

df = df.merge(
    geo_sellers, 
    left_on='seller_zip_code_prefix', 
    right_on='zip_code_prefix', 
    how='left'
).drop('zip_code_prefix', axis=1)

print(f"✓ Dataset unificado creado según diagrama de relaciones")
print(f"  - Dimensiones: {df.shape}")
print(f"  - Columnas: {df.shape[1]}")
print(f"  - Registros: {df.shape[0]:,}")
print(f"\n✓ Relaciones aplicadas:")
print(f"  1. Orders → Customers (customer_id)")
print(f"  2. Orders → Order_Items (order_id)")
print(f"  3. Order_Items → Products (product_id)")
print(f"  4. Order_Items → Sellers (seller_id)")
print(f"  5. Orders → Order_Payments (order_id)")
print(f"  6. Orders → Order_Reviews (order_id)")
print(f"  7. Customers → Geolocation (zip_code_prefix)")
print(f"  8. Sellers → Geolocation (zip_code_prefix)")

✓ Dataset unificado creado según diagrama de relaciones
  - Dimensiones: (113425, 45)
  - Columnas: 45
  - Registros: 113,425

✓ Relaciones aplicadas:
  1. Orders → Customers (customer_id)
  2. Orders → Order_Items (order_id)
  3. Order_Items → Products (product_id)
  4. Order_Items → Sellers (seller_id)
  5. Orders → Order_Payments (order_id)
  6. Orders → Order_Reviews (order_id)
  7. Customers → Geolocation (zip_code_prefix)
  8. Sellers → Geolocation (zip_code_prefix)


## 4. Vista previa del dataset unificado

In [31]:
# Ver las primeras filas
print("Primeras 5 filas del dataset unificado:")
display(df.head())

print("\nInformación general del dataset:")
print(df.info())

print("\nColumnas disponibles:")
for i, col in enumerate(df.columns, 1):
    print(f"{i:2d}. {col}")

Primeras 5 filas del dataset unificado:


Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,customer_unique_id,customer_zip_code_prefix,...,review_title_count,review_message_count,customer_geo_lat,customer_geo_lng,customer_geo_city,customer_geo_state,seller_geo_lat,seller_geo_lng,seller_geo_city,seller_geo_state
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,7c396fd4830fd04220f754e42b4e5bff,3149,...,0.0,1.0,-23.576983,-46.587161,sao paulo,SP,-23.680729,-46.444238,maua,SP
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00,af07308b275d755c9edb36a90c618231,47813,...,1.0,1.0,-12.177924,-44.660711,barreiras,BA,-19.807681,-43.980427,belo horizonte,MG
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00,3a653a41f6f9fc3d2a113cf8398680e8,75265,...,0.0,0.0,-16.74515,-48.514783,vianopolis,GO,-21.363502,-48.229601,guariba,SP
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00,7c142cf63193a1473d2e66489a9ae977,59296,...,0.0,1.0,-5.77419,-35.271143,sao goncalo do amarante,RN,-19.837682,-43.924053,belo horizonte,MG
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00,72632f0f9dd73dfee390c9b22eb56dd6,9195,...,0.0,0.0,-23.67637,-46.514627,santo andre,SP,-23.543395,-46.262086,mogi das cruzes,SP



Información general del dataset:
<class 'pandas.core.frame.DataFrame'>
Int64Index: 113425 entries, 0 to 113424
Data columns (total 45 columns):
 #   Column                         Non-Null Count   Dtype  
---  ------                         --------------   -----  
 0   order_id                       113425 non-null  object 
 1   customer_id                    113425 non-null  object 
 2   order_status                   113425 non-null  object 
 3   order_purchase_timestamp       113425 non-null  object 
 4   order_approved_at              113264 non-null  object 
 5   order_delivered_carrier_date   111457 non-null  object 
 6   order_delivered_customer_date  110196 non-null  object 
 7   order_estimated_delivery_date  113425 non-null  object 
 8   customer_unique_id             113425 non-null  object 
 9   customer_zip_code_prefix       113425 non-null  int64  
 10  customer_city                  113425 non-null  object 
 11  customer_state                 113425 non-null  object 
 

## 5. Análisis de Calidad de Datos

Antes de guardar el Parquet, analizamos la calidad de los datos para identificar problemas.

In [32]:
print("=" * 70)
print("ANÁLISIS DE CALIDAD DE DATOS")
print("=" * 70)

# 1. Verificar duplicados
print("\n1. DUPLICADOS")
print("-" * 70)
duplicates = df.duplicated().sum()
print(f"Total de filas duplicadas: {duplicates:,}")

if duplicates > 0:
    print(f"Porcentaje de duplicados: {(duplicates/len(df)*100):.2f}%")
    # Mostrar algunas filas duplicadas
    print("\nEjemplo de duplicados:")
    display(df[df.duplicated(keep=False)].head(10))

# 2. Análisis de valores nulos
print("\n2. VALORES NULOS")
print("-" * 70)
null_counts = df.isnull().sum()
null_percentages = (null_counts / len(df) * 100).round(2)
null_report = pd.DataFrame({
    'Columna': null_counts.index,
    'Nulos': null_counts.values,
    'Porcentaje': null_percentages.values
})
null_report = null_report[null_report['Nulos'] > 0].sort_values('Nulos', ascending=False)

if len(null_report) > 0:
    print(f"\nColumnas con valores nulos ({len(null_report)} columnas):")
    display(null_report.head(20))
else:
    print("✓ No hay valores nulos en el dataset")

# 3. Verificar tipos de datos
print("\n3. TIPOS DE DATOS")
print("-" * 70)
print(df.dtypes.value_counts())

# 4. Estadísticas básicas de columnas numéricas
print("\n4. ESTADÍSTICAS NUMÉRICAS")
print("-" * 70)
numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
print(f"Columnas numéricas: {len(numeric_cols)}")
display(df[numeric_cols].describe().T)

# 5. Resumen de cardinalidad (columnas categóricas)
print("\n5. CARDINALIDAD DE COLUMNAS CATEGÓRICAS")
print("-" * 70)
categorical_cols = df.select_dtypes(include=['object']).columns
cardinality = pd.DataFrame({
    'Columna': categorical_cols,
    'Valores únicos': [df[col].nunique() for col in categorical_cols],
    'Valores nulos': [df[col].isnull().sum() for col in categorical_cols]
})
cardinality = cardinality.sort_values('Valores únicos', ascending=False)
display(cardinality.head(15))

ANÁLISIS DE CALIDAD DE DATOS

1. DUPLICADOS
----------------------------------------------------------------------
Total de filas duplicadas: 0

2. VALORES NULOS
----------------------------------------------------------------------

Columnas con valores nulos (36 columnas):


Unnamed: 0,Columna,Nulos,Porcentaje
6,order_delivered_customer_date,3229,2.85
26,product_category_name_english,2402,2.12
18,product_category_name,2378,2.1
19,product_name_lenght,2378,2.1
21,product_photos_qty,2378,2.1
20,product_description_lenght,2378,2.1
5,order_delivered_carrier_date,1968,1.74
41,seller_geo_lat,1028,0.91
42,seller_geo_lng,1028,0.91
43,seller_geo_city,1028,0.91



3. TIPOS DE DATOS
----------------------------------------------------------------------
object     23
float64    21
int64       1
dtype: int64

4. ESTADÍSTICAS NUMÉRICAS
----------------------------------------------------------------------
Columnas numéricas: 22


Unnamed: 0,count,mean,std,min,25%,50%,75%,max
customer_zip_code_prefix,113425.0,35102.472965,29864.919733,1003.0,11250.0,24320.0,59020.0,99990.0
order_item_id,112650.0,1.197834,0.705124,1.0,1.0,1.0,1.0,21.0
price,112650.0,120.653739,183.633928,0.85,39.9,74.99,134.9,6735.0
freight_value,112650.0,19.99032,15.806405,0.0,13.08,16.26,21.15,409.68
product_name_lenght,111047.0,48.775978,10.025581,5.0,42.0,52.0,57.0,76.0
product_description_lenght,111047.0,787.867029,652.135608,4.0,348.0,603.0,987.0,3992.0
product_photos_qty,111047.0,2.209713,1.721438,1.0,1.0,1.0,3.0,20.0
product_weight_g,112632.0,2093.672047,3751.596884,0.0,300.0,700.0,1800.0,40425.0
product_length_cm,112632.0,30.153669,16.153449,7.0,18.0,25.0,38.0,105.0
product_height_cm,112632.0,16.593766,13.443483,2.0,8.0,13.0,20.0,105.0



5. CARDINALIDAD DE COLUMNAS CATEGÓRICAS
----------------------------------------------------------------------


Unnamed: 0,Columna,Valores únicos,Valores nulos
0,order_id,99441,0
1,customer_id,99441,0
3,order_purchase_timestamp,98875,0
8,customer_unique_id,96096,0
6,order_delivered_customer_date,95664,3229
13,shipping_limit_date,93318,775
4,order_approved_at,90733,161
5,order_delivered_carrier_date,81018,1968
11,product_id,32951,775
9,customer_city,4119,0


## 6. Limpieza de Datos

Aplicamos las transformaciones necesarias para limpiar el dataset.

In [33]:
print("=" * 70)
print("LIMPIEZA DE DATOS")
print("=" * 70)

# Guardar tamaño original
original_rows = len(df)
print(f"\nRegistros originales: {original_rows:,}")

# 1. Eliminar duplicados completos (si existen)
print("\n1. ELIMINANDO DUPLICADOS")
print("-" * 70)
df_cleaned = df.drop_duplicates()
duplicates_removed = original_rows - len(df_cleaned)
print(f"Duplicados eliminados: {duplicates_removed:,}")
print(f"Registros después de eliminar duplicados: {len(df_cleaned):,}")

# 2. Convertir columnas de fecha a datetime
print("\n2. CONVIRTIENDO FECHAS A DATETIME")
print("-" * 70)
date_columns = [
    'order_purchase_timestamp',
    'order_approved_at',
    'order_delivered_carrier_date',
    'order_delivered_customer_date',
    'order_estimated_delivery_date'
]

for col in date_columns:
    if col in df_cleaned.columns:
        df_cleaned[col] = pd.to_datetime(df_cleaned[col], errors='coerce')
        print(f"✓ {col} convertido a datetime")

# 3. Crear features temporales útiles
print("\n3. CREANDO FEATURES TEMPORALES")
print("-" * 70)

# Calcular tiempo de entrega real vs estimado
if 'order_delivered_customer_date' in df_cleaned.columns and 'order_estimated_delivery_date' in df_cleaned.columns:
    df_cleaned['delivery_delay_days'] = (
        df_cleaned['order_delivered_customer_date'] - df_cleaned['order_estimated_delivery_date']
    ).dt.days
    print("✓ delivery_delay_days: diferencia entre entrega real y estimada")

# Tiempo desde compra hasta entrega
if 'order_purchase_timestamp' in df_cleaned.columns and 'order_delivered_customer_date' in df_cleaned.columns:
    df_cleaned['total_delivery_time_days'] = (
        df_cleaned['order_delivered_customer_date'] - df_cleaned['order_purchase_timestamp']
    ).dt.days
    print("✓ total_delivery_time_days: tiempo total de entrega")

# Extraer componentes de fecha
if 'order_purchase_timestamp' in df_cleaned.columns:
    df_cleaned['order_year'] = df_cleaned['order_purchase_timestamp'].dt.year
    df_cleaned['order_month'] = df_cleaned['order_purchase_timestamp'].dt.month
    df_cleaned['order_day_of_week'] = df_cleaned['order_purchase_timestamp'].dt.dayofweek
    df_cleaned['order_quarter'] = df_cleaned['order_purchase_timestamp'].dt.quarter
    print("✓ Componentes temporales extraídos (year, month, day_of_week, quarter)")

# 4. Calcular distancia geográfica entre customer y seller
print("\n4. CALCULANDO DISTANCIA GEOGRÁFICA")
print("-" * 70)
from math import radians, sin, cos, sqrt, atan2

def haversine_distance(lat1, lon1, lat2, lon2):
    """Calcula la distancia en km entre dos puntos geográficos"""
    if pd.isna(lat1) or pd.isna(lon1) or pd.isna(lat2) or pd.isna(lon2):
        return None
    
    R = 6371  # Radio de la Tierra en km
    
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    
    return R * c

# Aplicar cálculo de distancia
if all(col in df_cleaned.columns for col in ['customer_geo_lat', 'customer_geo_lng', 'seller_geo_lat', 'seller_geo_lng']):
    df_cleaned['distance_km'] = df_cleaned.apply(
        lambda row: haversine_distance(
            row['customer_geo_lat'], 
            row['customer_geo_lng'],
            row['seller_geo_lat'], 
            row['seller_geo_lng']
        ), 
        axis=1
    )
    print(f"✓ distance_km calculada (distancia entre customer y seller)")
    print(f"  - Promedio: {df_cleaned['distance_km'].mean():.2f} km")
    print(f"  - Mediana: {df_cleaned['distance_km'].median():.2f} km")

# 5. Tratamiento de valores nulos estratégico
print("\n5. TRATAMIENTO DE VALORES NULOS")
print("-" * 70)

# Rellenar reviews nulos con valores neutros
if 'avg_review_score' in df_cleaned.columns:
    null_reviews_before = df_cleaned['avg_review_score'].isnull().sum()
    # No rellenamos - los nulos indican que no hubo review
    print(f"✓ avg_review_score: {null_reviews_before:,} nulos mantenidos (sin review)")

# Eliminar filas sin información crítica de negocio
print("\nEliminando registros sin información crítica:")
critical_columns = ['order_id', 'customer_id', 'product_id', 'seller_id']
rows_before = len(df_cleaned)
for col in critical_columns:
    if col in df_cleaned.columns:
        df_cleaned = df_cleaned[df_cleaned[col].notna()]
df_cleaned = df_cleaned.dropna(subset=critical_columns, how='any')
rows_removed = rows_before - len(df_cleaned)
print(f"  - Filas sin IDs críticos eliminadas: {rows_removed:,}")

# 6. Resumen final de limpieza
print("\n" + "=" * 70)
print("RESUMEN DE LIMPIEZA")
print("=" * 70)
print(f"Registros originales: {original_rows:,}")
print(f"Registros finales: {len(df_cleaned):,}")
print(f"Registros eliminados: {original_rows - len(df_cleaned):,} ({((original_rows - len(df_cleaned))/original_rows*100):.2f}%)")
print(f"Columnas: {len(df_cleaned.columns)}")
print(f"\n✓ Dataset limpio y listo para guardar")

LIMPIEZA DE DATOS

Registros originales: 113,425

1. ELIMINANDO DUPLICADOS
----------------------------------------------------------------------
Duplicados eliminados: 0
Registros después de eliminar duplicados: 113,425

2. CONVIRTIENDO FECHAS A DATETIME
----------------------------------------------------------------------
✓ order_purchase_timestamp convertido a datetime
✓ order_approved_at convertido a datetime
✓ order_delivered_carrier_date convertido a datetime
✓ order_delivered_customer_date convertido a datetime
✓ order_estimated_delivery_date convertido a datetime

3. CREANDO FEATURES TEMPORALES
----------------------------------------------------------------------
✓ delivery_delay_days: diferencia entre entrega real y estimada
✓ total_delivery_time_days: tiempo total de entrega
✓ Componentes temporales extraídos (year, month, day_of_week, quarter)

4. CALCULANDO DISTANCIA GEOGRÁFICA
----------------------------------------------------------------------
✓ distance_km calculada 

## 7. Vista Previa del Dataset Limpio

In [34]:
# Vista previa del dataset limpio
print("Dataset limpio:")
display(df_cleaned.head(10))

print("\nInformación del dataset limpio:")
print(df_cleaned.info())

print("\nNuevas columnas creadas:")
new_features = [
    'delivery_delay_days',
    'total_delivery_time_days', 
    'order_year',
    'order_month',
    'order_day_of_week',
    'order_quarter',
    'distance_km'
]
for feature in new_features:
    if feature in df_cleaned.columns:
        print(f"  ✓ {feature}")

Dataset limpio:


Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,customer_unique_id,customer_zip_code_prefix,...,seller_geo_lng,seller_geo_city,seller_geo_state,delivery_delay_days,total_delivery_time_days,order_year,order_month,order_day_of_week,order_quarter,distance_km
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18,7c396fd4830fd04220f754e42b4e5bff,3149,...,-46.444238,maua,SP,-8.0,8.0,2017,10,0,4,18.57611
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13,af07308b275d755c9edb36a90c618231,47813,...,-43.980427,belo horizonte,MG,-6.0,13.0,2018,7,1,3,851.495069
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04,3a653a41f6f9fc3d2a113cf8398680e8,75265,...,-48.229601,guariba,SP,-18.0,9.0,2018,8,2,3,514.410666
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15,7c142cf63193a1473d2e66489a9ae977,59296,...,-43.924053,belo horizonte,MG,-13.0,13.0,2017,11,5,4,1822.226336
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26,72632f0f9dd73dfee390c9b22eb56dd6,9195,...,-46.262086,mogi das cruzes,SP,-10.0,2.0,2018,2,1,1,29.676625
5,a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01,80bb27c7c16e8f973207a5086ab329e2,86320,...,-46.516142,guarulhos,SP,-6.0,16.0,2017,7,6,3,411.394362
6,136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,NaT,NaT,2017-05-09,36edbb3fb164b1f16485364b6fb04c73,98900,...,-46.711854,sao paulo,SP,,,2017,4,1,2,913.640074
7,6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07,932afa1e708222e5821dac9cd5db4cae,26525,...,-46.552881,atibaia,SP,-12.0,9.0,2017,5,1,2,322.277082
8,76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,delivered,2017-01-23 18:29:09,2017-01-25 02:50:47,2017-01-26 14:16:31,2017-02-02 14:08:10,2017-03-06,39382392765b6dc74812866ee5ee92a7,99655,...,-46.947759,sao jose do rio pardo,SP,-32.0,9.0,2017,1,0,1,869.946044
9,e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,delivered,2017-07-29 11:55:02,2017-07-29 12:05:32,2017-08-10 19:45:24,2017-08-16 17:14:30,2017-08-23,299905e3934e9e181bfb2e164dd4b4f8,18075,...,-46.366721,itaquaquecetuba,SP,-7.0,18.0,2017,7,5,3,112.263374



Información del dataset limpio:
<class 'pandas.core.frame.DataFrame'>
Int64Index: 112650 entries, 0 to 113424
Data columns (total 52 columns):
 #   Column                         Non-Null Count   Dtype         
---  ------                         --------------   -----         
 0   order_id                       112650 non-null  object        
 1   customer_id                    112650 non-null  object        
 2   order_status                   112650 non-null  object        
 3   order_purchase_timestamp       112650 non-null  datetime64[ns]
 4   order_approved_at              112635 non-null  datetime64[ns]
 5   order_delivered_carrier_date   111456 non-null  datetime64[ns]
 6   order_delivered_customer_date  110196 non-null  datetime64[ns]
 7   order_estimated_delivery_date  112650 non-null  datetime64[ns]
 8   customer_unique_id             112650 non-null  object        
 9   customer_zip_code_prefix       112650 non-null  int64         
 10  customer_city                  1126

## 8. Guardar Dataset Limpio como Parquet

Guardamos el dataset limpio y enriquecido en formato Parquet optimizado para PySpark.

In [39]:
# Crear directorio para el archivo parquet si no existe
output_dir = 'Data'
output_file = os.path.join(output_dir, 'olist_unified_dataset.parquet')

print(f"Guardando dataset limpio en formato Parquet...")
print("Convirtiendo timestamps a formato compatible con Spark...")

# IMPORTANTE: Convertir columnas datetime de nanosegundos a microsegundos ANTES de guardar
# Spark solo soporta datetime64[us], no datetime64[ns]
date_columns_to_convert = df_cleaned.select_dtypes(include=['datetime64[ns]']).columns
for col in date_columns_to_convert:
    df_cleaned[col] = df_cleaned[col].astype('datetime64[us]')
    print(f"  ✓ {col} convertido a datetime64[us]")

# Guardar como parquet con parámetros compatibles con Spark
df_cleaned.to_parquet(
    output_file, 
    index=False, 
    engine='pyarrow', 
    compression='snappy',
    coerce_timestamps='us',  # Forzar microsegundos
    allow_truncated_timestamps=True  # Permitir truncamiento si es necesario
)

# Verificar tamaño del archivo
file_size_mb = os.path.getsize(output_file) / (1024 * 1024)

print("\n" + "=" * 70)
print("ARCHIVO PARQUET GENERADO - COMPATIBLE CON SPARK")
print("=" * 70)
print(f"✓ Archivo guardado exitosamente: {output_file}")
print(f"\nDetalles del archivo:")
print(f"  - Tamaño: {file_size_mb:.2f} MB")
print(f"  - Compresión: Snappy")
print(f"  - Registros: {len(df_cleaned):,}")
print(f"  - Columnas: {len(df_cleaned.columns)}")
print(f"  - Timestamps: datetime64[us] (compatible con Spark)")
print(f"\nCaracterísticas del dataset:")
print(f"  ✓ Datos unificados de 8 tablas")
print(f"  ✓ Duplicados eliminados")
print(f"  ✓ Fechas convertidas a datetime64[us]")
print(f"  ✓ Features temporales creados")
print(f"  ✓ Distancia geográfica calculada")
print(f"  ✓ IDs críticos validados")
print(f"\n✓ Dataset listo para análisis predictivo con PySpark!")

Guardando dataset limpio en formato Parquet...
Convirtiendo timestamps a formato compatible con Spark...
  ✓ order_purchase_timestamp convertido a datetime64[us]
  ✓ order_approved_at convertido a datetime64[us]
  ✓ order_delivered_carrier_date convertido a datetime64[us]
  ✓ order_delivered_customer_date convertido a datetime64[us]
  ✓ order_estimated_delivery_date convertido a datetime64[us]

ARCHIVO PARQUET GENERADO - COMPATIBLE CON SPARK
✓ Archivo guardado exitosamente: Data/olist_unified_dataset.parquet

Detalles del archivo:
  - Tamaño: 21.59 MB
  - Compresión: Snappy
  - Registros: 112,650
  - Columnas: 52
  - Timestamps: datetime64[us] (compatible con Spark)

Características del dataset:
  ✓ Datos unificados de 8 tablas
  ✓ Duplicados eliminados
  ✓ Fechas convertidas a datetime64[us]
  ✓ Features temporales creados
  ✓ Distancia geográfica calculada
  ✓ IDs críticos validados

✓ Dataset listo para análisis predictivo con PySpark!


## 9. Lectura con PySpark

Este paso verifica que el archivo Parquet se puede leer correctamente con PySpark.

In [41]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Crear sesión de Spark con configuración optimizada
spark = SparkSession.builder \
    .appName("Olist Predictive Analysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Cargar el archivo parquet unificado
df_spark = spark.read.parquet("Data/olist_unified_dataset.parquet")

# Información básica del dataset
print("=" * 60)
print("Dataset cargado en PySpark")
print("=" * 60)
print(f"Registros: {df_spark.count():,}")
print(f"Columnas: {len(df_spark.columns)}")
print(f"Particiones: {df_spark.rdd.getNumPartitions()}")

# Ver esquema
print("\nEsquema del dataset:")
df_spark.printSchema()

# Primeras filas
print("\nPrimeras filas:")
df_spark.show(5, truncate=True, vertical=False)

# Estadísticas básicas de columnas numéricas
print("\nEstadísticas descriptivas:")
df_spark.describe().show()

# Cache del dataframe si lo vas a usar múltiples veces
df_spark.cache()
print("\n✓ Dataset cacheado en memoria para análisis rápido")


Dataset cargado en PySpark
Registros: 112,650
Columnas: 52
Particiones: 6

Esquema del dataset:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp_ntz (nullable = true)
 |-- order_approved_at: timestamp_ntz (nullable = true)
 |-- order_delivered_carrier_date: timestamp_ntz (nullable = true)
 |-- order_delivered_customer_date: timestamp_ntz (nullable = true)
 |-- order_estimated_delivery_date: timestamp_ntz (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: long (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- order_item_id: double (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: string (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = 