# Inicio do curso Engenharia de Dados com Dataflow e Apache Beam na GCP

## Foi instalado no ambiente o apache beam porém antes foi necessário instalar o pacote
##  pip install wheel  e depois  pip install apache-beam[interactive] 
https://beam.apache.org/get-started/quickstart-py/

### Docs
https://beam.apache.org/releases/pydoc/2.31.0/

In [2]:
import apache_beam as beam

In [3]:
# definir Pipeline
p1 = beam.Pipeline()

#Criando uma Pcollection
Pcollection = (
    p1 
    | "Tupla" >> beam.Create( [ ("Cassio",32) , ("Vics",21) ] ) 
    | "print Tupla" >> beam.Map(print), #tupla

    p1
    | "Lista" >> beam.Create ( [ 1,2,3 ] )
    |  "print Lista" >> beam.Map(print) #lista
)

p1.run()

# beam.Create ler a entrada e aceita diversos tipos de arquivos(txt, csv,...Bigquery e por ai vai)



1
2
3
('Cassio', 32)
('Vics', 21)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7ff8d8a945b0>

In [4]:
p1 = beam.Pipeline()

voos = (

    #ler arquivo, e excluir cabeçalho
    # as pipes(I) signigicam que um comando é usado como input de outro
    p1              
    | "Importar Dados" >> beam.io.ReadFromText("data/voos_sample.csv", skip_header_lines = 1)
    | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
    #| 'Mostrar Resultados' >> beam.Map(print) #Para exibir na tela o resultado(foi colocado como exemplo)
    | "Gravar Resultado" >> beam.io.WriteToText("data/voos.txt")
)

#Comando para executar
p1.run()





<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7ff8d8243eb0>

## beam.Map() A função do map é aplicar uma transformação.  lambda record, diz que para cada gravação/registro ele irá dividir utilizando o metodo split(). Posso usar o record para outras transformações

## beam.Flatmap() Realiza também uma transformação, porém separa ou criar novas colunas a partir de uma coleção de elementos(pode ser espaço, virgula, ponto e virgula)

https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/

In [5]:
p2 =beam.Pipeline()

Collection = (
    p2
    | 'Importando dados' >> beam.io.ReadFromText('data/poema.txt')
    | 'Transformando' >> beam.FlatMap(lambda record: record.split()) 
    | 'Gravando resultado' >> beam.io.WriteToText('data/resultado_poema.txt')
)

p2.run()

#Resultado: Usando o flatmap, o texto foi foi separado por cada espaço



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7ff8e4aa2400>

# Criando filtros

https://beam.apache.org/documentation/transforms/python/elementwise/filter/

In [6]:

p3 = beam.Pipeline()

voos3 = (

    #Utilziando o Filter será filtrado para retorna apenas as linhas onde o LAX for verdadeiro, 
    #no caso o record[3] está referenciando a 4 coluna da tabela, que é o aeroporto
    
    p3              
    | "Importar Dados" >> beam.io.ReadFromText("data/voos_sample.csv")
    | "Separar por Vírgulas" >> beam.FlatMap(lambda record: record.split())
    | 'Pegar voos de Los Angeles' >> beam.Filter(lambda record: record[3] == 'LAX')
    | "Gravar Resultado" >> beam.io.WriteToText("data/filtro_voos.txt")

)

p3.run()





<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7ff8d82432e0>

In [7]:
palavras = ['quatro', 'três']

def encontraPalavras(i):
    if i in palavras:
        return True

p4 = beam.Pipeline()

Collection = (
    p4
    | 'Importando dados' >> beam.io.ReadFromText('data/poema.txt')
    | 'Transformando' >> beam.FlatMap(lambda record: record.split()) 
    | 'Filtrando'   >> beam.Filter(encontraPalavras)
    | 'Gravando resultado' >> beam.io.WriteToText('data/filtro_poema.txt')
)

p4.run()




<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7ff8d8243160>

# Flatten - Ela agrupa dados do mesmo tipo em uma unica PCollection, basicamente igual um UNION

In [8]:
p5 = beam.Pipeline()

negros = ('Adão', 'Jesus', 'Fernando')
brancos = ('Tulio', 'Grace', 'Joca')
indios = ('Vic', 'Marta', 'Tom')

negros_pc = p5 | 'Criando PCollection negros' >> beam.Create(negros)
brancos_pc = p5 | 'Criando PCollection brancos' >> beam.Create(brancos)
indis_pc = p5 | 'Criando PCollection indios' >> beam.Create(indios)

pessoas = ((negros_pc, brancos_pc, indis_pc) | beam.Flatten() | beam.Map(print))

p5.run()

#Resultado: Foi unido 3 tuplas em uma unica usando o Flatten()




Tulio
Grace
Joca
Adão
Jesus
Fernando
Vic
Marta
Tom


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7ff8d80a3250>

# CombinePerKey - Função de agregação, baseada chave em uma lista de valores

In [9]:
p6 = beam.Pipeline()

Tempo_Atrasos = (
    p6
    | "Importar Dados" >> beam.io.ReadFromText("data/voos_sample.csv")
    | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
    | "Pegar voos de Los Angeles" >> beam.Filter(lambda record: int(record[8]) > 0 )
    | "Criar par" >> beam.Map(lambda record: (record[4],int(record[8])))
    | "Somar por key" >> beam.CombinePerKey(sum)
    | "Mostrar Resultados" >> beam.Map(print)
)

p6.run()



('LAX', 94)
('HNL', 15)
('DFW', 95)
('OGG', 138)
('JFK', 220)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7ff8d8132a90>

# Combiners Count Perkey - Realiza contas com base nas combinações, realiza um GroupBy com Count

In [10]:
p7 = beam.Pipeline()

Qtd_Atrasos = (
    p7
    | "Importar Dados" >> beam.io.ReadFromText("data/voos_sample.csv", skip_header_lines = 1)
    | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
    | "Pegar voos de Los Angeles" >> beam.Filter(lambda record: int(record[8]) > 0 )
    | "Criar par" >> beam.Map(lambda record: (record[4],int(record[8])))
    | "Contar por key" >> beam.combiners.Count.PerKey()
    | "Mostrar Resultados" >> beam.Map(print)
)

p7.run()



('LAX', 4)
('HNL', 1)
('DFW', 1)
('OGG', 1)
('JFK', 4)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7ff8d8144820>

# Combiners Count PerElement - Realiza a contagem de quantos elementos são iguais

In [15]:
p7 = beam.Pipeline()

Qtd_Atrasos = (
    p7
    | "Importar Dados" >> beam.io.ReadFromText("data/voos_sample.csv", skip_header_lines = 1)
    | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
    | "Pegar voos de Los Angeles" >> beam.Filter(lambda record: int(record[8]) > 0 )
    | "Criar par" >> beam.Map(lambda record: (record[4],int(record[8])))
    | "Contar por key" >> beam.combiners.Count.PerElement()
    | "Mostrar Resultados" >> beam.Map(print)
)

p7.run()



(('LAX', 39), 1)
(('HNL', 15), 1)
(('DFW', 95), 1)
(('OGG', 138), 1)
(('LAX', 19), 1)
(('JFK', 1), 1)
(('JFK', 88), 1)
(('LAX', 18), 1)
(('JFK', 120), 1)
(('LAX', 16), 1)
(('JFK', 11), 1)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7ff8d1314af0>

# Combiners Count Globally - Contabiliza os elementos que estão na agregação

In [16]:
p7 = beam.Pipeline()

Qtd_Atrasos = (
    p7
    | "Importar Dados" >> beam.io.ReadFromText("data/voos_sample.csv", skip_header_lines = 1)
    | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
    | "Pegar voos de Los Angeles" >> beam.Filter(lambda record: int(record[8]) > 0 )
    | "Criar par" >> beam.Map(lambda record: (record[4],int(record[8])))
    | "Contar por key" >> beam.combiners.Count.Globally()
    | "Mostrar Resultados" >> beam.Map(print)
)

p7.run()



11


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7ff8d135bee0>

# COGroupByKey - Realiza um Join atraves de chaves de uma PCollection separada

In [12]:
p8 = beam.Pipeline()

Tempo_Atrasos = (
    p8
    | "Importar Dados Atraso" >> beam.io.ReadFromText("data/voos_sample.csv", skip_header_lines = 1)
    | "Separar por Vírgulas Atraso" >> beam.Map(lambda record: record.split(','))
    | "Pegar voos com atraso" >> beam.Filter(lambda record: int(record[8]) > 0 )
    | "Criar par atraso" >> beam.Map(lambda record: (record[4],int(record[8])))
    | "Somar por key" >> beam.CombinePerKey(sum)

)

Qtd_Atrasos = (
    p8
    | "Importar Dados" >> beam.io.ReadFromText("data/voos_sample.csv", skip_header_lines = 1)
    | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
    | "Pegar voos com atraso qtd" >> beam.Filter(lambda record: int(record[8]) > 0 )
    | "Criar par qtd" >> beam.Map(lambda record: (record[4],int(record[8])))
    | "Contar por key" >> beam.combiners.Count.PerKey()

)

tabela_atrasos = (
    {'Qtd_Atrasos':Qtd_Atrasos,'Tempo_Atrasos':Tempo_Atrasos} 
    | "Group By" >> beam.CoGroupByKey()
    | beam.Map(print)
)

p8.run()

# Resultado: Cria um dicionario com a quantidade de atrasos e tempos de atrasos de cada voo



('LAX', {'Qtd_Atrasos': [4], 'Tempo_Atrasos': [92]})
('HNL', {'Qtd_Atrasos': [1], 'Tempo_Atrasos': [15]})
('DFW', {'Qtd_Atrasos': [1], 'Tempo_Atrasos': [95]})
('OGG', {'Qtd_Atrasos': [1], 'Tempo_Atrasos': [138]})
('JFK', {'Qtd_Atrasos': [4], 'Tempo_Atrasos': [220]})


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7ff8d3743340>

# ParDo - Permite criar funções personalizadas e rodar dentro do framework

In [13]:
p9 = beam.Pipeline()

#Criando uma classe e passando como parâmetro a função DO(PArDO), para realizar um filtro que antes era realiza por outros metodos
class filtro(beam.DoFn):
    def process(self,record):
        if int(record[8]) > 0:
            return [record]

Tempo_Atrasos = (
    p9
    | "Importar Dados Atraso" >> beam.io.ReadFromText('data/voos_sample.csv', skip_header_lines = 1)
    | "Separar por Vírgulas Atraso" >> beam.Map(lambda record: record.split(','))
    | "Pegar voos com atraso" >> beam.ParDo(filtro())
    | "Criar par atraso" >> beam.Map(lambda record: (record[4],int(record[8])))
    | "Somar por key" >> beam.CombinePerKey(sum)

)

Qtd_Atrasos = (
    p9
    | "Importar Dados" >> beam.io.ReadFromText('data/voos_sample.csv', skip_header_lines = 1)
    | "Separar por Vírgulas Qtd" >> beam.Map(lambda record: record.split(','))
    | "Pegar voos com Qtd" >> beam.ParDo(filtro())
    | "Criar par Qtd" >> beam.Map(lambda record: (record[4],int(record[8])))
    | "Contar por key" >> beam.combiners.Count.PerKey()

)

tabela_atrasos = (
    {'Qtd_Atrasos':Qtd_Atrasos,'Tempo_Atrasos':Tempo_Atrasos} 
    | "Group By" >> beam.CoGroupByKey()
    | beam.Map(print)
)

p9.run()



('LAX', {'Qtd_Atrasos': [4], 'Tempo_Atrasos': [92]})
('HNL', {'Qtd_Atrasos': [1], 'Tempo_Atrasos': [15]})
('DFW', {'Qtd_Atrasos': [1], 'Tempo_Atrasos': [95]})
('OGG', {'Qtd_Atrasos': [1], 'Tempo_Atrasos': [138]})
('JFK', {'Qtd_Atrasos': [4], 'Tempo_Atrasos': [220]})


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7ff8d36e6250>