# Carga de datos a PubSub

Este notebook se utilizará como una máquina que está cargando datos de forma continua a PubSub.

Para conseguir esos datos vamos a coger los datos que se han generado al inicio del curso (datos a nivel día, no datos históricos) y los traemos al directorio de colab:

```
            TXT_Simulación_TICKETS_YYYY-MM-DD.txt
```


Como en otras ocasiones, primero debemos activar Google Drive, donde guardaremos de forma segura nuestros credenciales que nos permitirán autentificarnos. Para ello, efectuamos y seguimos las instrucciones que se nos indiquen en el siguiente bloque:

In [None]:
from google.colab import drive
drive.mount('/content/drive')

A continuación vamos a coger las credenciales que hemos descargado y vamos a pegarlo en la misma carpeta en la que se encuentra este cuaderno:

```
'drive/
    My Drive/ 
          credentials.json                               
```


Para cerrar la puesta apunto del entorno de trabajo, activamos los credenciales e instalamos librerias.

In [None]:
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/content/drive/My Drive/credentials.json' 
!echo $GOOGLE_APPLICATION_CREDENTIALS

In [None]:
!pip install google-cloud-pubsub==0.42.1
!pip install google-api-core==1.13.0
!pip install APScheduler

Empecemos cargando los datos

 >- **Nota**: Los datos a introducir tienen que ser del día actual para que el algoritmo funcione. Recordar que esos datos los obtenemos en el paso
 `0. Pasos previos` y nos los descargamos

In [None]:
import pandas as pd
from datetime import datetime, timedelta
import json


# TODO: Copia el nombre del archivo copiado en esta carpeta
nombre_archivo = '___________________'

df = pd.read_csv(nombre_archivo, parse_dates=['ticketDate'])

Ahora, para lanzar nuestros datos a PubSub, tendremos que volver a especificar el tema y el proyecto que hemos creado en Google Cloud

In [None]:
from google.cloud import pubsub_v1

# TODO especificar nuestro 'project_id' y 'topic_name'
project_id = '_____________'
topic_name = '_____________'

# Configure the batch to publish as soon as there is one kilobyte
# of data or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
    max_bytes=1024,  # One kilobyte
    max_latency=1,   # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_name)

Una vez que hemos especificado los datos del día, las credenciales y los ID correspondientes, toca subir esos datos. Para ese cometido, vamos a hacerlo en 2 partes:

 - Para empezar vamos a cargar los datos hasta este instante (hasta el minuto actual en el que ejecutamos el código)
 - Una vez hemos cargado los datos iniciales, vamos a ir subiendo los datos minuto a minuto (simulando el tiempo real)

Vamos a cargar todos los tickets hasta el minuto actual:

In [None]:
def first_load():
    now = datetime.now()
    now -= timedelta(seconds=now.second)
    now -= timedelta(microseconds=now.microsecond)
    
    df_now = df[df['ticketDate'] <= now]
    for index, row in df_now.iterrows():
        json_values = {}
        json_values['amount'] = row['amount']
        json_values['ticketDate'] = row['ticketDate'].strftime('%Y-%m-%d %H:%M:%S')
        
        # Convertir diccionario en JSON
        json_values = json.dumps(json_values)
        
        # Codificar String
        json_values = json_values.encode('utf-8')
        future = publisher.publish(topic_path, data=json_values)
        
    print('Mensajes Publicados')

para después subirlos minuto a minuto

In [None]:
def run():
    now = datetime.now()
    now -= timedelta(seconds=now.second)
    now -= timedelta(microseconds=now.microsecond)
    
    df_now = df[df['ticketDate'] == now]
    for index, row in df_now.iterrows():
        json_values = {}
        json_values['amount'] = row['amount']
        json_values['ticketDate'] = row['ticketDate'].strftime('%Y-%m-%d %H:%M:%S')
        
        # Convertir diccionario en JSON
        json_values = json.dumps(json_values)
        
        # Codificar String
        json_values = json_values.encode('utf-8')
        future = publisher.publish(topic_path, data=json_values)
        
    print('Mensajes Publicados')

Para generar un bucle que se repita cada minuto, vamos a utilizar el módulo `APScheduler` de Python

In [None]:
from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()
sched.add_job(run, 'interval', minutes=1)

Ejecutemos...

 >- **Nota**: el siguiente bloque va a hacer que Python se meta en un bucle infinito, por lo que si se quiere parar tendremos que darle al botón de `Stop` o reiniciar el `Kernel`

In [None]:
first_load()
sched.start()