## Construcción de un Pipeline ETL

In [1]:
pip install pandas

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pandas as pd

In [3]:
# Ruta al archivo CSV
file_path = r'D:\Python\Proyectos\3. E - commerce\e-commerce.csv'

In [4]:
# Carga de datos con codificación alternativa
try:
    df = pd.read_csv(file_path, encoding='latin1')
    print("Datos cargados exitosamente")
except Exception as e:
    print(f"Error al cargar los datos: {e}")

Datos cargados exitosamente


In [5]:
# Vista previa de los datos
df.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850.0,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850.0,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850.0,United Kingdom


In [6]:
pip install chardet

Note: you may need to restart the kernel to use updated packages.


In [7]:
import chardet

In [8]:
# Detectar la codificación del archivo
file_path = r'D:\Python\Proyectos\3. E - commerce\e-commerce.csv'

with open(file_path, 'rb') as f:
    result = chardet.detect(f.read(100000))
    encoding = result['encoding']
    print(f"Codificación detectada: {encoding}")

Codificación detectada: ascii


In [9]:
# Cargar el archivo usando la codificación detectada
try:
    df = pd.read_csv(file_path, encoding='latin1')  
    print("Datos cargados exitosamente")
    print(df.head())
except Exception as e:
    print(f"Error al cargar los datos: {e}")

Datos cargados exitosamente
  InvoiceNo StockCode                          Description  Quantity  \
0    536365    85123A   WHITE HANGING HEART T-LIGHT HOLDER         6   
1    536365     71053                  WHITE METAL LANTERN         6   
2    536365    84406B       CREAM CUPID HEARTS COAT HANGER         8   
3    536365    84029G  KNITTED UNION FLAG HOT WATER BOTTLE         6   
4    536365    84029E       RED WOOLLY HOTTIE WHITE HEART.         6   

      InvoiceDate  UnitPrice  CustomerID         Country  
0  12/1/2010 8:26       2.55     17850.0  United Kingdom  
1  12/1/2010 8:26       3.39     17850.0  United Kingdom  
2  12/1/2010 8:26       2.75     17850.0  United Kingdom  
3  12/1/2010 8:26       3.39     17850.0  United Kingdom  
4  12/1/2010 8:26       3.39     17850.0  United Kingdom  


In [10]:
# Limpieza de datos
# Convertir InvoiceDate a formato datetime
df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')

# Verificar valores nulos
print(df.isnull().sum())

# Eliminar filas con datos faltantes (opcional, según el caso)
df.dropna(inplace=True)

InvoiceNo           0
StockCode           0
Description      1454
Quantity            0
InvoiceDate         0
UnitPrice           0
CustomerID     135080
Country             0
dtype: int64


In [11]:
# Identificar correctamente valores nulos y blancos
nulos_blancos = df['Description'].isnull() | (df['Description'].str.strip() == '')

# Contar la cantidad total de nulos y blancos
print(f"Total de valores nulos y blancos en 'Description': {nulos_blancos.sum()}")

Total de valores nulos y blancos en 'Description': 0


In [12]:
# Limpieza de datos después de identificar los blancos y nulos
# Reemplazar nulos por un valor predeterminado
df['Description'] = df['Description'].fillna('Descripción no disponible')

# Eliminar espacios en blanco
df['Description'] = df['Description'].str.strip()

# Reemplazar valores vacíos después de eliminar espacios en blanco
df.loc[df['Description'] == '', 'Description'] = 'Descripción no disponible'

# Confirmar que no quedan valores nulos o blancos
print(df['Description'].isnull().sum())
print((df['Description'] == '').sum())

0
0


In [13]:
# Eliminar las filas con valores nulos o blancos
# Filtrar filas que tengan valores válidos en 'Description'
df = df[~nulos_blancos]

In [14]:
# Limpieza de datos
# Convertir InvoiceDate a formato datetime
df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')

# Verificar valores nulos
print(df.isnull().sum())

# Eliminar filas con datos faltantes (opcional, según el caso)
df.dropna(inplace=True)

InvoiceNo      0
StockCode      0
Description    0
Quantity       0
InvoiceDate    0
UnitPrice      0
CustomerID     0
Country        0
dtype: int64


In [15]:
# Transformación de datos
# Crear columna de ingresos totales
df['TotalPrice'] = df['Quantity'] * df['UnitPrice']

# Filtrar ventas negativas
df = df[df['TotalPrice'] > 0]

In [16]:
# Guardar el archivo limpio
df.to_csv(r'D:\Python\Proyectos\3. E - commerce\e-commerce_clean.csv', index=False)
print("Archivo limpio guardado exitosamente")

Archivo limpio guardado exitosamente


In [17]:
# Análisis exploratorio inicial
print(df.describe())
print(df['Country'].value_counts())

            Quantity                    InvoiceDate      UnitPrice  \
count  397884.000000                         397884  397884.000000   
mean       12.988238  2011-07-10 23:41:23.511023360       3.116488   
min         1.000000            2010-12-01 08:26:00       0.001000   
25%         2.000000            2011-04-07 11:12:00       1.250000   
50%         6.000000            2011-07-31 14:39:00       1.950000   
75%        12.000000            2011-10-20 14:33:00       3.750000   
max     80995.000000            2011-12-09 12:50:00    8142.750000   
std       179.331775                            NaN      22.097877   

          CustomerID     TotalPrice  
count  397884.000000  397884.000000  
mean    15294.423453      22.397000  
min     12346.000000       0.001000  
25%     13969.000000       4.680000  
50%     15159.000000      11.800000  
75%     16795.000000      19.800000  
max     18287.000000  168469.600000  
std      1713.141560     309.071041  
Country
United Kingdom     

In [18]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 397884 entries, 0 to 541908
Data columns (total 9 columns):
 #   Column       Non-Null Count   Dtype         
---  ------       --------------   -----         
 0   InvoiceNo    397884 non-null  object        
 1   StockCode    397884 non-null  object        
 2   Description  397884 non-null  object        
 3   Quantity     397884 non-null  int64         
 4   InvoiceDate  397884 non-null  datetime64[ns]
 5   UnitPrice    397884 non-null  float64       
 6   CustomerID   397884 non-null  float64       
 7   Country      397884 non-null  object        
 8   TotalPrice   397884 non-null  float64       
dtypes: datetime64[ns](1), float64(3), int64(1), object(4)
memory usage: 30.4+ MB


In [19]:
df.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,TotalPrice
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850.0,United Kingdom,15.3
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom,20.34
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850.0,United Kingdom,22.0
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom,20.34
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom,20.34


In [20]:
# Análisis de ventas
# Ventas totales por país
sales_by_country = df.groupby('Country')['TotalPrice'].sum().sort_values(ascending=False)
print("Ventas totales por país")
print(sales_by_country)

Ventas totales por país
Country
United Kingdom          7308391.554
Netherlands              285446.340
EIRE                     265545.900
Germany                  228867.140
France                   209024.050
Australia                138521.310
Spain                     61577.110
Switzerland               56443.950
Belgium                   41196.340
Sweden                    38378.330
Japan                     37416.370
Norway                    36165.440
Portugal                  33439.890
Finland                   22546.080
Singapore                 21279.290
Channel Islands           20450.440
Denmark                   18955.340
Italy                     17483.240
Cyprus                    13590.380
Austria                   10198.680
Poland                     7334.650
Israel                     7221.690
Greece                     4760.520
Iceland                    4310.000
Canada                     3666.380
USA                        3580.390
Malta                      2725.

In [21]:
# Productos más vendidos
top_products = df.groupby('Description')['Quantity'].sum().sort_values(ascending=False).head(10)
print("Productos más vendidos:")
print(top_products)

Productos más vendidos:
Description
PAPER CRAFT , LITTLE BIRDIE           80995
MEDIUM CERAMIC TOP STORAGE JAR        77916
WORLD WAR 2 GLIDERS ASSTD DESIGNS     54415
JUMBO BAG RED RETROSPOT               46181
WHITE HANGING HEART T-LIGHT HOLDER    36725
ASSORTED COLOUR BIRD ORNAMENT         35362
PACK OF 72 RETROSPOT CAKE CASES       33693
POPCORN HOLDER                        30931
RABBIT NIGHT LIGHT                    27202
MINI PAINT SET VINTAGE                26076
Name: Quantity, dtype: int64


In [22]:
# Ventas mensuales
# Agregar columna de mes
df['Month'] = df['InvoiceDate'].dt.to_period('M')

# Ventas totales por mes
monthly_sales = df.groupby('Month')['TotalPrice'].sum()
print("Ventas totales mensuales:")
print(monthly_sales)

Ventas totales mensuales:
Month
2010-12     572713.890
2011-01     569445.040
2011-02     447137.350
2011-03     595500.760
2011-04     469200.361
2011-05     678594.560
2011-06     661213.690
2011-07     600091.011
2011-08     645343.900
2011-09     952838.382
2011-10    1039318.790
2011-11    1161817.380
2011-12     518192.790
Freq: M, Name: TotalPrice, dtype: float64


In [23]:
# Segmentación por país
# Categorizar países por ventas totales
sales_by_country = df.groupby('Country')['TotalPrice'].sum().reset_index()
sales_by_country['Segment'] = pd.qcut(sales_by_country['TotalPrice'], q=3, labels=['Bajo', 'Medio', 'Alto'])

print("Segmentación por país:")
print(sales_by_country)

Segmentación por país:
                 Country   TotalPrice Segment
0              Australia   138521.310    Alto
1                Austria    10198.680   Medio
2                Bahrain      548.400    Bajo
3                Belgium    41196.340    Alto
4                 Brazil     1143.600    Bajo
5                 Canada     3666.380   Medio
6        Channel Islands    20450.440   Medio
7                 Cyprus    13590.380   Medio
8         Czech Republic      826.740    Bajo
9                Denmark    18955.340   Medio
10                  EIRE   265545.900    Alto
11    European Community     1300.250    Bajo
12               Finland    22546.080   Medio
13                France   209024.050    Alto
14               Germany   228867.140    Alto
15                Greece     4760.520   Medio
16               Iceland     4310.000   Medio
17                Israel     7221.690   Medio
18                 Italy    17483.240   Medio
19                 Japan    37416.370    Alto
20         

In [24]:
pip install apache-airflow




In [25]:
pip show jupyter


Note: you may need to restart the kernel to use updated packages.




In [26]:
pip install notebook

Note: you may need to restart the kernel to use updated packages.


In [32]:
pip install --upgrade apache-airflow[google,azure]

Collecting apache-airflow-providers-google (from apache-airflow[azure,google])
  Downloading apache_airflow_providers_google-11.0.0-py3-none-any.whl.metadata (17 kB)
Collecting PyOpenSSL>=23.0.0 (from apache-airflow-providers-google->apache-airflow[azure,google])
  Downloading pyOpenSSL-24.3.0-py3-none-any.whl.metadata (15 kB)
Collecting gcloud-aio-auth>=5.2.0 (from apache-airflow-providers-google->apache-airflow[azure,google])
  Downloading gcloud_aio_auth-5.3.2-py3-none-any.whl.metadata (3.1 kB)
Collecting gcloud-aio-bigquery>=6.1.2 (from apache-airflow-providers-google->apache-airflow[azure,google])
  Downloading gcloud_aio_bigquery-7.1.0-py3-none-any.whl.metadata (1.9 kB)
Collecting gcloud-aio-storage>=9.0.0 (from apache-airflow-providers-google->apache-airflow[azure,google])
  Downloading gcloud_aio_storage-9.3.0-py3-none-any.whl.metadata (2.1 kB)
Collecting gcsfs>=2023.10.0 (from apache-airflow-providers-google->apache-airflow[azure,google])
  Downloading gcsfs-2024.12.0-py2.py3-

  You can safely remove it manually.
  You can safely remove it manually.
  You can safely remove it manually.
  You can safely remove it manually.


In [34]:
import pkg_resources

installed_packages = [pkg.key for pkg in pkg_resources.working_set]
if "apache-airflow" in installed_packages:
    print("Apache Airflow está instalado.")
else:
    print("Apache Airflow no está instalado.")

Apache Airflow está instalado.


In [35]:
import importlib.metadata

try:
    version = importlib.metadata.version("apache-airflow")
    print(f"Apache Airflow está instalado. Versión: {version}")
except importlib.metadata.PackageNotFoundError:
    print("Apache Airflow no está instalado.")

Apache Airflow está instalado. Versión: 2.10.4


In [36]:
# Construir y Ejecutar un Pipeline de ETL (Extract, Transform, Load) utilizando Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd

# Funciones del ETL
def extract():
    file_path = r'D:\Python\Proyectos\3. E - commerce\e-commerce.csv'
    return pd.read_csv(file_path, encoding='latin1')

def transform():
    df = extract()
    df['TotalPrice'] = df['Quantity'] * df['UnitPrice']
    df['Description'].fillna('Descripción no disponible', inplace=True)
    df = df.dropna(subset=['CustomerID'])
    df['CustomerID'] = df['CustomerID'].astype(int)
    return df

def load():
    df = transform()
    output_path = r'D:\Python\Proyectos\3. E - commerce\e-commerce_clean.csv'
    df.to_csv(output_path, index=False)

# DAG Configuración
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 12, 1),
    'retries': 1,
}

with DAG('ecommerce_etl',
         default_args=default_args,
         schedule='@daily',  # Cambiado a `schedule`
         catchup=False) as dag:

    extract_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract
    )

    transform_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform
    )

    load_task = PythonOperator(
        task_id='load_data',
        python_callable=load
    )

    extract_task >> transform_task >> load_task

In [38]:
def transform():
    df = extract()

    # Calcular el precio total
    df['TotalPrice'] = df['Quantity'] * df['UnitPrice']

    # Rellenar valores nulos en la columna 'Description'
    df['Description'] = df['Description'].fillna('Descripción no disponible')

    # Eliminar filas con valores nulos en 'CustomerID'
    df = df.dropna(subset=['CustomerID'])

    # Convertir 'CustomerID' a entero
    df['CustomerID'] = df['CustomerID'].astype(int)

    return df

In [39]:
df = extract()
print(df.head())

df_transformed = transform()
print(df_transformed.head())

load()
print("Archivo limpio guardado correctamente.")

  InvoiceNo StockCode                          Description  Quantity  \
0    536365    85123A   WHITE HANGING HEART T-LIGHT HOLDER         6   
1    536365     71053                  WHITE METAL LANTERN         6   
2    536365    84406B       CREAM CUPID HEARTS COAT HANGER         8   
3    536365    84029G  KNITTED UNION FLAG HOT WATER BOTTLE         6   
4    536365    84029E       RED WOOLLY HOTTIE WHITE HEART.         6   

      InvoiceDate  UnitPrice  CustomerID         Country  
0  12/1/2010 8:26       2.55     17850.0  United Kingdom  
1  12/1/2010 8:26       3.39     17850.0  United Kingdom  
2  12/1/2010 8:26       2.75     17850.0  United Kingdom  
3  12/1/2010 8:26       3.39     17850.0  United Kingdom  
4  12/1/2010 8:26       3.39     17850.0  United Kingdom  
  InvoiceNo StockCode                          Description  Quantity  \
0    536365    85123A   WHITE HANGING HEART T-LIGHT HOLDER         6   
1    536365     71053                  WHITE METAL LANTERN         6 

In [40]:
# Agregar Validaciones
def validate_data(df):
    required_columns = ['InvoiceNo', 'StockCode', 'Description', 'Quantity', 'InvoiceDate', 'UnitPrice', 'CustomerID', 'Country']
    for col in required_columns:
        if col not in df.columns:
            raise ValueError(f"Missing required column: {col}")
    if df.empty:
        raise ValueError("The DataFrame is empty.")
    return True

In [41]:
# Registrar Logs
import logging

logging.basicConfig(level=logging.INFO)

def extract():
    logging.info("Extrayendo datos del archivo CSV...")
    file_path = r'D:\Python\Proyectos\3. E - commerce\e-commerce.csv'
    df = pd.read_csv(file_path, encoding='latin1')
    logging.info(f"Datos extraídos: {len(df)} filas.")
    return df

def transform():
    logging.info("Transformando los datos...")
    df = extract()
    df['TotalPrice'] = df['Quantity'] * df['UnitPrice']
    df['Description'] = df['Description'].fillna('Descripción no disponible')
    df = df.dropna(subset=['CustomerID'])
    df['CustomerID'] = df['CustomerID'].astype(int)
    validate_data(df)
    logging.info("Transformación completa.")
    return df

def load():
    logging.info("Cargando datos transformados al archivo CSV limpio...")
    df = transform()
    output_path = r'D:\Python\Proyectos\3. E - commerce\e-commerce_clean.csv'
    df.to_csv(output_path, index=False)
    logging.info(f"Archivo guardado en: {output_path}")

In [45]:
# Obtener rutas desde las variables
from airflow.models import Variable

def extract():
    file_path = Variable.get("input_file_path")
    return pd.read_csv(file_path, encoding='latin1')

def load():
    df = transform()
    output_path = Variable.get("output_file_path")
    df.to_csv(output_path, index=False)

In [46]:
# Actualiza y verifica el DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import logging

# DAG Configuración
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 12, 1),
    'retries': 1,
}

with DAG('ecommerce_etl',
         default_args=default_args,
         schedule_interval='@daily',
         catchup=False) as dag:

    extract_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract
    )

    transform_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform
    )

    load_task = PythonOperator(
        task_id='load_data',
        python_callable=load
    )

    extract_task >> transform_task >> load_task

In [47]:
# Depuración Adicional
# Prueba las funciones directamente
df = extract()
print(df.head())

df_transformed = transform()
print(df_transformed.head())

load()
print("Proceso ETL completado manualmente.")

  InvoiceNo StockCode                          Description  Quantity  \
0    536365    85123A   WHITE HANGING HEART T-LIGHT HOLDER         6   
1    536365     71053                  WHITE METAL LANTERN         6   
2    536365    84406B       CREAM CUPID HEARTS COAT HANGER         8   
3    536365    84029G  KNITTED UNION FLAG HOT WATER BOTTLE         6   
4    536365    84029E       RED WOOLLY HOTTIE WHITE HEART.         6   

      InvoiceDate  UnitPrice  CustomerID         Country  
0  12/1/2010 8:26       2.55     17850.0  United Kingdom  
1  12/1/2010 8:26       3.39     17850.0  United Kingdom  
2  12/1/2010 8:26       2.75     17850.0  United Kingdom  
3  12/1/2010 8:26       3.39     17850.0  United Kingdom  
4  12/1/2010 8:26       3.39     17850.0  United Kingdom  
[[34m2024-12-22T17:48:40.096-0400[0m] {[34m376013929.py:[0m14} INFO[0m - Transformando los datos...[0m
[[34m2024-12-22T17:48:40.685-0400[0m] {[34m376013929.py:[0m21} INFO[0m - Transformación completa.[

KeyError: 'Variable output_file_path does not exist'

# Este fue mi primer trabajo configurando una tuberia y usando airflow, realmente me consto un monton y no se si lo hice bien, pero por lo menos lo intente. A partir de este momento sera mi objetivo aprender estas librerias