In [None]:
import numpy as np
from numpy import array
import os
import requests
import pandas_profiling
import pandas as pd
import sklearn
from sklearn.feature_selection import SelectKBest, f_classif
import tensorflow as tf
import tfx
from tfx.components import CsvExampleGen
from tfx.components import ExampleValidator
from tfx.components import SchemaGen
from tfx.v1.components import ImportSchemaGen
from tfx.components import StatisticsGen
from tfx.components import Transform
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.dsl.components.common.importer import Importer
import tensorflow_data_validation as tfdv
from google.protobuf.json_format import MessageToDict
import pprint

# 2. Conexion y carga de datos

In [None]:
# Ubicación pipeline
_pipeline_root = './pipeline/'

# Ubicación data en bruto
_data_root = './data/covertype'

# Ruta entrenamiento
_data_filepath = os.path.join(_data_root, 'covertype_train.csv')

# Descarga de datos
os.makedirs(_data_root, exist_ok=True)
if not os.path.isfile(_data_filepath):
  url = 'https://docs.google.com/uc?export= \
  download&confirm={{VALUE}}&id=1lVF1BCWLH4eXXV_YOJzjR7xZjj-wAGj9'
  r = requests.get(url, allow_redirects=True, stream=True)
  open(_data_filepath, 'wb').write(r.content) 

## 2.1 Exploración conjunto de entrenamiento

In [None]:
# se crea un dataframe con la data de entrenamiento para su manipulación
df = pd.DataFrame(pd.read_csv('data/covertype//covertype_train.csv'))

In [None]:
# obtener la forma (número de filas y columnas)
print(df.shape)

In [None]:
# obtener la lista de columnas
print(df.columns)

In [None]:
# mostrar información sobre el DataFrame
print(df.info())

In [None]:
# mostrar estadísticas descriptivas
print(df.describe())

# 3. Seleccion de caracteristicas

In [None]:
#Selección de las variables numericas
num = df.select_dtypes(include = 'number')

Seleccion de subconjunto que tenga un gran valor predictivo para la etiqueta 'Cover Type':

In [None]:
# Extracción de todas las filas y todas las columnas de la variable num, excepto la última columna.
# features_num es un DataFrame que contiene solo las características numéricas del conjunto de datos original,
# sin la variable objetivo.

features_num = num.iloc[:, 0:-1] 

#Extracción de la columna "Cover_Type" del DataFrame num para asignarla a la variable label_num.
#Se crea una serie que contiene los valores de la variable objetivo para el conjunto de datos original.

label_num = num['Cover_Type'] 

In [None]:
# Se crea un objeto SelectKBest que se utiliza para realizar la selección de características univariante.
# El parámetro score_func especifica la función de puntuación que se utilizará para evaluar la importancia
# de cada característica individual.

#En este caso, se están seleccionando las 8 características con la puntuación más alta según la función 
#f_classif.

select = SelectKBest(score_func=f_classif,k = 8)
# El método fit_transform se utiliza para ajustar el objeto SelectKBest a las características de 
# entrada features_num y la variable objetivo label_num, 
# y para transformar las características originales en un nuevo conjunto de datos que contiene solo 
# las características seleccionadas.
z = select.fit_transform(features_num,label_num)

In [None]:
# Se obtiene una lista de booleanos que indican si cada característica en features_num ha sido
# seleccionada o no por el objeto SelectKBest.

filter = list(select.get_support())
filter

In [None]:
# feature_names contiene la lista de nombres de características, que se utiliza para etiquetar 
# las características en los resultados de la selección de características.

feature_names = list(features_num.columns)
feature_names

In [None]:
# Crea un objeto DataFrame de pandas que contiene los nombres de las características seleccionadas 
# después de la selección de características univariada.
numss = pd.DataFrame(feature_names,filter)
numss

In [None]:
# Una vez se han identificado las 2 columnas numericas con menor relevancia, se proceda a 
# eliminarlas del dataset original

df_2 = df.drop(['Hillshade_3pm','Aspect'], axis = 1)

In [None]:
# Se define el nombre del archivo que se guardará como csv
outname = 'subconjunto_train.csv'

# se crea una ruta para almacenar el csv producto de la eliminación de las columnas menos relevantes
_subconjunto_data_root = './data/covertype/subconjunto'

# se crea el directorio donde se almacenará el csv
if not os.path.exists(_subconjunto_data_root):
    os.mkdir(_subconjunto_data_root)

fullname = os.path.join(_subconjunto_data_root, outname)    

df.to_csv(fullname)


# 4. Data pipeline

## 4.1 Configurar el contexto interactivo

In [None]:
# se inicializa contexto interactivo
context = InteractiveContext(pipeline_root=_pipeline_root)

In [None]:
pp = pprint.PrettyPrinter()

## 4.2 Generando Ejemplos

In [None]:
# Se crea un componente de generación de ejemplos (example gen) utilizando la clase CsvExampleGen de TFX
# El componente se configura para leer los datos de los archivos CSV en la carpeta especificada en _subconjunto_data_root.

example_gen = CsvExampleGen(input_base=_subconjunto_data_root)

In [None]:
# Se ejecuta el componente example_gen dentro del contexto interactivo de TFX especificado en la variable context.
# La ejecución del componente example_gen lee los datos de los archivos CSV especificados en el argumento 
# input_base del objeto CsvExampleGen creado anteriormente, y los convierte en ejemplos para entrenar el modelo.
context.run(example_gen)

In [None]:
# Se obriene el artefacto
artifact = example_gen.outputs['examples'].get()[0]

# se imprimen los split names & uri
print(f'split names: {artifact.split_names}')
print(f'artifact uri: {artifact.uri}')

In [None]:
# Obtiene el URI del artefacto de salida que representa los ejemplos de entrenamiento
train_uri = os.path.join(artifact.uri, 'Split-train')

# se consulta el contenido de la carpeta train
!ls {train_uri}

In [None]:
# se obtiene la lista de archivos de este directorio (todos los archivos TFRecord comprimidos)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Se crea `TFRecordDataset` para leer estos archivos
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")



In [None]:
# Se define una función para obtener ejemplos individuales en tfrecord
def get_records(dataset, num_records):

    records = []
    for tfrecord in dataset.take(num_records):
        serialized_example = tfrecord.numpy()
        
        example = tf.train.Example()
        
        example.ParseFromString(serialized_example)
        
        example_dict = (MessageToDict(example))
        
        records.append(example_dict)
        
    return records


In [None]:
# Se obtienen los primeros 5 ejemplos para entrenamiento
sample_records = get_records(dataset, 5)
pp.pprint(sample_records)

## 4.3 Estadísticas

In [None]:
# Instantiate StatisticsGen with the ExampleGen ingested dataset
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])

# Execute the component
context.run(statistics_gen)

In [None]:
# Show the output statistics
context.show(statistics_gen.outputs['statistics'])

## 4.4 Inferir el esquema

In [None]:
# Instantiate SchemaGen with the StatisticsGen ingested dataset
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'],    )

# Run the component
context.run(schema_gen)

In [None]:
# Se visualiza el esquema
context.show(schema_gen.outputs['schema'])

## 4.5 Curando el esquema

In [None]:
# se guarda el esquema creado en 4.4
## codigo de referencia tomado de: https://notebook.community/GoogleCloudPlatform/mlops-on-gcp/workshops/tfx-caip-tf21/lab-01-tfx-walkthrough/lab-01

schema_proto_path = '{}/{}'.format(schema_gen.outputs['schema'].get()[0].uri, 'schema.pbtxt')
schema = tfdv.load_schema_text(schema_proto_path)


In [None]:
# se ajusta el dominio para las variables mostradas
tfdv.set_domain(schema, 'Hillshade_9am', tfdv.utils.schema_util.schema_pb2.IntDomain(min=0, max=255))
tfdv.set_domain(schema, 'Hillshade_Noon', tfdv.utils.schema_util.schema_pb2.IntDomain(min=0, max=255))
tfdv.set_domain(schema, 'Slope', tfdv.utils.schema_util.schema_pb2.IntDomain(min=0, max=90))
tfdv.set_domain(schema, 'Cover_Type', tfdv.utils.schema_util.schema_pb2.IntDomain(min=0, max=6,is_categorical=True))

tfdv.display_schema(schema=schema)

In [None]:
# Se guarda y actualiza el esquema
# ref codigo: https://notebook.community/GoogleCloudPlatform/mlops-on-gcp/workshops/tfx-caip-tf21/lab-01-tfx-walkthrough/lab-01
schema_dir = os.path.join(schema_gen.outputs['schema'].get()[0].uri, 'schema')
tf.io.gfile.makedirs(schema_gen.outputs['schema'].get()[0].uri)
schema_file = os.path.join(schema_gen.outputs['schema'].get()[0].uri, 'schema.pbtxt')

tfdv.write_schema_text(schema, schema_file)

In [None]:
!cat {schema_file}

In [None]:
# se importa el esquema actualizado 
schema_importer = Importer(
    source_uri=schema_gen.outputs['schema'].get()[0].uri,
    artifact_type = tfx.types.standard_artifacts.Schema,
    reimport=False)

In [None]:
#se ejecuta el nuevo esquema
context.run(schema_importer)

In [None]:
# se verifica que el esquema contiene los cambios en el domminio
context.show(schema_importer.outputs['result'])

## 4.6  Entornos de esquema

In [None]:
# se recrean los datos de entrada 
servicio = df_2.drop(['Cover_Type'], axis = 1)
servicio.shape

In [None]:
# Se define el nombre del archivo que se guardará como csv
outname = 'servicio_train.csv'

# se crea una ruta para almacenar el csv producto de la eliminación de las columnas menos relevantes
_servicio_data_root = './data/covertype/subconjunto/servicio'

# se crea el directorio donde se almacenará el csv
if not os.path.exists(_servicio_data_root):
    os.mkdir(_servicio_data_root)

fullname = os.path.join(_servicio_data_root, outname)    

df.to_csv(fullname)

In [None]:
#se define la ruta del archivo de servicio
archivo_servicio = fullname

In [None]:
# Se generan estadisticas a partir del archivo de servicio
estadisticas_de_servicio = tfdv.generate_statistics_from_csv(archivo_servicio)

In [None]:
# se validan las anomalias en el archivo de servicio, teniendo como referencia el esquema inicial
anomalias_en_servicio = tfdv.validate_statistics(estadisticas_de_servicio, schema)
tfdv.display_anomalies(anomalias_en_servicio)

In [None]:
#### deberia aparcer como anomalia que no encuentra a columna de cover_type

In [None]:
tfdv.display_schema(schema=schema)

## 4.7. Genere nuevas estad´ısticas usando el esquema actualizado

In [None]:
updated_scheme = ImportSchemaGen(schema_file= schema_file)

In [None]:
context.run(updated_scheme)

In [None]:
# Se generan estadisticas basadas en el esquema actualizado
updated_statistics = StatisticsGen(examples = example_gen.outputs['examples'],schema=updated_scheme.outputs['schema'])
context.run(updated_statistics)


In [None]:
#se visualizan las estadisticas
context.show(updated_statistics.outputs['statistics'])

## 4.8. Comprobar anomalías

In [None]:
example_validator = ExampleValidator(
    statistics = updated_statistics.outputs['statistics'],
    schema = updated_scheme.outputs['schema'])

context.run(example_validator)

In [None]:
# se valida si existe alguna anomalia con el esquema y estadisticas actualizadas
context.show(example_validator.outputs['anomalies'])

## 4.9. Ingeniería de características

## 4.10 Función de preprocesamiento

## 4.11 Transformar

# 5. Metadatos de aprendizaje automático

In [None]:
import tensorflow as tf
import pandas as pd
from google.protobuf.json_format import MessageToDict

def get_records(dataset, num_records):
    '''Extracts records from the given dataset.
    Args:
        dataset (TFRecordDataset): dataset saved by ExampleGen
        num_records (int): number of records to preview
    '''
    
    # initialize an empty list
    records = []
    
    # Use the `take()` method to specify how many records to get
    for tfrecord in dataset.take(num_records):
        
        # Get the numpy property of the tensor
        serialized_example = tfrecord.numpy()
        
        # Initialize a `tf.train.Example()` to read the serialized data
        example = tf.train.Example()
        
        # Read the example data (output is a protocol buffer message)
        example.ParseFromString(serialized_example)
        
        # convert the protocol bufffer message to a Python dictionary
        example_dict = (MessageToDict(example))
        
        # append to the records list
        records.append(example_dict)
        
    return records

def display_types(types):
    # Helper function to render dataframes for the artifact and execution types
    table = {'id': [], 'name': []}
    for a_type in types:
        table['id'].append(a_type.id)
        table['name'].append(a_type.name)
    return pd.DataFrame(data=table)

def display_artifacts(store, artifacts, base_dir):
    # Helper function to render dataframes for the input artifacts
    table = {'artifact id': [], 'type': [], 'uri': []}
    for a in artifacts:
        table['artifact id'].append(a.id)
        artifact_type = store.get_artifact_types_by_id([a.type_id])[0]
        table['type'].append(artifact_type.name)
        table['uri'].append(a.uri.replace(base_dir, './'))
    return pd.DataFrame(data=table)

def display_properties(store, node):
    # Helper function to render dataframes for artifact and execution properties
    table = {'property': [], 'value': []}
    
    for k, v in node.properties.items():
        table['property'].append(k)
        table['value'].append(
            v.string_value if v.HasField('string_value') else v.int_value)
    
    for k, v in node.custom_properties.items():
        table['property'].append(k)
        table['value'].append(
            v.string_value if v.HasField('string_value') else v.int_value)
    
    return pd.DataFrame(data=table)

In [None]:
import ml_metadata as mlmd
from ml_metadata.metadata_store import metadata_store
from ml_metadata.proto import metadata_store_pb2

connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.sqlite.filename_uri = 'pipeline/metadata.sqlite'
connection_config.sqlite.connection_mode = 3 # READWRITE_OPENCREATE
store = metadata_store.MetadataStore(connection_config)

## 5.1. Acceso a artefactos almacenados

In [None]:
# codigo tomado de https://www.tensorflow.org/tfx/guide/mlmd
artifact_types = store.get_artifact_types()

In [None]:
display_types(artifact_types)

## 5.2. Seguimiento de artefactos

## 5.3 Obtener artefactos principales

In [None]:
artifact_transform = store.get_artifacts_by_type('TransformGraph')
_id = artifact_transform[0].id
artifacts_p = get_main_artifacts(store, _id)
get_records( display_artifacts(store, artifacts_p, '.')