# Prueba Técnica Data Engineer - Global Mobility Apex

**Origen:** Este challenge fue enviado por Sebastián Rincon Agredo (jrincon@apexglobal.co) a Ronald Castillo Capino el 6 de marzo de 2025.

**Contexto:**

- Se trata de la prueba técnica para la vacante de Data Engineer en Global Mobility Apex.
- Los entregables deberán ser enviados por Get on Board.

Esta prueba consiste en desarrollar un dashboard de ventas que incluya:

- Procesamiento y limpieza de datos de ventas a partir de un CSV con algunas irregularidades.
- Transformaciones y cálculos de métricas agregadas (precio promedio, ingreso total, día de mayor venta, etc.).
- Detección de outliers basados en la cantidad de cada transacción.
- Almacenamiento de los datos procesados en una base de datos SQLite con tablas separadas para transacciones, métricas agregadas y outliers.
- Un API RESTful (utilizando FastAPI) que exponga endpoints para consultar:
  - Ventas totales por producto (con filtros opcionales por nombre de producto o categoría).
  - Ventas totales por día (con opción de filtrar por rango de fechas).
  - Métricas agregadas por categoría.
  - Transacciones marcadas como outliers.

A continuación se presenta el código completo que cumple con estos requerimientos.

In [None]:
# Importar librerías necesarias
import pandas as pd
import numpy as np
import sqlite3
from datetime import datetime
import os

# Para la parte del API
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import JSONResponse
import uvicorn

from pydantic import BaseModel
from typing import Optional, List

print('Librerías importadas correctamente.')

## 1. Procesamiento y Limpieza de Datos

En esta sección se realiza la ingesta y limpieza del archivo CSV que contiene los datos de ventas. Se deben considerar los siguientes puntos:

- **Valores faltantes en `quantity`**: Se reemplazan por 0.
- **Valores inválidos en `price`**: Los valores que no se puedan convertir a número (por ejemplo, "not_a_number") se reemplazarán por la mediana del precio de la misma categoría.
- Se calcula la columna `total_sales` (multiplicando `quantity` y `price`).
- Se crea la columna `day_of_week` a partir de la fecha, y un flag `high_volume` que marca como `True` aquellas transacciones con `quantity` mayor a 10.

Estos pasos aseguran que la información esté limpia y lista para las transformaciones posteriores.

In [None]:
from io import StringIO

# Simular el CSV usando un string (en un caso real, se leería de un archivo, por ejemplo, pd.read_csv('sales_data.csv'))
csv_data = '''transaction_id,date,category,product,quantity,price
1,2024-07-01,Widget,Widget-A,10,9.99
2,2024-07-01,Gadget,Gadget-X,5,19.99
3,2024-07-02,Widget,Widget-B,7,9.99
4,2024-07-02,Doodad,Doodad-1,,4.99
5,2024-07-03,Widget,Widget-C,3,9.99
6,2024-07-03,Gadget,Gadget-Y,8,19.99
7,2024-07-04,Widget,Widget-A,2,9.99
8,2024-07-04,Doodad,Doodad-2,4,not_a_number
9,2024-07-05,Widget,Widget-B,6,9.99
10,2024-07-05,Gadget,Gadget-X,3,19.99
11,2024-07-06,Gadget,,5,19.99
12,2024-07-06,Doodad,Doodad-3,1,4.99
13,2024-07-07,Widget,Widget-C,8,9.99
14,2024-07-07,Gadget,Gadget-Y,4,19.99
15,2024-07-08,Widget,Widget-A,3,9.99
16,2024-07-08,Doodad,Doodad-1,2,4.99
17,2024-07-09,Gadget,Gadget-X,,19.99
18,2024-07-09,Widget,Widget-B,5,9.99
19,2024-07-10,Doodad,Doodad-2,4,4.99
20,2024-07-10,Gadget,Gadget-Y,7,19.99
21,2024-07-11,Widget,Widget-C,6,9.99
22,2024-07-11,Doodad,Doodad-3,3,4.99
23,2024-07-12,Gadget,Gadget-X,9,19.99
24,2024-07-12,Widget,Widget-A,1,9.99
25,2024-07-13,Doodad,Doodad-1,2,4.99
26,2024-07-13,Gadget,Gadget-Y,5,19.99
27,2024-07-14,Widget,Widget-B,,9.99
28,2024-07-14,Doodad,Doodad-2,4,4.99
29,2024-07-15,Gadget,Gadget-X,7,19.99
30,2024-07-15,Widget,Widget-C,3,9.99
31,2024-07-16,Doodad,Doodad-3,1,4.99
32,2024-07-16,Gadget,Gadget-Y,5,not_a_number
33,2024-07-17,Widget,Widget-A,8,9.99
34,2024-07-17,Doodad,Doodad-1,2,4.99
35,2024-07-18,Gadget,Gadget-X,6,19.99
36,2024-07-18,Widget,Widget-B,4,9.99
37,2024-07-19,Doodad,Doodad-2,3,4.99
38,2024-07-19,Gadget,Gadget-Y,2,19.99
39,2024-07-20,Widget,Widget-C,5,9.99
40,2024-07-20,Doodad,Doodad-3,,4.99
41,2024-07-21,Gadget,Gadget-X,7,19.99
42,2024-07-21,Widget,Widget-A,3,9.99
43,2024-07-22,Doodad,Doodad-1,2,4.99
44,2024-07-22,Gadget,Gadget-Y,6,19.99
45,2024-07-23,Widget,Widget-B,7,9.99
46,2024-07-23,Doodad,Doodad-2,3,4.99
47,2024-07-24,Gadget,Gadget-X,,19.99
48,2024-07-24,Widget,Widget-C,5,9.99
49,2024-07-25,Doodad,Doodad-3,4,4.99
50,2024-07-25,Gadget,Gadget-Y,8,19.99
''' 

# Leer el CSV
df = pd.read_csv(StringIO(csv_data))
print('Datos originales:')
print(df.head(10))

# Reemplazar valores faltantes en 'quantity' por 0
df['quantity'] = df['quantity'].fillna(0)

# Convertir 'quantity' a numérico
df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce').fillna(0).astype(int)

# Convertir 'price' a numérico; los errores se convierten a NaN
df['price'] = pd.to_numeric(df['price'], errors='coerce')

# Función para reemplazar precios inválidos por la mediana del grupo
def replace_invalid_prices(group):
    median_price = group['price'].median()
    group['price'] = group['price'].fillna(median_price)
    return group

df = df.groupby('category', group_keys=False).apply(replace_invalid_prices)

# Calcular la columna total_sales
df['total_sales'] = df['quantity'] * df['price']

# Convertir la columna 'date' a tipo datetime y extraer el día de la semana
df['date'] = pd.to_datetime(df['date'])
df['day_of_week'] = df['date'].dt.day_name()

# Flag high_volume: True si quantity > 10
df['high_volume'] = df['quantity'] > 10

print('\nDatos procesados:')
print(df.head(10))

## 2. Transformaciones Complejas y Cálculo de Métricas Agregadas

Se realizan las siguientes transformaciones:

- **Métricas Agregadas:**
  - Precio promedio por producto (media de `price`) en cada categoría.
  - Ingreso total (suma de `total_sales`) por categoría.
  - Día de la semana con mayor total de ventas por categoría.

- **Detección de Outliers:**
  - Se identifican aquellas transacciones cuyo `quantity` es mayor a 2 desviaciones estándar respecto de la media de la categoría. Se marca con un flag `outlier`.

Esto permite obtener una visión más completa del desempeño de cada categoría.

In [None]:
# Agrupación por categoría para obtener métricas
category_group = df.groupby('category')

agg_metrics = category_group.agg(
    avg_price=('price', 'mean'),
    total_revenue=('total_sales', 'sum')
).reset_index()

# Para el día con mayor ventas por categoría
day_sales = df.groupby(['category', 'day_of_week'])['total_sales'].sum().reset_index()

# Encontrar el día con el máximo total de ventas por categoría
max_day = day_sales.loc[day_sales.groupby('category')['total_sales'].idxmax()].rename(columns={'day_of_week': 'top_day'})

agg_metrics = pd.merge(agg_metrics, max_day[['category', 'top_day']], on='category')

print('Métricas Agregadas por Categoría:')
print(agg_metrics)

# Detección de outliers en 'quantity' por categoría
def detect_outliers(group):
    mean_q = group['quantity'].mean()
    std_q = group['quantity'].std()
    group['outlier'] = (group['quantity'] > (mean_q + 2 * std_q))
    return group

df = df.groupby('category', group_keys=False).apply(detect_outliers)

outliers = df[df['outlier']]

print('\nOutliers detectados:')
print(outliers)

## 3. Almacenamiento en SQLite

Se crean tres tablas en la base de datos SQLite:

- **transactions:** Datos de las transacciones procesadas.
- **aggregated_metrics:** Métricas agregadas por categoría.
- **outliers:** Transacciones marcadas como outliers.

Esto permite almacenar la información para consultas eficientes y servirla a través del API.

In [None]:
# Crear/conectar a la base de datos SQLite
db_name = 'sales_dashboard.db'
conn = sqlite3.connect(db_name)

# Guardar las tablas en la base de datos
df.to_sql('transactions', conn, if_exists='replace', index=False)
agg_metrics.to_sql('aggregated_metrics', conn, if_exists='replace', index=False)
outliers.to_sql('outliers', conn, if_exists='replace', index=False)

print(f'Datos guardados en la base de datos {db_name}')

# Cerrar la conexión (se reabrirá en el API si es necesario)
conn.close()

## 4. Backend: API RESTful con FastAPI

En esta sección se implementa un API RESTful que expone los siguientes endpoints:

1. **GET /sales/product**: Retorna el total de ventas por producto. Se puede filtrar opcionalmente por nombre de producto o categoría.
2. **GET /sales/day**: Retorna el total de ventas por día, con opción de filtrar por un rango de fechas.
3. **GET /sales/category**: Retorna las métricas agregadas por categoría (total revenue, average price, día con mayor ventas).
4. **GET /sales/outliers**: Retorna las transacciones marcadas como outliers.

El API se conecta a la base de datos SQLite creada previamente para consultar la información.

In [None]:
app = FastAPI(title='Sales Dashboard API')

DATABASE = 'sales_dashboard.db'

def get_db_connection():
    conn = sqlite3.connect(DATABASE)
    conn.row_factory = sqlite3.Row
    return conn

@app.get('/sales/product')
def get_sales_by_product(product: Optional[str] = None, category: Optional[str] = None):
    conn = get_db_connection()
    query = "SELECT product, SUM(total_sales) as total_sales FROM transactions"
    conditions = []
    params = []
    if product:
        conditions.append("product = ?")
        params.append(product)
    if category:
        conditions.append("category = ?")
        params.append(category)
    if conditions:
        query += " WHERE " + " AND ".join(conditions)
    query += " GROUP BY product"
    
    cur = conn.execute(query, params)
    rows = cur.fetchall()
    conn.close()
    
    results = [dict(row) for row in rows]
    return JSONResponse(content=results)


@app.get('/sales/day')
def get_sales_by_day(start_date: Optional[str] = None, end_date: Optional[str] = None):
    conn = get_db_connection()
    query = "SELECT date(date) as date, SUM(total_sales) as total_sales FROM transactions"
    conditions = []
    params = []
    if start_date:
        conditions.append("date >= ?")
        params.append(start_date)
    if end_date:
        conditions.append("date <= ?")
        params.append(end_date)
    if conditions:
        query += " WHERE " + " AND ".join(conditions)
    query += " GROUP BY date"
    
    cur = conn.execute(query, params)
    rows = cur.fetchall()
    conn.close()
    results = [dict(row) for row in rows]
    return JSONResponse(content=results)


@app.get('/sales/category')
def get_sales_by_category():
    conn = get_db_connection()
    query = "SELECT * FROM aggregated_metrics"
    cur = conn.execute(query)
    rows = cur.fetchall()
    conn.close()
    results = [dict(row) for row in rows]
    return JSONResponse(content=results)


@app.get('/sales/outliers')
def get_outliers():
    conn = get_db_connection()
    query = "SELECT * FROM outliers"
    cur = conn.execute(query)
    rows = cur.fetchall()
    conn.close()
    results = [dict(row) for row in rows]
    return JSONResponse(content=results)


### Ejecución y Prueba del API

Para ejecutar el API desde el notebook sin provocar errores de asyncio, se utiliza la librería `nest_asyncio`.

1. Asegúrate de instalar `nest_asyncio`:
   ```bash
   pip install nest_asyncio
   ```
2. Ejecuta la celda siguiente para iniciar el servidor en `http://localhost:8000`.

También puedes ejecutar el script fuera del notebook en modo terminal si lo prefieres.

In [None]:
# Aplicar nest_asyncio para evitar conflictos con el loop de eventos en Jupyter
import nest_asyncio
nest_asyncio.apply()

print('Iniciando el servidor FastAPI en http://localhost:8000')
uvicorn.run(app, host='0.0.0.0', port=8000)