<font color="darkblue" size=5>**Práctica Final Despliegue de Algoritmos, José Carlos Amo Pérez.**</font>

# Práctica Final: detección de mensajes troll en chat de Twitch en tiempo real

En los últimos años Twitch se ha consolidad como uno de los principales medios de comunicación especialmente para las generaciones más jóvenes.

Al tratarse de una plataforma participativa en la que los usuarios pueden poner comentarios durante y posteriormente a las emisiones. Entre estos comentarios han aparecido como siempre comentarios ofensisvos.

En esta práctica construiremos una Inteligencia Artificial capaz de clasificar esos mensajes troll.

Durante la práctica entrenaremos el modelo de Deep Learning y lo desplegaremos para inferencia en batch, la más habitual actualmente dentro de la industria:

## Teoría

1. **¿Qué es Apache Beam?** Es una plataforma que permiter escribir pipelines de código. La ventaja es que una vez diseñado un modelo de ML, DL, NLP etc se puede automatizar todo el código para su uso en prodcucción. Los pipelines son basicamente concatenaciones de 3 elementos: Un PCollection con los datos de entrada, un PTransform que son las operaciones que transforman esos datos y un nuevo PCollection que contendrá los datos de salida.

2. **¿Cuáles son las diferentes formas de desplegar un modelo?** Si por desplegar un modelo entendemos pasar un modelo que ha sido entrenado en un entorno "de disño" a un entorno de producción "para uso real", existen 4 formas de desplegarlo: "en batch" (batch offliune) "en streaming" (batch pero online), mediante "Microservicios/API" (on demand offline) y "Machine Learning Automatizado" (on demand online). La más frecuente hoy en día es en batch y la deseable sería ML Automatizado.

3. **¿Cuál es la principal diferencia entre la inferencia en batch y la inferencia en streaming?** La inferencia en batch es offline en el sentido de que los datos se almacenan y tiempo después se procesan. La inferencia en streaming es online o en tiempo real (o casi real) en el sentido de que a medida que llegan datos se procesan.

# Configuración de nuestro proyecto en GCP


In [1]:
PROJECT_ID = "proyecto-jcao-da-kc-2023" #@param {type:"string"}
! gcloud config set project $PROJECT_ID

Updated property [core/project].


In [2]:
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

if 'google.colab' in sys.modules:
  from google.colab import auth as google_auth
  google_auth.authenticate_user()

# If you are running this notebook locally, replace the string below with the
# path to your service account key and run this cell to authenticate your GCP
# account.
else:
  %env GOOGLE_APPLICATION_CREDENTIALS ''


Creamos el bucket mediante la consola y una vez creado lo indicamos en la variable:

In [3]:
BUCKET_NAME = "bucket-da-2023-jcap" #@param {type:"string"}
REGION = "europe-west1" #@param {type:"string"}

In [4]:
! gsutil mb -l $REGION gs://$BUCKET_NAME   # Esto crea el bucket

Creating gs://bucket-da-2023-jcap/...
ServiceException: 409 A Cloud Storage bucket named 'bucket-da-2023-jcap' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [5]:
! gsutil ls -al gs://$BUCKET_NAME          # Esto confirma que el bucket está creado ok

   2756023  2023-03-19T19:02:08Z  gs://bucket-da-2023-jcap/data.json#1679252528447416  metageneration=1
TOTAL: 1 objects, 2756023 bytes (2.63 MiB)


# Entrenamiento e inferencia en Batch

## Preparación

Para esta primera parte se va a utilizar [Tweets Dataset for Detection of Cyber-Trolls](https://www.kaggle.com/dataturks/dataset-for-detection-of-cybertrolls). El objetivo es desarrollar un clasificador binario para detectar si el mensaje recibido es troll (1) o no (0). **Las métricas obtenidas del entrenamiento y la inferencia no se tendrán en cuenta para la evaluación de la práctica, la importancia está en la arquitectura de la solución**, es decir, lo importante no es que nuestro modelo detecte correctamente los tweets de trolls si no que funcione y sea capaz de hacer inferencias.


A continuación, se van a subir los datos de entrenamiento al bucket del proyecto que se haya creado. **Importante:** crea el bucket en una única región. Os dejo disponibilizado el dataset en un bucket de acceso público:

In [6]:
# Upload data to your bucket
! wget https://storage.googleapis.com/datos-practica-compartidos/Dataset%20for%20Detection%20of%20Cyber-Trolls.json -O - | gsutil cp - gs://$BUCKET_NAME/data.json

--2023-03-19 20:07:25--  https://storage.googleapis.com/datos-practica-compartidos/Dataset%20for%20Detection%20of%20Cyber-Trolls.json
Resolving storage.googleapis.com (storage.googleapis.com)... 108.177.13.128, 74.125.26.128, 172.217.193.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|108.177.13.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2756023 (2.6M) [application/json]
Saving to: ‘STDOUT’

-                     0%[                    ]       0  --.-KB/s               Copying from <STDIN>...

2023-03-19 20:07:28 (1.05 MB/s) - written to stdout [2756023/2756023]

/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              


Ahora se crea el directorio dónde vas a desarrollar esta primera parte de la práctica.

In [7]:
%mkdir /content/batch

mkdir: cannot create directory ‘/content/batch’: File exists


Se establece el directorio de trabajo que hemos creado.

In [8]:
import os

# Set the working directory to the sample code directory
%cd /content/batch

WORK_DIR = os.getcwd()

/content/batch


Ahora se descargarán los datos en el workspace de Colab para trabajar en local.

In [9]:
! wget https://storage.googleapis.com/datos-practica-compartidos/Dataset%20for%20Detection%20of%20Cyber-Trolls.json

--2023-03-19 20:07:30--  https://storage.googleapis.com/datos-practica-compartidos/Dataset%20for%20Detection%20of%20Cyber-Trolls.json
Resolving storage.googleapis.com (storage.googleapis.com)... 108.177.13.128, 74.125.26.128, 172.217.193.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|108.177.13.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2756023 (2.6M) [application/json]
Saving to: ‘Dataset for Detection of Cyber-Trolls.json.3’


2023-03-19 20:07:30 (174 MB/s) - ‘Dataset for Detection of Cyber-Trolls.json.3’ saved [2756023/2756023]



Se establecen las dependencias que se usarán en la práctica. Se pueden añadir y quitar las dependencias que no se usen o viceversa.

In [10]:
#apache-beam[gcp]==2.24.0  Lo comento y lo saco del %%write para resolver error. Cargo beam en el siguiente script a ver si eludo el error
%%writefile requirements.txt
tensorflow==2.8.0
gensim==3.6.0
fsspec==0.8.4
gcsfs==0.7.1
numpy==1.24.2

Overwriting requirements.txt


In [11]:
!pip install apache-beam

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


<font color="red"> Resueltos los errores al crear requirements.txt</font>

Instalamos las dependencias. **No olvides reiniciar el entorno al instalar y establecer las variables y credenciales de GCP al arrancar.**

In [12]:
! pip install -r requirements.txt

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## Primer ejercicio

Desarrollar un pipeline de preprocesamiento utilizando Apache Beam para generar datos de train, eval y test para los datos proporcionados anteriormente. Requisitos:

- Proporcionar dos modos de ejecución: `train` y `test`
- Soportar ejecuciones en local con `DirectRunner` y ejecuciones en Dataflow usando `DataFlowRunner`.

In [13]:
! pip install contractions  # Lo instalo porque en el preprocesado uso esta funcion para expandir las formas comprimidas del inglés
! pip install num2words     # Lo instalo porque en el preprocesado uso esta funcion para pasar numeros a su expresion literal
! pip install --upgrade numpy

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [14]:
%%writefile preprocess.py

from __future__ import absolute_import

import argparse
import logging
import re
import os
import csv
import random
import json
import contractions

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.coders.coders import Coder
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions, DirectOptions

import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer

nltk.download("stopwords")

from num2words import num2words


# CLEANING
STOP_WORDS = stopwords.words("english")
STEMMER = SnowballStemmer("english")
TEXT_CLEANING_RE = "@\S+|https?:\S+|http?:\S|[^A-Za-z]+"  #Elimino también los números


class ExtractColumnsDoFn(beam.DoFn):
    def process(self, element):
        # Parse JSON
        data = json.loads(element)
        content = data["content"]
        label = data["annotation"]["label"][0]
        result = [content, label]   
        yield result


class PreprocessColumnsTrainFn(beam.DoFn):
    def process_sentiment(self, sentiment):
        sentiment = int(sentiment)
        if sentiment == 1:
            return "SI_TROLL"
        else:
            return "NO_TROLL"
       
    def process_text(self, text):
        #Expando las formas contraidas del inglés
        text = contractions.fix(text)
        # Paso expresiones numericas a su correspondiente literal (por ejemplo 222 -> two hundred and twenty-two)
        numbers = re.findall(r'\d+', text)
        for number in numbers:
            text = text.replace(number, num2words(int(number)))
        # Remove link,user and special characters
        stem = False
        text = re.sub(TEXT_CLEANING_RE, " ", str(text).lower()).strip()
        # Elimino espacios en blanco duplicados, es decir: "estoy    en    casa" -> "estoy en casa"
        text = re.sub(r'\s{2,}', ' ', text)
        tokens = []
        for token in text.split():
            if token not in STOP_WORDS:
                if stem:
                    tokens.append(STEMMER.stem(token))
                else:
                    tokens.append(token)
        return " ".join(tokens)

    def process(self, element):
        processed_text = self.process_text(element[0])
        processed_sentiment = self.process_sentiment(element[1])
        yield f"{processed_text}, {processed_sentiment}"


class CustomCoder(Coder):
    """A custom coder used for reading and writing strings"""

    def __init__(self, encoding: str):
        # latin-1
        # iso-8859-1
        self.enconding = encoding

    def encode(self, value):
        return value.encode(self.enconding)

    def decode(self, value):
        return value.decode(self.enconding)

    def is_deterministic(self):
        return True


def run(argv=None, save_main_session=True):

    """Main entry point; defines and runs the wordcount pipeline."""

    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--work-dir", dest="work_dir", required=True, help="Working directory",
    )

    parser.add_argument(
        "--input", dest="input", required=True, help="Input dataset in work dir",
    )

    parser.add_argument(
        "--output",
        dest="output",
        required=True,
        help="Output path to store transformed data in work dir",
    )

    parser.add_argument(
        "--mode",
        dest="mode",
        required=True,
        choices=["train", "test"],
        help="Type of output to store transformed data",
    )

    known_args, pipeline_args = parser.parse_known_args(argv)

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    pipeline_options.view_as(DirectOptions).direct_num_workers = 0

   # The pipeline will be run on exiting the with block.
    with beam.Pipeline(options=pipeline_options) as p:

        # Read the text file[pattern] into a PCollection.
        raw_data = p | "ReadTwitterData" >> ReadFromText(
            known_args.input, coder=CustomCoder("latin-1")
        )

        if known_args.mode == "train":

            transformed_data = (
                raw_data
                | "ExtractColumns" >> beam.ParDo(ExtractColumnsDoFn())
                | "Preprocess" >> beam.ParDo(PreprocessColumnsTrainFn())
            )

            eval_percent = 20
            assert 0 < eval_percent < 100, "eval_percent must in the range (0-100)"
            train_dataset, eval_dataset = (
                transformed_data
                | "Split dataset"
                >> beam.Partition(
                    lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2
                )
            )

            train_dataset | "TrainWriteToCSV" >> WriteToText(
                os.path.join(known_args.output, "train", "part")
            )
            eval_dataset | "EvalWriteToCSV" >> WriteToText(
                os.path.join(known_args.output, "eval", "part")
            )

        else:
            transformed_data = (
                raw_data
                | "ExtractColumns" >> beam.ParDo(ExtractColumnsDoFn())
                | "Preprocess" >> beam.Map(lambda x: f'"{x[0]}"')
            )

            transformed_data | "TestWriteToCSV" >> WriteToText(
                os.path.join(known_args.output, "test", "part")
            )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

Overwriting preprocess.py


### Validación preprocess train en local
Con el comando mostrado a continuación se valida la correcta generación de los datos de entrenamiento y validación en local.

In [15]:
# Run este script para el preprocesado en local con los datos de entrenamiento
! python3 preprocess.py \
  --work-dir $WORK_DIR \
  --runner DirectRunner \
  --input $WORK_DIR/Dataset*.json \
  --output $WORK_DIR/transformed_data \
  --mode train

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.46.0
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f6c9561f220> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f6c9561fe50> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:root:Default Python SDK image for environ

### Validación preprocess test en local 

Con el comando mostrado a continuación se valida la correcta generación de los datos de test en local.

In [16]:
# Run este script para el preprocesado en local con los datos de test
! python3 preprocess.py \
  --work-dir $WORK_DIR \
  --runner DirectRunner \
  --input $WORK_DIR/Dataset*.json\
  --output $WORK_DIR/transformed_data \
  --mode test

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.46.0
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f35e64419a0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f35e6429ac0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:root:Default Python SDK image for environ

## Segundo ejercicio

Desarrollar una tarea de entrenamiento para los datos preprocesados. Requisitos:

- Soportar ejecuciones en local usando el SDK de AI-Platform y ejecuciones en GCP con el mismo código.

Se crea el directorio donde trabajaremos:

In [17]:
%mkdir /content/batch/trainer

mkdir: cannot create directory ‘/content/batch/trainer’: File exists


In [18]:
%%writefile trainer/__init__.py

version = "0.1.0"

Overwriting trainer/__init__.py


In [19]:
%%writefile trainer/task.py

from __future__ import absolute_import

import argparse
import multiprocessing as mp
import logging
import tempfile
import os

import pickle
import gensim
import pandas as pd
import numpy as np
import tensorflow as tf

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    Dense,
    Dropout,
    Embedding,
    LSTM,
)
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from sklearn.preprocessing import LabelEncoder


# WORD2VEC
W2V_SIZE = 300 #tamaño del embedding
W2V_WINDOW = 7 #tamaño de la ventana
# 32
W2V_EPOCH = 5 #epochs para entrenar
W2V_MIN_COUNT = 10 #se eliminan palabras que aparecen menos de 10 veces

# KERAS
SEQUENCE_LENGTH = 300 #longitud de secuencia para el padding

# SENTIMENT definimos las etiquetas
NEGATIVE = "NO_TROLL"
POSITIVE = "SI_TROLL"
SENTIMENT_THRESHOLDS = [0.5, 0.5]

# EXPORT
KERAS_MODEL = "model.h5" #nombre del archivo del modelo
WORD2VEC_MODEL = "model.w2v" #nombre del archivo del embedding
TOKENIZER_MODEL = "tokenizer.pkl" #nombre del tokenizador
ENCODER_MODEL = "encoder.pkl" #nombre del encoder

# TRAIN AND EVALUATE
STEPS = 10   # Pongo un valor muy bajo porque me he quedado sin GPUs y si subo es eterno

def generate_word2vec(train_df):
  ### Genera el embedding y lo entrena
    documents = [_text.split() for _text in train_df.text.values]
    w2v_model = gensim.models.word2vec.Word2Vec(
        size=W2V_SIZE,
        window=W2V_WINDOW,
        min_count=W2V_MIN_COUNT,
        workers=mp.cpu_count(),
    )
    w2v_model.build_vocab(documents)

    words = w2v_model.wv.vocab.keys()
    vocab_size = len(words)
    logging.info(f"Vocab size: {vocab_size}")
    w2v_model.train(documents, total_examples=len(documents), epochs=W2V_EPOCH)

    return w2v_model


def generate_tokenizer(train_df):
  ### Genera nuestro tokenizador
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(train_df.text)
    vocab_size = len(tokenizer.word_index) + 1
    logging.info(f"Total words: {vocab_size}")
    return tokenizer, vocab_size


def generate_label_encoder(train_df):
  ### COdifica las etiquetas (de texto a número)
    encoder = LabelEncoder()
    encoder.fit(train_df.sentiment.tolist())
    return encoder


def generate_embedding(word2vec_model, vocab_size, tokenizer):
  ### Genera el embedding en base al Word2Vec
    embedding_matrix = np.zeros((vocab_size, W2V_SIZE))
    for word, i in tokenizer.word_index.items():
        if word in word2vec_model.wv:
            embedding_matrix[i] = word2vec_model.wv[word]
    return Embedding(
        vocab_size,
        W2V_SIZE,
        weights=[embedding_matrix],
        input_length=SEQUENCE_LENGTH,
        trainable=False,
    )


def train_and_evaluate(
    work_dir, train_df, eval_df, batch_size=1024, epochs=8, steps=STEPS
):

    """
    Trains and evaluates the estimator given.
    The input functions are generated by the preprocessing function.
    """

    model_dir = os.path.join(work_dir, "model") # Comprobamos si ya existe un modelo
    if tf.io.gfile.exists(model_dir):
        tf.io.gfile.rmtree(model_dir) #si existe lo eliminamos
    tf.io.gfile.mkdir(model_dir) #creamos un directorio de modelo

    # Configuramos donde guardar el modelo
    run_config = tf.estimator.RunConfig() 
    run_config = run_config.replace(model_dir=model_dir)

    # Nos permite seguir el entrenamiento cada 10 steps
    run_config = run_config.replace(save_summary_steps=10)

    # Generamos el Word2Vec para entrenamiento
    logging.info("---- Generating word2vec model ----")
    word2vec_model = generate_word2vec(train_df) 

    # Generamos el tokenizador con el dato de entrenamiento e inferimos en el de entrenamiento y evaluación
    logging.info("---- Generating tokenizer ----")
    train_df = pd.concat([train_df] * 7, ignore_index=True)   # Para asegurarme de que el modelo vea suficientes ejemplos de entrenamiento
    tokenizer, vocab_size = generate_tokenizer(train_df)

    logging.info("---- Tokenizing train data ----")
    x_train = pad_sequences(
        tokenizer.texts_to_sequences(train_df.text), maxlen=SEQUENCE_LENGTH
    ) #Añadimos el padding
    logging.info("---- Tokenizing eval data ----")
    x_eval = pad_sequences(
        tokenizer.texts_to_sequences(eval_df.text), maxlen=SEQUENCE_LENGTH
    )

    # Generamos los encodings de las etiquetas tanto para entrenamiento como para evaluación
    logging.info("---- Generating label encoder ----")
    label_encoder = generate_label_encoder(train_df)

    logging.info("---- Encoding train target ----")
    y_train = label_encoder.transform(train_df.sentiment.tolist())
    logging.info("---- Encoding eval target ----")
    y_eval = label_encoder.transform(eval_df.sentiment.tolist())

    y_train = y_train.reshape(-1, 1) #adaptamos la forma del tensor
    y_eval = y_eval.reshape(-1, 1)

    # Create Embedding Layer
    logging.info("---- Generating embedding layer ----")
    embedding_layer = generate_embedding(word2vec_model, vocab_size, tokenizer) #capade embedding
    # Construimos nuestra red LSTM
    logging.info("---- Generating Sequential model ----")
    model = Sequential()
    model.add(embedding_layer)
    model.add(Dropout(0.5))
    model.add(LSTM(100, dropout=0.2, recurrent_dropout=0.2))
    model.add(Dense(1, activation="sigmoid"))

    model.summary()

    logging.info("---- Adding loss function to model ----")
    model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"]) #problema binario y medimos según el accuracy

    logging.info("---- Adding callbacks to model ----")
    callbacks = [
        ReduceLROnPlateau(monitor="val_loss", patience=5, cooldown=0), 
        EarlyStopping(monitor="val_accuracy", min_delta=1e-4, patience=5), #early stopping para agilizar
    ]

    logging.info("---- Training model ----")
    model.fit(
        x_train,
        y_train,
        batch_size=batch_size,
        steps_per_epoch=steps,
        epochs=epochs,
        validation_split=0.1,
        verbose=1,
        callbacks=callbacks,
    ) #entrenamos nuestro modelo

    logging.info("---- Evaluating model ----")
    score = model.evaluate(x_eval, y_eval, batch_size=batch_size) #evaluamos el modelo
    logging.info(f"ACCURACY: {score[1]}")
    logging.info(f"LOSS: {score[0]}")

    logging.info("---- Saving models ----")
    pickle.dump(
        tokenizer,
        tf.io.gfile.GFile(os.path.join(model_dir, TOKENIZER_MODEL), mode="wb"),
        protocol=0,
    ) #guardamos el tokenizador
    with tempfile.NamedTemporaryFile(suffix=".h5") as local_file:
        with tf.io.gfile.GFile(
            os.path.join(model_dir, KERAS_MODEL), mode="wb" #guardamos el archivo
        ) as gcs_file:
            model.save(local_file.name)
            gcs_file.write(local_file.read())

    # word2vec_model.save(os.path.join(model_dir, WORD2VEC_MODEL))

    # pickle.dump(
    #     label_encoder, open(os.path.join(model_dir, ENCODER_MODEL), "wb"), protocol=0
    # )


if __name__ == "__main__":

    """Main function called by AI Platform."""

    logging.getLogger().setLevel(logging.INFO) # Activamos el logger

    parser = argparse.ArgumentParser( #Implementamos el parseador de argumentos. Ver script de preprocesamiento para más detalle
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )

    parser.add_argument(
        "--job-dir",
        help="Directory for staging trainer files. "
        "This can be a Google Cloud Storage path.",
    )

    parser.add_argument(
        "--work-dir",
        required=True,
        help="Directory for staging and working files. "
        "This can be a Google Cloud Storage path.",
    )

    parser.add_argument(
        "--batch-size",
        type=int,
        default=1024,
        help="Batch size for training and evaluation.",
    )

    parser.add_argument(
        "--epochs", type=int, default=8, help="Number of epochs to train the model",
    )

    parser.add_argument(
        "--steps",
        type=int,
        default=STEPS,
        help="Number of steps per epoch to train the model",
    )

    args = parser.parse_args() #cargamos todos los argumentos antes mencionados

    train_data_files = tf.io.gfile.glob(
        os.path.join(args.work_dir, "/content/batch/transformed_data/train/part-*") # el * permite que lea todas las particiones
    ) #archivos de entrenamiento
    eval_data_files = tf.io.gfile.glob(
        os.path.join(args.work_dir, "/content/batch/transformed_data/eval/part-*")
    ) #archivos de test

    train_df = pd.concat(
        [
            pd.read_csv(
                f,
                names=["text", "sentiment"],
                dtype={"text": "string", "sentiment": "string"},
            ) # leemos cada csv 
            for f in train_data_files
        ]
    ).dropna() #generamos un dataframe con todas las particiones

    eval_df = pd.concat(
        [
            pd.read_csv(
                f,
                names=["text", "sentiment"],
                dtype={"text": "string", "sentiment": "string"},
            )
            for f in eval_data_files
        ]
    ).dropna()

    train_and_evaluate(
        args.work_dir,
        train_df=train_df,
        eval_df=eval_df,
        batch_size=args.batch_size,
        epochs=args.epochs,
        steps=args.steps,
    )


Overwriting trainer/task.py


### Validación Train en local

Con el comando mostrado a continuación se valida el correcto entrenamiento del modelo usando los datos preprocesados del apartado anterior.

In [20]:
# Explicitly tell `gcloud ai-platform local train` to use Python 3 
! gcloud config set ml_engine/local_python $(which python3)

# This is similar to `python -m trainer.task --job-dir local-training-output`
# but it better replicates the AI Platform environment, especially for
# distributed training (not applicable here).
! gcloud ai-platform local train \
  --package-path trainer \
  --module-name trainer.task \
  -- \
  --work-dir $WORK_DIR \
  --epochs 1

Updated property [ml_engine/local_python].
INFO:tensorflow:TF_CONFIG environment variable: {'job': {'job_name': 'trainer.task', 'args': ['--work-dir', '/content/batch', '--epochs', '1']}, 'task': {}, 'cluster': {}, 'environment': 'cloud'}
INFO:root:---- Generating word2vec model ----
INFO:gensim.models.word2vec:collecting all words and their counts
INFO:gensim.models.word2vec:PROGRESS: at sentence #0, processed 0 words, keeping 0 word types
INFO:gensim.models.word2vec:PROGRESS: at sentence #10000, processed 66901 words, keeping 11467 word types
INFO:gensim.models.word2vec:collected 14439 word types from a corpus of 113404 raw words and 15990 sentences
INFO:gensim.models.word2vec:Loading a fresh vocabulary
INFO:gensim.models.word2vec:effective_min_count=10 retains 1559 unique words (10% of original 14439, drops 12880)
INFO:gensim.models.word2vec:effective_min_count=10 leaves 84461 word corpus (74% of original 113404, drops 28943)
INFO:gensim.models.word2vec:deleting the raw counts dicti

## Tercer ejercicio

Desarrollar un pipeline de inferencia utilizando Apache Beam para generar predicciones usando los modelos generados en el apartado anterior así como los datos de test generados en el primer ejercicio.


In [21]:
%%writefile predict.py

from __future__ import absolute_import
from __future__ import print_function

import argparse
import tempfile
import json
import os
import sys
import time

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.coders.coders import Coder

import pickle
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences

# KERAS
SEQUENCE_LENGTH = 300

# SENTIMENT
NEGATIVE = "NO_TROLL"  # Esta es una posible clasificacion
POSITIVE = "SI_TROLL"  # Esta es la otra
SENTIMENT_THRESHOLDS = [0.5, 0.5] # Lo que salga por debajo de 0.5 se clasifica como negativo (NO_TROLL)
                                  # y lo que salga por encima como positivo (SI_TROLL)

# EXPORT
KERAS_MODEL = "model.h5"
TOKENIZER_MODEL = "tokenizer.pkl"


class Predict(beam.DoFn): #realiza predicciones y hereda de Beam
    def __init__(
        self, model_dir,
    ):
        self.model_dir = model_dir
        self.model = None
        self.tokenizer = None

    def setup(self): #es un método de apache Beam que se ejecuta la primera vez que se ejecute e inicializa el modelo de Keras y el tokenizador cargándolos
        keras_model_path = os.path.join(self.model_dir, KERAS_MODEL)
        with tempfile.NamedTemporaryFile(suffix=".h5") as local_file:
            with tf.io.gfile.GFile(keras_model_path, mode="rb") as gcs_file:
                local_file.write(gcs_file.read())
                self.model = tf.keras.models.load_model(local_file.name)

        tokenizer_path = os.path.join(self.model_dir, TOKENIZER_MODEL)
        self.tokenizer = pickle.load(tf.io.gfile.GFile(tokenizer_path, mode="rb"))

    def decode_sentiment(self, score, include_neutral=True):
#        if include_neutral:
#           label = NEUTRAL
#            if score <= SENTIMENT_THRESHOLDS[0]:
#                label = NEGATIVE
#            elif score >= SENTIMENT_THRESHOLDS[1]:
#                label = POSITIVE
#
#            return label
#        else:
            return NEGATIVE if score < 0.5 else POSITIVE

    def process(self, element): #recoge elt exto a predecir, tokeniza 
        start_at = time.time()
        # Tokenize text
        x_test = pad_sequences(
            self.tokenizer.texts_to_sequences([element]), maxlen=SEQUENCE_LENGTH 
        ) #tokenizado y padding
        # Predict
        score = self.model.predict([x_test])[0] #realiza la inferencia
        # Decode sentiment
        label = self.decode_sentiment(score) #trasnforma la predicción en sentimiento

        yield {
            "text": element,
            "label": label,
            "score": float(score),
            "elapsed_time": time.time() - start_at,
        } #devuelve un diccionario con la información clave


class CustomCoder(Coder):
    """A custom coder used for reading and writing strings"""

    def __init__(self, encoding: str):
        # latin-1
        # iso-8859-1
        self.enconding = encoding

    def encode(self, value):
        return value.encode(self.enconding)

    def decode(self, value):
        return value.decode(self.enconding)

    def is_deterministic(self):
        return True


def run(model_dir, source, sink, beam_options=None):
    with beam.Pipeline(options=beam_options) as p:
        _ = (
            p #generamos el pipeline
            | "Read data" >> source #lectura de datos
            | "Predict" >> beam.ParDo(Predict(model_dir)) #hacemos la predicción
            | "Format as JSON" >> beam.Map(json.dumps) #convertimos el diccionario en string
            | "Write predictions" >> sink #guardamos la información
        )


if __name__ == "__main__":
    """Main function"""
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter # parseador de argumentos
    )

    parser.add_argument( #directorio de trabajo
        "--work-dir",
        dest="work_dir",
        required=True,
        help="Directory for temporary files and preprocessed datasets to. "
        "This can be a Google Cloud Storage path.",
    )

    parser.add_argument( #ubicación del modelo para inferencia
        "--model-dir",
        dest="model_dir",
        required=True,
        help="Path to the exported TensorFlow model. "
        "This can be a Google Cloud Storage path.",
    )

    verbs = parser.add_subparsers(dest="verb")
    batch_verb = verbs.add_parser("batch", help="Batch prediction")
    batch_verb.add_argument( #datos de entrada para la predicción
        "--inputs-dir",
        dest="inputs_dir",
        required=True,
        help="Input directory where CSV data files are read from. "
        "This can be a Google Cloud Storage path.",
    )
    batch_verb.add_argument( #datos de salida para la predicción
        "--outputs-dir",
        dest="outputs_dir",
        required=True,
        help="Directory to store prediction results. "
        "This can be a Google Cloud Storage path.",
    )

    args, pipeline_args = parser.parse_known_args()
    print(args)
    beam_options = PipelineOptions(pipeline_args)
    beam_options.view_as(SetupOptions).save_main_session = True
    # beam_options.view_as(DirectOptions).direct_num_workers = 0

    project = beam_options.view_as(GoogleCloudOptions).project

    if args.verb == "batch": #si es batch genera transformadores iniciales, la fuente y la salida de datos
        results_prefix = os.path.join(args.outputs_dir, "part")

        source = ReadFromText(args.inputs_dir, coder=CustomCoder("latin-1"))
        sink = WriteToText(results_prefix)

    else: # de momento no funciona si no es batch
        parser.print_usage()
        sys.exit(1)

    run(args.model_dir, source, sink, beam_options)

Overwriting predict.py


Generamos un timestamp para la ejecución de las predicciones

In [22]:
from datetime import datetime

# current date and time
TIMESTAMP = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

### Validación Predict en local

Con el comando mostrado a continuación se valida la correcta inferencia usando los modelos anteriores y los datos de test generados anteriormente.

In [23]:
! python3 predict.py \
  --work-dir $WORK_DIR \
  --model-dir $WORK_DIR/model \
  batch \
  --inputs-dir $WORK_DIR/transformed_data/test/part* \
  --outputs-dir $WORK_DIR/predictions/$TIMESTAMP

Namespace(work_dir='/content/batch', model_dir='/content/batch/model', verb='batch', inputs_dir='/content/batch/transformed_data/test/part-00000-of-00002', outputs_dir='/content/batch/predictions/2023-03-19_20-11-07')
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1466, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
  File "apache_beam/runners/common.py", line 552, in apache_beam.runners.common.DoFnInvoker.invoke_setup
  File "/content/batch/predict.py", line 50, in setup
    local_file.write(gcs_file.read())
  File "/usr/local/lib/python3.9/dist-packages/tensorflow/python/lib/io/file_io.py", line 114, in read
    self._preread_check()
  File "/usr/local/lib/python3.9/dist-packages/tensorflow/python/lib/io/file_io.py", line 76, in _preread_check
    self._read_buf = _pywrap_file_io.BufferedInputStream(
tensorflow.python.framework.errors_impl.NotFoundError: /content/batch/model/model.h5; No such file or directory

During handling of th

## Extra. Comprobaciones en la nube

En esta última parte se validará el funcionamiento del código en un proyecto de GCP sobre DataFlow y AI Platform

Establecemos el bucket y region de GCP sobre el que trabajaremos:

In [112]:
# GCP_WORK_DIR = ''
# GCP_REGION = ''

BUCKET_NAME = "bucket-da-2023-jcap" #@param {type:"string"}
REGION = "europe-west1" #@param {type:"string"}

Se proporciona un fichero `setup.py` necesario para ejecutar en DataFlow. Modificar la variable `REQUIRED_PACKAGES` con las dependencias que se hayan usado en el `requirements.txt`

In [113]:
%%writefile setup.py

import setuptools

REQUIRED_PACKAGES = [
  "apache-beam[gcp]==2.24.0",
  "tensorflow==2.8.0"
  "gensim==3.6.0"
  "fsspec==0.8.4"
  "gcsfs==0.7.1"
  "numpy==1.24.2"
]

setuptools.setup(
    name="twitchstreaming",
    version="0.0.1",
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True,
    description="Troll detection",
)


Overwriting setup.py


Habría que parsear lo siguiente:

  --project $PROJECT_ID \
  --region $REGION \
  --runner DataflowRunner \
  --temp_location $WORK_DIR/beam-temp \
  --setup_file ./setup.py \

In [116]:
# Run este script para el preprocesado en ****Google Cloud***** con los datos de entrenamiento
! python3 preprocess.py \
  --project $PROJECT_ID \
  --region $REGION \
  --runner DataflowRunner \
  --temp_location $WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $WORK_DIR \
  --input $WORK_DIR/Dataset*.json \
  --output $WORK_DIR/transformed_data \
  --mode train

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
Traceback (most recent call last):
  File "/content/batch/preprocess.py", line 182, in <module>
    run()
  File "/content/batch/preprocess.py", line 136, in run
    with beam.Pipeline(options=pipeline_options) as p:
  File "/usr/local/lib/python3.9/dist-packages/apache_beam/pipeline.py", line 203, in __init__
    raise ValueError(
ValueError: Pipeline has validations errors: 
Invalid GCS path (/content/batch/beam-temp), given for the option: temp_location.


In [None]:
# Run este script para el preprocesado en *******google Cloud*********** con los datos de test
! python3 preprocess.py \
  --project $PROJECT_ID \
  --region $REGION \
  --runner DataflowRunner \
  --temp_location $WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $WORK_DIR \
  --input $WORK_DIR/data/Dataset*.json \
  --output $WORK_DIR/data/transformed_data \
  --mode test

### Validación preprocess train en Dataflow

Con el comando mostrado a continuación se valida la correcta generación de los datos de entrenamiento y validación en GCP con el servicio DataFlow.

In [None]:
! python3 preprocess.py \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $GCP_WORK_DIR \
  --input $GCP_WORK_DIR/data.json \
  --output $GCP_WORK_DIR/transformed_data \
  --mode train

Traceback (most recent call last):
  File "/content/batch/preprocess.py", line 3, in <module>
    import apache_beam as beam
ModuleNotFoundError: No module named 'apache_beam'


### Validación preprocess test en Dataflow

Con el comando mostrado a continuación se valida la correcta generación de los datos de test en GCP con el servicio DataFlow.

In [None]:
! python3 preprocess.py \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $GCP_WORK_DIR \
  --input $GCP_WORK_DIR/data.json \
  --output $GCP_WORK_DIR/transformed_data \
  --mode test

Traceback (most recent call last):
  File "/content/batch/preprocess.py", line 3, in <module>
    import apache_beam as beam
ModuleNotFoundError: No module named 'apache_beam'


Omitimos el entrenamiento en la nube para no incurrir en costes innecesarios.

### Validación predict en Dataflow

Con el comando mostrado a continuación se valida la predicción correcta de los datos de test usando los modelos generados en el comando anterior.

Generamos un timestamp para el almacenamiento de las inferencias en Google Cloud Storage.

In [None]:
from datetime import datetime

# current date and time
TIMESTAMP = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

In [None]:
# For using sample models: --model-dir gs://$BUCKET_NAME/models/
! python3 predict.py \
  --work-dir $GCP_WORK_DIR \
  --model-dir $GCP_WORK_DIR/model/ \
  batch \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --inputs-dir $GCP_WORK_DIR/transformed_data/test/part* \
  --outputs-dir $GCP_WORK_DIR/predictions/$TIMESTAMP