#**Operaciones de Machine Learning**
#**Proyecto 1**: Crear un ambiente de desarrollo de machine learning en el cual sea posible la ingesta, validaci´on y transformación de datos.
#**Grupo: 5**


Importar librerias

In [None]:
import os
import requests
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.feature_selection import SelectKBest, f_classif
import numpy as np
import tfx
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.components import CsvExampleGen,StatisticsGen,SchemaGen,ExampleValidator
from tfx.v1.components import ImportSchemaGen
import tensorflow_data_validation as tfdv
from tensorflow_metadata.proto.v0 import schema_pb2,anomalies_pb2
from google.protobuf import text_format
print('TF version:', tf.__version__)
print('TFDV version:', tfdv.version.__version__)

#Cargar Dataset

In [None]:
 ## download the dataset
 # Directory of the raw data files
 _data_root = '/data/covertype'
 # Path to the raw training data
 _data_filepath = os.path.join(_data_root, 'covertype_test.csv')
 # Download data
 os.makedirs(_data_root, exist_ok=True)
 if not os.path.isfile(_data_filepath):
     #https://archive.ics.uci.edu/ml/machine-learning-databases/covtype/
     url = 'https://docs.google.com/uc?export= \
     download&confirm={{VALUE}}&id=1lVF1BCWLH4eXXV_YOJzjR7xZjj-wAGj9'
     r = requests.get(url, allow_redirects=True, stream=True)
     open(_data_filepath, 'wb').write(r.content)
# Cargar el dataset
df = pd.read_csv(_data_filepath)
df.head()

#Selección de Caracteristicas

In [None]:
# Separar la variable objetivo
target = 'Cover_Type'
numerical_features = df.select_dtypes(include=['int64', 'float64']).columns.tolist()

# Eliminar la variable objetivo de la lista de características
numerical_features.remove(target)

# Crear el subconjunto solo con las características numéricas
df_numerical = df[numerical_features + [target]]

print("Características numéricas:", numerical_features)

In [None]:

# Definir variables de entrada (X) y salida (y)
X = df_numerical.drop(columns=[target])
y = df_numerical[target]

k = 8  # Número de características a seleccionar
selector = SelectKBest(score_func=f_classif, k=k)
X_new = selector.fit_transform(X, y)

# Obtener las características seleccionadas
selected_features = np.array(numerical_features)[selector.get_support()]
print("Características seleccionadas:", selected_features.tolist())


#Data Pipeline

In [None]:
#4.1 Configurar contexto interactivo

# Definir directorio de metadatos
_pipeline_root = '/data/tfx_pipeline'
_metadata_path = os.path.join(_pipeline_root, 'metadata.db')

# Crear contexto interactivo
from tfx.orchestration.metadata import sqlite_metadata_connection_config

context = InteractiveContext(
    pipeline_root=_pipeline_root,
    metadata_connection_config=sqlite_metadata_connection_config(os.path.join(_pipeline_root, 'metadata.db'))
)



In [None]:
#4.2 ExampleGen

# Crear componente ExampleGen
example_gen = CsvExampleGen(input_base=_data_root)

# Ejecutar componente
context.run(example_gen)



In [None]:
#4.3 Estadisticas

# Crear y ejecutar el componente StatisticsGen
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
context.run(statistics_gen)

print(" StatisticsGen ejecutado correctamente.")


# Obtener la ruta del directorio de estadísticas
statistics_uri = statistics_gen.outputs['statistics'].get()[0].uri

# Rutas de los archivos FeatureStats.pb en Split-train y Split-eval
train_stats_path = os.path.join(statistics_uri, 'Split-train', 'FeatureStats.pb')
eval_stats_path = os.path.join(statistics_uri, 'Split-eval', 'FeatureStats.pb')

# Verificar si los archivos existen
if not os.path.exists(train_stats_path) or not os.path.exists(eval_stats_path):
    raise FileNotFoundError(f" No se encontraron los archivos FeatureStats.pb en {statistics_uri}")

print(f" Archivo de estadísticas de entrenamiento: {train_stats_path}")
print(f" Archivo de estadísticas de evaluación: {eval_stats_path}")

# Cargar estadísticas desde los archivos .pb
train_stats = tfdv.load_stats_binary(train_stats_path)
eval_stats = tfdv.load_stats_binary(eval_stats_path)

# Visualizar estadísticas comparando entrenamiento y evaluación
tfdv.visualize_statistics(lhs_statistics=train_stats, rhs_statistics=eval_stats, lhs_name="Train", rhs_name="Eval")


In [None]:
#4.4 Inferir el esquema


# Ejecutar SchemaGen
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])
context.run(schema_gen)

# Obtener la ruta del esquema generado
schema_uri = schema_gen.outputs['schema'].get()[0].uri
print(f" Esquema generado en: {schema_uri}")

# Verificar archivos generados
print(f" Contenido de {schema_uri}: {os.listdir(schema_uri)}")

# Cargar y visualizar el esquema
schema = tfdv.load_schema_text(os.path.join(schema_uri, 'schema.pbtxt'))
tfdv.display_schema(schema)

# Inspeccionar características inferidas
for feature in schema.feature:
    print(f" Feature: {feature.name}")
    print(f"   Tipo: {feature.type}")
    print(f"   Presencia: {feature.presence.min_fraction if feature.HasField('presence') else 'N/A'}")
    print("-" * 40)


In [None]:
#4.5 Curando el esquema


# Ejecutar SchemaGen
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])
context.run(schema_gen)

# Cargar el esquema inferido
schema_uri = schema_gen.outputs['schema'].get()[0].uri
schema_path = os.path.join(schema_uri, "schema.pbtxt")
schema = tfdv.load_schema_text(schema_path)

# Definir rangos permitidos para variables numéricas
tfdv.set_domain(schema, "Hillshade_9am", schema_pb2.IntDomain(min=0, max=255))
tfdv.set_domain(schema, "Hillshade_Noon", schema_pb2.IntDomain(min=0, max=255))
tfdv.set_domain(schema, "Slope", schema_pb2.IntDomain(min=0, max=90))

# Definir Cover Type como categórica
cover_type_domain = schema_pb2.IntDomain(min=0, max=6)
cover_type_feature = next(f for f in schema.feature if f.name == "Cover_Type")
cover_type_feature.int_domain.CopyFrom(cover_type_domain)
cover_type_feature.type = schema_pb2.FeatureType.INT  # Mantener como INT
cover_type_feature.annotation.tag.append("categorical")  # Marcar como categórica

# Guardar el esquema actualizado
schema_output_path = os.path.join(schema_uri, "schema.pbtxt")
tfdv.write_schema_text(schema, schema_output_path)

# Verificar cambios
schema_updated = tfdv.load_schema_text(schema_output_path)
tfdv.display_schema(schema_updated)

print(f" Esquema actualizado guardado en: {schema_output_path}")


In [None]:
# 4.6 Entornos de Esquema

#  Directorios de datos
train_data_path = "/data/covertype/train"
inference_data_path = "/data/covertype/inference"

#  Crear directorios si no existen
os.makedirs(train_data_path, exist_ok=True)
os.makedirs(inference_data_path, exist_ok=True)

#  Cargar datos de entrenamiento y generar datos de inferencia
train_file = "/data/covertype/covertype_test.csv"
df_train = pd.read_csv(train_file)

#  Crear datos de inferencia (eliminando Cover_Type)
df_inference = df_train.drop(columns=["Cover_Type"])
df_inference.to_csv(f"{inference_data_path}/covertype_inference.csv", index=False)

#  Mover el archivo de entrenamiento a su carpeta correspondiente
df_train.to_csv(f"{train_data_path}/covertype_test.csv", index=False)

#  Cargar el esquema generado en 4.4
schema_uri = schema_gen.outputs['schema'].get()[0].uri
schema_path = os.path.join(schema_uri, "schema.pbtxt")
schema = tfdv.load_schema_text(schema_path)

#  Evitar duplicados en los entornos
if "TRAINING" not in schema.default_environment:
    schema.default_environment.append("TRAINING")
if "SERVING" not in schema.default_environment:
    schema.default_environment.append("SERVING")

# Excluir Cover_Type en SERVING sin duplicados
for feature in schema.feature:
    if feature.name == "Cover_Type" and "SERVING" not in feature.not_in_environment:
        feature.not_in_environment.append("SERVING")

# Guardar el esquema actualizado
schema_env_path = os.path.join(schema_uri, "schema.pbtxt")
tfdv.write_schema_text(schema, schema_env_path)

# Verificar configuración de entornos
print(" Entornos en el esquema:", schema.default_environment)
for feature in schema.feature:
    if feature.name == "Cover_Type":
        print(f" Feature: {feature.name}")
        print(f"   Excluido del entorno: {feature.not_in_environment}")

#  Crear ExampleGen para los datos de entrenamiento e inferencia
train_example_gen = CsvExampleGen(input_base=train_data_path)
context.run(train_example_gen)

inference_example_gen = CsvExampleGen(input_base=inference_data_path)
context.run(inference_example_gen)

#  Generar estadísticas para datos de inferencia
inference_stats_gen = StatisticsGen(examples=inference_example_gen.outputs['examples'])
context.run(inference_stats_gen)

# Validar datos de inferencia usando el esquema actualizado
example_validator = ExampleValidator(
    statistics=inference_stats_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema']
)

context.run(example_validator)
print(" Validación de datos de inferencia completada.")


In [None]:
#Verificación entornos


#  Cargar el esquema guardado
schema_env_path = os.path.join(schema_gen.outputs['schema'].get()[0].uri, "schema.pbtxt")
schema = tfdv.load_schema_text(schema_env_path)

#  Verificar entornos disponibles
print(" Entornos en el esquema:", schema.default_environment)

#  Verificar que Cover_Type está excluido en SERVING
for feature in schema.feature:
    if feature.name == "Cover_Type":
        print(f" Feature: {feature.name}")
        print(f"   Excluido del entorno: {feature.not_in_environment}")

#  Mostrar rangos de las columnas en Domain
print("\n Rango de valores permitidos en las características numéricas:")
for feature in schema.feature:
    if feature.HasField("int_domain"):  # Verificar si la columna tiene un dominio de valores
        print(f" {feature.name}: {feature.int_domain.min} - {feature.int_domain.max}")


In [None]:
#4.7 Nuevas Estadisticas

#  Ruta del esquema actualizado
schema_env_path = os.path.join(schema_gen.outputs['schema'].get()[0].uri, "schema.pbtxt")

#  Crear componente ImportSchemaGen para importar el esquema al pipeline
schema_importer = ImportSchemaGen(schema_file=schema_env_path)

#  Ejecutar ImportSchemaGen
context.run(schema_importer)
print(" Esquema actualizado importado con ImportSchemaGen.")

#  Generar nuevas estadísticas con el esquema curado
updated_stats_gen = StatisticsGen(
    examples=example_gen.outputs['examples'],  # Usar los datos originales
    schema=schema_importer.outputs['schema']  # Pasar el esquema actualizado
)

#  Ejecutar StatisticsGen con el nuevo esquema
context.run(updated_stats_gen)
print(" Nuevas estadísticas generadas con el esquema actualizado.")

#  Ruta al archivo de estadísticas dentro de Split-train
stats_file = os.path.join(updated_stats_gen.outputs['statistics'].get()[0].uri, "Split-train", "FeatureStats.pb")

#  Cargar las estadísticas desde el archivo binario
updated_stats = tfdv.load_stats_binary(stats_file)

#  Visualizar estadísticas actualizadas
tfdv.visualize_statistics(updated_stats)


In [None]:
#4.8 Comprobar anomalias 

# Ejecutar ExampleValidator con las estadísticas y el esquema actualizados
example_validator = ExampleValidator(
    statistics=updated_stats_gen.outputs['statistics'],  # Estadísticas generadas en 4.7
    schema=schema_importer.outputs['schema']  # Esquema importado con ImportSchemaGen
)

# Ejecutar el componente para detectar anomalías
context.run(example_validator)
print("Comprobación de anomalías completada.")

# Ruta del directorio de anomalías generadas
anomalies_dir = example_validator.outputs['anomalies'].get()[0].uri
print("Ruta de salida de anomalías:", anomalies_dir)

# Verificar archivos dentro del directorio de anomalías
print("Contenido en Split-train:", os.listdir(os.path.join(anomalies_dir, "Split-train")))
print("Contenido en Split-eval:", os.listdir(os.path.join(anomalies_dir, "Split-eval")))

# Ruta al archivo de diferencias detectadas dentro de Split-train
anomalies_file = os.path.join(anomalies_dir, "Split-train", "SchemaDiff.pb")

# Verificar si el archivo de anomalías existe
if os.path.exists(anomalies_file):
    # Cargar el archivo SchemaDiff.pb como Protobuf
    anomalies_proto = anomalies_pb2.Anomalies()
    with open(anomalies_file, "rb") as f:
        anomalies_proto.ParseFromString(f.read())

    # Inspeccionar los campos del objeto anomalies_proto
    print("Estructura de anomalías (campos disponibles):")
    print(anomalies_proto)  # Esto muestra los campos disponibles en el objeto

    # Intentar ver la representación en texto del objeto completo
    print("Representación de las anomalías en formato de texto:")
    print(text_format.MessageToString(anomalies_proto))
else:
    print(f"El archivo {anomalies_file} no existe.")
