# Análisis y Estimación de Costos (Gold)  
## Dataknow – Estimación de Costos para Equipo de Construcción

---

Este notebook realiza el **análisis estadístico descriptivo** de los precios de las materias primas **X, Y y Z**, así como la **estimación de costos** de los equipos del proyecto, aplicando las **reglas de negocio definidas**
1. **Equipo 1:** Composición del Precio: 20% la materia prima **X** y el resto por la
materia prima **Y**
2. **Equipo 2:** El precio del equipo 2 está conformado por partes iguales de las
materias primas **X**, **Y** y **Z**.
---

### Alcance del análisis

- Estadística descriptiva:
  - Promedios, medianas, rangos y desviaciones estándar por materia prima.
- Análisis temporal:
  - Identificación de tendencias generales y posibles patrones estacionales.
- Visualización:
  - Series temporales de precios.
  - Histogramas de distribución por insumo.
- Estimación de costos:
  - Cálculo del costo del **Equipo 1** y **Equipo 2** según las reglas de negocio.

---

### Información del proyecto

- **Equipo:** Dataknow  
- **Data Scientist:** Juan Pablo Guerra Osorio


In [0]:
%run ../configs/utils/params

In [0]:
# Cambiamos el directorio de trabajo
os.chdir(directorio_proceso["workspace"])

# Añadimos el directorio al PYTHONPATH
sys.path.append(directorio_proceso["workspace"])

In [0]:
from configs.utils.functions import *

In [0]:
# Carga de las tablas traidas desde el UC 
gold_x = spark.table("dataknow_dll.construccion.silver_x")
gold_y = spark.table("dataknow_dll.construccion.silver_y")
gold_z = spark.table("dataknow_dll.construccion.silver_z")

## Analisis descriptivo basicio para cada tabla
---

In [0]:
# Vamos a hacer un loop para mostrar el .describe() de cada tabla
for df, materia in zip([gold_x, gold_y, gold_z], ['X', 'Y', 'Z']):
    print(f'Describe para materia prima: {materia}:')
    display(df.describe())

## Análisis Estadístico de Materias Primas (X, Y, Z)

### **1. Materia Prima X**
- **Promedio:** 51.32 | **Desviación estándar:** 32.99
- **Mínimo - Máximo:** 9.64 - 146.08
- **Observaciones:** 9144
- **Insight:** Precios bajos y alta variabilidad. (20% en el Equipo 1).

---

### **2. Materia Prima Y**
- **Promedio:** 565.46 | **Desviación estándar:** 145.15
- **Mínimo - Máximo:** 257.5 - 1062.37
- **Observaciones:** 4485
- **Insight:** Materia prima clave en el costo total del Equipo 1 (80%). Su precio, sigue mostrando picos importantes que podrían impactar presupuestos.

---

### **3. Materia Prima Z**
- **Promedio:** 2037.08 | **Desviación estándar:** 372.97
- **Mínimo - Máximo:** 1421.5 - 3984.0
- **Observaciones:** 3565
- **Insight:** Es el insumo más costoso con precios extremos. Su influencia es directa en el Equipo 2, aumentando la sensibilidad de costos.

---

### **Comparativa de Materias Primas**

| **Materia** | **Promedio (mean)** | **Desviación estándar (stddev)** | **Mínimo (min)** | **Máximo (max)** | **Observaciones (count)** |
|-------------|---------------------|----------------------------------|------------------|------------------|---------------------------|
| **X**       | 51.32              | 32.99                           | 9.64            | 146.08          | 9144                      |
| **Y**       | 565.46            | 145.15                          | 257.5           | 1062.37         | 4485                      |
| **Z**       | 2037.08           | 372.97                          | 1421.5          | 3984.0          | 3565                      |

---

### **Conclusiones**
1. **Equipo 1:** Altamente dependiente de Y; se debe estabilizar su suministro para evitar sobrecostos.
2. **Equipo 2:** Z presenta riesgos altos por su elevado costo promedio y rango máximo.

In [0]:
# Calcularemos costos promedios actualos de los equipos con los datos proporcionados en los archivos .csv
equipo_1_inner = (gold_x
    .alias("tabla_x")
    .join(gold_y.alias("tabla_y"), on="Date", how="inner").select(F.col("Date"),(F.col("tabla_x.Price") * 0.2 + F.col("tabla_y.Price") * 0.8).alias("Costo_equipo_1"))
)
equipo_2_inner = (
    gold_x.alias("tabla_x")
    .join(gold_y.alias("tabla_y"), on="Date", how="inner")
    .join(gold_z.alias("tabla_z"), on="Date", how="inner")
    .select(
        F.col("Date"),
        (
            (F.col("tabla_x.Price") * 0.3333) +
            (F.col("tabla_y.Price") * 0.3333) +
            (F.col("tabla_z.Price") * 0.3333)
        ).alias("Costo_equipo_2")
    )
)

In [0]:
display(equipo_2_inner)

In [0]:
%pip install prophet

In [0]:
import mlflow
import mlflow.pyfunc
from mlflow.models.signature import infer_signature
from prophet import Prophet
import pandas as pd

class ProphetWrapper(mlflow.pyfunc.PythonModel):
    def __init__(self, model):
        self.model = model

    def predict(self, context, model_input):
        return self.model.predict(model_input)

df = equipo_1_inner.toPandas().rename(
    columns={"Date": "ds", "Costo_equipo_1": "y"}
)

model = Prophet()
with mlflow.start_run(run_name="Equipo_1_36_meses") as run:
    model.fit(df)
    future = model.make_future_dataframe(periods=1095, freq='D')
    forecast = model.predict(future)

    
    signature = infer_signature(df, model.predict(df))

    mlflow.log_metric("rmse", model.params["sigma_obs"][0])
    mlflow.log_metric("mae", model.params["sigma_obs"][0])

    fig = model.plot(forecast)
    fig.savefig("forecast.png")
    mlflow.log_artifact("forecast.png")

    mlflow.pyfunc.log_model(
        artifact_path="model",
        python_model=ProphetWrapper(model),
        signature=signature,
        registered_model_name="dataknow_dll.construccion.ProphetEquipo1"
    )
    print(f"Run con MLflow finalizado. ID de ejecución: {run.info.run_id}")

In [0]:
display(forecast)

In [0]:
df_inner = equipo_1_inner.toPandas().rename(columns={"Date": "ds", "Costo_equipo_1": "y"})
df_inner_2 = equipo_2_inner.toPandas().rename(columns={"Date": "ds", "Costo_equipo_2": "y"})

# Entrenar el modelo Prophet con los datos históricos
model_inner = Prophet()
model_inner.fit(df_inner)

# Crear hasta 1095 períodos diarios después de la última fecha
future = model_inner.make_future_dataframe(periods=1095, freq='D') 
forecast_inner = model_inner.predict(future)

# Mostrar resultados
forecast_inner = forecast_inner[["ds", "trend", "yhat", "yhat_lower", "yhat_upper"]]

#--------------------------------------------------------------------------------------
# Entrenar el modelo Prophet con los datos del Equipo 2
model_inner_2 = Prophet()
model_inner_2.fit(df_inner_2)

# Generar fechas futuras para los próximos 36 meses (con granularidad diaria)
future_2 = model_inner_2.make_future_dataframe(periods=1095, freq="D")

# Generar predicciones
forecast_inner_2 = model_inner_2.predict(future_2)

# Mostrar las predicciones
forecast_inner_2_resultados = forecast_inner_2[["ds", "trend", "yhat", "yhat_lower", "yhat_upper"]]

In [0]:
# Limpiamos nuestra tabla de resultados
forecast_equipo_1 = spark.createDataFrame(forecast_inner[["ds", "trend","yhat", "yhat_lower", "yhat_upper"]])
forecast_equipo_2 = spark.createDataFrame(forecast_inner_2_resultados[["ds", "trend", "yhat", "yhat_lower", "yhat_upper"]])
    

In [0]:
forecast_equipo_1 = (
    forecast_equipo_1
    .withColumn("ds", F.to_date(F.col("ds"), "MM-dd-yyyy"))
    .withColumn("trend", F.col("trend").cast("double"))
    .withColumn("yhat_lower", F.col("yhat_lower").cast("double"))
    .withColumn("yhat_upper", F.col("yhat_upper").cast("double"))
    .withColumn("yhat", F.col("yhat").cast("double"))
    .withColumnRenamed("ds", "Date")
)
forecast_equipo_2 = (
    forecast_equipo_2
    .withColumn("ds", F.to_date(F.col("ds"), "MM-dd-yyyy"))
    .withColumn("trend", F.col("trend").cast("double"))
    .withColumn("yhat_lower", F.col("yhat_lower").cast("double"))
    .withColumn("yhat_upper", F.col("yhat_upper").cast("double"))
    .withColumn("yhat", F.col("yhat").cast("double"))
    .withColumnRenamed("ds", "Date")
)

In [0]:
# Obtener la fecha más reciente del historial (desde Spark)
fecha_inicio_prediccion_1 = equipo_1_inner.agg(F.max("Date")).collect()[0][0]
fecha_inicio_prediccion_2 = equipo_2_inner.agg(F.max("Date")).collect()[0][0]

In [0]:
display(forecast_equipo_1.filter(F.col("Date") >= fecha_inicio_prediccion_1).orderBy(F.col("Date")))
display(forecast_equipo_2.filter(F.col("Date") >= fecha_inicio_prediccion_2).orderBy(F.col("Date")))

In [0]:
# Vamos a ver las metricas de evaluacion de nuestro modelo
y_true = df_inner.y.values
y_pred = model_inner.predict(df_inner).yhat.values

mae = sk.metrics.mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(sk.metrics.mean_squared_error(y_true, y_pred))
mape = sk.metrics.mean_absolute_percentage_error(y_true, y_pred)
print(f"MAE equipo 1: {mae}")
print(f"RMSE  equipo 1: {rmse}")
print(f"MAPE  equipo 1: {mape}")

#--------------------------------------------------------------------------------------
# Vamos a ver las metricas de evaluacion de nuestro modelo
y_true = df_inner_2.y.values
y_pred = model_inner_2.predict(df_inner_2).yhat.values

mae = sk.metrics.mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(sk.metrics.mean_squared_error(y_true, y_pred))
mape = sk.metrics.mean_absolute_percentage_error(y_true, y_pred)
print('--'* 30)
print(f"MAE equipo 2: {mae}")
print(f"RMSE  equipo 2: {rmse}")
print(f"MAPE  equipo 2: {mape}")

