<a href="https://colab.research.google.com/github/giobritos/soul_code_projeto_final/blob/main/Custom_Template_GCS_para_BigQuery.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Criando um custom template do Cloud Dataflow para enviar aquivos da GCS para o BigQuery

## Instalações (reiniciar a cada instalação)

In [None]:
pip install --upgrade pip

In [None]:
pip install apache_beam[interactive]

In [None]:
pip install apache_beam[gcp]

## Conexão a GCP

In [None]:
import os

# Configuração da chave de segurança
serviceAccount = '/content/projeto-final-373521-25961e56ca37.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

## Criando o template

In [None]:
# Arquivo base:
# ETL Processing on Google Cloud Using Dataflow and BigQuery
# https://www.cloudskillsboost.google/focuses/3460?parent=catalog

import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Classe com os passos para transformar um arquivo CSV 
# em um formato aceito pelo BigQuery
class DataIngestion:
    # Método que pega uma linha de dados separados por ',' 
    # e transforma em dicionário para ser carregado no BigQuery
    def parse_method(self, string_input):
        # Separando a linha por ',' e removendo '"' , '\n' e '\r'
        values = re.split(",", re.sub('\r\n','',re.sub('"','',string_input)))
        # Criando um dicionário com os dados
        row = dict(
            zip(('cidade', 'tipo_crime', 'distrito_zona', 'data_ocorrencia',
                 'periodo_ocorrencia', 'faixa_etaria_suspeito', 
                 'cor_pele_suspeito', 'sexo_suspeito', 'faixa_etaria_vitima', 
                 'cor_pele_vitima', 'sexo_vitima', 'latitude_longitude', 
                 'desc_foro', 'n_vitimas', 'tipo_pessoa', 'dt_nasc_pessoa', 
                 'idade_pessoa', 'profissao', 'tipo_local_fato', 
                 'logradouro', 'n_logradouro', 'corporacao_policial', 
                 'situacao_policial', 'dp_num_nome', 'bairro'),
                values))
        return row

# Função para criar o custom template
def run(argv=None):

  # Instanciando a classe DataIngestion
  data_ingestion = DataIngestion()

  # Argumentos da pipeline
  pipeline_options_dict = {
    'runner':'DataflowRunner',
    'project':'projeto-final-373521',
    'staging_location':'gs://pipeline-apachebeam/staging',
    'temp_location':'gs://pipeline-apachebeam/temp',
    'template_location':'gs://pipeline-apachebeam/models/nyspGCStoBQ',
    'region':'southamerica-east1',
    'input':'gs://projeto-final-agsw/tratados/ny_sp_tratado.csv',
    'output':'dataset_projeto_final_pipeline.ny_sp_tratado_base'
  }

  pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)

  # Iniciando a pipeline
  p = beam.Pipeline(options=pipeline_options)

  (p
    # Lendo o arquivo, pulando a primeira linha 
    # que tem as informações das colunas
    | 'Lendo o CSV' >> beam.io.ReadFromText(pipeline_options_dict.get('input'),\
                                            skip_header_lines=1)
    # Usando o método parse_method da classe DataIngestion 
    # para preparar os dados
    | 'Preparando os dados' >> beam.Map(lambda s:data_ingestion.parse_method(s))
    # Escrevendo no BigQuery
    | 'Escrevendo no BigQuery' >> beam.io.Write(
          beam.io.BigQuerySink(
          # Nome da tabela
          pipeline_options_dict.get('output'),
          # Esquema da tabela
          schema="cidade:STRING, tipo_crime:STRING, distrito_zona:STRING,"\
                 "data_ocorrencia:TIMESTAMP, periodo_ocorrencia:STRING,"\ 
                 "faixa_etaria_suspeito:STRING, cor_pele_suspeito:STRING,"\
                 "sexo_suspeito:STRING, faixa_etaria_vitima:STRING,"\ 
                 "cor_pele_vitima:STRING, sexo_vitima:STRING,"\ 
                 "latitude_longitude:STRING, desc_foro:STRING,"\
                 "n_vitimas:STRING, tipo_pessoa:STRING,"\
                 "dt_nasc_pessoa:STRING, idade_pessoa:STRING,"\
                 "profissao:STRING, tipo_local_fato:STRING, logradouro:STRING,"\ 
                 "n_logradouro:STRING, corporacao_policial:STRING, "\
                 "situacao_policial:STRING, dp_num_nome:STRING, bairro:STRING",
          # Cria a tabela se ela não existir
          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
          # Deleta os dados da tabela existente antes de escrever
          write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
  )
  p.run().wait_until_finish()
run()

  beam.io.BigQuerySink(
  self.table_reference.projectId = pcoll.pipeline.options.view_as(
  self.project = self.project or p.options.view_as(GoogleCloudOptions).project
