# Práctica Final: detección de mensajes troll en chat de Twitch en tiempo real

Durante este último año la plataforma de vídeo en streaming Twitch ha cogido mucha popularidad debido a la situación que hemos vivido debido al COVID-19. Por esto, mucha gente de todas las edades ha empezado a consumir esta plataforma de manera diaria.

Como consecuencia, no sólo han aumentado las personas que ven contenido en Twitch, sino también el número de los denominados *trolls*, gente que pone comentarios ofensivos en los chat de los streamers.

En esta práctica se desarrollará un sistema autónomo basado en IA y desplegado en GCP que detectará en tiempo real si los mensajes que se envían a un canal de Twitch son de un *troll* o no. La práctica constará de tres partes principales que serán evaluadas en la corrección:
1. Entrenamiento e inferencia en Batch de un modelo usando Dataflow y AI Platform. **(3.5 puntos)**.
2. Despliegue e inferencia online en microservicio con el modelo. **(3.5 puntos)**.
3. Inferencia en streaming de un canal de Twitch con el microservicio anterior. **(3 puntos)**.

# Configuración de nuestro proyecto en GCP


In [None]:
! pip install apache-beam

Collecting apache-beam
[?25l  Downloading https://files.pythonhosted.org/packages/ac/c9/395a9759dfbf9e87203a69c33b2e94f10d566d9391bddb6f99facafe64c3/apache_beam-2.30.0-cp37-cp37m-manylinux2010_x86_64.whl (9.6MB)
[K     |████████████████████████████████| 9.6MB 14.8MB/s 
[?25hCollecting avro-python3!=1.9.2,<1.10.0,>=1.8.1
  Downloading https://files.pythonhosted.org/packages/5a/80/acd1455bea0a9fcdc60a748a97dcbb3ff624726fb90987a0fc1c19e7a5a5/avro-python3-1.9.2.1.tar.gz
Collecting dill<0.3.2,>=0.3.1.1
[?25l  Downloading https://files.pythonhosted.org/packages/c7/11/345f3173809cea7f1a193bfbf02403fff250a3360e0e118a1630985e547d/dill-0.3.1.1.tar.gz (151kB)
[K     |████████████████████████████████| 153kB 45.1MB/s 
Collecting future<1.0.0,>=0.18.2
[?25l  Downloading https://files.pythonhosted.org/packages/45/0b/38b06fd9b92dc2b68d58b75f900e97884c45bedd2ff83203d933cf5851c9/future-0.18.2.tar.gz (829kB)
[K     |████████████████████████████████| 829kB 28.9MB/s 
Collecting fastavro<2,>=0.21.4


In [None]:
PROJECT_ID = "twitch-practiceeva" #@param {type:"string"}
! gcloud config set project $PROJECT_ID

Updated property [core/project].


In [None]:
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

if 'google.colab' in sys.modules:
  from google.colab import auth as google_auth
  google_auth.authenticate_user()

# If you are running this notebook locally, replace the string below with the
# path to your service account key and run this cell to authenticate your GCP
# account.
else:
  %env GOOGLE_APPLICATION_CREDENTIALS ''


In [None]:
BUCKET_NAME = "twitch-practiceeva" #@param {type:"string"}
REGION = "europe-west1" #@param {type:"string"}

In [None]:
! gsutil mb -l $REGION gs://$BUCKET_NAME #sólo la primera vez

Creating gs://twitch-practiceeva/...


In [None]:
! gsutil ls -al gs://$BUCKET_NAME

   2756023  2021-07-02T15:52:43Z  gs://twitch-practiceeva/data.json#1625241163220787  metageneration=1
                                 gs://twitch-practiceeva/beam-temp/
                                 gs://twitch-practiceeva/model/
                                 gs://twitch-practiceeva/predictions/
                                 gs://twitch-practiceeva/trainer/
                                 gs://twitch-practiceeva/transformed_data/
TOTAL: 1 objects, 2756023 bytes (2.63 MiB)


# Entrenamiento e inferencia en Batch

Para esta primera parte se va a utilizar [Tweets Dataset for Detection of Cyber-Trolls](https://www.kaggle.com/dataturks/dataset-for-detection-of-cybertrolls). El objetivo es desarrollar un clasificador binario para detectar si el mensaje recibido es troll (1) o no (0). **Las métricas obtenidas del entrenamiento y la inferencia no se tendrán en cuenta para la evaluación de la práctica, la importancia está en la arquitectura de la solución**.

A continuación os dejo un diagrama con la arquitectura que se va a desarrollar:

![batch_diagram](https://drive.google.com/uc?export=view&id=1h1BkIunyKSkJYFRbXKNWpHOZ_rDUyGAT)

A continuación, se van a subir los datos de entrenamiento al bucket del proyecto que se haya creado. **Importante:** crea el bucket en una única región para evitar problemas más adelante.

In [None]:
# Upload data to your bucket
! wget https://storage.googleapis.com/twitch-practice-keepcoding/data.json -O - | gsutil cp - gs://$BUCKET_NAME/data.json

--2021-07-02 15:52:40--  https://storage.googleapis.com/twitch-practice-keepcoding/data.json
Resolving storage.googleapis.com (storage.googleapis.com)... 172.217.13.80, 172.217.13.240, 172.217.15.80, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.217.13.80|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2756023 (2.6M) [application/json]
Saving to: ‘STDOUT’

-                     0%[                    ]       0  --.-KB/s               Copying from <STDIN>...

2021-07-02 15:52:42 (1.41 MB/s) - written to stdout [2756023/2756023]

/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              


Ahora se crea el directorio dónde vas a desarrollar esta primera parte de la práctica.

In [None]:
%mkdir /content/batch

Se establece el directorio de trabajo que hemos creado.

In [None]:
import os

# Set the working directory to the sample code directory
%cd /content/batch

WORK_DIR = os.getcwd()

/content/batch


Ahora se descargarán los datos en el workspace de Colab para trabajar en local.

In [None]:
! wget https://storage.googleapis.com/twitch-practice-keepcoding/data.json

--2021-07-03 14:54:46--  https://storage.googleapis.com/twitch-practice-keepcoding/data.json
Resolving storage.googleapis.com (storage.googleapis.com)... 142.250.65.80, 142.250.188.208, 142.251.33.208, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.250.65.80|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2756023 (2.6M) [application/json]
Saving to: ‘data.json’


2021-07-03 14:54:46 (129 MB/s) - ‘data.json’ saved [2756023/2756023]



Se establecen las dependencias que se usarán en la práctica. Se pueden añadir y quitar las dependencias que no se usen o viceversa.

In [None]:
%%writefile requirements.txt

apache-beam[gcp]==2.24.0
tensorflow-transform==0.24.1
tensorflow==2.3.0
tfx==0.24.1
gensim==3.6.0
fsspec==0.8.4
gcsfs==0.7.1

Writing requirements.txt


Instalamos las dependencias. **No olvidarse de reiniciar el entorno al instalar y establecer las variables y credenciales de GCP al arrancar.**

In [None]:
! pip install -r requirements.txt

Collecting apache-beam[gcp]==2.24.0
[?25l  Downloading https://files.pythonhosted.org/packages/97/1b/10ab8380b6d56700b3eba1a7bbf980878ea9d0457303e0934760fc7882d7/apache_beam-2.24.0-cp37-cp37m-manylinux2010_x86_64.whl (8.5MB)
[K     |████████████████████████████████| 8.6MB 25.4MB/s 
[?25hCollecting tensorflow-transform==0.24.1
[?25l  Downloading https://files.pythonhosted.org/packages/5d/9f/acad7dab38ba19f4c574de2b50ec13343fce6ac51c291b5fc81e59bc4466/tensorflow_transform-0.24.1-py3-none-any.whl (373kB)
[K     |████████████████████████████████| 378kB 38.7MB/s 
[?25hCollecting tensorflow==2.3.0
[?25l  Downloading https://files.pythonhosted.org/packages/16/89/f2d29c2eafc2eeafb17d5634340e06366af904d332341200a49d954bce85/tensorflow-2.3.0-cp37-cp37m-manylinux2010_x86_64.whl (320.4MB)
[K     |████████████████████████████████| 320.4MB 47kB/s 
[?25hCollecting tfx==0.24.1
[?25l  Downloading https://files.pythonhosted.org/packages/34/28/d283681f9e7bc489b59712654f65229c75b84d4450137092a18

##**Entregable (0.5 puntos)**

Desarrollar un pipeline de preprocesamiento utilizando Apache Beam para generar datos de train, eval y test para los datos proporcionados anteriormente. Requisitos:

- Proporcionar dos modos de ejecución: `train` y `test`
- Soportar ejecuciones en local con `DirectRunner` y ejecuciones en Dataflow usando `DataFlowRunner`.

In [None]:
# Nos aseguramos que nuestras variables de entorno no hayan desaparecido al reiniciar el kernel

print(f"Project: {PROJECT_ID}")
print(f"Region: {REGION}")
print(f"Bucket: {BUCKET_NAME}")

Project: twitch-practiceeva
Region: europe-west1
Bucket: twitch-practiceeva


In [None]:
%%writefile preprocess.py

from __future__ import absolute_import

import argparse # con esta librería se define el work-dir, el runner, el input, output
import logging #para establecer trazas
import re #para expresiones regulares
import os #para interactuar con sistema de ficheros local 
import json #para leer datos json
import random #para generar semillas...para que siempre nos de lo mismo...

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText #transformaciones entrada-salida. leer
from apache_beam.io import WriteToText #escribir
from apache_beam.coders.coders import Coder # para transformar formato de entrada de datos en estandar
from apache_beam.options.pipeline_options import PipelineOptions #establecemos opciones 
from apache_beam.options.pipeline_options import SetupOptions, DirectOptions #de configuración y de forma directa

import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer

nltk.download("stopwords")

# CLEANING
STOP_WORDS = stopwords.words("english")
STEMMER = SnowballStemmer("english")
TEXT_CLEANING_RE = "@\S+|https?:\S+|http?:\S|[^A-Za-z0-9]+"

#raw_data = [json.loads(line) for line in open("data.json", 'r')]

class ExtractColumnsDoFn(beam.DoFn):
    def process(self, element):
        # space removal
        element_split = json.loads(element)     
        # text, sentiment
        yield element_split.get('content'), element_split.get('annotation').get('label')[0]


class PreprocessColumnsTrainFn(beam.DoFn):
    def process_sentiment(self, sentiment):
        sentiment = int(sentiment)
        if sentiment == 1:
            return "TROLL"
        else:
            return "NOTROLL"

    def process_text(self, text):
        # Remove link,user and special characters
        stem = False
        text = re.sub(TEXT_CLEANING_RE, " ", str(text).lower()).strip()
        tokens = []
        for token in text.split():
            if token not in STOP_WORDS:
                if stem:
                    tokens.append(STEMMER.stem(token))
                else:
                    tokens.append(token)
        return " ".join(tokens)

    def process(self, element):
        processed_text = self.process_text(element[0])
        processed_sentiment = self.process_sentiment(element[1])
        yield f"{processed_text}, {processed_sentiment}"


class CustomCoder(Coder):
    """A custom coder used for reading and writing strings"""

    def __init__(self, encoding: str):
        # latin-1
        # iso-8859-1
        self.encoding = encoding

    def encode(self, value):
        return value.encode(self.encoding)

    def decode(self, value):
        return value.decode(self.encoding)

    def is_deterministic(self):
        return True


def run(argv=None, save_main_session=True):

    """Main entry point; defines and runs the wordcount pipeline."""

    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--work-dir", dest="work_dir", required=True, help="Working directory",
    )

    parser.add_argument(
        "--input", dest="input", required=True, help="Input dataset in work dir",
    )
    parser.add_argument(
        "--output",
        dest="output",
        required=True,
        help="Output path to store transformed data in work dir",
    )
    parser.add_argument(
        "--mode",
        dest="mode",
        required=True,
        choices=["train", "test"],
        help="Type of output to store transformed data",
    )

    known_args, pipeline_args = parser.parse_known_args(argv)

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    pipeline_options.view_as(DirectOptions).direct_num_workers = 0

    # The pipeline will be run on exiting the with block.
    with beam.Pipeline(options=pipeline_options) as p:

        # Read the text file[pattern] into a PCollection.
        raw_data = p | "ReadTwitchData" >> ReadFromText(
            known_args.input, coder=CustomCoder("latin-1")
            )
        eval_percent = 20
        assert 0 < eval_percent < 100, "eval_percent must in the range (0-100)"
        train_data, test_data = (
            raw_data
            | "Split train-test dataset"
            >> beam.Partition(
                    lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2
            )
        )
        if known_args.mode == "train":

            transformed_data = (
                train_data
                | "ExtractColumns" >> beam.ParDo(ExtractColumnsDoFn())
                | "Preprocess" >> beam.ParDo(PreprocessColumnsTrainFn())
            )

            new_eval_percent = 20
            assert 0 < new_eval_percent < 100, "eval_percent must in the range (0-100)"
            train_dataset, eval_dataset = (
                transformed_data
                | "Split dataset"
                >> beam.Partition(
                    lambda elem, _: int(random.uniform(0, 100) < new_eval_percent), 2
                )
            )

            train_dataset | "TrainWriteToCSV" >> WriteToText(
                os.path.join(known_args.output, "train", "part")
            )
            eval_dataset | "EvalWriteToCSV" >> WriteToText(
                os.path.join(known_args.output, "eval", "part")
            )

        else:
            transformed_data = (
                test_data
                | "ExtractColumns" >> beam.ParDo(ExtractColumnsDoFn())
                | "Preprocess" >> beam.Map(lambda x: f'"{x[0]}"')
            )

            transformed_data | "TestWriteToCSV" >> WriteToText(
                os.path.join(known_args.output, "test", "part")
            )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()



Writing preprocess.py


Se proporciona un fichero `setup.py` necesario para ejecutar en DataFlow. Modificar la variable `REQUIRED_PACKAGES` con las dependencias que se hayan usado en el `requirements.txt`

In [None]:
%%writefile setup.py

import setuptools

REQUIRED_PACKAGES = [
    "apache-beam[gcp]==2.24.0",
    "tensorflow-transform==0.24.1",
    "tensorflow==2.3.0",
    "tfx==0.24.1",
    "gensim==3.6.0",
    "fsspec==0.8.4",
    "gcsfs==0.7.1",
]

setuptools.setup(
    name="twitchstreaming",
    version="0.0.1",
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True,
    description="Troll detection",
)


Writing setup.py


### Validación preprocess train en local (0.25 puntos)

Con el comando mostrado a continuación se valida la correcta generación de los datos de entrenamiento y validación en local.

In [None]:
! python3 preprocess.py \
  --work-dir $WORK_DIR \
  --runner DirectRunner \
  --input $WORK_DIR/data.json \
  --output $WORK_DIR/transformed_data \
  --mode train

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f421480fcd0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f421480f950> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((ref_AppliedPTransform_TrainWriteToCSV/Write/WriteImpl/DoOnce/Impulse_19)+(ref_A

### Validación preprocess test en local (0.25 puntos)

Con el comando mostrado a continuación se valida la correcta generación de los datos de test en local.

In [None]:
! python3 preprocess.py \
  --work-dir $WORK_DIR \
  --runner DirectRunner \
  --input $WORK_DIR/data.json \
  --output $WORK_DIR/transformed_data \
  --mode test

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f51683eefd0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f51683eeb90> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((ref_AppliedPTransform_TestWriteToCSV/Write/WriteImpl/DoOnce/Impulse_16)

## Entregable 2 (1.25 puntos)

Desarrollar una tarea de entrenamiento para los datos preprocesados. Requisitos:

- Soportar ejecuciones en local usando el SDK de AI-Platform y ejecuciones en GCP con el mismo código.

Se crea el directorio donde se dejará este entregable.

In [None]:
%mkdir /content/batch/trainer

In [None]:
%%writefile trainer/__init__.py

version = "0.1.0"

Writing trainer/__init__.py


In [None]:
%%writefile trainer/task.py
from __future__ import absolute_import

import argparse
import multiprocessing as mp
import logging
import tempfile
import os

import pickle
import gensim
import pandas as pd
import numpy as np
import tensorflow as tf

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    Dense,
    Dropout,
    Embedding,
    LSTM,
)
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from sklearn.preprocessing import LabelEncoder


# WORD2VEC
W2V_SIZE = 300
W2V_WINDOW = 7
# 32
W2V_EPOCH = 5
W2V_MIN_COUNT = 10

# KERAS
SEQUENCE_LENGTH = 300

# SENTIMENT
TROLL = "TROLL"
NOTROLL = "NOTROLL"
SENTIMENT_THRESHOLDS = (0.5)

# EXPORT
KERAS_MODEL = "model.h5"
WORD2VEC_MODEL = "model.w2v"
TOKENIZER_MODEL = "tokenizer.pkl"
ENCODER_MODEL = "encoder.pkl"


def generate_word2vec(train_df):
    documents = [_text.split() for _text in train_df.text.values]
    w2v_model = gensim.models.word2vec.Word2Vec(
        size=W2V_SIZE,
        window=W2V_WINDOW,
        min_count=W2V_MIN_COUNT,
        workers=mp.cpu_count(),
    )
    w2v_model.build_vocab(documents)

    words = w2v_model.wv.vocab.keys()
    vocab_size = len(words)
    logging.info(f"Vocab size: {vocab_size}")
    w2v_model.train(documents, total_examples=len(documents), epochs=W2V_EPOCH)

    return w2v_model


def generate_tokenizer(train_df):
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(train_df.text)
    vocab_size = len(tokenizer.word_index) + 1
    logging.info(f"Total words: {vocab_size}")
    return tokenizer, vocab_size


def generate_label_encoder(train_df):
    encoder = LabelEncoder()
    encoder.fit(train_df.sentiment.tolist())
    return encoder


def generate_embedding(word2vec_model, vocab_size, tokenizer):
    embedding_matrix = np.zeros((vocab_size, W2V_SIZE))
    for word, i in tokenizer.word_index.items():
        if word in word2vec_model.wv:
            embedding_matrix[i] = word2vec_model.wv[word]
    return Embedding(
        vocab_size,
        W2V_SIZE,
        weights=[embedding_matrix],
        input_length=SEQUENCE_LENGTH,
        trainable=False,
    )


def train_and_evaluate(
    work_dir, train_df, eval_df, batch_size=1024, epochs=8, steps=100
):

    """
    Trains and evaluates the estimator given.
    The input functions are generated by the preprocessing function.
    """

    model_dir = os.path.join(work_dir, "model")
    if tf.io.gfile.exists(model_dir):
        tf.io.gfile.rmtree(model_dir)
    tf.io.gfile.mkdir(model_dir)

    # Specify where to store our model
    run_config = tf.estimator.RunConfig()
    run_config = run_config.replace(model_dir=model_dir)

    # This will give us a more granular visualization of the training
    run_config = run_config.replace(save_summary_steps=10)

    # Create Word2vec of training data
    logging.info("---- Generating word2vec model ----")
    word2vec_model = generate_word2vec(train_df)

    # Tokenize training data
    logging.info("---- Generating tokenizer ----")
    tokenizer, vocab_size = generate_tokenizer(train_df)

    logging.info("---- Tokenizing train data ----")
    x_train = pad_sequences(
        tokenizer.texts_to_sequences(train_df.text), maxlen=SEQUENCE_LENGTH
    )
    logging.info("---- Tokenizing eval data ----")
    x_eval = pad_sequences(
        tokenizer.texts_to_sequences(eval_df.text), maxlen=SEQUENCE_LENGTH
    )

    # Label Encoder
    logging.info("---- Generating label encoder ----")
    label_encoder = generate_label_encoder(train_df)

    logging.info("---- Encoding train target ----")
    y_train = label_encoder.transform(train_df.sentiment.tolist())
    logging.info("---- Encoding eval target ----")
    y_eval = label_encoder.transform(eval_df.sentiment.tolist())

    y_train = y_train.reshape(-1, 1)
    y_eval = y_eval.reshape(-1, 1)

    # Create Embedding Layer
    logging.info("---- Generating embedding layer ----")
    embedding_layer = generate_embedding(word2vec_model, vocab_size, tokenizer)

    logging.info("---- Generating Sequential model ----")
    model = Sequential()
    model.add(embedding_layer)
    model.add(Dropout(0.5))
    model.add(LSTM(100, dropout=0.2, recurrent_dropout=0.2))
    model.add(Dense(1, activation="sigmoid"))

    model.summary()

    logging.info("---- Adding loss function to model ----")
    model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

    logging.info("---- Adding callbacks to model ----")
    callbacks = [
        ReduceLROnPlateau(monitor="val_loss", patience=5, cooldown=0),
        EarlyStopping(monitor="val_accuracy", min_delta=1e-4, patience=5),
    ]

    logging.info("---- Training model ----")
    model.fit(
        x_train,
        y_train,
        batch_size=batch_size,
        steps_per_epoch=steps,
        epochs=epochs,
        validation_split=0.1,
        verbose=1,
        callbacks=callbacks,
    )

    logging.info("---- Evaluating model ----")
    score = model.evaluate(x_eval, y_eval, batch_size=batch_size)
    logging.info(f"ACCURACY: {score[1]}")
    logging.info(f"LOSS: {score[0]}")

    logging.info("---- Saving models ----")
    pickle.dump(
        tokenizer,
        tf.io.gfile.GFile(os.path.join(model_dir, TOKENIZER_MODEL), mode="wb"),
        protocol=0,
    )
    with tempfile.NamedTemporaryFile(suffix=".h5") as local_file:
        with tf.io.gfile.GFile(
            os.path.join(model_dir, KERAS_MODEL), mode="wb"
        ) as gcs_file:
            model.save(local_file.name)
            gcs_file.write(local_file.read())

    # word2vec_model.save(os.path.join(model_dir, WORD2VEC_MODEL))

    # pickle.dump(
    #     label_encoder, open(os.path.join(model_dir, ENCODER_MODEL), "wb"), protocol=0
    # )


if __name__ == "__main__":

    """Main function called by AI Platform."""

    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )

    parser.add_argument(
        "--job-dir",
        help="Directory for staging trainer files. "
        "This can be a Google Cloud Storage path.",
    )

    parser.add_argument(
        "--work-dir",
        required=True,
        help="Directory for staging and working files. "
        "This can be a Google Cloud Storage path.",
    )

    parser.add_argument(
        "--batch-size",
        type=int,
        default=1024,
        help="Batch size for training and evaluation.",
    )

    parser.add_argument(
        "--epochs", type=int, default=8, help="Number of epochs to train the model",
    )

    parser.add_argument(
        "--steps",
        type=int,
        default=100,
        help="Number of steps per epoch to train the model",
    )

    args = parser.parse_args()

    train_data_files = tf.io.gfile.glob(
        os.path.join(args.work_dir, "transformed_data/train/part-*")
    )
    eval_data_files = tf.io.gfile.glob(
        os.path.join(args.work_dir, "transformed_data/eval/part-*")
    )

    train_df = pd.concat(
        [
            pd.read_csv(
                f,
                names=["text", "sentiment"],
                dtype={"text": "string", "sentiment": "string"},
            )
            for f in train_data_files
        ]
    ).dropna()

    eval_df = pd.concat(
        [
            pd.read_csv(
                f,
                names=["text", "sentiment"],
                dtype={"text": "string", "sentiment": "string"},
            )
            for f in eval_data_files
        ]
    ).dropna()

    train_and_evaluate(
        args.work_dir,
        train_df=train_df,
        eval_df=eval_df,
        batch_size=args.batch_size,
        epochs=args.epochs,
        steps=args.steps,
    )


Writing trainer/task.py


### Validación Train en local

Con el comando mostrado a continuación se valida el correcto entrenamiento del modelo usando los datos preprocesados del apartado anterior.

In [None]:
# Explicitly tell `gcloud ai-platform local train` to use Python 3 
! gcloud config set ml_engine/local_python $(which python3)

# This is similar to `python -m trainer.task --job-dir local-training-output`
# but it better replicates the AI Platform environment, especially for
# distributed training (not applicable here).
! gcloud ai-platform local train \
  --package-path trainer \
  --module-name trainer.task \
  -- \
  --work-dir $WORK_DIR \
  --epochs 1

Updated property [ml_engine/local_python].
2021-07-03 15:00:56.440423: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
INFO:tensorflow:TF_CONFIG environment variable: {'job': {'job_name': 'trainer.task', 'args': ['--work-dir', '/content/batch', '--epochs', '1']}, 'task': {}, 'cluster': {}, 'environment': 'cloud'}
INFO:root:---- Generating word2vec model ----
INFO:gensim.models.word2vec:collecting all words and their counts
INFO:gensim.models.word2vec:PROGRESS: at sentence #0, processed 0 words, keeping 0 word types
INFO:gensim.models.word2vec:PROGRESS: at sentence #10000, processed 77308 words, keeping 11687 word types
INFO:gensim.models.word2vec:collected 13227 word types from a corpus of 91618 raw words and 12750 sentences
INFO:gensim.models.word2vec:Loading a fresh vocabulary
INFO:gensim.models.word2vec:effective_min_count=10 retains 1331 unique words (10% of original 13227, drops 11896)
INFO:gensim.models.word2ve

## Entregable 3 (0.5 puntos)

Desarrollar un pipeline de inferencia utilizando Apache Beam para generar predicciones usando los modelos generados en el apartado anterior así como los de test generados en el primer entregable.


In [None]:
%%writefile predict.py

from __future__ import absolute_import
from __future__ import print_function

import argparse
import tempfile
import json
import os
import sys
import time

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.coders.coders import Coder

import pickle
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences

# KERAS
SEQUENCE_LENGTH = 300

# SENTIMENT
TROLL = "TROLL"
NOTROLL = "NOTROLL"
#NEUTRAL = "NEUTRAL"
SENTIMENT_THRESHOLDS = (0.5)

# EXPORT
KERAS_MODEL = "model.h5"
TOKENIZER_MODEL = "tokenizer.pkl"
class Predict(beam.DoFn):
    def __init__(
        self, model_dir,
    ):
        self.model_dir = model_dir
        self.model = None
        self.tokenizer = None

    def setup(self):
        keras_model_path = os.path.join(self.model_dir, KERAS_MODEL)
        with tempfile.NamedTemporaryFile(suffix=".h5") as local_file:
            with tf.io.gfile.GFile(keras_model_path, mode="rb") as gcs_file:
                local_file.write(gcs_file.read())
                self.model = tf.keras.models.load_model(local_file.name)

        tokenizer_path = os.path.join(self.model_dir, TOKENIZER_MODEL)
        self.tokenizer = pickle.load(tf.io.gfile.GFile(tokenizer_path, mode="rb"))

    def decode_sentiment(self, score, include_neutral=False):
        if include_neutral:
            label = NEUTRAL
            if score <= SENTIMENT_THRESHOLDS[0]:
                label = NOTROLL
            elif score >= SENTIMENT_THRESHOLDS[1]:
                label = TROLL

            return label
        else:
            return NOTROLL if score < 0.5 else TROLL

    def process(self, element):
        start_at = time.time()
        # Tokenize text
        x_test = pad_sequences(
            self.tokenizer.texts_to_sequences([element]), maxlen=SEQUENCE_LENGTH
        )
        # Predict
        score = self.model.predict([x_test])[0]
        # Decode sentiment
        label = self.decode_sentiment(score)

        yield {
            "text": element,
            "label": label,
            "score": float(score),
            "elapsed_time": time.time() - start_at,
        }


class CustomCoder(Coder):
    """A custom coder used for reading and writing strings"""

    def __init__(self, encoding: str):
        # latin-1
        # iso-8859-1
        self.enconding = encoding

    def encode(self, value):
        return value.encode(self.enconding)

    def decode(self, value):
        return value.decode(self.enconding)

    def is_deterministic(self):
        return True


def run(model_dir, source, sink, beam_options=None):
    with beam.Pipeline(options=beam_options) as p:
        _ = (
            p
            | "Read data" >> source
            # | "Preprocess" >> beam.ParDo(PreprocessTextFn(model_dir, "ID"))
            | "Predict" >> beam.ParDo(Predict(model_dir))
            | "Format as JSON" >> beam.Map(json.dumps)
            | "Write predictions" >> sink
        )


if __name__ == "__main__":
    """Main function"""
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )

    parser.add_argument(
        "--work-dir",
        dest="work_dir",
        required=True,
        help="Directory for temporary files and preprocessed datasets to. "
        "This can be a Google Cloud Storage path.",
    )

    parser.add_argument(
        "--model-dir",
        dest="model_dir",
        required=True,
        help="Path to the exported TensorFlow model. "
        "This can be a Google Cloud Storage path.",
    )

    verbs = parser.add_subparsers(dest="verb")
    batch_verb = verbs.add_parser("batch", help="Batch prediction")
    batch_verb.add_argument(
        "--inputs-dir",
        dest="inputs_dir",
        required=True,
        help="Input directory where CSV data files are read from. "
        "This can be a Google Cloud Storage path.",
    )
    batch_verb.add_argument(
        "--outputs-dir",
        dest="outputs_dir",
        required=True,
        help="Directory to store prediction results. "
        "This can be a Google Cloud Storage path.",
    )

    args, pipeline_args = parser.parse_known_args()
    print(args)
    beam_options = PipelineOptions(pipeline_args)
    beam_options.view_as(SetupOptions).save_main_session = True
    # beam_options.view_as(DirectOptions).direct_num_workers = 0

    project = beam_options.view_as(GoogleCloudOptions).project

    if args.verb == "batch":
        results_prefix = os.path.join(args.outputs_dir, "part")

        source = ReadFromText(args.inputs_dir, coder=CustomCoder("latin-1"))
        sink = WriteToText(results_prefix)

    else:
        parser.print_usage()
        sys.exit(1)

    run(args.model_dir, source, sink, beam_options)


Writing predict.py


Generamos un timestamp para la ejecución de las predicciones

In [None]:
from datetime import datetime

# current date and time
TIMESTAMP = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

### Validación Predict en local

Con el comando mostrado a continuación se valida la correcta inferencia usando los modelos anteriores y los datos de test generados anteriormente.

In [None]:
! python3 predict.py \
  --work-dir $WORK_DIR \
  --model-dir $WORK_DIR/model \
  batch \
  --inputs-dir $WORK_DIR/transformed_data/test/part* \
  --outputs-dir $WORK_DIR/predictions/$TIMESTAMP

2021-07-03 15:03:56.572555: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
Namespace(inputs_dir='/content/batch/transformed_data/test/part-00000-of-00002', model_dir='/content/batch/model', outputs_dir='/content/batch/predictions/2021-07-03_15-03-50', verb='batch', work_dir='/content/batch')
2021-07-03 15:03:59.340629: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcuda.so.1
2021-07-03 15:03:59.352429: E tensorflow/stream_executor/cuda/cuda_driver.cc:314] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-07-03 15:03:59.352494: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (ed70ec286c9a): /proc/driver/nvidia/version does not exist
2021-07-03 15:03:59.352916: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI D

##Entregable 4 (1.25 puntos)

En este entregable se validará el funcionamiento del código en un proyecto de GCP sobre DataFlow y AI Platform

Establecemos el bucket y region de GCP sobre el que trabajaremos:

In [None]:
GCP_WORK_DIR = 'gs://twitch-practiceeva'
GCP_REGION = 'europe-west1'



### Validación preprocess train en Dataflow (0.25 puntos)

Con el comando mostrado a continuación se valida la correcta generación de los datos de entrenamiento y validación en GCP con el servicio DataFlow.

In [None]:
! python3 preprocess.py \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $GCP_WORK_DIR \
  --input $GCP_WORK_DIR/data.json \
  --output $GCP_WORK_DIR/transformed_data \
  --mode train

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpaw76h72_']



INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/tmpaw76h72_', 'apache-beam==2.24.0', '--no-deps', '--no-binary', ':all:']
INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binar

### Validación preprocess test en Dataflow (0.25 puntos)

Con el comando mostrado a continuación se valida la correcta generación de los datos de test en GCP con el servicio DataFlow.

In [None]:
! python3 preprocess.py \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $GCP_WORK_DIR \
  --input $GCP_WORK_DIR/data.json \
  --output $GCP_WORK_DIR/transformed_data \
  --mode test

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpyxz5n33p']



INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/tmpyxz5n33p', 'apache-beam==2.24.0', '--no-deps', '--no-binary', ':all:']
INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binar

### Validación Train en AI Platform (0.5 puntos)

Con el comando mostrado a continuación se valida el correcto entrenamiento del modelo usando los datos de las ejecuciones anteriores en GCP con los datos obtenidos almacenados en Google Cloud Storage.

Generamos un nombre para el job de entrenamiento y donde se almacenarán los metadatos.

In [None]:
JOB = "troll_detection_batch_$(date +%Y%m%d_%H%M%S)"
JOB_DIR = GCP_WORK_DIR + "/trainer"

In [None]:
! gcloud ai-platform jobs submit training $JOB \
  --module-name trainer.task \
  --package-path trainer \
  --scale-tier basic_gpu \
  --python-version 3.7 \
  --runtime-version 2.1 \
  --region $GCP_REGION \
  --job-dir $JOB_DIR \
  --stream-logs \
  -- \
  --work-dir $GCP_WORK_DIR \
  --epochs 1

Job [troll_detection_batch_20210702_163306] submitted successfully.
INFO	2021-07-02 16:33:08 +0000	service		Validating job requirements...
INFO	2021-07-02 16:33:20 +0000	service		Job creation request has been successfully validated.
INFO	2021-07-02 16:33:20 +0000	service		Job troll_detection_batch_20210702_163306 is queued.
INFO	2021-07-02 16:33:22 +0000	service		Waiting for job to be provisioned.
INFO	2021-07-02 16:33:27 +0000	service		Waiting for training program to start.
INFO	2021-07-02 16:34:57 +0000	master-replica-0		Using mount point: /gcs
NOTICE	2021-07-02 16:34:57 +0000	master-replica-0		Opening GCS connection...
INFO	2021-07-02 16:34:57 +0000	master-replica-0		Set up root directory for all accessible buckets
NOTICE	2021-07-02 16:34:57 +0000	master-replica-0		Mounting file system "gcsfuse"...
NOTICE	2021-07-02 16:34:57 +0000	master-replica-0		File system has been successfully mounted.
INFO	2021-07-02 16:35:03 +0000	master-replica-0		Running task with arguments: --cluster={"chi

### Validación predict en Dataflow (0.25 puntos)

Con el comando mostrado a continuación se valida la predicción correcta de los datos de test usando los modelos generados en el comando anterior.

Generamos un timestamp para el almacenamiento de las inferencias en Google Cloud Storage.

In [None]:
from datetime import datetime

# current date and time
TIMESTAMP = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

In [None]:
# For using sample models: --model-dir gs://$BUCKET_NAME/models/
! python3 predict.py \
  --work-dir $GCP_WORK_DIR \
  --model-dir $GCP_WORK_DIR/model/ \
  batch \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --inputs-dir $GCP_WORK_DIR/transformed_data/test/part* \
  --outputs-dir $GCP_WORK_DIR/predictions/$TIMESTAMP

2021-07-02 16:46:39.519989: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
Namespace(inputs_dir='gs://twitch-practiceeva/transformed_data/test/part*', model_dir='gs://twitch-practiceeva/model/', outputs_dir='gs://twitch-practiceeva/predictions/2021-07-02_16-46-38', verb='batch', work_dir='gs://twitch-practiceeva')





# Inferencia online

En esta segunda parte de la práctica se realizará un microservicio de inferencia online usando los modelos generados en la primera parte. Para esta parte de la práctica el código de vuestro microservicio deberá estar subido en un repositorio. En la variable de debajo deberéis dejar la URL a vuestro repositorrio pues será el contenido con el que serás evaluado. 

**Importante:** asegúrate de crear el repositorio de manera pública para poder clonarlo.

A continuación os dejo un diagrama con la arquitectura que se va a desarrollar:

![online_diagram](https://drive.google.com/uc?export=view&id=1zR7Cwp0Vq1QeTxwLoJ8YJNRM9G5KVh2S)

In [None]:
REPOSITORIO = "https://github.com/eesquivias/eva_practica_implementacion_algoritmos.git"

Creamos el directorio donde trabajaremos.

In [None]:
%mkdir /content/online
%cd /content/online

mkdir: cannot create directory ‘/content/online’: File exists
/content/online


In [None]:
# Clone the repository
! git clone $REPOSITORIO

# Set the working directory to the sample code directory
%cd ./eva_practica_implementacion_algoritmos

# Change to develop
! git checkout develop

Cloning into 'eva_practica_implementacion_algoritmos'...
remote: Enumerating objects: 72, done.[K
remote: Counting objects: 100% (72/72), done.[K
remote: Compressing objects: 100% (69/69), done.[K
remote: Total 72 (delta 31), reused 17 (delta 0), pack-reused 0[K
Unpacking objects: 100% (72/72), done.
/content/online/eva_practica_implementacion_algoritmos
error: pathspec 'develop' did not match any file(s) known to git.


In [None]:
#para después de reiniciar al instalar los requirements
%cd ./eva_practica_implementacion_algoritmos 

/content/online/eva_practica_implementacion_algoritmos


In [None]:
#! git pull $REPOSITORIO después de un cambio lo he necesitado. lo dejo por si en un futuro...

fatal: not a git repository (or any of the parent directories): .git


In [None]:
! pip install -r requirements.txt

Collecting requests==2.25.0
[?25l  Downloading https://files.pythonhosted.org/packages/39/fc/f91eac5a39a65f75a7adb58eac7fa78871ea9872283fb9c44e6545998134/requests-2.25.0-py2.py3-none-any.whl (61kB)
[K     |████████████████████████████████| 61kB 7.3MB/s 
[?25hCollecting uvicorn==0.12.2
[?25l  Downloading https://files.pythonhosted.org/packages/30/cc/01cc4cb980dfcf04eb283b6497c7f280928a0b02c68c0f85b6901e7716ae/uvicorn-0.12.2-py3-none-any.whl (45kB)
[K     |████████████████████████████████| 51kB 7.1MB/s 
[?25hCollecting fastapi==0.61.2
[?25l  Downloading https://files.pythonhosted.org/packages/4c/0b/5df17eaadb7fe39dad349f484e551e802ce0581be672822f010c530d5e75/fastapi-0.61.2-py3-none-any.whl (48kB)
[K     |████████████████████████████████| 51kB 6.8MB/s 
Collecting scikit-learn==0.23.2
[?25l  Downloading https://files.pythonhosted.org/packages/f4/cb/64623369f348e9bfb29ff898a57ac7c91ed4921f228e9726546614d63ccb/scikit_learn-0.23.2-cp37-cp37m-manylinux1_x86_64.whl (6.8MB)
[K     |███

In [None]:
! pip install pyngrok

Collecting pyngrok
[?25l  Downloading https://files.pythonhosted.org/packages/6b/4e/a2fe095bbe17cf26424c4abcd22a0490e22d01cc628f25af5e220ddbf6f0/pyngrok-5.0.5.tar.gz (745kB)
[K     |▍                               | 10kB 14.4MB/s eta 0:00:01[K     |▉                               | 20kB 18.6MB/s eta 0:00:01[K     |█▎                              | 30kB 21.3MB/s eta 0:00:01[K     |█▊                              | 40kB 23.1MB/s eta 0:00:01[K     |██▏                             | 51kB 24.3MB/s eta 0:00:01[K     |██▋                             | 61kB 26.2MB/s eta 0:00:01[K     |███                             | 71kB 26.2MB/s eta 0:00:01[K     |███▌                            | 81kB 25.5MB/s eta 0:00:01[K     |████                            | 92kB 26.3MB/s eta 0:00:01[K     |████▍                           | 102kB 26.4MB/s eta 0:00:01[K     |████▉                           | 112kB 26.4MB/s eta 0:00:01[K     |█████▎                          | 122kB 26.4MB/s eta 0

Para cuando estes modificando, probando y ejecutando ficheros os dejo en las celdas de abajo los comandos de git necesarios para interaccionar con vuestro repositorio en caso de que queráis:

In [None]:
! git status

fatal: not a git repository (or any of the parent directories): .git


In [None]:
! git add <files>
! git commit -m "Nuevos cambios"
! git push origin master

Será necesario definir y establecer la variable de entorno `DEFAULT_MODEL_PATH` para definir donde están almacenados nuestros modelos para hacer inferencia.

In [None]:
import os

os.environ["DEFAULT_MODEL_PATH"] = "/content/batch/model/"

In [None]:
os.getcwd()

'/content/online/eva_practica_implementacion_algoritmos'

In [None]:
#si no los tengo cargados en local (se me bloquea constantemente colab) los cargo de cloud
! gsutil -m cp \
  "gs://$BUCKET_NAME/model/model.h5" \
  "gs://$BUCKET_NAME/model/tokenizer.pkl" \
  .

Copying gs://twitch-practiceeva/model/model.h5...
Copying gs://twitch-practiceeva/model/tokenizer.pkl...
/ [2/2 files][ 17.7 MiB/ 17.7 MiB] 100% Done                                    
Operation completed over 2 objects/17.7 MiB.                                     


### Validación inferencia online en local (1.75 puntos)

Se validará la correcta inferencia del microservio en local utilizando Swagger. Para ejecutar en local solo hay que ejecutar los comandos a continuación. Después, entrar en la URL proporcionada por ngrock `<ngrok_url>/docs` para acceder a swagger y probar la inferencia como vimos en clase.

In [None]:
# For testing purposes
import nest_asyncio
from pyngrok import ngrok

ngrok_tunnel = ngrok.connect(8000)
print('Public URL:', ngrok_tunnel.public_url)
nest_asyncio.apply()

Public URL: http://8a319555ab9f.ngrok.io


In [None]:
! uvicorn app.main:app --port 8000

2021-07-03 15:56:33.974553: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library 'libnvinfer.so.6'; dlerror: libnvinfer.so.6: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/nvidia/lib:/usr/local/nvidia/lib64
2021-07-03 15:56:33.974798: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library 'libnvinfer_plugin.so.6'; dlerror: libnvinfer_plugin.so.6: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/nvidia/lib:/usr/local/nvidia/lib64
2021-07-03 15:56:33.974826: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:30] Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
[32mINFO[0m:     Started server process [[36m6358[0m]
[32mINFO[0m:     Waiting for application startup.
[32m2021-07-03 15:56:35.281[0m | [1mINFO

### Validación inferencia online en GCP (1.75 puntos)

Se validará el correcto funcionamiento del microservicio haciendo una petición POST de inferencia a través de curl al microservicio desplegado en GCP.

Primero, contruiremos una imagen Docker con el microservicio y subiremos el desarrollo al Container Repository en GCP a través de Cloud Build.

In [None]:
! gcloud builds submit --tag gcr.io/$PROJECT_ID/troll-detection-online-service

Creating temporary tarball archive of 40 file(s) totalling 16.5 KiB before compression.
Uploading tarball of [.] to [gs://twitch-practiceeva_cloudbuild/source/1625306139.335005-30fcf16ba0a040b99f43e23c01ddd104.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/twitch-practiceeva/locations/global/builds/dc288610-b9fd-45e9-8a2a-b03625dff04c].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/dc288610-b9fd-45e9-8a2a-b03625dff04c?project=178865046960].
 REMOTE BUILD OUTPUT
starting build "dc288610-b9fd-45e9-8a2a-b03625dff04c"

FETCHSOURCE
Fetching storage object: gs://twitch-practiceeva_cloudbuild/source/1625306139.335005-30fcf16ba0a040b99f43e23c01ddd104.tgz#1625306140586611
Copying gs://twitch-practiceeva_cloudbuild/source/1625306139.335005-30fcf16ba0a040b99f43e23c01ddd104.tgz#1625306140586611...
/ [1 files][  7.1 KiB/  7.1 KiB]                                                
Operation completed over 1 objects/7.1 KiB.
BUILD
Already have image (with diges

Desplegaremos la imagen Docker generada en el Container Registry en el servicio de Cloud Run. Después, validaremos que las inferencias funcionan en GCP usando el comando mostrado a continuación:

In [None]:
! curl -X POST "https://troll-detection-service-g2v7skqfeq-uc.a.run.app/api/model/predict" -H  "accept: application/json" -H  "Content-Type: application/json" -d "{\"text\":\"i hate you\"}"

{"label":"NO_TROLL","score":0.40702152252197266,"elapsed_time":0.7307009696960449}

In [None]:
#con mi aplicación tampoco detecta la frase como troll..aunque le da un score un pelín mayor. 
! curl -X POST "https://troll-detection-6ff2xxxgaq-ew.a.run.app/api/model/predict" -H  "accept: application/json" -H  "Content-Type: application/json" -d "{\"text\":\"i hate you\"}"

{"label":"NOTROLL","score":0.41722559928894043,"elapsed_time":0.7551183700561523}

In [None]:
! curl -X POST "https://troll-detection-6ff2xxxgaq-ew.a.run.app/api/model/predict" -H  "accept: application/json" -H  "Content-Type: application/json" -d "{\"text\":\"shit\"}"

{"label":"NOTROLL","score":0.41826510429382324,"elapsed_time":0.45891594886779785}

# Detección de Trolls en Twitch en Streaming

En esta última parte de la práctica se realizará un pipeline de inferencia en tiempo real de un chat de Twitch alcualmente en vivo. Para ello, usaremos mi canal de Twitch `https://www.twitch.tv/franalgaba` donde tengo un bot deplegado poniendo mensajes troll y no troll de forma aleatoria del dataset que hemos usado en la primera parte.

Para acceder al chat de Twitch os proporciono el conector correspondiente que será desplegado como Cloud Function como hicimos en clase y usando mis credenciales recogerá los mensajes del chat y los enviará a un topic de Pub/Sub en GCP. Después, desarrollarás un job en streaming de Dataflow con el que leerás esos mensajes de Pub/sub, los mandarás a tu microservicio de inferencia para que haga las predicciones y enviarás los resultados a un nuevo tópico de Pub/Sub.

A continuación os dejo un diagrama con la arquitectura que se va a desarrollar:

![streaming_diagram](https://drive.google.com/uc?export=view&id=1TEBPPc9ZF09IM5iGq9FwGAx9PVzAYNPg)

Primero, creamos el publisher que será el encargado de recoger los mensajes de Twitch y enviarlos a Pub/Sub. Esto os lo doy yo desarrollado, sólo tendréis que desplegarlo en una Cloud Function.

In [None]:
%mkdir -p /content/streaming/publisher

In [None]:
# Execute after restart
%cd /content/streaming/publisher

/content/streaming/publisher


In [None]:
%%writefile requirements.txt

twitchio==1.2.3
loguru==0.5.3
google-cloud-pubsub==2.1.0

Writing requirements.txt


In [None]:
!pip install -r requirements.txt


Collecting twitchio==1.2.3
[?25l  Downloading https://files.pythonhosted.org/packages/4c/01/9e360ff53b4a9f538b258b81bf889973fedbb019a5ee6feba8d288708a7f/twitchio-1.2.3-py3-none-any.whl (48kB)
[K     |██████▊                         | 10kB 12.7MB/s eta 0:00:01[K     |█████████████▌                  | 20kB 17.8MB/s eta 0:00:01[K     |████████████████████▎           | 30kB 22.1MB/s eta 0:00:01[K     |███████████████████████████     | 40kB 25.2MB/s eta 0:00:01[K     |████████████████████████████████| 51kB 6.6MB/s 
Collecting google-cloud-pubsub==2.1.0
[?25l  Downloading https://files.pythonhosted.org/packages/41/e4/acb1b0b23d46c1b67976f218a263873d4d1aa2ccf077f73aa3af84b24339/google_cloud_pubsub-2.1.0-py2.py3-none-any.whl (177kB)
[K     |████████████████████████████████| 184kB 30.4MB/s 
Collecting websockets>=6.0
[?25l  Downloading https://files.pythonhosted.org/packages/84/64/78c2b3fe37730b30dca3c93d1f7f4a4286767f86e7c04cf3571b39bc2fb7/websockets-9.1-cp37-cp37m-manylinux2010_

In [None]:
%%writefile main.py

import os  # for importing env vars for the bot to use
import sys
import json
import time

from twitchio.ext import commands
from google.cloud import pubsub_v1
from loguru import logger

PROJECT_ID = os.getenv("PROJECT_ID")
TOPIC_NAME = os.getenv("TOPIC_NAME")

TOPIC_PATH = f"projects/{PROJECT_ID}/topics/{TOPIC_NAME}"

publisher = pubsub_v1.PublisherClient()


class Bot(commands.Bot):

    def __init__(self, irc_token='...', client_id='...', nick='...', prefix="!", initial_channels=['...'], debug=True):
        super().__init__(irc_token=irc_token, client_id=client_id, nick=nick, prefix='!',
                         initial_channels=initial_channels)
        self.debug = debug

    # Events don't need decorators when subclassed
    async def event_ready(self):
        logger.info('Ready')

    async def event_message(self, message):
        logger.info(message.content)
        publisher.publish(TOPIC_PATH, str.encode(message.content))


def main(request):

    topic_name = f"projects/{PROJECT_ID}/topics/{TOPIC_NAME}"
    # publisher.create_topic(topic_name)

    request_json = request.get_json(silent=True)

    logger.info("Starting listener...")
    if "debug" in request_json and isinstance(request_json["debug"], bool):
        logger.info(f"Debug mode: {request_json['debug']}")
        bot = Bot(
          # set up the bot
          irc_token="oauth:xl5cpf8qe8tl1d03dppymchi6r04iz",
          client_id="ciliqxi534iwg4pfqj7swl1jmkt23y",
          nick="franalgaba",
          prefix="!",
          initial_channels=["franalgaba"],
          debug=request_json['debug'])
    else:
        bot = Bot(
          # set up the bot
          irc_token="oauth:xl5cpf8qe8tl1d03dppymchi6r04iz",
          client_id="ciliqxi534iwg4pfqj7swl1jmkt23y",
          nick="franalgaba",
          prefix="!",
          initial_channels=["franalgaba"])

    bot.run()

Writing main.py


In [None]:
# In case user service error...
! gcloud iam service-accounts add-iam-policy-binding <project_id>@appspot.gserviceaccount.com --member=user:<mail> --role=roles/iam.serviceAccountUser

Para lanzar vuestra Cloud Function, que recoja y mande mensajes solo tenéis que ejecutar el comando siguiente (haced los pasos vistos en clase para desplegar el servicio):

In [None]:
! curl -X POST https://europe-west1-twitch-practiceeva.cloudfunctions.net/twitch-pub -H "Content-Type:application/json"  -d '{"debug": false}'

#consigo los mensajes en pub/sub y en los logs de la cloudfunction, aunque al rato sale mensaje de error: could not handle the request, y paran los mensajes. supongo por alcanzar el límite? en pub sub me dice rate exceeded. 

Error: could not handle the request


## Entregable 1 (3 puntos)

En este entregable desarrollarás un pipeline de inferencia en streaming usando Apache Beam para ejecutar en Dataflow un job en streaming que llamará a vuestro microservicio para realizar inferencias.

In [None]:
%mkdir /content/streaming/subscriber

In [None]:
%cd /content/streaming/subscriber

/content/streaming/subscriber


In [None]:
%%writefile requirements.txt

apache-beam[gcp]==2.24.0
fsspec==0.8.4
gcsfs==0.7.1
loguru==0.5.3

Writing requirements.txt


In [None]:
! pip install -r requirements.txt

Collecting google-cloud-pubsub<2,>=0.39.0; extra == "gcp"
  Using cached https://files.pythonhosted.org/packages/1f/b3/dd83eca4cd1019d592e82595ea45d53f11e39db4ee99daa66ceb8a1b2d89/google_cloud_pubsub-1.7.0-py2.py3-none-any.whl
Installing collected packages: google-cloud-pubsub
  Found existing installation: google-cloud-pubsub 2.1.0
    Uninstalling google-cloud-pubsub-2.1.0:
      Successfully uninstalled google-cloud-pubsub-2.1.0
Successfully installed google-cloud-pubsub-1.7.0


In [None]:
%%writefile predict.py

from __future__ import absolute_import
from __future__ import print_function

import argparse
import requests
import json
import sys

import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import (
    GoogleCloudOptions,
    StandardOptions,
    PipelineOptions,
    SetupOptions,
)
from loguru import logger


class Predict(beam.DoFn):
    def __init__(self, predict_server) -> None:
        self.url = predict_server

    def _predict(self, text) -> str:
        payload = {"text": text}
        headers = {"accept": "application/json", "Content-Type": "application/json"}
        try:
            response = requests.post(
                self.url, data=json.dumps(payload), headers=headers
            )
            response = json.loads(response.text)
        except Exception:
            response = {"label": "undefined", "score": 0, "elapsed_time": 0}

        return response

    def process(self, element, window=beam.DoFn.WindowParam):
        logger.info(f"Text to predict: {element}")
        result = self._predict(element)
        result["text"] = element
        yield json.dumps(result)


def run(predict_server, source, sink, beam_options=None):
    with beam.Pipeline(options=beam_options) as p:
        _ = (
            p
            | "Read data from PubSub" >> source
            | "decode" >> beam.Map(lambda x: x.decode("utf-8"))
            | "window" >> beam.WindowInto(window.FixedWindows(15))
            | "Predict" >> beam.ParDo(Predict(predict_server))
            | "encode" >> beam.Map(lambda x: x.encode("utf-8")).with_output_types(bytes)
            | "Write predictions" >> sink
        )


if __name__ == "__main__":
    """Main function"""
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )

    parser.add_argument(
        "--inputs_topic",
        dest="inputs_topic",
        required=True,
        help="Directory for temporary files and preprocessed datasets to. "
        "This can be a Google Cloud Storage path.",
    )

    parser.add_argument(
        "--outputs_topic",
        dest="outputs_topic",
        required=True,
        help="Directory for temporary files and preprocessed datasets to. "
        "This can be a Google Cloud Storage path.",
    )

    parser.add_argument(
        "--predict_server",
        dest="predict_server",
        required=True,
        help="Directory for temporary files and preprocessed datasets to. "
        "This can be a Google Cloud Storage path.",
    )

    args, pipeline_args = parser.parse_known_args()
    logger.info(args)
    beam_options = PipelineOptions(pipeline_args)
    beam_options.view_as(SetupOptions).save_main_session = True
    # beam_options.view_as(DirectOptions).direct_num_workers = 0

    project = beam_options.view_as(GoogleCloudOptions).project

    if not project:
        parser.print_usage()
        print("error: argument --project is required for streaming")
        sys.exit(1)

    beam_options.view_as(StandardOptions).streaming = True

    source = beam.io.ReadFromPubSub(
        topic="projects/{}/topics/{}".format(project, args.inputs_topic)
    ).with_output_types(bytes)

    sink = beam.io.WriteToPubSub(
        topic="projects/{}/topics/{}".format(project, args.outputs_topic)
    )

    run(args.predict_server, source, sink, beam_options)


Writing predict.py


In [None]:
%%writefile setup.py

import setuptools

REQUIRED_PACKAGES = [
    "apache-beam[gcp]==2.24.0",
    "fsspec==0.8.4",
    "gcsfs==0.7.1",
    "loguru==0.5.3",
]

setuptools.setup(
    name="twitchstreaming",
    version="0.0.1",
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True,
    description="Twitch Troll Detection",
)

Writing setup.py


### Validación inferencia en streaming

Con el comando mostrado a continuación se genera un job en streaming de Dataflow. Antes de ejecutarlo, deberás crear dos topicos en Pub/Sub, `twitch-chat` donde se recibirán los mensajes de twitch, y `twitch-chat-predictions` donde se mandarán las predicciones generadas por vuestro microservicio.

**Importante**: no te olvides de modificar la URL de tu microservicio de inferencia.

In [None]:
GCP_WORK_DIR = 'gs://twitch-practiceeva'
GCP_REGION = 'europe-west1'

In [None]:
! python3 predict.py \
--project $PROJECT_ID \
--region $GCP_REGION \
--runner DataflowRunner \
--temp_location $BUCKET_NAME/beam-temp \
--setup_file ./setup.py \
--inputs_topic twitch-chat \
--outputs_topic twitch-chat-predictions \
--predict_server https://troll-detection-6ff2xxxgaq-ew.a.run.app/api/model/predict \


[32m2021-07-03 16:18:51.992[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m89[0m - [1mNamespace(inputs_topic='twitch-chat', outputs_topic='twitch-chat-predictions', predict_server='https://troll-detection-6ff2xxxgaq-ew.a.run.app/api/model/predict')[0m



Traceback (most recent call last):
  File "/usr/lib/python3.7/subprocess.py", line 490, in run
    stdout, stderr = process.communicate(input, timeout=timeout)
  File "/usr/lib/python3.7/subprocess.py", line 951, in communicate
    stdout = self.stdout.read()
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "predict.py", line 111, in <module>
    run(args.predict_server, source, sink, beam_options)
  File "predict.py", line 54, in run
    | "Write predictions" >> sink
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py", line 555, in __exit__
    self.result = self.run()
  File "/usr/local/lib/python3.7/dist-packag