# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.2 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Session ID: 90c6ae4c-d33a-4cb6-b426-b5a66bbae391
Applying the following default arguments:
--glue_kernel_version 1.0.2
--enable-glue-datacatalog true
Waiting for session 90c6ae4c-d33a-4cb6-b426-b5a66bbae391 to get into ready status...
Session 90c6ae4c-d33a-4cb6-b426-b5a66bbae391 has been created.



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [2]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='proyectoint20232', table_name='all_files_e_parquet')
dyf.printSchema()

root
|-- serial: string
|-- fecha: string
|-- stator_kv: string
|-- mfr: string
|-- group_number: string
|-- requested_test_kv: string
|-- test_kv: string
|-- ma: string
|-- watts: string
|-- measured_cap: string
|-- pfm: string


#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [4]:
DF = dyf.toDF()
DF.show()

+------+-------------------+---------+---+------------+-----------------+------------------+------------------+-------------------+------------------+-------------------+
|serial|              fecha|stator_kv|mfr|group_number|requested_test_kv|           test_kv|                ma|              watts|      measured_cap|                pfm|
+------+-------------------+---------+---+------------+-----------------+------------------+------------------+-------------------+------------------+-------------------+
|715015|2018-11-27 16:07:37|     13.8|TOS|         GST|              2.0|1.9947197437286377|3572.8927850723267|  388.1911926269531| 948070.8058617892|  1.086489900421389|
|715015|2018-11-27 16:07:37|     13.8|TOS|         GST|              4.0| 4.015733480453491| 3583.967685699463| 458.96038818359375| 950296.2541318992| 1.2805929864125465|
|715015|2018-11-27 16:07:37|     13.8|TOS|         GST|              6.0| 5.990353345870972| 3568.703293800354|  517.6515197753905| 947210.963886

In [5]:
# Convertir el DataFrame de Spark en un DataFrame de pandas
df = DF.toPandas()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15604 entries, 0 to 15603
Data columns (total 11 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   serial             15604 non-null  object
 1   fecha              15604 non-null  object
 2   stator_kv          15604 non-null  object
 3   mfr                15604 non-null  object
 4   group_number       15604 non-null  object
 5   requested_test_kv  15604 non-null  object
 6   test_kv            15604 non-null  object
 7   ma                 15604 non-null  object
 8   watts              15604 non-null  object
 9   measured_cap       15604 non-null  object
 10  pfm                15604 non-null  object
dtypes: object(11)
memory usage: 1.3+ MB


In [6]:
import os
import pandas as pd
import numpy as np
import seaborn as sns
import plotly.express as px
from itertools import product
from tqdm import tqdm
from matplotlib import pyplot as plt
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.metrics import silhouette_score
from sklearn.cluster import DBSCAN
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from mpl_toolkits.mplot3d import Axes3D
from sklearn.metrics import silhouette_score, davies_bouldin_score
from sklearn.decomposition import PCA
from numpy.linalg import norm, svd, det, cond
import plotly.graph_objs as go

from sklearn.model_selection import train_test_split, GridSearchCV
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches




In [21]:
# Lista de columnas que quieres convertir a float
columnas_para_convertir = ['stator_kv', 'requested_test_kv', 'test_kv', 'ma', 'watts', 'measured_cap', 'pfm']


# Usando pd.to_numeric() para manejar valores no convertibles
for columna in columnas_para_convertir:
    df[columna] = pd.to_numeric(df[columna], errors='coerce')

# Mostrar las primeras filas para verificar la conversión
print(df.head())

   serial                fecha  stator_kv  ...       watts   measured_cap       pfm
0  715015  2018-11-27 16:07:37       13.8  ...  388.191193  948070.805862  1.086490
1  715015  2018-11-27 16:07:37       13.8  ...  458.960388  950296.254132  1.280593
2  715015  2018-11-27 16:07:37       13.8  ...  517.651520  947210.963886  1.450531
3  715015  2018-11-27 16:07:37       13.8  ...  585.986572  954618.258220  1.627991
4  715015  2018-11-27 16:07:37       13.8  ...  385.123230  945508.958239  1.081192

[5 rows x 11 columns]


In [22]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15604 entries, 0 to 15603
Data columns (total 11 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   serial             15604 non-null  object 
 1   fecha              15604 non-null  object 
 2   stator_kv          15604 non-null  float64
 3   mfr                15604 non-null  object 
 4   group_number       15604 non-null  object 
 5   requested_test_kv  15604 non-null  float64
 6   test_kv            15604 non-null  float64
 7   ma                 15604 non-null  float64
 8   watts              15604 non-null  float64
 9   measured_cap       15602 non-null  float64
 10  pfm                15604 non-null  float64
dtypes: float64(7), object(4)
memory usage: 1.3+ MB


In [7]:
## Diccionario de mapeo de valores originales a valores deseados
mapeo = {
  'Ansal' : 'ANS',
  'General Electric' : 'GE',
  'Brown Boveri Company' : 'GMX',
  'Mitsubishi' : 'MIT',
  'ENERGOMEX' : 'ENG',
  'Rade-Koncar' : 'RK',
  'Toshiba' : 'TOS',
  'AEG Power Tool Corp.' : 'GMX',
  'Alstom' : 'ALS',
  'ASEA' : 'GMX',
  'RADE KONCAR' : 'RK',
  'Westinghouse Electric' : 'OTH',
  'TIBB' : 'GMX',
  'ABB (ASEA-Brown Boveri)' : 'GMX',
  'Koch & Sterzel' : 'OTH',
  'Hitachi' : 'HIT',
  'ASEA Inc.' : 'GMX',
  'Cenemesa' : 'OTH',
  'HITACHI' : 'HIT',
  'Raychem' : 'OTH',
  'Brush Ltd. (H-S Group)' : 'OTH',
  'HARBIN ELECTRIC MACHINERY' : 'OTH',
  'Magnetek' : 'OTH',
  'GAMESA' : 'OTH'
}




In [8]:
# Obtener el array de los valores sin repetidos
array_valores_no_repetidos = list(set(mapeo.values()))

# Imprimir el array de los valores
print(array_valores_no_repetidos)

['MIT', 'ANS', 'HIT', 'GMX', 'OTH', 'GE', 'TOS', 'ENG', 'RK', 'ALS']


In [9]:
# Definir los arrays
array_a = ['GST', 'UST']
array_b = [6.9, 13.8, 18]
array_c = array_valores_no_repetidos
array_d = [1,2,3,4,6,8,10]

# Obtener todas las combinaciones posibles
combinaciones = list(product(array_a, array_b, array_c,array_d))

 # Mostrar el tamaño de la lista
print("Tamaño de la lista de combinaciones:", len(combinaciones))
combinaciones

Tamaño de la lista de combinaciones: 420
[('GST', 6.9, 'MIT', 1), ('GST', 6.9, 'MIT', 2), ('GST', 6.9, 'MIT', 3), ('GST', 6.9, 'MIT', 4), ('GST', 6.9, 'MIT', 6), ('GST', 6.9, 'MIT', 8), ('GST', 6.9, 'MIT', 10), ('GST', 6.9, 'ANS', 1), ('GST', 6.9, 'ANS', 2), ('GST', 6.9, 'ANS', 3), ('GST', 6.9, 'ANS', 4), ('GST', 6.9, 'ANS', 6), ('GST', 6.9, 'ANS', 8), ('GST', 6.9, 'ANS', 10), ('GST', 6.9, 'HIT', 1), ('GST', 6.9, 'HIT', 2), ('GST', 6.9, 'HIT', 3), ('GST', 6.9, 'HIT', 4), ('GST', 6.9, 'HIT', 6), ('GST', 6.9, 'HIT', 8), ('GST', 6.9, 'HIT', 10), ('GST', 6.9, 'GMX', 1), ('GST', 6.9, 'GMX', 2), ('GST', 6.9, 'GMX', 3), ('GST', 6.9, 'GMX', 4), ('GST', 6.9, 'GMX', 6), ('GST', 6.9, 'GMX', 8), ('GST', 6.9, 'GMX', 10), ('GST', 6.9, 'OTH', 1), ('GST', 6.9, 'OTH', 2), ('GST', 6.9, 'OTH', 3), ('GST', 6.9, 'OTH', 4), ('GST', 6.9, 'OTH', 6), ('GST', 6.9, 'OTH', 8), ('GST', 6.9, 'OTH', 10), ('GST', 6.9, 'GE', 1), ('GST', 6.9, 'GE', 2), ('GST', 6.9, 'GE', 3), ('GST', 6.9, 'GE', 4), ('GST', 6.9, 'GE', 6)

In [15]:
combinaciones[0][1]

6.9


In [10]:
def obtener_datos_por_combinacion(_df, combinacion):
    # Filtrar el DataFrame según la combinación actual
    filtro = (_df['group_number'] == combinacion[0]) & \
             (_df['stator_kv'] == combinacion[1]) & \
             (_df['mfr'] == combinacion[2])

    datos_filtrados = _df[filtro]

    # Crear un objeto con la estructura deseada
    objeto = {
        "family": f"{combinacion[0]}_{combinacion[1]}_{combinacion[2]}_{combinacion[3]}",
        "df": datos_filtrados.to_dict(orient='records')  # Convertir DataFrame a lista de diccionarios
    }

    return objeto




In [11]:
def filtrar_objetos_con_datos(array_objetos):
    # Array para guardar los objetos filtrados
    objetos_filtrados = []

    # Iterar a través de cada objeto en el array
    for objeto in array_objetos:
        # Verificar si el campo 'df' del objeto tiene longitud mayor a 0
        if len(objeto['df']) > 0:
            # Agregar el objeto al array de objetos filtrados
            objetos_filtrados.append(objeto)

    # Retornar el array de objetos filtrados
    return objetos_filtrados




In [25]:
df.head()

   serial                fecha  stator_kv  ...       watts   measured_cap       pfm
0  715015  2018-11-27 16:07:37       13.8  ...  388.191193  948070.805862  1.086490
1  715015  2018-11-27 16:07:37       13.8  ...  458.960388  950296.254132  1.280593
2  715015  2018-11-27 16:07:37       13.8  ...  517.651520  947210.963886  1.450531
3  715015  2018-11-27 16:07:37       13.8  ...  585.986572  954618.258220  1.627991
4  715015  2018-11-27 16:07:37       13.8  ...  385.123230  945508.958239  1.081192

[5 rows x 11 columns]


In [26]:
# Crear una lista de objetos con los datos filtrados
objetos_datos = [obtener_datos_por_combinacion(df, combinacion) for combinacion in combinaciones]

print("Tamaño de la lista de objetos_datos:", len(objetos_datos))

Tamaño de la lista de objetos_datos: 420


In [27]:
#
resultados_filtrados = filtrar_objetos_con_datos(objetos_datos)

print("Tamaño de la lista de objetos_datos:", len(resultados_filtrados))

Tamaño de la lista de objetos_datos: 182


In [28]:
def realizar_clustering(Y, random_state=42):
    # Convertir la lista de diccionarios en un DataFrame de Pandas
    Y = pd.DataFrame(Y)
    # Seleccionar columnas
    Y = Y[['test_kv', 'ma']]

    # Dividir los datos en conjuntos de entrenamiento, validación y prueba
    Y_train, Y_temp, _, _ = train_test_split(Y, Y, test_size=0.3, random_state=random_state)
    Y_val, Y_test, _, _ = train_test_split(Y_temp, Y_temp, test_size=0.5, random_state=random_state)

    # Normalizar los datos
    scaler = StandardScaler()
    Y_train_scaled = scaler.fit_transform(Y_train)
    Y_val_scaled = scaler.transform(Y_val)
    Y_test_scaled = scaler.transform(Y_test)

    # Definir el rango de hiperparámetros a ensayar
    param_grid = {
        'n_clusters': [4, 5, 6],
        'init': ['k-means++', 'random'],
        'max_iter': [100, 200, 300],
        'n_init': [10, 15, 20],
        'tol': [1e-4, 1e-5, 1e-6]
    }

    # Inicialización del modelo de KMeans
    kmeans_model = KMeans(random_state=random_state)

    # Configuración de búsqueda de hiperparámetros con validación cruzada
    grid_search = GridSearchCV(kmeans_model, param_grid, cv=5, scoring='neg_mean_squared_error')

    # Ajuste del modelo a los datos
    grid_search.fit(Y)

    return grid_search




In [29]:
def entrenar_kmeans_y_visualizar(_X, num_clusters=4, init='k-means++', n_init=10, random_state=42, tol=0.0001):
    # Inicializar y ajustar el modelo k-Means
    kmeans_model = KMeans(n_clusters=num_clusters, init=init, n_init=n_init, random_state=random_state, tol=tol)
    _Xa = _X.copy()  # Crear una copia para evitar modificar el DataFrame original
    _Xa['qual'] = kmeans_model.fit_predict(_Xa)

    # Ordenar los centroides por la columna 'ma' (suponiendo que es la segunda columna de características)
    ordered_indices = kmeans_model.cluster_centers_[:, 1].argsort()

    # Crear un mapeo de las etiquetas originales a las etiquetas ordenadas
    label_mapping = {old_label: new_label for new_label, old_label in enumerate(ordered_indices)}

    # Reasignar las etiquetas en el DataFrame según el nuevo mapeo
    _Xa['qual'] = _Xa['qual'].map(label_mapping)

    return _Xa  # Devolver el DataFrame con la columna 'qual' reasignada




In [30]:
def visualizar_kmeans(df):
    # Asegurarse de que el DataFrame tiene suficientes columnas para la visualización
    if df.shape[1] < 3:
        print("Se necesitan al menos dos columnas numéricas y una columna 'qual' para la visualización.")
        return

    # Seleccionar las dos primeras columnas numéricas para el gráfico
    columnas_para_grafico = df.select_dtypes(include='number').columns[:2]

    # Crear un gráfico de dispersión
    plt.figure(figsize=(10, 6))
    sns.scatterplot(data=df, x=columnas_para_grafico[0], y=columnas_para_grafico[1], hue='qual', palette='viridis')

    # Mostrar el gráfico
    plt.title('Visualización de Clusters de K-Means')
    plt.xlabel(columnas_para_grafico[0])
    plt.ylabel(columnas_para_grafico[1])
    plt.show()




In [32]:
def procesar_lista_objetos_D(lista_objetos, max_df_a_procesar=1):
    # Crear una lista vacía para almacenar los resultados
    resultados = []

    for objeto in lista_objetos[:max_df_a_procesar]:
        family = objeto['family']
        df_o = pd.DataFrame(objeto['df'])
        print(f"family::::::: {family}, for realizar_clustering.")

        # Continuar si el DataFrame está vacío
        if df_o.empty:
            print(f"DataFrame vacío para la familia {family}, se omite.")
            continue

        # Procesar el DataFrame
        result_clustering = realizar_clustering(df_o)
        best_params_o = result_clustering.best_params_
        print(f"Mejores hiperparámetros para {family}:", best_params_o['n_clusters'])

        df_o= df_o[['test_kv', 'ma']]

        df_modificado = entrenar_kmeans_y_visualizar(df_o, best_params_o['n_clusters'])

        # Guardar los resultados en la lista
        resultados.append({'family': family, 'df': df_modificado})

    return resultados




In [33]:
max_df_procesar = 3
procesar_lista_objetos_test = procesar_lista_objetos_D(resultados_filtrados,max_df_procesar)

family::::::: GST_6.9_ANS_1, for realizar_clustering.
Mejores hiperparámetros para GST_6.9_ANS_1: 4
family::::::: GST_6.9_ANS_2, for realizar_clustering.
Mejores hiperparámetros para GST_6.9_ANS_2: 4
family::::::: GST_6.9_ANS_3, for realizar_clustering.
Mejores hiperparámetros para GST_6.9_ANS_3: 4
Traceback (most recent call last):
  File "/home/spark/.local/lib/python3.10/site-packages/sklearn/model_selection/_validation.py", line 765, in _score
    scores = scorer(estimator, X_test)
TypeError: _BaseScorer.__call__() missing 1 required positional argument: 'y_true'

Traceback (most recent call last):
  File "/home/spark/.local/lib/python3.10/site-packages/sklearn/model_selection/_validation.py", line 765, in _score
    scores = scorer(estimator, X_test)
TypeError: _BaseScorer.__call__() missing 1 required positional argument: 'y_true'

Traceback (most recent call last):
  File "/home/spark/.local/lib/python3.10/site-packages/sklearn/model_selection/_validation.py", line 765, in _scor

In [34]:
DF_qual = pd.DataFrame(procesar_lista_objetos_test)
procesar_lista_objetos_test[0]

{'family': 'GST_6.9_ANS_1', 'df':       test_kv           ma  qual
0    0.999955  1015.419602     0
1    2.001143  1016.756535     0
2    2.999540  1017.821908     0
3    4.000157  1019.098878     0
4    0.999480  1012.587667     0
..        ...          ...   ...
195  4.000000  1034.100000     2
196  1.001000  1026.900000     1
197  1.999000  1028.300000     1
198  3.001000  1031.800000     2
199  4.002000  1041.600000     2

[200 rows x 3 columns]}


In [35]:
#
def crear_tabla_con_familias(resultados):
    # Crear una lista para almacenar los DataFrames con la nueva columna 'family'
    dfs_con_family = []

    # Iterar sobre cada resultado y agregar la columna 'family'
    for resultado in resultados:
        df = resultado['df'].copy()  # Hacer una copia para no modificar el original
        df['family'] = resultado['family']  # Agregar la columna 'family'
        dfs_con_family.append(df)

    # Concatenar todos los DataFrames en uno solo
    df_final = pd.concat(dfs_con_family, ignore_index=True)

    return df_final




In [36]:
# Usar la función y guardar el DataFrame resultante
df_con_familias = crear_tabla_con_familias(procesar_lista_objetos_test)  # 'resultados' debe ser tu lista de objetos
df_con_familias

      test_kv           ma  qual         family
0    0.999955  1015.419602     0  GST_6.9_ANS_1
1    2.001143  1016.756535     0  GST_6.9_ANS_1
2    2.999540  1017.821908     0  GST_6.9_ANS_1
3    4.000157  1019.098878     0  GST_6.9_ANS_1
4    0.999480  1012.587667     0  GST_6.9_ANS_1
..        ...          ...   ...            ...
595  4.000000  1034.100000     2  GST_6.9_ANS_3
596  1.001000  1026.900000     1  GST_6.9_ANS_3
597  1.999000  1028.300000     1  GST_6.9_ANS_3
598  3.001000  1031.800000     2  GST_6.9_ANS_3
599  4.002000  1041.600000     2  GST_6.9_ANS_3

[600 rows x 4 columns]


In [37]:
from pyspark.sql import SparkSession
from awsglue.dynamicframe import DynamicFrame

# Inicializa la sesión de Spark
spark = SparkSession.builder.appName("pandasToSpark").getOrCreate()

# Suponiendo que 'pandas_df' es tu DataFrame de pandas
spark_df = spark.createDataFrame(df_con_familias)

# Convertir el DataFrame de Spark en un DynamicFrame
glueContext = GlueContext(spark.sparkContext)
dyf = DynamicFrame.fromDF(spark_df, glueContext, "dyf")

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [38]:
dyf.printSchema()

root
|-- test_kv: double
|-- ma: double
|-- qual: long
|-- family: string


In [39]:
# Suponiendo que 'dyf' es tu DynamicFrame
num_registros = dyf.count()

# Imprimir el número de registros
print("Número de registros en el DynamicFrame:", num_registros)

Número de registros en el DynamicFrame: 600


In [40]:
# Ahora puedes usar el dyf con el código para escribir en S3
s3output_parquet = glueContext.getSink(
    path="s3://proyectointegrador20232/refined/df_family/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=[],
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="s3output_parquet",
)
s3output_parquet.setCatalogInfo(
    catalogDatabase="proyectoint20232", catalogTableName="all_files_e_parquet",
)
s3output_parquet.setFormat("glueparquet")
s3output_parquet.writeFrame(dyf)

<awsglue.dynamicframe.DynamicFrame object at 0x7f765a80da50>
