# Práctica Final: detección de mensajes troll 

En los últimos años Twitch se ha consolidad como uno de los principales medios de comunicación especialmente para las generaciones más jóvenes.

Al tratarse de una plataforma participativa en la que los usuarios pueden poner comentarios durante y posteriormente a las emisiones. Entre estos comentarios han aparecido como siempre comentarios ofensisvos.

En esta práctica construiremos una Inteligencia Artificial capaz de clasificar esos mensajes troll.

Durante la práctica entrenaremos el modelo de Deep Learning y lo desplegaremos para inferencia en batch, la más habitual actualmente dentro de la industria:

## Teoría

1. ¿Qué es Apache Beam?
2. ¿Cuáles son las diferentes formas de desplegar un modelo?
3. ¿Cuál es la principal diferencia entre la inferencia en batch y la inferencia en streaming?

**Respuestas**

1.- Es un marco de programación open source para procesar datos de manera eficiente en lotes y en streaming. Permite crear pipelines de procesamiento de datos que pueden ejecutarse en diferentes sistemas de ejecución, como Apache Flink, Apache Spark y Google Cloud Dataflow. Su ventaja principal es el unificar el procesamiento por lotes y en tiempo real en una API, simplificando el desarrollo y gestión de aplicaciones de procesamiento de datos.

2.- Las formas de desplegar un modelo son: batch y on demand.

3.- La inferencia batch procesa conjuntos de datos completos y se pueden generar predicciones para todos los datos al mismo tiempo. Por el contrario, la inferencia en streamming es un procesamiento y predicción al tiempo que se reciben los datos. Esto es muy útil en datos que fluyen en tiempo real.


# Configuración de nuestro proyecto en GCP


In [1]:
PROJECT_ID = "serious-studio-380614" #@param {type:"string"}
! gcloud config set project $PROJECT_ID

Updated property [core/project].


In [2]:
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

if 'google.colab' in sys.modules:
  from google.colab import auth as google_auth
  google_auth.authenticate_user()

# If you are running this notebook locally, replace the string below with the
# path to your service account key and run this cell to authenticate your GCP
# account.
else:
  %env GOOGLE_APPLICATION_CREDENTIALS ''


Creamos el bucket mediante la consola y una vez creado lo indicamos en la variable:

In [3]:
BUCKET_NAME = "produccionkc" #@param {type:"string"}
REGION = "europe-west1" #@param {type:"string"}

In [4]:
! gsutil ls -al gs://$BUCKET_NAME

                                 gs://produccionkc/twitter-sentiment-batch/


# Entrenamiento e inferencia en Batch

## Preparación

Para esta primera parte se va a utilizar [Tweets Dataset for Detection of Cyber-Trolls](https://www.kaggle.com/dataturks/dataset-for-detection-of-cybertrolls). El objetivo es desarrollar un clasificador binario para detectar si el mensaje recibido es troll (1) o no (0). **Las métricas obtenidas del entrenamiento y la inferencia no se tendrán en cuenta para la evaluación de la práctica, la importancia está en la arquitectura de la solución**, es decir, lo importante no es que nuestro modelo detecte correctamente los tweets de trolls si no que funcione y sea capaz de hacer inferencias.


A continuación, se van a subir los datos de entrenamiento al bucket del proyecto que se haya creado. **Importante:** crea el bucket en una única región. Os dejo disponibilizado el dataset en un bucket de acceso público:

In [5]:
# Upload data to your bucket
! wget https://storage.googleapis.com/datos-practica-compartidos/Dataset%20for%20Detection%20of%20Cyber-Trolls.json -O - | gsutil cp - gs://$BUCKET_NAME/data.json

--2023-03-19 20:36:31--  https://storage.googleapis.com/datos-practica-compartidos/Dataset%20for%20Detection%20of%20Cyber-Trolls.json
Resolving storage.googleapis.com (storage.googleapis.com)... 108.177.127.128, 142.250.145.128, 2a00:1450:4013:c14::80, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|108.177.127.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2756023 (2.6M) [application/json]
Saving to: ‘STDOUT’

-                     0%[                    ]       0  --.-KB/s               Copying from <STDIN>...

2023-03-19 20:36:34 (926 KB/s) - written to stdout [2756023/2756023]

/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              


Ahora se crea el directorio dónde vas a desarrollar esta primera parte de la práctica.

In [6]:
%mkdir /content/batch

Se establece el directorio de trabajo que hemos creado.

In [7]:
import os

# Set the working directory to the sample code directory
%cd /content/batch

WORK_DIR = os.getcwd()

/content/batch


Ahora se descargarán los datos en el workspace de Colab para trabajar en local.

In [8]:
! wget https://storage.googleapis.com/datos-practica-compartidos/Dataset%20for%20Detection%20of%20Cyber-Trolls.json

--2023-03-19 20:38:21--  https://storage.googleapis.com/datos-practica-compartidos/Dataset%20for%20Detection%20of%20Cyber-Trolls.json
Resolving storage.googleapis.com (storage.googleapis.com)... 142.250.153.128, 142.250.145.128, 108.177.119.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.250.153.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2756023 (2.6M) [application/json]
Saving to: ‘Dataset for Detection of Cyber-Trolls.json’


2023-03-19 20:38:22 (200 MB/s) - ‘Dataset for Detection of Cyber-Trolls.json’ saved [2756023/2756023]



Se establecen las dependencias que se usarán en la práctica. Se pueden añadir y quitar las dependencias que no se usen o viceversa.

In [9]:
%%writefile requirements.txt

apache-beam[gcp]
tensorflow
gensim
fsspec
gcsfs
numpy
nltk

Writing requirements.txt


Instalamos las dependencias. **No olvides reiniciar el entorno al instalar y establecer las variables y credenciales de GCP al arrancar.**

In [11]:
! pip install -r requirements.txt

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## Primer ejercicio

Desarrollar un pipeline de preprocesamiento utilizando Apache Beam para generar datos de train, eval y test para los datos proporcionados anteriormente. Requisitos:

- Proporcionar dos modos de ejecución: `train` y `test`
- Soportar ejecuciones en local con `DirectRunner` y ejecuciones en Dataflow usando `DataFlowRunner`.

In [12]:
%%writefile preprocess.py

"""A word-counting workflow."""

import argparse
import logging
import re
import os
import json
import random

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, DirectOptions

import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer

nltk.download("stopwords")

# Text preprocessing constants
STOP_WORDS = stopwords.words("english")
STEMMER = SnowballStemmer("english")
TEXT_CLEANING_RE = "@\S+|https?:\S+|http?:\S|[^A-Za-z0-9]+"


def extract_columns(element):
    element_json = json.loads(element)
    text = element_json["content"]
    sentiment = element_json["annotation"]["label"][0]
    return text, sentiment


def preprocess_columns_train(element):
    def process_sentiment(sentiment):
        sentiment = int(sentiment)
        return "TROLL" if sentiment == 1 else "NO_TROLL" if sentiment == 0 else "NEUTRAL"

    def process_text(text):
        text = re.sub(TEXT_CLEANING_RE, " ", str(text).lower()).strip()
        tokens = [token for token in text.split() if token not in STOP_WORDS]
        return " ".join(tokens)

    processed_text = process_text(element[0])
    processed_sentiment = process_sentiment(element[1])
    return f"{processed_text}, {processed_sentiment}"


def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument("--work-dir", dest="work_dir", required=True, help="Working directory")
    parser.add_argument("--input", dest="input", required=True, help="Input dataset in work dir")
    parser.add_argument("--output", dest="output", required=True, help="Output path to store transformed data in work dir")
    parser.add_argument("--mode", dest="mode", required=True, choices=["train", "test"], help="Type of output to store transformed data")

    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    pipeline_options.view_as(DirectOptions).direct_num_workers = 0

    with beam.Pipeline(options=pipeline_options) as p:
        raw_data = p | "ReadTwitterData" >> ReadFromText(known_args.input)

        if known_args.mode == "train":
            transformed_data = (
                raw_data
                | "ExtractColumns" >> beam.Map(extract_columns)
                | "Preprocess" >> beam.Map(preprocess_columns_train)
            )

            eval_percent = 20
            assert 0 < eval_percent < 100, "eval_percent must be in the range (0-100)"
            train_dataset, eval_dataset = (
                transformed_data
                | "Split dataset" >> beam.Partition(lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2)
            )

            train_dataset | "TrainWriteToCSV" >> WriteToText(os.path.join(known_args.output, "train", "part"))
            eval_dataset | "EvalWriteToCSV" >> WriteToText(os.path.join(known_args.output, "eval", "part"))

        else:
            transformed_data = (
                raw_data
                | "ExtractColumns" >> beam.Map(extract_columns)
                | "Preprocess" >> beam.Map(lambda x: f'"{x[0]}"')
            )

            transformed_data | "TestWriteToCSV" >> WriteToText(
                os.path.join(known_args.output, "test", "part")
            )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()



Writing preprocess.py


Se proporciona un fichero `setup.py` necesario para ejecutar en DataFlow. Modificar la variable `REQUIRED_PACKAGES` con las dependencias que se hayan usado en el `requirements.txt`

In [14]:
%%writefile setup.py

import setuptools

REQUIRED_PACKAGES = [
  "apache-beam[gcp]",
  "tensorflow",
  "gensim",
  "fsspec",
  "gcsfs",
  "numpy",
  "keras",
  "nltk"
]

setuptools.setup(
    name="twitchstreaming",
    version="0.0.1",
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True,
    description="Troll detection",
)


Overwriting setup.py


### Validación preprocess train en local
Con el comando mostrado a continuación se valida la correcta generación de los datos de entrenamiento y validación en local.

In [16]:
WORK_DIR

'/content/batch'

In [17]:
! python3 preprocess.py \
  --work-dir $WORK_DIR \
  --runner DirectRunner \
  --input $WORK_DIR/data.json \
  --output $WORK_DIR/transformed_data \
  --mode train

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.46.0
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f594d24a580> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f594d24aee0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:root:Default Python SDK image for environ

### Validación preprocess test en local 

Con el comando mostrado a continuación se valida la correcta generación de los datos de test en local.

In [18]:
! python3 preprocess.py \
  --work-dir $WORK_DIR \
  --runner DirectRunner \
  --input $WORK_DIR/data.json \
  --output $WORK_DIR/transformed_data \
  --mode test

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.46.0
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f196d1c3b20> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f196cdffaf0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:root:Default Python SDK image for environ

## Segundo ejercicio

Desarrollar una tarea de entrenamiento para los datos preprocesados. Requisitos:

- Soportar ejecuciones en local usando el SDK de AI-Platform y ejecuciones en GCP con el mismo código.

Se crea el directorio donde trabajaremos:

In [19]:
%mkdir /content/batch/trainer

In [20]:
%%writefile trainer/__init__.py

version = "0.1.0"

Writing trainer/__init__.py


In [23]:
%%writefile trainer/task.py

import os
import multiprocessing as mp
import logging
import tempfile
import pickle
import gensim
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Embedding, LSTM
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from sklearn.preprocessing import LabelEncoder


W2V_SIZE = 300
W2V_WINDOW = 7
W2V_EPOCH = 5
W2V_MIN_COUNT = 10
SEQUENCE_LENGTH = 300
TROLL = "TROLL"
NO_TROLL = "NO_TROLL"
NEUTRAL = "NEUTRAL"
SENTIMENT_THRESHOLDS = (0.5, 0.6)
KERAS_MODEL = "model.h5"
WORD2VEC_MODEL = "model.w2v"
TOKENIZER_MODEL = "tokenizer.pkl"
ENCODER_MODEL = "encoder.pkl"


def generate_word2vec(train_df):
    documents = [_text.split() for _text in train_df.text.values]
    w2v_model = gensim.models.word2vec.Word2Vec(
        size=W2V_SIZE,
        window=W2V_WINDOW,
        min_count=W2V_MIN_COUNT,
        workers=mp.cpu_count(),
    )
    w2v_model.build_vocab(documents)

    words = w2v_model.wv.vocab.keys()
    vocab_size = len(words)
    logging.info(f"Vocabulario: {vocab_size}")
    w2v_model.train(documents, total_examples=len(documents), epochs=W2V_EPOCH)

    return w2v_model


def generate_tokenizer(train_df):
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(train_df.text)
    vocab_size = len(tokenizer.word_index) + 1
    logging.info(f"Total palabras: {vocab_size}")
    return tokenizer, vocab_size


def generate_label_encoder(train_df):
    encoder = LabelEncoder()
    encoder.fit(train_df.sentiment.tolist())
    return encoder


def generate_embedding(word2vec_model, vocab_size, tokenizer):
    embedding_matrix = np.zeros((vocab_size, W2V_SIZE))
    for word, i in tokenizer.word_index.items():
        if word in word2vec_model.wv:
            embedding_matrix[i] = word2vec_model.wv[word]
    return Embedding(
        vocab_size,
        W2V_SIZE,
        weights=[embedding_matrix],
        input_length=SEQUENCE_LENGTH,
        trainable=False,
    )


def train_and_evaluate(work_dir, train_df, eval_df, batch_size=1024, epochs=8, steps=1000):
    model_dir = os.path.join(work_dir, "model")
    if tf.io.gfile.exists(model_dir):
        tf.io.gfile.rmtree(model_dir)
    tf.io.gfile.mkdir(model_dir)

    run_config = tf.estimator.RunConfig()
    run_config = run_config.replace(model_dir=model_dir)
    run_config = run_config.replace(save_summary_steps=10)

    logging.info("---- Generando modelo word2vec ----")
    word2vec_model = generate_word2vec(train_df)

    logging.info("---- Generando tokenizador ----")
    tokenizer, vocab_size = generate_tokenizer(train_df)

    logging.info("---- Datos de entrenamiento ----")
    x_train = pad_sequences(
        tokenizer.texts_to_sequences(train_df.text), maxlen=SEQUENCE_LENGTH
    )
    logging.info("---- Datos de evaluacion ----")
    x_eval = pad_sequences(
        tokenizer.texts_to_sequences(eval_df.text), maxlen=SEQUENCE_LENGTH
    )

    logging.info("---- Generando encoder ----")
    label_encoder = generate_label_encoder(train_df)

    logging.info("---- Encoder de entrenamiento ----")
    y_train = label_encoder.transform(train_df.sentiment.tolist())
    logging.info("---- Encoder de evaluacion ----")
    y_eval = label_encoder.transform(eval_df.sentiment.tolist())

    y_train = y_train.reshape(-1, 1)
    y_eval = y_eval.reshape(-1, 1)

    logging.info("---- Generando la embedding layer ----")
    embedding_layer = generate_embedding(word2vec_model, vocab_size, tokenizer)

    logging.info("---- modelo ----")
    model = Sequential()
    model.add(embedding_layer)
    model.add(Dropout(0.5))
    model.add(LSTM(100, dropout=0.2, recurrent_dropout=0.2))
    model.add(Dense(1, activation="sigmoid"))

    model.summary()

    logging.info("---- Función de pérdida ----")
    model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

    logging.info("---- Funciones para detener ----")
    callbacks = [
        ReduceLROnPlateau(monitor="val_loss", patience=5, cooldown=0),
        EarlyStopping(monitor="val_accuracy", min_delta=1e-4, patience=5),
    ]

    logging.info("---- Entrenando ----")
    model.fit(
        x_train,
        y_train,
        batch_size=batch_size,
        steps_per_epoch=steps,
        epochs=epochs,
        validation_split=0.1,
        verbose=1,
        callbacks=callbacks,
    )

    logging.info("---- Evaluando ----")
    score = model.evaluate(x_eval, y_eval, batch_size=batch_size)
    logging.info(f"ACCURACY: {score[1]}")
    logging.info(f"LOSS: {score[0]}")

    logging.info("---- Guardando modelos ----")
    pickle.dump(
        tokenizer,
        tf.io.gfile.GFile(os.path.join(model_dir, TOKENIZER_MODEL), mode="wb"),
        protocol=0,
    )
    with tempfile.NamedTemporaryFile(suffix=".h5") as local_file:
        with tf.io.gfile.GFile(
            os.path.join(model_dir, KERAS_MODEL), mode="wb"
        ) as gcs_file:
            model.save(local_file.name)
            gcs_file.write(local_file.read())

def main(work_dir, batch_size=1024, epochs=8, steps=1000):
    logging.getLogger().setLevel(logging.INFO)

    train_data_files = tf.io.gfile.glob(
        os.path.join(work_dir, "transformed_data/train/part-*")
    )
    eval_data_files = tf.io.gfile.glob(
        os.path.join(work_dir, "transformed_data/eval/part-*")
    )

    train_df = pd.concat(
        [
            pd.read_csv(
                f,
                names=["text", "sentiment"],
                dtype={"text": "string", "sentiment": "string"},
            )
            for f in train_data_files
        ]
    ).dropna()

    eval_df = pd.concat(
        [
            pd.read_csv(
                f,
                names=["text", "sentiment"],
                dtype={"text": "string", "sentiment": "string"},
            )
            for f in eval_data_files
        ]
       ).dropna()

    train_and_evaluate(
        work_dir,
        train_df=train_df,
        eval_df=eval_df,
        batch_size=batch_size,
        epochs=epochs,
        steps=steps,
    )


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )

    parser.add_argument(
        "--work-dir",
        required=True,
        help="Directory for staging and working files. "
        "This can be a Google Cloud Storage path.",
    )

    parser.add_argument(
        "--batch-size",
        type=int,
        default=1024,
        help="Batch size for training and evaluation.",
    )

    parser.add_argument(
        "--epochs", type=int, default=8, help="Number of epochs to train the model",
    )

    parser.add_argument(
        "--steps",
        type=int,
        default=1000,
        help="Number of steps per epoch to train the model",
    )

    args = parser.parse_args()

    main(
        work_dir=args.work_dir,
        batch_size=args.batch_size,
        epochs=args.epochs,
        steps=args.steps,
    )



Overwriting trainer/task.py


### Validación Train en local

Con el comando mostrado a continuación se valida el correcto entrenamiento del modelo usando los datos preprocesados del apartado anterior.

In [24]:
# Explicitly tell `gcloud ai-platform local train` to use Python 3 
! gcloud config set ml_engine/local_python $(which python3)

# This is similar to `python -m trainer.task --job-dir local-training-output`
# but it better replicates the AI Platform environment, especially for
# distributed training (not applicable here).
! gcloud ai-platform local train \
  --package-path trainer \
  --module-name trainer.task \
  -- \
  --work-dir $WORK_DIR \
  --epochs 1

Updated property [ml_engine/local_python].
2023-03-19 21:08:08.648423: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-19 21:08:09.475167: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/lib64-nvidia
2023-03-19 21:08:09.475285: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/lib64-nvidia
INFO:tensorflow:TF_CONFIG environment varia

## Tercer ejercicio

Desarrollar un pipeline de inferencia utilizando Apache Beam para generar predicciones usando los modelos generados en el apartado anterior así como los datos de test generados en el primer ejercicio.


In [31]:
%%writefile predict.py

from __future__ import absolute_import, print_function

import argparse
import json
import os
import sys
import tempfile
import time

import apache_beam as beam
import pickle
import tensorflow as tf
from apache_beam.coders.coders import Coder
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import (
    DirectOptions,
    GoogleCloudOptions,
    PipelineOptions,
    SetupOptions,
)
from tensorflow.keras.preprocessing.sequence import pad_sequences

SEQUENCE_LENGTH = 300

TROLL = "TROLL"
NO_TROLL = "NO_TROLL"
NEUTRAL = "NEUTRAL"
SENTIMENT_THRESHOLDS = (0.5, 0.6)

KERAS_MODEL = "model.h5"
TOKENIZER_MODEL = "tokenizer.pkl"


class CustomCoder(Coder):
    def __init__(self, encoding: str):
        self.enconding = encoding

    def encode(self, value):
        return value.encode(self.enconding)

    def decode(self, value):
        return value.decode(self.enconding)

    def is_deterministic(self):
        return True


class Predict(beam.DoFn):
    def __init__(self, model_dir):
        self.model_dir = model_dir
        self.model = None
        self.tokenizer = None

    def setup(self):
        keras_model_path = os.path.join(self.model_dir, KERAS_MODEL)
        tokenizer_path = os.path.join(self.model_dir, TOKENIZER_MODEL)

        with tempfile.NamedTemporaryFile(suffix=".h5") as local_file:
            with tf.io.gfile.GFile(keras_model_path, mode="rb") as gcs_file:
                local_file.write(gcs_file.read())
                self.model = tf.keras.models.load_model(local_file.name)

        self.tokenizer = pickle.load(tf.io.gfile.GFile(tokenizer_path, mode="rb"))

    def decode_sentiment(self, score, include_neutral=True):
        if include_neutral:
            label = NEUTRAL
            if score <= SENTIMENT_THRESHOLDS[0]:
                label = NO_TROLL
            elif score >= SENTIMENT_THRESHOLDS[1]:
                label = TROLL
            return label
        else:
            return NO_TROLL if score < 0.5 else TROLL

    def process(self, element):
        start_at = time.time()
        x_test = pad_sequences(
            self.tokenizer.texts_to_sequences([element]), maxlen=SEQUENCE_LENGTH
        )
        score = self.model.predict([x_test])[0]
        label = self.decode_sentiment(score)

        yield {
            "text": element,
            "label": label,
            "score": float(score),
            "elapsed_time": time.time() - start_at,
        }


def run(model_dir, source, sink, beam_options=None):
    with beam.Pipeline(options=beam_options) as p:
        _ = (
            p
            | "Read data" >> source
            | "Predict" >> beam.ParDo(Predict(model_dir))
            | "Format as JSON" >> beam.Map(json.dumps)
            | "Write predictions" >> sink
        )


def main():
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )

    parser.add_argument("--work-dir", dest="work_dir", required=True)
    parser.add_argument("--model-dir", dest="model_dir", required=True)

    verbs = parser.add_subparsers(dest="verb")
    batch_verb = verbs.add_parser("batch", help="Batch prediction")
    batch_verb.add_argument("--inputs-dir", dest="inputs_dir", required=True)
    batch_verb.add_argument("--outputs-dir", dest="outputs_dir", required=True)

    args, pipeline_args = parser.parse_known_args()
    beam_options = PipelineOptions(pipeline_args)
    beam_options.view_as(SetupOptions).save_main_session = True
    beam_options.view_as(DirectOptions).direct_num_workers = 0

    project = beam_options.view_as(GoogleCloudOptions).project

    if args.verb == "batch":
        results_prefix = os.path.join(args.outputs_dir, "part")

        source = ReadFromText(args.inputs_dir, coder=CustomCoder("latin-1"))
        sink = WriteToText(results_prefix)

    else:
        parser.print_usage()
        sys.exit(1)

    run(args.model_dir, source, sink, beam_options)


if __name__ == "__main__":
    main()






Overwriting predict.py


Generamos un timestamp para la ejecución de las predicciones

In [29]:
from datetime import datetime

# current date and time
TIMESTAMP = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

### Validación Predict en local

Con el comando mostrado a continuación se valida la correcta inferencia usando los modelos anteriores y los datos de test generados anteriormente.

In [30]:
! python3 predict.py \
  --work-dir $WORK_DIR \
  --model-dir $WORK_DIR/model \
  batch \
  --inputs-dir $WORK_DIR/transformed_data/test/part* \
  --outputs-dir $WORK_DIR/predictions/$TIMESTAMP

[1;30;43mSe truncaron las últimas líneas 5000 del resultado de transmisión.[0m


## Extra. Comprobaciones en la nube

En esta última parte se validará el funcionamiento del código en un proyecto de GCP sobre DataFlow y AI Platform

Establecemos el bucket y region de GCP sobre el que trabajaremos:

In [32]:
GCP_WORK_DIR = 'gs://' + BUCKET_NAME + '/final'
GCP_REGION = REGION

In [33]:
GCP_WORK_DIR

'gs://produccionkc/final'

In [34]:
GCP_REGION

'europe-west1'

### Validación preprocess train en Dataflow

Con el comando mostrado a continuación se valida la correcta generación de los datos de entrenamiento y validación en GCP con el servicio DataFlow.

In [37]:
! python3 preprocess.py \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $GCP_WORK_DIR \
  --input $GCP_WORK_DIR/data.json \
  --output $GCP_WORK_DIR/transformed_data \
  --mode train

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmp94nyjqo4']



INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/tmp94nyjqo4', 'apache-beam==2.46.0', '--no-deps', '--no-binary', ':all:']
INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binary distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', '-m', '

### Validación preprocess test en Dataflow

Con el comando mostrado a continuación se valida la correcta generación de los datos de test en GCP con el servicio DataFlow.

In [38]:
! python3 preprocess.py \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $GCP_WORK_DIR \
  --input $GCP_WORK_DIR/data.json \
  --output $GCP_WORK_DIR/transformed_data \
  --mode test

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmp_1hkchlg']



INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/tmp_1hkchlg', 'apache-beam==2.46.0', '--no-deps', '--no-binary', ':all:']
INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binary distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', '-m', '

Omitimos el entrenamiento en la nube para no incurrir en costes innecesarios.

### Validación predict en Dataflow

Con el comando mostrado a continuación se valida la predicción correcta de los datos de test usando los modelos generados en el comando anterior.

Generamos un timestamp para el almacenamiento de las inferencias en Google Cloud Storage.

In [39]:
from datetime import datetime

# current date and time
TIMESTAMP = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

In [None]:
# For using sample models: --model-dir gs://$BUCKET_NAME/models/
! python3 predict.py \
  --work-dir $GCP_WORK_DIR \
  --model-dir $GCP_WORK_DIR/model/ \
  batch \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --inputs-dir $GCP_WORK_DIR/transformed_data/test/part* \
  --outputs-dir $GCP_WORK_DIR/predictions/$TIMESTAMP