<h1 align=center>Módulo de Implementación de Algoritmos</h1>

<h2 align=center><i>Alumno: David Jiménez Vicente</i></h2>

# Práctica Final 1: Clasificación de Documentos con Scikit-learn y MLflow

En esta práctica, utilizarás un conjunto de datos de Scikit-learn (podeís usar el mismo que en el notebook de Intro MLFlow) para entrenar un modelo de clasificación de documentos. El objetivo es construir un modelo capaz de clasificar automáticamente documentos en categorías predefinidas.

Pasos a seguir: 

    Exploración de Datos: Analiza el conjunto de datos proporcionado para comprender su estructura y contenido.

    Preprocesamiento de Texto: Realiza tareas de preprocesamiento de texto, como tokenización y vectorización, para preparar los datos para el modelado.

    Entrenamiento del Modelo: Utiliza algoritmos de clasificación de Scikit-learn para entrenar un modelo con los datos preprocesados.

    Evaluación del Modelo: Evalúa el rendimiento del modelo utilizando métricas de evaluación estándar como precisión y recall.

    Registro de Métricas con MLflow: Utiliza MLflow para registrar métricas y hiperparámetros durante el entrenamiento, facilitando la gestión y comparación de experimentos.


Nota: Dado que no voy a poder tener acceso a vuestros logs de MLFlow añadirme las imagenes de la interfaz de MLFlow en el notebook

### 1.- Carga de librerías

In [1]:
import pandas as pd
import numpy as np
import mlflow
import mlflow.sklearn

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

### 2.- Cargamos el dataset de ejemplo desde sklearn

In [2]:
from sklearn.datasets import load_breast_cancer
cancer = load_breast_cancer()

In [3]:
df = pd.DataFrame(cancer['data'], columns=cancer['feature_names'])
df['target'] = cancer['target']
df.head()

Unnamed: 0,mean radius,mean texture,mean perimeter,mean area,mean smoothness,mean compactness,mean concavity,mean concave points,mean symmetry,mean fractal dimension,...,worst texture,worst perimeter,worst area,worst smoothness,worst compactness,worst concavity,worst concave points,worst symmetry,worst fractal dimension,target
0,17.99,10.38,122.8,1001.0,0.1184,0.2776,0.3001,0.1471,0.2419,0.07871,...,17.33,184.6,2019.0,0.1622,0.6656,0.7119,0.2654,0.4601,0.1189,0
1,20.57,17.77,132.9,1326.0,0.08474,0.07864,0.0869,0.07017,0.1812,0.05667,...,23.41,158.8,1956.0,0.1238,0.1866,0.2416,0.186,0.275,0.08902,0
2,19.69,21.25,130.0,1203.0,0.1096,0.1599,0.1974,0.1279,0.2069,0.05999,...,25.53,152.5,1709.0,0.1444,0.4245,0.4504,0.243,0.3613,0.08758,0
3,11.42,20.38,77.58,386.1,0.1425,0.2839,0.2414,0.1052,0.2597,0.09744,...,26.5,98.87,567.7,0.2098,0.8663,0.6869,0.2575,0.6638,0.173,0
4,20.29,14.34,135.1,1297.0,0.1003,0.1328,0.198,0.1043,0.1809,0.05883,...,16.67,152.2,1575.0,0.1374,0.205,0.4,0.1625,0.2364,0.07678,0


In [4]:
df.shape

(569, 31)

In [5]:
# Hacemos un describe con el resultado traspuesto para poder verlo completamente
df.describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
mean radius,569.0,14.127292,3.524049,6.981,11.7,13.37,15.78,28.11
mean texture,569.0,19.289649,4.301036,9.71,16.17,18.84,21.8,39.28
mean perimeter,569.0,91.969033,24.298981,43.79,75.17,86.24,104.1,188.5
mean area,569.0,654.889104,351.914129,143.5,420.3,551.1,782.7,2501.0
mean smoothness,569.0,0.09636,0.014064,0.05263,0.08637,0.09587,0.1053,0.1634
mean compactness,569.0,0.104341,0.052813,0.01938,0.06492,0.09263,0.1304,0.3454
mean concavity,569.0,0.088799,0.07972,0.0,0.02956,0.06154,0.1307,0.4268
mean concave points,569.0,0.048919,0.038803,0.0,0.02031,0.0335,0.074,0.2012
mean symmetry,569.0,0.181162,0.027414,0.106,0.1619,0.1792,0.1957,0.304
mean fractal dimension,569.0,0.062798,0.00706,0.04996,0.0577,0.06154,0.06612,0.09744


In [6]:
df.columns

Index(['mean radius', 'mean texture', 'mean perimeter', 'mean area',
       'mean smoothness', 'mean compactness', 'mean concavity',
       'mean concave points', 'mean symmetry', 'mean fractal dimension',
       'radius error', 'texture error', 'perimeter error', 'area error',
       'smoothness error', 'compactness error', 'concavity error',
       'concave points error', 'symmetry error', 'fractal dimension error',
       'worst radius', 'worst texture', 'worst perimeter', 'worst area',
       'worst smoothness', 'worst compactness', 'worst concavity',
       'worst concave points', 'worst symmetry', 'worst fractal dimension',
       'target'],
      dtype='object')

### 3.- Separamos los datos, suponiendo que los datos ya están preprocesados al ser un dataset de ejemplo para ML.

In [7]:
X_raw = df.drop("target", axis=1)
y_raw = df.target

Sólo contamos con menos de 600 datos, con lo que reservaremos una pequeña parte para test, y otra para validación.

In [8]:
# Empezamos con la partición de validation
X, X_val, y, y_val = train_test_split(X_raw, y_raw, train_size = 0.85, random_state=42, stratify = y_raw)
print(X.shape, X_val.shape, y.shape, y_val.shape)

(483, 30) (86, 30) (483,) (86,)


In [9]:
# Y ahora la de test
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.8, random_state=42, stratify = y)

In [10]:
print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

(386, 30) (97, 30) (386,) (97,)


### 4.- Creamos un pipeline con el modelo y el preproceso:

In [11]:

clf = RandomForestClassifier(n_estimators=10,
                            min_samples_leaf=3,
                            class_weight='balanced',
                            random_state=123)

preprocessor = Pipeline(steps=[('scaler', StandardScaler())])

model = Pipeline(steps=[('preprocessor', preprocessor),
                           ('randomforestclassifier', clf)])


### 5.- Entrenamos el modelo:

In [12]:
model.fit(X_train, y_train)

### 6.- Medimos el rendimiento del modelo:

Con el split de train

In [13]:
score_train = model.score(X_train, y_train)

Con el split de test

In [14]:
score_test = model.score(X_test, y_test)

Y obtenemos los parámetros:

In [15]:
model.get_params()

{'memory': None,
 'steps': [('preprocessor', Pipeline(steps=[('scaler', StandardScaler())])),
  ('randomforestclassifier',
   RandomForestClassifier(class_weight='balanced', min_samples_leaf=3,
                          n_estimators=10, random_state=123))],
 'verbose': False,
 'preprocessor': Pipeline(steps=[('scaler', StandardScaler())]),
 'randomforestclassifier': RandomForestClassifier(class_weight='balanced', min_samples_leaf=3,
                        n_estimators=10, random_state=123),
 'preprocessor__memory': None,
 'preprocessor__steps': [('scaler', StandardScaler())],
 'preprocessor__verbose': False,
 'preprocessor__scaler': StandardScaler(),
 'preprocessor__scaler__copy': True,
 'preprocessor__scaler__with_mean': True,
 'preprocessor__scaler__with_std': True,
 'randomforestclassifier__bootstrap': True,
 'randomforestclassifier__ccp_alpha': 0.0,
 'randomforestclassifier__class_weight': 'balanced',
 'randomforestclassifier__criterion': 'gini',
 'randomforestclassifier__max_dept

### Configuramos MLFlow y probamos con distintos parámetros el logging de los resultados del modelo cambiados a mano uno a uno

In [16]:
NGROK_AUTH_TOKEN = "2eh6azGOZ3i2yxZ9B5n8QtXJYo2_6XuqCpN1gNZPp5Pq7WZbu"

In [17]:
mlflow.set_experiment("Prueba Classifier")

with mlflow.start_run(run_name="Practica"):
    mlflow.log_metric("Train score", score_train),
    mlflow.log_metric("Test score", score_test),
    mlflow.log_param("n_stimators", 100),
    mlflow.log_param("min_samples_leaf", 3)
    
get_ipython().system_raw("mlflow ui --port 5000 &")

from pyngrok import ngrok

ngrok.kill()

ngrok.set_auth_token(NGROK_AUTH_TOKEN)

ngrok_tunnel = ngrok.connect(addr="5000", proto="http", bind_tls=True)
print("El tracking UI es:", ngrok_tunnel.public_url)

2024/04/07 23:06:56 INFO mlflow.tracking.fluent: Experiment with name 'Prueba Classifier' does not exist. Creating a new experiment.


El tracking UI es: https://cc3a-82-159-98-51.ngrok-free.app


In [18]:
ngrok.kill()

* 'schema_extra' has been renamed to 'json_schema_extra'
[2024-04-07 23:06:57 +0200] [1422] [INFO] Starting gunicorn 21.2.0
[2024-04-07 23:06:57 +0200] [1422] [INFO] Listening at: http://127.0.0.1:5000 (1422)
[2024-04-07 23:06:57 +0200] [1422] [INFO] Using worker: sync
[2024-04-07 23:06:57 +0200] [1423] [INFO] Booting worker with pid: 1423
[2024-04-07 23:06:57 +0200] [1424] [INFO] Booting worker with pid: 1424
[2024-04-07 23:06:57 +0200] [1425] [INFO] Booting worker with pid: 1425
[2024-04-07 23:06:57 +0200] [1426] [INFO] Booting worker with pid: 1426


![chart1.png](attachment:e1b8c221-467c-439d-877c-e837459e5993.png)

![chart2.png](attachment:e63db66b-5dd1-448c-9eb8-f1de5cb196a0.png)

### Ahora configuramos el registro con los artefactos del modelo con diferentes parámetros.

In [19]:
# Me aseguro de que está levantado el servidor MLFlow
!mlflow ui
def train_random_forest(parameters, X_train, y_train, X_test, y_test):
    with mlflow.start_run():
        # Entreno el modelo
        model = RandomForestClassifier(**parameters)
        model.fit(X_train, y_train)

        # Hago la predicción
        y_pred = model.predict(X_test)
        
        # Métrica
        accuracy = accuracy_score(y_test, y_pred)

        # Registramos los parámetros y métricas en MLFlow
        mlflow.log_params(parameters)
        mlflow.log_metric("accuracy", accuracy)

        # Guardamos modelo en MLFlow
        mlflow.sklearn.log_model(model, "random_forest_model")

# Cargo librerías necesarias para la prueba en grid de diferentes parámetros
from sklearn.model_selection import ParameterGrid
from pyngrok import ngrok

if __name__ == "__main__":
    
    # Definimos los parámetros a probar
    param_grid = {
        "n_estimators": [10, 50],
        "max_depth": [5, 10, 20],
        "min_samples_leaf": [3, 5, 10]
    }

    # Probamos las diferentes combinaciones de parámetros
    for params in ParameterGrid(param_grid):
        train_random_forest(params, X_train, y_train, X_test, y_test)

    # Iniciamos ngrok para exponer el servidor de MLFlow
    ngrok.set_auth_token(NGROK_AUTH_TOKEN)
    public_url = ngrok.connect(addr="5000", proto="http", bind_tls=True)
    print("MLFlow UI está disponible en:", public_url)

* 'schema_extra' has been renamed to 'json_schema_extra'
[2024-04-07 23:06:59 +0200] [1428] [INFO] Starting gunicorn 21.2.0
[2024-04-07 23:06:59 +0200] [1428] [ERROR] Connection in use: ('127.0.0.1', 5000)
[2024-04-07 23:06:59 +0200] [1428] [ERROR] Retrying in 1 second.
[2024-04-07 23:07:00 +0200] [1428] [ERROR] Connection in use: ('127.0.0.1', 5000)
[2024-04-07 23:07:00 +0200] [1428] [ERROR] Retrying in 1 second.
[2024-04-07 23:07:01 +0200] [1428] [ERROR] Connection in use: ('127.0.0.1', 5000)
[2024-04-07 23:07:01 +0200] [1428] [ERROR] Retrying in 1 second.
[2024-04-07 23:07:02 +0200] [1428] [ERROR] Connection in use: ('127.0.0.1', 5000)
[2024-04-07 23:07:02 +0200] [1428] [ERROR] Retrying in 1 second.
[2024-04-07 23:07:03 +0200] [1428] [ERROR] Connection in use: ('127.0.0.1', 5000)
[2024-04-07 23:07:03 +0200] [1428] [ERROR] Retrying in 1 second.
[2024-04-07 23:07:04 +0200] [1428] [ERROR] Can't connect to ('127.0.0.1', 5000)
Running the mlflow server failed. Please see the logs above f



MLFlow UI está disponible en: NgrokTunnel: "https://9cc7-82-159-98-51.ngrok-free.app" -> "http://localhost:5000"


#### Nota: He tenido que usar el loop 127.0.0.1:5000, porque no me funcionan el localhost ni la dirección del tunnel

![chart3.png](attachment:3a455143-c63b-40a3-aacb-e295ea516770.png)

![chart4.png](attachment:c1af696a-1a54-41b2-8186-01b376c0b338.png)

![chart5.png](attachment:af3c375d-3702-4171-931f-64e98738b9ed.png)

# Práctica Final 2: detección de mensajes troll



En los últimos años Twitch se ha consolidad como uno de los principales medios de comunicación especialmente para las generaciones más jóvenes.

Al tratarse de una plataforma participativa en la que los usuarios pueden poner comentarios durante y posteriormente a las emisiones. Entre estos comentarios han aparecido como siempre comentarios ofensisvos.

En esta práctica construiremos una Inteligencia Artificial capaz de clasificar esos mensajes troll.

Durante la práctica entrenaremos el modelo de Deep Learning y lo desplegaremos para inferencia en batch, la más habitual actualmente dentro de la industria:

## Teoría

**1. ¿Qué es Apache Beam?**

Apache Beam es un framework de trabajo que permite definir pipelines de procesado de datos enfocadas a la paralelización de procesos.

Beam es una abstracción por lo que los detalles de implementación son gestionados por los motores de ejecución (Flink, DataFlow...)

Es un framework compatible con distintos lenguajes de programación como Java, Python o Go.

**2. ¿Cuáles son las diferentes formas de desplegar un modelo?**

Distinguimos cuatro opciones:

* Despligue en Batch. Desplegamos el modelo para hacer predicciones de grandes cantidades de datos en un momento puntual.

* Despligue online. Permite la disponibilización del modelo a demanda por el ususario de manera que se pueden realizar predicciones mediante llamdas a APIs.

* Despliegue en streaming Despligue pensando en la recepción continua de datos de manera que nuestro servidor es capaz de procesar un flujo continuo e ininterrumpido de datos en tiempo real.

* ML Automatizado. Es el estado más avanzado en el desarrollo de MLOPS en el que el modelo es monitoeado y reentrenado de manera automática.

**3. ¿Cuál es la principal diferencia entre la inferencia en batch y la inferencia en streaming?**

La inferencia en batch procesa grandes cantidades de datos en momentos puntuales y normalmente programados de antemano frente al procesamiento en streming cuya infrastructura está diseñada para estar constantemente en funcionamiento emitiendo predicciones de manera continua y en tiempo real (o casi).



# Configuración de nuestro proyecto en GCP


In [1]:
import pandas as pd
import numpy as np
import mlflow
import mlflow.sklearn


from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler


from sklearn.model_selection import train_test_split


from sklearn.metrics import accuracy_score

In [2]:
PROJECT_ID = "bootcamp-12"
!gcloud config set project $PROJECT_ID

Updated property [core/project].


Creamos el bucket mediante la consola y una vez creado lo indicamos en la variable:

In [3]:
BUCKET_NAME = "practica_algo" #@param {type:"string"}
REGION = "europe-west1" #@param {type:"string"}

In [4]:
!gsutil mb -l $REGION gs://$BUCKET_NAME

Creating gs://practica_algo/...
ServiceException: 409 A Cloud Storage bucket named 'practica_algo' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [5]:
# Comprobamos el acceso:
!gsutil ls -al gs://$BUCKET_NAME

# Entrenamiento e inferencia en Batch

## Preparación

Para esta primera parte se va a utilizar [Tweets Dataset for Detection of Cyber-Trolls](https://www.kaggle.com/dataturks/dataset-for-detection-of-cybertrolls). El objetivo es desarrollar un clasificador binario para detectar si el mensaje recibido es troll (1) o no (0). **Las métricas obtenidas del entrenamiento y la inferencia no se tendrán en cuenta para la evaluación de la práctica, la importancia está en la arquitectura de la solución**, es decir, lo importante no es que nuestro modelo detecte correctamente los tweets de trolls si no que funcione y sea capaz de hacer inferencias.


A continuación, se van a subir los datos de entrenamiento al bucket del proyecto que se haya creado. **Importante:** crea el bucket en una única región. Os dejo disponibilizado el dataset en un bucket de acceso público:

In [6]:
# Revisamos nuestro directorio de trabajo
!pwd

/Users/dj/Documents/GitHub/KeepCoding_Algorithm_Deployment


Ahora se crea el directorio dónde vas a desarrollar esta primera parte de la práctica.

In [35]:
# Creamos carpetas dentro de manera recurrente
!mkdir -p ./content/trolls-data

In [7]:
!mkdir -p ./content/transformed_data

Se establece el directorio de trabajo que hemos creado.

In [8]:
# Cambiamos el directorio de trabajo
import os
os.chdir("./content/trolls-data")

In [9]:
# Comprobamos el resultado
!pwd

/Users/dj/Documents/GitHub/KeepCoding_Algorithm_Deployment/content/trolls-data


Ahora se descargarán los datos en el workspace de Colab para trabajar en local.

In [10]:
%pip install gdown
!gdown "1dTaKofC9ZcMWa5cVtGLDFkEnbc4hiPJr"

Note: you may need to restart the kernel to use updated packages.
Downloading...
From: https://drive.google.com/uc?id=1dTaKofC9ZcMWa5cVtGLDFkEnbc4hiPJr
To: /Users/dj/Documents/GitHub/KeepCoding_Algorithm_Deployment/content/trolls-data/dataset-cybertrolls.json
100%|██████████████████████████████████████| 2.76M/2.76M [00:00<00:00, 27.9MB/s]


Se establecen las dependencias que se usarán en la práctica. Se pueden añadir y quitar las dependencias que no se usen o viceversa.

In [None]:
%%writefile requirements.txt

apache-beam[gcp]
tensorflow
gensim==3.6.0
fsspec==0.8.4
gcsfs==0.7.1
numpy==1.20.0

Instalamos las dependencias. **No olvides reiniciar el entorno al instalar y establecer las variables y credenciales de GCP al arrancar.**

(No las ejecuto porque ya las tengo instaladas)

In [None]:
! pip install -r requirements.txt

In [None]:
! pip install apache-beam[gcp]

In [11]:
# Nos aseguramos que nuestras variables de entorno no hayan desaparecido al reiniciar el kernel

PROJECT_ID = "bootcamp-12" #@param {type:"string"}
! gcloud config set project $PROJECT_ID
BUCKET_NAME = "bootcamp-12" #@param {type:"string"}
REGION = "europe-west1" #@param {type:"string"}

print(f"Project: {PROJECT_ID}")
print(f"Region: {REGION}")
print(f"Bucket: {BUCKET_NAME}")

Updated property [core/project].
Project: bootcamp-12
Region: europe-west1
Bucket: bootcamp-12


## Primer ejercicio

Desarrollar un pipeline de preprocesamiento utilizando Apache Beam para generar datos de train, eval y test para los datos proporcionados anteriormente. Requisitos:

- Proporcionar dos modos de ejecución: `train` y `test`
- Soportar ejecuciones en local con `DirectRunner` y ejecuciones en Dataflow usando `DataFlowRunner`.

In [12]:
import csv
import json

# Para evitar errores convertimos a 

input_json = "dataset-cybertrolls.json"
output_csv = "cybertrolls.csv"

new_data = []

with open(input_json, "r") as input_file:
    for line in input_file:
        item = json.loads(line)
        content = item.get("content", "")
        label = item.get("annotation", {}).get("label", [""])[0]
        new_data.append((content, label))


with open(output_csv, "w", newline="") as output_file:
    csv_writer = csv.writer(output_file)
    csv_writer.writerow(["content", "label"])
    csv_writer.writerows(new_data)

# Ante problemas de datat types que he tenido, lo transformo a integer
data = pd.read_csv("cybertrolls.csv")
data['label'] = data['label'].astype(int)
data.to_csv("cybertrolls-fixed.csv", index=False)

del(data)

Se proporciona un fichero `setup.py` necesario para ejecutar en DataFlow. Modificar la variable `REQUIRED_PACKAGES` con las dependencias que se hayan usado en el `requirements.txt`

In [13]:
%%writefile setup.py

import setuptools

REQUIRED_PACKAGES = [
  "apache-beam[gcp]==2.24.0",
  "tensorflow==2.8.0",
  "gensim==3.6.0",
  "fsspec==0.8.4",
  "gcsfs==0.7.1",
  "numpy==1.20.0"
]

setuptools.setup(
    name="twitchstreaming",
    version="0.0.1",
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True,
    description="Troll detection",
)

Writing setup.py


Me creo una copia por si hubiera algún error en el procesamiento (buena práctica):

In [14]:
! gdown "1dTaKofC9ZcMWa5cVtGLDFkEnbc4hiPJr"
! gsutil cp dataset-cybertrolls.json //$WORK_DIR/data.json

Downloading...
From: https://drive.google.com/uc?id=1dTaKofC9ZcMWa5cVtGLDFkEnbc4hiPJr
To: /Users/dj/Documents/GitHub/KeepCoding_Algorithm_Deployment/content/trolls-data/dataset-cybertrolls.json
100%|██████████████████████████████████████| 2.76M/2.76M [00:00<00:00, 24.2MB/s]
Copying file://dataset-cybertrolls.json...
OSError: Read-only file system.]                                                


In [15]:
!pwd

/Users/dj/Documents/GitHub/KeepCoding_Algorithm_Deployment/content/trolls-data


In [16]:
%%writefile pipeline_split.py

import argparse
import logging
import re
import os
import csv
import random

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.coders.coders import Coder
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions, DirectOptions

import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer

nltk.download("stopwords")

# CLEANING
STOP_WORDS = set(stopwords.words("english"))
STEMMER = SnowballStemmer("english")
TEXT_CLEANING_RE = "@\S+|https?:\S+|http?:\S|[^A-Za-z0-9]+"

class CustomCoder(Coder):
    """Custom coder utilizado para ller y escribir strings. Realiza una serie de tranformaciones entre codificaciones"""

    def __init__(self, encoding: str):
        self.enconding = encoding

    def encode(self, value):
        return value.encode(self.enconding)

    def decode(self, value):
        return value.decode(self.enconding)

    def is_deterministic(self):
        return True

class PreprocessColumnsTrainFn(beam.DoFn):
    """Realiza el preprocesamiento propio de NLP"""

    def process_text(self, text):
        text = re.sub(TEXT_CLEANING_RE, " ", str(text).lower()).strip()
        tokens = []
        for token in text.split():
            if token not in STOP_WORDS:
                # Si el token es un número, conviértelo a cadena de texto
                if token.replace('.', '', 1).isdigit():  # Verifica si el token es un número (incluyendo números decimales)
                    tokens.append("number")
                else:
                    tokens.append(STEMMER.stem(token))
        return " ".join(tokens)

    def process(self, element):
        if isinstance(element, str):
            try:
                text, sentiment = element.split(",")
            except ValueError:
                logging.warning("No se pudo dividir la línea por la coma. Sin modificar.")
                text = element.strip()
                sentiment = "1"  # Asignamos un valor por defecto
            processed_text = self.process_text(text)
            processed_sentiment = sentiment  # No necesitas procesar el sentimiento aquí
            yield f"{processed_text}, {processed_sentiment}"


def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument("--work-dir", dest="work_dir", required=True, help="Working Directory")
    parser.add_argument("--input", dest="input", required=True, help="Input")
    parser.add_argument("--output", dest="output", required=True, help="Salida de la transformación")
    parser.add_argument("--mode", dest="mode", required=True, choices=["train", "test"], help="Tipo de salida de la transformación")

    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    pipeline_options.view_as(DirectOptions).direct_num_workers = 0

    with beam.Pipeline(options=pipeline_options) as p:
        raw_data = p | "ReadTrollData" >> ReadFromText(known_args.input, coder=CustomCoder("latin-1"))

        if known_args.mode == "train":
            transformed_data = (raw_data
                                | "Preprocess" >> beam.ParDo(PreprocessColumnsTrainFn()))

            eval_percent = 20
            assert 0 < eval_percent < 100, "eval_percent must be in the range (0-100)"
            train_dataset, eval_dataset = (transformed_data
                                           | "Split dataset"
                                           >> beam.Partition(lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2))

            train_dataset | "TrainWriteToCSV" >> WriteToText(os.path.join(known_args.output, "train", "part"), file_name_suffix=".csv")
            eval_dataset | "ValWriteToCSV" >> WriteToText(os.path.join(known_args.output, "val", "part"), file_name_suffix=".csv")

        else:
            transformed_data = (raw_data
                                | "Preprocess" >> beam.ParDo(PreprocessColumnsTrainFn()))

            transformed_data | "TestWriteToCSV" >> WriteToText(os.path.join(known_args.output, "test", "part"), file_name_suffix=".csv")

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()


Overwriting pipeline_split.py


In [17]:
!pwd

/Users/dj/Documents/GitHub/KeepCoding_Algorithm_Deployment/content/trolls-data


### Validación preprocess train en local
Con el comando mostrado a continuación se valida la correcta generación de los datos de entrenamiento y validación en local.

In [18]:
# Ejecutar el pipeline para generar datos de train/val
WORK_DIR = !pwd
! python3 pipeline_split.py \
    --work-dir $WORK_DIR \
    --runner DirectRunner \
    --input $WORK_DIR/cybertrolls-fixed.csv \
    --output $WORK_DIR/transformed_data \
    --mode train

[nltk_data] Downloading package stopwords to /Users/dj/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
Traceback (most recent call last):
  File "/Users/dj/Documents/GitHub/KeepCoding_Algorithm_Deployment/content/trolls-data/pipeline_split.py", line 106, in <module>
    run()
  File "/Users/dj/Documents/GitHub/KeepCoding_Algorithm_Deployment/content/trolls-data/pipeline_split.py", line 83, in run
    raw_data = p | "ReadTrollData" >> ReadFromText(known_args.input, coder=CustomCoder("latin-1"))
                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/dj/anaconda3/lib/python3.11/site-packages/apache_beam/io/textio.py", line 781, in __init__
    self._source = self._source_class(
                   ^^^^^^^^^^^^^^^^^^^
  File "/Users/dj/anaconda3/lib/python3.11/site-packages/apache_beam/io/textio.py", line 140, in __init__
    super().__init__(
  File "/Users/dj/anaconda3/lib/python3.11/site-packages/apache_beam/i

### Validación preprocess test en local

Con el comando mostrado a continuación se valida la correcta generación de los datos de test en local.

## Segundo ejercicio

Desarrollar una tarea de entrenamiento para los datos preprocesados. Requisitos:

- Soportar ejecuciones en local usando el SDK de AI-Platform y ejecuciones en GCP con el mismo código.

Se crea el directorio donde trabajaremos:

In [None]:
%mkdir /content/batch/trainer

In [None]:
%%writefile trainer/__init__.py

version = "0.1.0"

### Validación Train en local

Con el comando mostrado a continuación se valida el correcto entrenamiento del modelo usando los datos preprocesados del apartado anterior.

## Tercer ejercicio

Desarrollar un pipeline de inferencia utilizando Apache Beam para generar predicciones usando los modelos generados en el apartado anterior así como los datos de test generados en el primer ejercicio.


Generamos un timestamp para la ejecución de las predicciones

### Validación Predict en local

Con el comando mostrado a continuación se valida la correcta inferencia usando los modelos anteriores y los datos de test generados anteriormente.

# Mensaje final

¡Muchas gracias por participar en este curso, espero que tanto las sesiones teóricas como la práctica te hayan resultado útiles. A lo largo de esta semmana iréis recibiendo feedback personalizado sobre vuestras prácticas.

¡Muchas gracias y ánimo con el proyecto final!