In [15]:
#!pip install pyspark
#!pip install google-cloud
#!pip install google-cloud-pubsub

In [16]:
#!wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz
#!tar -xvzf mysql-connector-java-8.0.27.tar.gz
#!ls mysql-connector-java-8.0.27

In [3]:
from pyspark import SparkConf, SparkContext, SQLContext
from time import sleep
import os
from google.cloud import pubsub_v1
from google.cloud import functions_v1
from googleapiclient.discovery import build
from google.api_core.exceptions import NotFound
import threading
import json
import base64
from google.cloud import storage
from google.cloud import bigquery
from googleapiclient import discovery

In [4]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'/content/drive/MyDrive/Colab Notebooks/PEA/service-account-admin.json'

In [5]:
# Var project
project_id = 'ingenieriadatos-392001'

# Var pub/sub
# clientes
topic_id = 'clientes-topic'
subscription_name="clientes-subs"
#transacciones
topic_id_tr = 'transactions-topic'
subscription_name_tr="transactions-topic-subs"

# Var APIS
api_names = ['pubsub.googleapis.com','cloudfunctions.googleapis.com','cloudbuild.googleapis.com','eventarc.googleapis.com']

# Var storage
bucket_name = "bk_code_message"
# clientes
local_file_path = "/content/process_pubsub_message_clientes.zip"
remote_file_name = "process_pubsub_message_clientes.zip"
# transacciones
local_file_path_tr = "/content/process_pubsub_message_transacciones.zip"
remote_file_name_tr = "process_pubsub_message_transacciones.zip"


# Var function
# clientes
function_name = "process_pubsub_message_clientes"
source_url = "gs://bk_code_message/process_pubsub_message_clientes.zip"
trigger_topic = f"projects/{project_id}/topics/{topic_id}"
# transacciones
function_name_tr = "process_pubsub_message_transacciones"
source_url_tr = "gs://bk_code_message/process_pubsub_message_transacciones.zip"
trigger_topic_tr = f"projects/{project_id}/topics/{topic_id_tr}"

# Var bigquery
dataset_id = 'db_banco'

# clientes
table_id = 'clientes'
schema_cliente = [
    bigquery.SchemaField("id", "INTEGER"),
    bigquery.SchemaField("sexo", "STRING"),
    bigquery.SchemaField("fecha_alta", "DATE"),
    bigquery.SchemaField("año_alta", "INTEGER"),
    bigquery.SchemaField("saldo", "FLOAT"),
    bigquery.SchemaField("ingreso", "FLOAT"),
    bigquery.SchemaField("rango_ingresos", "STRING"),
    bigquery.SchemaField("tipo_cliente", "STRING"),
    bigquery.SchemaField("edad", "INTEGER"),
    bigquery.SchemaField("rango_edad", "STRING"),
    bigquery.SchemaField("nivel_educativo", "STRING"),
    bigquery.SchemaField("antiguedad", "STRING")
]
# transacciones
table_id_tr = 'transacciones'
schema_transacciones = [
    bigquery.SchemaField("id_deposito_client", "INTEGER"),
    bigquery.SchemaField("id_recibe_cliente", "INTEGER"),
    bigquery.SchemaField("monto", "FLOAT"),
    bigquery.SchemaField("tipo_transaccion", "STRING"),
    bigquery.SchemaField("agencia", "STRING"),
    bigquery.SchemaField("medio_transaccion", "STRING"),
    bigquery.SchemaField("fallo", "INTEGER"),
    bigquery.SchemaField("fecha_transaccion", "DATETIME"),
    bigquery.SchemaField("motivo", "STRING"),
    bigquery.SchemaField("estado", "STRING")
]

In [6]:
# Funciones

# Función habilitar apis
def habilitar_apis_gcp(project_id, api_names):
    """
    Habilitar APIs.
    """
    for api_name in api_names:
        service = build('serviceusage', 'v1')
        request = service.services().enable(name=f'projects/{project_id}/services/{api_name}')
        response = request.execute()
        print(f"API '{api_name}' ha sido habilitada con éxito en el proyecto '{project_id}'.")

# Función para cargar un archivo .zip del codigo a Cloud Storage que utilizará Cloud Functions
def upload_to_bucket(bucket_name, local_file_path, remote_file_name):
    storage_client = storage.Client()
    try:
        bucket = storage_client.get_bucket(bucket_name)
    except:
        bucket = storage_client.create_bucket(bucket_name)

    blob = bucket.blob(remote_file_name)
    blob.upload_from_filename(local_file_path)
    print(f"Archivo {remote_file_name} subido a {bucket_name}.")

# Función para crear un dataset y tabla en BigQuery
def create_dataset_table(schema,dataset_id,table_id):
    client = bigquery.Client()

    dataset_ref = client.dataset(dataset_id)
    try:
        client.get_dataset(dataset_ref)
    except NotFound:
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = "US"
        dataset = client.create_dataset(dataset)
        print(f"Dataset {dataset_id} creado.")

    table_ref = dataset_ref.table(table_id)
    try:
        client.get_table(table_ref)
    except NotFound:
        table = bigquery.Table(table_ref, schema=schema)
        table = client.create_table(table)
        print(f"Tabla {table_id} creada.")

        table_ref = client.dataset(dataset_id).table(table_id)
        table = client.get_table(table_ref)

# Función para que el suscriptor reconozca el mensaje
def callback(message):
    print(f"Mensaje recibido: {message.data}")
    message.ack()  # Confirma el mensaje
    print("Mensaje confirmado.")

# Crear publisher, topic, suscription and subscriber
def create_pubsub_topic_and_publish(project_id, topic_id, subscription_name, df=None, batch_size=10):
    publisher = pubsub_v1.PublisherClient() # publicador
    subscriber = pubsub_v1.SubscriberClient() # suscriptor

    # Verificar si el tema ya existe
    topic_path = publisher.topic_path(project_id, topic_id)
    try:
        topic = publisher.get_topic(request={"topic": topic_path})
    except NotFound:
        topic = publisher.create_topic(request={"name": topic_path}) # crear tema
        print(f"Tema '{topic.name}' creado con éxito.")
    else:
        pass

    # Verificar si la suscripción ya existe
    subscription_path = subscriber.subscription_path(project_id, subscription_name)
    try:
        subscription = subscriber.get_subscription(request={"subscription": subscription_path})
    except NotFound:
        subscription = subscriber.create_subscription(request={"name": subscription_path, "topic": topic_path}) # crear suscripción
        print(f"Suscripción '{subscription.name}' creada con éxito.")
    else:
        pass

    # Iniciar la suscripción en un hilo separado
    thread = threading.Thread(target=subscriber.subscribe, args=(subscription_path,), kwargs={"callback": callback}) # confirma los mensajes
    thread.start()

    # Si se proporciona un DataFrame (df), enviar los datos a Pub/Sub en grupos de 10 mensajes
    if df is not None:
        messages = []
        for row in df.collect():
            data = row.asDict()
            messages.append(data)

            if len(messages) >= batch_size:
                for batch_start in range(0, len(messages), batch_size):
                    batch_messages = messages[batch_start:batch_start + batch_size]
                    batch_messages_json = [json.dumps(message) for message in batch_messages]
                    batch_message = "\n".join(batch_messages_json)
                    future = publisher.publish(topic_path, batch_message.encode("utf-8"))
                    sleep(1)
                messages = []

        if messages:
            batch_messages = [json.dumps(message) for message in messages]
            batch_message = "\n".join(batch_messages)
            future = publisher.publish(topic_path, batch_message.encode("utf-8"))

    # Esperar a que el hilo de suscripción termine
    thread.join()

# Crea un Cloud Functions que se dispara cuando Pub/Sub recibe mensaje mediante el tema
def create_cloud_function(project_id, function_name, source_url, trigger_topic):
    service = discovery.build('cloudfunctions', 'v1')

    # Verifica si la función ya existe
    parent = f'projects/{project_id}/locations/us-central1'
    functions = service.projects().locations().functions().list(parent=parent).execute()
    existing_functions = [f['name'] for f in functions.get('functions', [])]

    if f'projects/{project_id}/locations/us-central1/functions/{function_name}' in existing_functions:
        print(f"La función '{function_name}' ya existe.")
    else:
        function = {
            'name': f'projects/{project_id}/locations/us-central1/functions/{function_name}',
            'entryPoint': function_name,
            'runtime': 'python37',
            'sourceArchiveUrl': source_url,
            'eventTrigger': {
                'eventType': 'google.pubsub.topic.publish',
                'resource': trigger_topic
            },
            'environmentVariables': {
                'GOOGLE_FUNCTION_SOURCE': f'main.{function_name}'
            }
        }
        # Crea la función
        response = service.projects().locations().functions().create(location=parent, body=function).execute()

        print(f"Función creada: {response['name']}")

In [7]:
# Habilitar APIS
habilitar_apis_gcp(project_id, api_names)

API 'pubsub.googleapis.com' ha sido habilitada con éxito en el proyecto 'ingenieriadatos-392001'.
API 'cloudfunctions.googleapis.com' ha sido habilitada con éxito en el proyecto 'ingenieriadatos-392001'.
API 'cloudbuild.googleapis.com' ha sido habilitada con éxito en el proyecto 'ingenieriadatos-392001'.
API 'eventarc.googleapis.com' ha sido habilitada con éxito en el proyecto 'ingenieriadatos-392001'.


In [8]:
# O en Cloud Shell
#gcloud services enable pubsub.googleapis.com
#gcloud services enable dataflow.googleapis.com
#gcloud services enable bigquery.googleapis.com
#gcloud services enable notebooks.googleapis.com  -> para un entorno de jupyter en cloud

In [9]:
# Subimos el archivo que procesa el mensaje a GCS
# clientes
upload_to_bucket(bucket_name, local_file_path, remote_file_name)
# transacciones
upload_to_bucket(bucket_name, local_file_path_tr, remote_file_name_tr)

Archivo process_pubsub_message_clientes.zip subido a bk_code_message.
Archivo process_pubsub_message_transacciones.zip subido a bk_code_message.


In [10]:
# Crear dataset y tabla con su esquema
# clientes
create_dataset_table(schema_cliente,dataset_id,table_id)
create_dataset_table(schema_transacciones,dataset_id,table_id_tr)

Dataset db_banco creado.
Tabla clientes creada.
Tabla transacciones creada.


In [12]:
# Crear topic y susbscription
# clientes
create_pubsub_topic_and_publish(project_id, topic_id, subscription_name, df=None)
# transacciones
create_pubsub_topic_and_publish(project_id, topic_id_tr, subscription_name_tr, df=None)

Tema 'projects/ingenieriadatos-392001/topics/clientes-topic' creado con éxito.
Suscripción 'projects/ingenieriadatos-392001/subscriptions/clientes-subs' creada con éxito.
Tema 'projects/ingenieriadatos-392001/topics/transactions-topic' creado con éxito.
Suscripción 'projects/ingenieriadatos-392001/subscriptions/transactions-topic-subs' creada con éxito.


In [13]:
# Creamos un Cloud Function con trigger de Pub/Sub
# clientes
create_cloud_function(project_id, function_name, source_url, trigger_topic)
# transacciones
create_cloud_function(project_id, function_name_tr, source_url_tr, trigger_topic_tr)

Función creada: operations/aW5nZW5pZXJpYWRhdG9zLTM5MjAwMS91cy1jZW50cmFsMS9wcm9jZXNzX3B1YnN1Yl9tZXNzYWdlX2NsaWVudGVzL0ZpVk55Qmd4LW5F
Función creada: operations/aW5nZW5pZXJpYWRhdG9zLTM5MjAwMS91cy1jZW50cmFsMS9wcm9jZXNzX3B1YnN1Yl9tZXNzYWdlX3RyYW5zYWNjaW9uZXMvVHFzNFJENEdoU1E


In [14]:
# Ejecutar en Cloud Shell
#gcloud functions deploy process_pubsub_message_clientes \
#    --runtime python37 \
#    --trigger-topic clientes-topic \
#    --source gs://bk_code_message/process_pubsub_message.zip \
#    --set-env-vars GOOGLE_FUNCTION_SOURCE=main.process_pubsub_message_clientes