<a href="https://colab.research.google.com/github/felipecampelo/PipelineStreaming/blob/main/PipelineStreaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### ✅ Pipeline de Streaming de Dados ✅

`Objetivo`: Gerar dados streaming para o tópico 01 (Pub/Sub 01), executar a pipeline de dados streaming com o Dataflow (com o modelo gerado pelo Colab) que vai transferir os dados do tópico 01 para o tópico 02 (Pub/Sub 02), consumir os dados do tópico 02.

### ❗ Apache Beam ❗

`Passo a passo para instalação do Apache Beam no Colab:`

`1)` pip install --upgrade pip

`2)` pip install apache-beam[interactive]

`3)` Reiniciar ambiente de execução

`4)` pip install apache-beam[gcp]

`5)` Reiniciar ambiente de execução

`6)` import apache_beam as beam

`OBS: Nunca usar funções de exibição e gravação na mesma pipeline`

In [None]:
# Necessário atualizar o pip para instalação do Beam
!pip install --upgrade pip

In [None]:
# Instalando o apache-beam[interactive]
!pip install apache-beam[interactive]

In [None]:
# Instalando o apache-beam[gcp]
!pip install apache-beam[gcp]

In [None]:
# Instalando a biblioteca do Pub/Sub do GCP
!pip install google-cloud-pubsub

###⚡ Importando bibliotecas necessárias ⚡


In [1]:
# Importando as bibliotecas necessárias
import csv
import time
from google.cloud import pubsub_v1
import os

# Bibliotecas para Pipeline
import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam import window # Para trabalhar a pipeline com streaming

###⚡ Simulador de Streaming de Dados e Processamento via Pub/Sub ⚡

`Pub: Enche o tópico de dados`

`Sub: Lê e envia os dados do tópico`

In [2]:
# Conectando com o service account do GCP
serviceAccount = '/content/drive/MyDrive/KeysGCP/aulas-soulcode-felipe-1ab7e143ccf1.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

#### ⏩ Producer ou Criador de Dados (PUB) ⏪

In [None]:
# Definindo o topico e o publisher
topico = 'projects/aulas-soulcode-felipe/topics/igorconsumer01'
publisher = pubsub_v1.PublisherClient()

# Definindo os dados de entrada
entrada = '/content/drive/MyDrive/Datasets/voos.csv'

# Enviando os dados do CSV em linha a linha (simulando Streaming)
with open(entrada, 'rb') as file:
  next(file) # Pulando a linha das colunas
  for row in file:
    print('Linha enviada')
    publisher.publish(topico, row)

#### ⏩ Consumer ou Extrator de Dados (SUB) ⏪

In [8]:
# Definindo a subscription e o subscriber
subscription = 'projects/aulas-soulcode-felipe/subscriptions/igorsub02'
subscriber = pubsub_v1.SubscriberClient()

# Para ler as mensagens e assim poder apagar do topico
def visualizarMsg(mensagem):
  print(('mensagem: {}'.format(mensagem)))
  mensagem.ack()

subscriber.subscribe(subscription, callback = visualizarMsg)

<StreamingPullFuture at 0x7f61676fec50 state=pending>

###⚡ Modelo de Pipeline para Streaming ⚡

In [None]:
# Criando as configurações da Pipeline para conexão com o GCP
pipeline_options = {
    'project': 'aulas-soulcode-felipe', # ID do projeto do GCP
    'runner': 'DataflowRunner', # Aplicação que irá rodar (Dataflow)
    'region': 'southamerica-east1', # Região de preferência
    'staging_location': 'gs://atividade-beam-dataflow/staging/', # Localização para arquivos em staging
    'temp_location': 'gs://atividade-beam-dataflow/temp/', # Localização para arquivos temporários
    'template_location': 'gs://atividade-beam-dataflow/models/modelo_streaming', # Localização do modelo de Pipeline
    'save_main_session': True,
    'streaming': True
}

# Transformando o dicionário para o tipo de PipelineOption
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)

# Criando a Pipeline com as opções estabelecidas
p1 = beam.Pipeline(options = pipeline_options)

# Definindo a entrada e saída do Pipeline de acordo com o objetivo
entrada = 'projects/aulas-soulcode-felipe/subscriptions/igorsub01' # Subscription do Pub/Sub 01
saida = 'projects/aulas-soulcode-felipe/topics/igorconsumer02' # Tópico do Pub/Sub 02

# Classe com uma função para que cada registro seja decodificado para utf-8 e separado por vírgula
# Motivo: Pub/Sub entende os dados bit a bit
# Alternativa: Podemos também apenas colocar o return dessa função no beam.Map (não usando ParDo)
class separador(beam.DoFn):
  def process(self, record):
    return [record.decode('utf-8').split(',')]

# Perna para coletar os dados de entrada
pcollection_entrada = (
    p1 
    |'Ler do topico' >> beam.io.ReadFromPubSub(subscription = entrada)
)

tempoAtraso = (
    pcollection_entrada
    |'Separador 1 do Tópico - CSV' >> beam.ParDo(separador()) # Definindo o separador do arquivo
    |'Filtrar atrasos maior que zero (1)' >> beam.Filter(lambda record: float(record[8]) > 0) # Filtrando os que tem atraso
    |'Agregar as colunas (1)' >> beam.Map(lambda record: (record[0], float(record[1]))) # Pegando apenas as colunas 4 e 8
    |'Janela de sliding (1)' >> beam.WindowInto(window.SlidingWindows(15, 10)) # Definindo a window -> 15 é o tempo total da janela, 10 é o momento que abre outra
    |'Construir uma nova tabela por minuto (1)' >> beam.CombinePerKey(sum) # Agrupando por chave e fazendo a soma
)

qtdAtraso = (
    pcollection_entrada
    |'Separador 2 do Tópico - CSV' >> beam.ParDo(separador()) # Definindo o separador do arquivo
    |'Filtrar atrasos maior que zero (2)' >> beam.Filter(lambda record: float(record[8]) > 0) # Filtrando os que tem atraso
    |'Agregar as colunas (2)' >> beam.Map(lambda record: (record[0], float(record[1]))) # Pegando apenas as colunas 4 e 8
    |'Janela de sliding (2)' >> beam.WindowInto(window.SlidingWindows(15, 10)) # Definindo a window -> 15 é o tempo total da janela, 10 é o momento que abre outra
    |'Construir uma nova tabela por contagem (2)' >> beam.combiners.Count.PerKey() # Fazendo uma contagem de ocorrências
)

# A última pipeline não precisa do p1 (pernas) já que estamos juntando elas
tabela = (
    {'Quantidade_minutos': tempoAtraso, 'Numero_de_atrasos': qtdAtraso}
    |'Agrupar as pernas' >> beam.CoGroupByKey()
    |'Converter para PubSub' >> beam.Map(lambda row: (''.join(str(row)).encode('utf-8')))
    |'Gravar o resultado' >> beam.io.WriteToPubSub(saida) # Enviando para o Pub/Sub
)

# Executando as Pipelines
execucao = p1.run()
execucao.wait_until_finish() # Para esperar até finalizar (não sabemos o fim do streaming)