<a href="https://colab.research.google.com/github/MarianaDuartee/ProjetoFinal/blob/main/1_Apache_Beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### INSTALANDO DEPENDENCIAS

In [None]:
!pip install google-cloud-pubsub
!pip install fsspec
!pip install gcsfs
!pip install apache-beam[interactive]
!pip install apache_beam[gcp]
!pip install google-cloud-bigquery

### IMPORTANDO BIBLIOTECAS

In [181]:
import csv
import time
import os
import json
import argparse
import logging

import fsspec
import gcsfs
import pandas as pd

import apache_beam as beam
from apache_beam import coders
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions

from google.cloud import storage
from google.cloud import bigquery
from google.colab import drive
from google.oauth2 import service_account

# Configurando conta de serviço
service_account_key = r"/content/soulcode-projeto-final-4b88bea6e07a.json"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_key

# PIPELINE BATCH

### PIPELINE LOCAL PARA GCS

In [None]:
# Criando pipeline
p1 = beam.Pipeline()

# Modelo de Pipeline. Ingerindo arquivos locais

path_file = r'/content/drive/MyDrive/Projeto Final/OcorrenciasUF.json'

rows = (
    p1
    |'Extrair_Dados' >> beam.io.Read(path_file, skip_header_lines=0, coder=coders.StrUtf8Coder())
    |'Ler_Elementos' >> beam.Map(lambda element: element)
    # |'Separar_Elementos' >> beam.Map(lambda element: element.split(','))
    |'Gravar_resultado' >> beam.io.WriteToText('gs://data_lake_ingest_data/1_input/converter/OcorrenciasUF', file_name_suffix='.json')
   )

p1.run()

df_test = pd.read_json('gs://data_lake_ingest_data/1_input/converter/OcorrenciasUF-00000-of-00001.json')
print(df_test)
print(df_test.dtypes)

### PIPELINE GCS PARA BIGQUERY (BATCH)

In [190]:
def create_table(element):
    dict_ = {} 
    dict_['UF'] = str(element[1])
    dict_['populacao_estimada'] = float(element[2])
    dict_['Ano'] = int(element[3])
    return(dict_)

def run(argv=None):
    """Run the workflow."""
    parser = argparse.ArgumentParser()

    parser.add_argument('--output', required=True,
    help=(
          'Output BigQuery table for results specified as: '
          'PROJECT:DATASET.TABLE or DATASET.TABLE.'))
    known_args, pipeline_args = parser.parse_known_args(argv)

    with beam.Pipeline(argv=pipeline_args) as p1:
        table_schema = bigquery.TableSchema()

        UF = bigquery.TableFieldSchema()
        UF.name = 'UF'
        UF.type = 'string'
        UF.mode = 'nullable'
        table_schema.fields.append(UF)

        populacao_estimada = bigquery.TableFieldSchema()
        populacao_estimada.name = 'populacao_estimada'
        populacao_estimada.type = 'float'
        populacao_estimada.mode = 'nullable'
        table_schema.fields.append(populacao_estimada)

        Ano = bigquery.TableFieldSchema()
        Ano.name = 'populacao_estimada'
        Ano.type = 'integer'
        Ano.mode = 'nullable'
        table_schema.fields.append(Ano)

pipeline_args={'runner':'DataflowRunner',
               'job_name':'bq-load',
               'project':'soulcode-projeto-final',
               'region':'southamerica-east1',
               'temp_location':'gs://data_lake_ingest_data/temp_process',
               'staging_location':'gs://data_lake_ingest_data/temp_process',
               'template_location':'gs://data_lake_ingest_data/4_templates/template_model_batch',
               'save_main_session':True,
               }

options = PipelineOptions.from_dictionary(pipeline_args)
p1 = beam.Pipeline(options=options)

p1 = beam.Pipeline()

path_file_input = 'gs://data_lake_ingest_data/2_temp/temp_pandas_total_pop_ano_uf.csv'
path_file_output = 'gs://data_lake_ingest_data/temp/temp_pandas_total_pop_ano_uf'

table_schema =[
                {
                    "mode": "NULLABLE",
                    "name": "UF",
                    "type": "STRING"
                },
                {
                    "mode": "NULLABLE",
                    "name": "populacao_estimada",
                    "type": "STRING"
                },
                {
                    "mode": "NULLABLE",
                    "name": "Ano",
                    "type": "INTEGER"
                }
            ]

schema = json.dumps(table_schema)

pass_to_table = (
        p1 

        |'Extraindo_Dados' >> beam.io.ReadFromText(path_file_input, skip_header_lines=1)
        |'Separar_Elementos' >> beam.Map(lambda element: element.split(','))
        |'Criar_Tabela' >> beam.Map(lambda element: create_table(element))
        |'Print' >> beam.Map(lambda element: print(element))
        |'Gravar_resultado' >> beam.io.WriteToText(path_file_output, file_name_suffix='.csv', header=True)
        |'Gravar_Resultado' >> beam.io.WriteToBigQuery(
                                    table='TesteBeamApache',
                                    dataset='Teste',
                                    project='soulcode-projeto-final',
                                    custom_gcs_temp_location='gs://data_lake_ingest_data/temp_process',
                                    schema=schema,
                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
        )

p1.run().wait_until_finish()



{'UF': 'AC', 'populacao_estimada': 816687.0, 'Ano': 2016}
{'UF': 'AL', 'populacao_estimada': 3358963.0, 'Ano': 2016}
{'UF': 'AM', 'populacao_estimada': 4001667.0, 'Ano': 2016}
{'UF': 'AP', 'populacao_estimada': 782295.0, 'Ano': 2016}
{'UF': 'BA', 'populacao_estimada': 15276566.0, 'Ano': 2016}
{'UF': 'CE', 'populacao_estimada': 8963663.0, 'Ano': 2016}
{'UF': 'DF', 'populacao_estimada': 2977216.0, 'Ano': 2016}
{'UF': 'ES', 'populacao_estimada': 3973697.0, 'Ano': 2016}
{'UF': 'GO', 'populacao_estimada': 6695855.0, 'Ano': 2016}
{'UF': 'MA', 'populacao_estimada': 6954036.0, 'Ano': 2016}
{'UF': 'MG', 'populacao_estimada': 20997560.0, 'Ano': 2016}
{'UF': 'MS', 'populacao_estimada': 2682386.0, 'Ano': 2016}
{'UF': 'MT', 'populacao_estimada': 3305531.0, 'Ano': 2016}
{'UF': 'PA', 'populacao_estimada': 8305359.0, 'Ano': 2016}
{'UF': 'PB', 'populacao_estimada': 3999415.0, 'Ano': 2016}
{'UF': 'PE', 'populacao_estimada': 9410336.0, 'Ano': 2016}
{'UF': 'PI', 'populacao_estimada': 3212180.0, 'Ano': 201



RuntimeError: ignored