### Pipeline que busca arquivos zipados na GCS, concatena e salva novamente na GCS

#### Importa bibliotecas

In [1]:
import zipfile
from zipfile import ZipFile
import google.auth
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import PipelineOptions
import argparse
import os

#### Autorizações para acesso a GCS e cria bucket para arquivos temporários

In [2]:
project = google.auth.default()[1]
#Roda pipeline localmente
options = pipeline_options.PipelineOptions(
    streaming=True,
    project=project
)
bucket='temporariospj'

In [4]:
#cria bucket para envio dos aquivos temporários
!gsutil mb  gs://temporariosacidente

Creating gs://temporariosacidente/...
ServiceException: 409 A Cloud Storage bucket named 'temporariosacidente' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


#### Define a função run que executa todos os passos da pipeline
##### Usa options para rodar localmente e argv para o DataFlowRunner


In [5]:
def run():
    #Lê arquivo zipado, descomprime e separa por linhas
    def unzip(readable_file):
        import zipfile
        from zipfile import ZipFile
        arq_zip=zipfile.ZipFile(readable_file.open('r'))
        nome=arq_zip.namelist()
        arquivo=arq_zip.read(nome[0])
        arquivo.decode(encoding='iso-8859-1').split('\n')
        bytearray(arquivo)
        yield arquivo[363:] # tira o header
    #informações para rodar pipeline no DataFlow
    argv = [
      '--project={0}'.format(project),
      '--region=us-central1',  
      '--staging_location=gs://{0}/staging/'.format(bucket),
      '--temp_location=gs://{0}/staging/'.format(bucket),
      '--runner=DataflowRunner',
      '--template_location:gs://{0}/model/'.format(bucket)]
    
    p = beam.Pipeline(argv=argv)
    (p
      | 'Procura arquivo' >> beam.io.fileio.MatchFiles('gs://projetofinalscacademy/dadosbrutos/ocorrencias/datatran20*.zip')
      | 'Encontra os targets' >> beam.io.fileio.ReadMatches() 
      | 'Unzipa' >> beam.FlatMap(unzip)  
      | 'Escreve arquivo' >> beam.io.WriteToText('gs://projetofinalscacademy/dadosbrutos/ocorrencias/acidentes_ocorrencias', file_name_suffix='.csv'))

    p.run()

In [None]:
run()



#### Visualização com Interactive Runner

In [10]:
from apache_beam.runners.interactive import interactive_runner
import apache_beam.runners.interactive.interactive_beam as ib

In [6]:
options = pipeline_options.PipelineOptions()

In [11]:
def unzip(readable_file):
    input_zip=zipfile.ZipFile(readable_file.open('r'))
    a=input_zip.namelist()
    b=input_zip.read(a[0])
    b.decode(encoding='iso-8859-1').split('\n')
    bytearray(b)
    yield b[363:]
    #informações para rodar pipeline no DataFlow

    
p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options)
(p
    | 'Procura arquivo' >> beam.io.fileio.MatchFiles('Documents/Projeto_Final/DataSet/Ocorrências/datatran20*.zip')
    | 'Encontra os targets' >> beam.io.fileio.ReadMatches() 
    | 'Unzipa' >> beam.FlatMap(unzip)  
    | 'Escreve arquivo' >> beam.io.WriteToText('ocorrencias', file_name_suffix='.csv'))
    



<PCollection[[11]: Escreve arquivo/Write/WriteImpl/FinalizeWrite.None] at 0x7f0f2905f220>

In [12]:
ib.show_graph(p)

/opt/conda/miniconda3/bin/dot
