In [1]:
pip install pandas numpy scikit-learn joblib

Note: you may need to restart the kernel to use updated packages.


# 1) Pipeline pandas: clusterización y preparación de datos para entrenamiento

In [1]:
# ------------------ Imports ------------------
import pandas as pd
import numpy as np
import os
import joblib
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score
from sklearn.impute import SimpleImputer

# ------------------ Parâmetros ------------------
file_path    = "data_cleaned.csv"    # CSV de entrada (output do pipeline anterior)
output_dir   = "refined_models"      # onde salvar kmeans_model.joblib e scaler.joblib
num_clusters = 5                     # número de clusters
random_state = 42                    # semente para reprodutibilidade

# ------------------ 1. Ingestão de dados ------------------
data = pd.read_csv(file_path)
required = ['Presion', 'Temperatura', 'Volumen', 'Cliente']
missing  = [c for c in required if c not in data.columns]
if missing:
    raise ValueError(f"Faltam colunas essenciais: {missing}")

# converter colunas numéricas e dropar linhas inválidas
for c in ['Presion', 'Temperatura', 'Volumen']:
    data[c] = pd.to_numeric(data[c], errors='coerce')
before = len(data)
data.dropna(subset=['Presion', 'Temperatura', 'Volumen'], inplace=True)
after = len(data)
print(f"[1] Ingestão: {before} → {after} registros válidos")

# ------------------ 2. Clusterização de clientes ------------------
# 2.1 Resumo médio por cliente
summary = data.groupby('Cliente')[['Presion','Temperatura','Volumen']].mean()

# 2.2 Escalonamento
scaler   = StandardScaler()
X_scaled = scaler.fit_transform(summary)

# 2.3 Imputação de NaNs, se houver
if np.isnan(X_scaled).any():
    imputer  = SimpleImputer(strategy='mean')
    X_scaled = imputer.fit_transform(X_scaled)

# 2.4 KMeans
kmeans = KMeans(n_clusters=num_clusters, random_state=random_state, n_init=10)
labels = kmeans.fit_predict(X_scaled)
summary['cluster'] = labels

# 2.5 Avaliação com Silhouette
sil = silhouette_score(X_scaled, labels)
print(f"[2] Silhouette score: {sil:.3f}")

# 2.6 Salvando modelos
os.makedirs(output_dir, exist_ok=True)
joblib.dump(kmeans, os.path.join(output_dir, 'kmeans_model.joblib'))
joblib.dump(scaler, os.path.join(output_dir, 'scaler.joblib'))
print(f"[2] Modelos salvos em '{output_dir}'")

# ------------------ 3. Preparação dos dados para treinamento ------------------
# 3.1 Merge dos clusters no dataset original
df_clusters = summary.reset_index()[['Cliente','cluster']]
data = data.merge(df_clusters, on='Cliente', how='left')

# 3.2 Remover registros sem cluster
b = len(data)
data.dropna(subset=['cluster'], inplace=True)
a = len(data)
print(f"[3] Eliminados {b-a} registros sem cluster")
data['cluster'] = data['cluster'].astype(int)

# 3.3 Cálculo de Anomalia_Proxy_Cluster
data['Anomalia_Proxy_Cluster'] = 0
for cid in range(num_clusters):
    mask = data['cluster'] == cid
    sub  = data.loc[mask, ['Presion','Temperatura','Volumen']]
    if sub.empty:
        continue
    flags = pd.DataFrame(index=sub.index)
    for feat in ['Presion','Temperatura','Volumen']:
        p10, p90 = sub[feat].quantile([0.10, 0.90])
        flags[feat] = sub[feat].apply(lambda x: 1 if x < p10 or x > p90 else 0)
    data.loc[mask, 'Anomalia_Proxy_Cluster'] = flags.max(axis=1)
print("[3] Coluna 'Anomalia_Proxy_Cluster' calculada")

# ------------------ 3.4 Renomear colunas para o formato final ------------------
data.rename(columns={
    'Cliente': 'cliente',
    'Fecha': 'fecha',                 # caso exista a coluna Fecha
    'Presion': 'presion',
    'Temperatura': 'temperatura',
    'Volumen': 'volumen',
    'Mes': 'mes',
    'Ano': 'año',                     # se a coluna se chamar Ano
    'Presion_Suavizada': 'presion_suavizada',
    'Es_Feriado': 'es_feriado',
    'Anomalia': 'anomalia',
    'Anomalia_bin': 'anomalia_bin'
}, inplace=True)
# As colunas já criadas abaixo já têm o nome correto:
# 'cluster', 'anomalia_presion', 'anomalia_temperatura',
# 'anomalia_volumen', 'anomalia_proxy_cluster'

# ------------------ 4. Salvamento final ------------------
training_path = "training_data.csv"
data.to_csv(training_path, index=False)
print(f"[4] Dados de treinamento salvos em '{training_path}'")# ------------------ Importaciones ------------------
import pandas as pd
import numpy as np
import os
import joblib
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score
from sklearn.impute import SimpleImputer

# ------------------ Parámetros ------------------
file_path    = "data_cleaned.csv"    # CSV de entrada (output del pipeline anterior)
output_dir   = "refined_models"      # dónde guardar kmeans_model.joblib y scaler.joblib
num_clusters = 5                     # número de clusters
random_state = 42                    # semilla para reproducibilidad

# ------------------ 1. Ingesta de datos ------------------
data = pd.read_csv(file_path)
required = ['Presion', 'Temperatura', 'Volumen', 'Cliente']
missing  = [c for c in required if c not in data.columns]
if missing:
    raise ValueError(f"Faltan columnas esenciales: {missing}")

# convertir columnas numéricas y eliminar filas inválidas
for c in ['Presion', 'Temperatura', 'Volumen']:
    data[c] = pd.to_numeric(data[c], errors='coerce')
before = len(data)
data.dropna(subset=['Presion', 'Temperatura', 'Volumen'], inplace=True)
after = len(data)
print(f"[1] Ingesta: {before} → {after} registros válidos")

# ------------------ 2. Clusterización de clientes ------------------
# 2.1 Resumen medio por cliente
summary = data.groupby('Cliente')[['Presion','Temperatura','Volumen']].mean()

# 2.2 Escalado
scaler   = StandardScaler()
X_scaled = scaler.fit_transform(summary)

# 2.3 Imputación de NaNs, si los hay
if np.isnan(X_scaled).any():
    imputer  = SimpleImputer(strategy='mean')
    X_scaled = imputer.fit_transform(X_scaled)

# 2.4 KMeans
kmeans = KMeans(n_clusters=num_clusters, random_state=random_state, n_init=10)
labels = kmeans.fit_predict(X_scaled)
summary['cluster'] = labels

# 2.5 Evaluación con Silhouette
sil = silhouette_score(X_scaled, labels)
print(f"[2] Puntuación Silhouette: {sil:.3f}")

# 2.6 Guardando modelos
os.makedirs(output_dir, exist_ok=True)
joblib.dump(kmeans, os.path.join(output_dir, 'kmeans_model.joblib'))
joblib.dump(scaler, os.path.join(output_dir, 'scaler.joblib'))
print(f"[2] Modelos guardados en '{output_dir}'")

# ------------------ 3. Preparación de los datos para entrenamiento ------------------
# 3.1 Unión de los clusters al dataset original
df_clusters = summary.reset_index()[['Cliente','cluster']]
data = data.merge(df_clusters, on='Cliente', how='left')

# 3.2 Eliminar registros sin cluster
b = len(data)
data.dropna(subset=['cluster'], inplace=True)
a = len(data)
print(f"[3] Eliminados {b-a} registros sin cluster")
data['cluster'] = data['cluster'].astype(int)

# 3.3 Cálculo de Anomalia_Proxy_Cluster
data['Anomalia_Proxy_Cluster'] = 0
for cid in range(num_clusters):
    mask = data['cluster'] == cid
    sub  = data.loc[mask, ['Presion','Temperatura','Volumen']]
    if sub.empty:
        continue
    flags = pd.DataFrame(index=sub.index)
    for feat in ['Presion','Temperatura','Volumen']:
        p10, p90 = sub[feat].quantile([0.10, 0.90])
        flags[feat] = sub[feat].apply(lambda x: 1 if x < p10 or x > p90 else 0)
    data.loc[mask, 'Anomalia_Proxy_Cluster'] = flags.max(axis=1)
print("[3] Columna 'Anomalia_Proxy_Cluster' calculada")

# 3.4 Renombrar columnas al formato final
data.rename(columns={
    'Cliente': 'cliente',
    'Fecha': 'fecha',                 # en caso de que exista la columna Fecha
    'Presion': 'presion',
    'Temperatura': 'temperatura',
    'Volumen': 'volumen',
    'Mes': 'mes',
    'Ano': 'año',                     # si la columna se llama Ano
    'Presion_Suavizada': 'presion_suavizada',
    'Es_Feriado': 'es_feriado',
    'Anomalia': 'anomalia',
    'Anomalia_bin': 'anomalia_bin'
}, inplace=True)
# Las columnas ya creadas abajo ya tienen el nombre correcto:
# 'cluster', 'anomalia_presion', 'anomalia_temperatura',
# 'anomalia_volumen', 'anomalia_proxy_cluster'

# ------------------ 4. Guardado final ------------------
training_path = "training_data.csv"
data.to_csv(training_path, index=False)
print(f"[4] Datos de entrenamiento guardados en '{training_path}'")

[1] Ingestão: 847960 → 847960 registros válidos
[2] Silhouette score: 0.690
[2] Modelos salvos em 'refined_models'
[3] Eliminados 0 registros sem cluster
[3] Coluna 'Anomalia_Proxy_Cluster' calculada
[4] Dados de treinamento salvos em 'training_data.csv'
[1] Ingesta: 847960 → 847960 registros válidos
[2] Puntuación Silhouette: 0.690
[2] Modelos guardados en 'refined_models'
[3] Eliminados 0 registros sin cluster
[3] Columna 'Anomalia_Proxy_Cluster' calculada
[4] Datos de entrenamiento guardados en 'training_data.csv'


# 2) Validación de datos Generados con Pandas por medio de Spark SQL (Pyspark)

In [3]:
from pyspark.sql import SparkSession

# 1. Criar SparkSession
spark = SparkSession.builder \
    .appName("ValidacaoTrainingData") \
    .getOrCreate()

# 2. Ler o CSV e renomear colunas
df_train = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("training_data.csv") \
    .withColumnRenamed("año", "ano") \
    .withColumnRenamed("Anomalia_Proxy_Cluster", "anomalia_proxy_cluster")

# 3. Registrar como view
df_train.createOrReplaceTempView("training_data")

In [28]:
df_train.printSchema()

root
 |-- presion: double (nullable = true)
 |-- temperatura: double (nullable = true)
 |-- volumen: double (nullable = true)
 |-- cliente: string (nullable = true)
 |-- mes: integer (nullable = true)
 |-- ano: integer (nullable = true)
 |-- es_feriado: boolean (nullable = true)
 |-- anomalia: string (nullable = true)
 |-- anomalia_bin: integer (nullable = true)
 |-- cluster: integer (nullable = true)
 |-- anomalia_proxy_cluster: integer (nullable = true)



In [4]:
# Consulta A: Mostrar las primeras 10 filas de muestra
spark.sql("""
    SELECT *
    FROM training_data
    LIMIT 10
""").show(truncate=False)

+-----------------+-----------------+-----------------+--------+---+----+----------+--------+------------+-------+----------------------+
|presion          |temperatura      |volumen          |cliente |mes|ano |es_feriado|anomalia|anomalia_bin|cluster|anomalia_proxy_cluster|
+-----------------+-----------------+-----------------+--------+---+----+----------+--------+------------+-------+----------------------+
|17.7325634924889 |28.2093536541928 |20.96975076975659|CLIENTE1|1  |2019|false     |Normal  |0           |4      |0                     |
|17.74777603806793|28.51861421312152|17.84573913758869|CLIENTE1|1  |2019|false     |Normal  |0           |4      |0                     |
|17.75891638774564|28.23019056507057|20.97591383522386|CLIENTE1|1  |2019|false     |Normal  |0           |4      |0                     |
|17.72794022684193|27.8115085858999 |20.59229909002912|CLIENTE1|1  |2019|false     |Normal  |0           |4      |0                     |
|17.74648374408114|27.795293433213

In [5]:
# Consulta B: Conteo total de filas
spark.sql("""
    SELECT
      COUNT(*) AS total_filas
    FROM training_data
""").show()

+-----------+
|total_filas|
+-----------+
|     847960|
+-----------+



In [6]:
# Consulta C: Verificación de valores nulos por columna
spark.sql("""
    SELECT
      SUM(CASE WHEN presion                IS NULL THEN 1 ELSE 0 END) AS null_presion,
      SUM(CASE WHEN temperatura            IS NULL THEN 1 ELSE 0 END) AS null_temperatura,
      SUM(CASE WHEN volumen                IS NULL THEN 1 ELSE 0 END) AS null_volumen,
      SUM(CASE WHEN cliente                IS NULL THEN 1 ELSE 0 END) AS null_cliente,
      SUM(CASE WHEN mes                    IS NULL THEN 1 ELSE 0 END) AS null_mes,
      SUM(CASE WHEN ano                    IS NULL THEN 1 ELSE 0 END) AS null_ano,
      SUM(CASE WHEN es_feriado             IS NULL THEN 1 ELSE 0 END) AS null_es_feriado,
      SUM(CASE WHEN anomalia               IS NULL THEN 1 ELSE 0 END) AS null_anomalia,
      SUM(CASE WHEN anomalia_bin           IS NULL THEN 1 ELSE 0 END) AS null_anomalia_bin,
      SUM(CASE WHEN cluster                IS NULL THEN 1 ELSE 0 END) AS null_cluster,
      SUM(CASE WHEN anomalia_proxy_cluster IS NULL THEN 1 ELSE 0 END) AS null_anomalia_proxy_cluster
    FROM training_data
""").show()

+------------+----------------+------------+------------+--------+--------+---------------+-------------+-----------------+------------+---------------------------+
|null_presion|null_temperatura|null_volumen|null_cliente|null_mes|null_ano|null_es_feriado|null_anomalia|null_anomalia_bin|null_cluster|null_anomalia_proxy_cluster|
+------------+----------------+------------+------------+--------+--------+---------------+-------------+-----------------+------------+---------------------------+
|           0|               0|           0|           0|       0|       0|              0|            0|                0|           0|                          0|
+------------+----------------+------------+------------+--------+--------+---------------+-------------+-----------------+------------+---------------------------+



In [7]:
# Consulta D: Distribución de clusters
spark.sql("""
    SELECT
      cluster,
      COUNT(*) AS total
    FROM training_data
    GROUP BY cluster
    ORDER BY cluster
""").show()

+-------+------+
|cluster| total|
+-------+------+
|      0|171910|
|      1|336775|
|      2| 84496|
|      3| 82764|
|      4|172015|
+-------+------+



In [8]:
# Consulta E: Distribución de anomalia_proxy_cluster
spark.sql("""
    SELECT
      anomalia_proxy_cluster,
      COUNT(*) AS total
    FROM training_data
    GROUP BY anomalia_proxy_cluster
    ORDER BY anomalia_proxy_cluster
""").show()

+----------------------+------+
|anomalia_proxy_cluster| total|
+----------------------+------+
|                     0|496454|
|                     1|351506|
+----------------------+------+



In [9]:
# Consulta F: Consistencia entre anomalia_bin y anomalia_proxy_cluster
spark.sql("""
    SELECT
      anomalia_bin,
      anomalia_proxy_cluster,
      COUNT(*) AS total
    FROM training_data
    GROUP BY anomalia_bin, anomalia_proxy_cluster
    ORDER BY anomalia_bin, anomalia_proxy_cluster
""").show()

+------------+----------------------+------+
|anomalia_bin|anomalia_proxy_cluster| total|
+------------+----------------------+------+
|           0|                     0|462350|
|           0|                     1|253860|
|           1|                     0| 34104|
|           1|                     1| 97646|
+------------+----------------------+------+



In [10]:
# Consulta G: Estadísticas básicas de las columnas numéricas
spark.sql("""
    SELECT
      ROUND(AVG(presion),2)     AS avg_presion,
      ROUND(MIN(presion),2)     AS min_presion,
      ROUND(MAX(presion),2)     AS max_presion,
      ROUND(AVG(temperatura),2) AS avg_temperatura,
      ROUND(MIN(temperatura),2) AS min_temperatura,
      ROUND(MAX(temperatura),2) AS max_temperatura,
      ROUND(AVG(volumen),2)     AS avg_volumen,
      ROUND(MIN(volumen),2)     AS min_volumen,
      ROUND(MAX(volumen),2)     AS max_volumen
    FROM training_data
""").show(truncate=False)

+-----------+-----------+-----------+---------------+---------------+---------------+-----------+-----------+-----------+
|avg_presion|min_presion|max_presion|avg_temperatura|min_temperatura|max_temperatura|avg_volumen|min_volumen|max_volumen|
+-----------+-----------+-----------+---------------+---------------+---------------+-----------+-----------+-----------+
|16.07      |2.93       |20.31      |25.2           |-5.26          |50.02          |62.33      |0.0        |577.41     |
+-----------+-----------+-----------+---------------+---------------+---------------+-----------+-----------+-----------+



In [11]:
# Consulta H: Porcentaje de proxy-anomalías por cluster
spark.sql("""
    SELECT
      cluster,
      ROUND(100.0 * SUM(anomalia_proxy_cluster) / COUNT(*),2) AS pct_proxy_anomalia
    FROM training_data
    GROUP BY cluster
    ORDER BY cluster
""").show()

+-------+------------------+
|cluster|pct_proxy_anomalia|
+-------+------------------+
|      0|             43.59|
|      1|             38.76|
|      2|             46.12|
|      3|             50.00|
|      4|             38.19|
+-------+------------------+



In [12]:
# Consulta I: Cobertura temporal por cliente
spark.sql("""
    SELECT
      cliente,
      ano,
      mes,
      COUNT(*) AS qtde_registros
    FROM training_data
    GROUP BY cliente, ano, mes
    ORDER BY cliente, ano, mes
""").show(10, truncate=False)

+--------+----+---+--------------+
|cliente |ano |mes|qtde_registros|
+--------+----+---+--------------+
|CLIENTE1|2019|1  |431           |
|CLIENTE1|2019|2  |672           |
|CLIENTE1|2019|3  |743           |
|CLIENTE1|2019|4  |720           |
|CLIENTE1|2019|5  |744           |
|CLIENTE1|2019|6  |720           |
|CLIENTE1|2019|7  |744           |
|CLIENTE1|2019|8  |744           |
|CLIENTE1|2019|9  |720           |
|CLIENTE1|2019|10 |744           |
+--------+----+---+--------------+
only showing top 10 rows



# 3) Validación de datos Generados con AWS Glue/Pyspark por medio de Spark SQL (Pyspark)

Estos archivos que se leerán a continuación fueron extraídos del bucket de la capa refined del data lake para validar los datos, es decir, para comprobar si lo que se generó con Glue y PySpark coincide con lo que se había hecho inicialmente con pandas.

In [15]:
# 1) Leitura e rename em df_refined
df_refined = (
    spark.read
         .parquet("Parquet_Refined_Files/part-*.snappy.parquet")
         .withColumnRenamed("año", "ano")
         .withColumnRenamed("Anomalia_Proxy_Cluster", "anomalia_proxy_cluster")
)

# 2) Registrar a view usando df_refined, não 'df'
df_refined.createOrReplaceTempView("contugas_refined_dato_procesado")

In [16]:
# Consulta A: Mostrar las primeras 10 filas de muestra
spark.sql("""
    SELECT *
    FROM contugas_refined_dato_procesado
    LIMIT 10
""").show(truncate=False)

+-------+---------+-------------------+-----------------+-----------------+-----------------+---+----+------------------+----------+-----------------+------------------+--------+------------+----------------------+
|cluster|cliente  |fecha              |presion          |temperatura      |volumen          |mes|ano |presion_suavizada |es_feriado|p10              |p90               |anomalia|anomalia_bin|anomalia_proxy_cluster|
+-------+---------+-------------------+-----------------+-----------------+-----------------+---+----+------------------+----------+-----------------+------------------+--------+------------+----------------------+
|2      |CLIENTE11|2021-01-01 16:00:00|3.555825750286133|27.47967341667485|110.8325498819195|1  |2021|15.608226364489315|true      |84.15590463899385|171.35187757400683|Normal  |0           |0                     |
|2      |CLIENTE11|2021-04-03 03:00:00|3.645878675455474|29.92690024787693|105.7093616999356|4  |2021|16.059413956351662|false     |84.15590

In [17]:
# Consulta B: Conteo total de filas
spark.sql("""
    SELECT
      COUNT(*) AS total_filas
    FROM contugas_refined_dato_procesado
""").show()

+-----------+
|total_filas|
+-----------+
|     847960|
+-----------+



In [18]:
# Consulta C: Verificación de valores nulos por columna
spark.sql("""
    SELECT
      SUM(CASE WHEN presion                IS NULL THEN 1 ELSE 0 END) AS null_presion,
      SUM(CASE WHEN temperatura            IS NULL THEN 1 ELSE 0 END) AS null_temperatura,
      SUM(CASE WHEN volumen                IS NULL THEN 1 ELSE 0 END) AS null_volumen,
      SUM(CASE WHEN cliente                IS NULL THEN 1 ELSE 0 END) AS null_cliente,
      SUM(CASE WHEN mes                    IS NULL THEN 1 ELSE 0 END) AS null_mes,
      SUM(CASE WHEN ano                    IS NULL THEN 1 ELSE 0 END) AS null_ano,
      SUM(CASE WHEN es_feriado             IS NULL THEN 1 ELSE 0 END) AS null_es_feriado,
      SUM(CASE WHEN anomalia               IS NULL THEN 1 ELSE 0 END) AS null_anomalia,
      SUM(CASE WHEN anomalia_bin           IS NULL THEN 1 ELSE 0 END) AS null_anomalia_bin,
      SUM(CASE WHEN cluster                IS NULL THEN 1 ELSE 0 END) AS null_cluster,
      SUM(CASE WHEN anomalia_proxy_cluster IS NULL THEN 1 ELSE 0 END) AS null_anomalia_proxy_cluster
    FROM contugas_refined_dato_procesado
""").show()

+------------+----------------+------------+------------+--------+--------+---------------+-------------+-----------------+------------+---------------------------+
|null_presion|null_temperatura|null_volumen|null_cliente|null_mes|null_ano|null_es_feriado|null_anomalia|null_anomalia_bin|null_cluster|null_anomalia_proxy_cluster|
+------------+----------------+------------+------------+--------+--------+---------------+-------------+-----------------+------------+---------------------------+
|           0|               0|           0|           0|       0|       0|              0|            0|                0|           0|                          0|
+------------+----------------+------------+------------+--------+--------+---------------+-------------+-----------------+------------+---------------------------+



In [20]:
# Consulta D: Distribución de clusters
spark.sql("""
    SELECT
      cluster,
      COUNT(*) AS total
    FROM contugas_refined_dato_procesado
    GROUP BY cluster
    ORDER BY cluster
""").show()

+-------+------+
|cluster| total|
+-------+------+
|      0|336775|
|      1|171910|
|      2| 84496|
|      3| 82764|
|      4|172015|
+-------+------+



In [21]:
# Consulta E: Distribución de anomalia_proxy_cluster
spark.sql("""
    SELECT
      anomalia_proxy_cluster,
      COUNT(*) AS total
    FROM contugas_refined_dato_procesado
    GROUP BY anomalia_proxy_cluster
    ORDER BY anomalia_proxy_cluster
""").show()

+----------------------+------+
|anomalia_proxy_cluster| total|
+----------------------+------+
|                     0|496413|
|                     1|351547|
+----------------------+------+



In [22]:
# Consulta F: Consistencia entre anomalia_bin y anomalia_proxy_cluster
spark.sql("""
    SELECT
      anomalia_bin,
      anomalia_proxy_cluster,
      COUNT(*) AS total
    FROM contugas_refined_dato_procesado
    GROUP BY anomalia_bin, anomalia_proxy_cluster
    ORDER BY anomalia_bin, anomalia_proxy_cluster
""").show()

+------------+----------------------+------+
|anomalia_bin|anomalia_proxy_cluster| total|
+------------+----------------------+------+
|           0|                     0|462330|
|           0|                     1|253880|
|           1|                     0| 34083|
|           1|                     1| 97667|
+------------+----------------------+------+



In [23]:
# Consulta G: Estadísticas básicas de las columnas numéricas
spark.sql("""
    SELECT
      ROUND(AVG(presion),2)       AS avg_presion,
      ROUND(MIN(presion),2)       AS min_presion,
      ROUND(MAX(presion),2)       AS max_presion,
      ROUND(AVG(temperatura),2)   AS avg_temperatura,
      ROUND(MIN(temperatura),2)   AS min_temperatura,
      ROUND(MAX(temperatura),2)   AS max_temperatura,
      ROUND(AVG(volumen),2)       AS avg_volumen,
      ROUND(MIN(volumen),2)       AS min_volumen,
      ROUND(MAX(volumen),2)       AS max_volumen
    FROM contugas_refined_dato_procesado
""").show(truncate=False)

+-----------+-----------+-----------+---------------+---------------+---------------+-----------+-----------+-----------+
|avg_presion|min_presion|max_presion|avg_temperatura|min_temperatura|max_temperatura|avg_volumen|min_volumen|max_volumen|
+-----------+-----------+-----------+---------------+---------------+---------------+-----------+-----------+-----------+
|16.07      |2.93       |20.31      |25.2           |-5.26          |50.02          |62.33      |0.0        |577.41     |
+-----------+-----------+-----------+---------------+---------------+---------------+-----------+-----------+-----------+



In [24]:
# Consulta H: Porcentaje de anomalias proxy por cluster
spark.sql("""
    SELECT
      cluster,
      ROUND(100.0 * SUM(anomalia_proxy_cluster) / COUNT(*),2) AS pct_proxy_anomalia
    FROM contugas_refined_dato_procesado
    GROUP BY cluster
    ORDER BY cluster
""").show()

+-------+------------------+
|cluster|pct_proxy_anomalia|
+-------+------------------+
|      0|             38.76|
|      1|             43.60|
|      2|             46.13|
|      3|             50.00|
|      4|             38.19|
+-------+------------------+



In [25]:
# Consulta I: Cobertura temporal por cliente
spark.sql("""
    SELECT
      cliente,
      ano,
      mes,
      COUNT(*) AS qtde_registros
    FROM contugas_refined_dato_procesado
    GROUP BY cliente, ano, mes
    ORDER BY cliente, ano, mes
""").show(10, truncate=False)

+--------+----+---+--------------+
|cliente |ano |mes|qtde_registros|
+--------+----+---+--------------+
|CLIENTE1|2019|1  |431           |
|CLIENTE1|2019|2  |672           |
|CLIENTE1|2019|3  |743           |
|CLIENTE1|2019|4  |720           |
|CLIENTE1|2019|5  |744           |
|CLIENTE1|2019|6  |720           |
|CLIENTE1|2019|7  |744           |
|CLIENTE1|2019|8  |744           |
|CLIENTE1|2019|9  |720           |
|CLIENTE1|2019|10 |744           |
+--------+----+---+--------------+
only showing top 10 rows

