
# 🧩 Integración de tus módulos en el pipeline (Metaflow + MLflow)

Este notebook usa **tus archivos** (`data_ingestion.py`, `data_preprocessing.py`, `train.py`, `collision_flow.py`) dentro de la estructura del proyecto.  
Podés correr **paso a paso** (ingesta → preprocesamiento → entrenamiento) o lanzar el **flujo completo de Metaflow**.



## 0) Verificación de estructura y dependencias


In [9]:

from pathlib import Path
BASE = Path(r"collision-model-project")
for p in [BASE/'src', BASE/'flows', BASE/'data'/'raw', BASE/'data'/'processed', BASE/'mlruns']:
    p.mkdir(parents=True, exist_ok=True)

print("Base:", BASE)
print("Contenido de src/:", list((BASE/'src').iterdir()))
print("Contenido de flows/:", list((BASE/'flows').iterdir()))


Base: collision-model-project
Contenido de src/: [WindowsPath('collision-model-project/src/data_ingestion.py'), WindowsPath('collision-model-project/src/data_preprocessing.py'), WindowsPath('collision-model-project/src/train.py'), WindowsPath('collision-model-project/src/__init__.py')]
Contenido de flows/: [WindowsPath('collision-model-project/flows/collision_flow.py')]



## 1) Ingesta con tu módulo (`src/data_ingestion.py`)

Esta celda importa y ejecuta `ingest_collision_data()` para descargar y guardar datos crudos en `data/raw/la_collisions_raw.csv`.


In [11]:

from importlib import reload
import sys
sys.path.append(str(BASE))

from src import data_ingestion
reload(data_ingestion)

df_raw = data_ingestion.ingest_collision_data()
print(df_raw.shape)
df_raw.head()


Starting data retrieval...
✅ Partially ingested 50000 rows (Total: 50000)
✅ Partially ingested 50000 rows (Total: 100000)
✅ Partially ingested 50000 rows (Total: 150000)
✅ Partially ingested 50000 rows (Total: 200000)
✅ Partially ingested 50000 rows (Total: 250000)
✅ Partially ingested 50000 rows (Total: 300000)
✅ Partially ingested 50000 rows (Total: 350000)
✅ Partially ingested 50000 rows (Total: 400000)
✅ Partially ingested 50000 rows (Total: 450000)
✅ Partially ingested 50000 rows (Total: 500000)
✅ Partially ingested 50000 rows (Total: 550000)
✅ Partially ingested 50000 rows (Total: 600000)
✅ Partially ingested 21677 rows (Total: 621677)
(621677, 26)


Unnamed: 0,dr_no,date_rptd,date_occ,time_occ,area,area_name,rpt_dist_no,crm_cd,crm_cd_desc,mocodes,...,cross_street,:@computed_region_qz3q_ghft,:@computed_region_k96s_3jcv,:@computed_region_tatf_ua23,:@computed_region_ur2y_g4cx,:@computed_region_kqwf_mjcx,:@computed_region_2dna_qi2s,location_1.latitude,location_1.longitude,location_1.human_address
0,212013850,2021-09-03T00:00:00.000,2021-09-02T00:00:00.000,2335,20,Olympic,2021,997,TRAFFIC COLLISION,3004 3027 3034 4027 3036 3101 3401 3701,...,6TH ST,22722,588,875,36.0,7,86,34.063,-118.3141,"{""address"": """", ""city"": """", ""state"": """", ""zip""..."
1,221417787,2022-10-17T00:00:00.000,2022-10-17T00:00:00.000,1620,14,Pacific,1406,997,TRAFFIC COLLISION,4027 3011 3028 3034 3037 3101 3401 3701,...,MOTOR AV,23451,881,1358,9.0,6,74,34.029,-118.4113,"{""address"": """", ""city"": """", ""state"": """", ""zip""..."
2,221418141,2022-10-26T00:00:00.000,2022-10-26T00:00:00.000,1135,14,Pacific,1434,997,TRAFFIC COLLISION,4027 3011 3025 3034 3037 3101 3401 3701,...,ROSEWOOD AV,24031,891,855,10.0,10,27,34.0052,-118.4478,"{""address"": """", ""city"": """", ""state"": """", ""zip""..."
3,222017859,2022-12-01T00:00:00.000,2022-12-01T00:00:00.000,230,20,Olympic,2044,997,TRAFFIC COLLISION,3003 0913 3026 3035 3037 3101 3401 3701 4020,...,SAN MARINO ST,22723,607,647,,12,89,34.0545,-118.3009,"{""address"": """", ""city"": """", ""state"": """", ""zip""..."
4,190319651,2019-08-24T00:00:00.000,2019-08-24T00:00:00.000,450,3,Southwest,356,997,TRAFFIC COLLISION,3036 3004 3026 3101 4003,...,NORMANDIE AV,22724,691,916,7.0,14,32,34.0255,-118.3002,"{""address"": """", ""city"": """", ""state"": """", ""zip""..."



## 2) Preprocesamiento con tu módulo (`src/data_preprocessing.py`)

Toma el DataFrame crudo y genera `data/processed/processed_data.csv` con las features temporales (lags, rolling, etc.).


In [15]:

from src import data_preprocessing
reload(data_preprocessing)

df_processed = data_preprocessing.preprocess_data(df_raw)
print(df_processed.shape)
df_processed.head()


(5271, 8)


Unnamed: 0,date_occ,collision_count,dayofyear,weekday,year,lag_1,lag_2,rolling_mean_3
0,2010-01-03,11,3,6,2010,5.0,9.0,8.333333
1,2010-01-04,7,4,0,2010,11.0,5.0,7.666667
2,2010-01-05,3,5,1,2010,7.0,11.0,7.0
3,2010-01-06,5,6,2,2010,3.0,7.0,5.0
4,2010-01-08,7,8,4,2010,5.0,3.0,5.0



## 3) Entrenamiento + MLflow con tu módulo (`src/train.py`)

Entrena un **RandomForestRegressor** y registra métricas/artefactos en **MLflow** (experimento `Collision_Prediction`).  
La ruta de tracking se configura localmente en `mlruns/`.


In [31]:
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, mean_squared_error, r2_score

# 📦 pandas
import pandas as pd

# 📦 numpy
import numpy as np

# 📦 matplotlib
import matplotlib.pyplot as plt

In [33]:
!pip install mlflow metaflow



In [35]:
import os
os.environ["METAFLOW_DEFAULT_STEP_DECORATORS"] = "[]"
os.environ["METAFLOW_DEFAULT_METADATA"] = "local"

In [37]:
import mlflow
import metaflow
print("MLflow version:", mlflow.__version__)
print("Metaflow version:", metaflow.__version__)

ValueError: Cannot locate step_decorator plugin 'batch' at 'metaflow.plugins.aws.batch.batch_decorator'

In [39]:

from src import train
from importlib import reload
reload(train)

model = train.train_model()
model


2025/08/24 21:10:59 INFO mlflow.tracking.fluent: Experiment with name 'Collision_Prediction' does not exist. Creating a new experiment.


Model trained with MSE: 0.17528229023004277, R2: 0.8914582620268914



## 4) Orquestación completa con Metaflow (`flows/collision_flow.py`)

Para ejecutar el **pipeline completo** desde la terminal, ubicándote en la carpeta base del proyecto:

```bash
python flows/collision_flow.py run
```

> Si preferís lanzarlo desde aquí, podés descomentar la celda siguiente.


In [45]:

#OPCIONAL: ejecutar flujo desde notebook (requiere internet para ingesta)
import subprocess, shlex
cmd = "python flows/collision_flow.py run"
print("Ejecutando:", cmd)
subprocess.run(shlex.split(cmd), cwd=str(BASE))
print("Listo para ejecutar el flujo desde terminal.")


Ejecutando: python flows/collision_flow.py run
Listo para ejecutar el flujo desde terminal.


In [49]:
import os
print(os.getcwd())  # ruta actual

C:\Users\User\CEIA\MLOps



### 5) UI de MLflow local

Abrí la interfaz de MLflow para inspeccionar corridas, métricas y artefactos:

```bash
mlflow ui --backend-store-uri mlruns/
```
y luego navegá a `http://localhost:5000`.
