<a href="https://colab.research.google.com/github/clarissa-souza/Pipeline-ApacheBeam/blob/main/Pipeline_PySpark_Apache_beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Instalando e importando as bibliotecas

In [None]:
pip install pyspark

In [None]:
pip install gcsfs

In [None]:
pip --version 

pip 22.2.2 from /usr/local/lib/python3.7/dist-packages/pip (python 3.7)


In [None]:
pip install --upgrade pip

In [None]:
pip install apache_beam[interactive]

In [None]:
pip install apache_beam[gcp]

In [None]:
import apache_beam as beam

In [None]:
# Importando o pysaprk
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark.sql.functions as F

# Importando para o google cloud
from google.cloud import storage
import os

## Configuração para acessar google cloud storage 

In [None]:
# Montando o acesso ao drive para acessar os arquivos gerado pelo pyspark para testar localmente
from google.colab import drive
drive.mount ('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Configurando o caminho da Service account
#CONFIGURANDO A CHAVE DE SEGURANÇA posso colocar em qualquer diretorio
serviceAccount='/content/bc23-aulas-clarissa-83f5074aa293.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS']=serviceAccount

# Instanciar o client 
client=storage.Client()

# Receber o nome do bucket na variavel bucket 
bucket=client.get_bucket('bucket-proj-individual')

# Escolher o arquivo dentro da bucket
bucket.blob('flights.csv')

# Escolher uma variável que vai receber o caminho do arquivo que quero ler
path='gs://bucket-proj-individual/dados/original/flights.csv'

## Configurando o SparkSession e importando para o dataframe spark

In [None]:
#configurar a sparksession
spark = (SparkSession.builder
          .master('local[4]')
          .appName('proj-individual-BC23') 
          .config('spark.ui.port','4050')
          .config('spark.jars','https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar') 
          .getOrCreate() 
          )

In [None]:
spark

In [None]:
df_spark = (spark
       .read
       .format ('csv') 
       .option ('header','true') 
       .option('inferschema','true')
       .option('delimiter',',')
       .load(path)
      )



## Analisando e tratando o Dataframe pyspark

In [None]:
df_spark.show()

In [None]:
df_spark.printSchema()

In [None]:
df_spark.orderBy(F.col('CANCELLED').desc()).show(10, truncate=False)



+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

                                                                                

In [None]:
#Contando as linhas que tiveram o voo cancelado
df_spark.filter(F.col('CANCELLED')==1).count()



89884

In [None]:
#Filtrando as linhas onde os voos não foram cancelados
df_spark=df_spark.filter(F.col('CANCELLED')==0)

In [None]:
#Analisando a quantidade de linhas depois retirar os voos cancelados
df_spark.count()



5729195

In [None]:
# Drop na coluna 
df_spark=df_spark.drop('YEAR','MONTH','DAY','DAY_OF_WEEK','AIRLINE','FLIGHT_NUMBER','TAIL_NUMBER')

In [None]:
# Drop na coluna 
df_spark=df_spark.drop('DESTINATION_AIRPORT','SCHEDULED_DEPARTURE','DEPARTURE_TIME','DEPARTURE_DELAY','TAXI_OUT','WHEELS_OFF','SCHEDULED_TIME','ELAPSED_TIME','AIR_TIME','DISTANCE','WHEELS_ON','TAXI_IN','SCHEDULED_ARRIVAL','ARRIVAL_TIME')

In [None]:
# Drop na coluna 
df_spark=df_spark.drop('DIVERTED','CANCELLED','CANCELLATION_REASON','AIR_SYSTEM_DELAY','SECURITY_DELAY','AIRLINE_DELAY','LATE_AIRCRAFT_DELAY','WEATHER_DELAY')

In [None]:
df_spark.show(5)

In [None]:
#Filtrando por valores
df_spark.filter(F.col('ARRIVAL_DELAY').asc()).show()

In [None]:
#Filtrando por valores
df_spark.orderBy(F.col('ARRIVAL_DELAY').desc()).show()



+--------------+-------------+
|ORIGIN_AIRPORT|ARRIVAL_DELAY|
+--------------+-------------+
|           BHM|         1971|
|           RIC|         1898|
|           SAN|         1665|
|           DTW|         1638|
|           ABQ|         1636|
|           IND|         1636|
|           STL|         1627|
|           OMA|         1598|
|           LAS|         1593|
|           HNL|         1576|
|           HNL|         1574|
|           MSP|         1557|
|           MCO|         1556|
|         14747|         1555|
|           SAT|         1554|
|           SAN|         1554|
|           FAT|         1546|
|         11612|         1528|
|           SMF|         1514|
|           RIC|         1508|
+--------------+-------------+
only showing top 20 rows



                                                                                

In [None]:
#atribuindo 0 para os que estão com null na coluna Arrival_delay porque matematicamente não vai fazer diferença
df_spark=df_spark.fillna(value=0)

In [None]:
#Filtrando por valores
df_spark.orderBy(F.col('ORIGIN_AIRPORT').asc()).show()

In [None]:
"""
Filtrando as origens que começam com 1. Pelo que estudei no arquivo, tem que pegar uma tabela na AFA para converter esse números para siglas. 
Como é para estudo vou excluir essas linhas porque o que eu quero nesse momento é testar o pyspark como apache beam
"""

df_spark.filter(F.col('ORIGIN_AIRPORT').like('1%')).count()



483711

In [None]:
#Gravei em outro df tudo o que for diferente de começando por 1
df_spark1=df_spark.filter(~F.col('ORIGIN_AIRPORT').like('1%'))

In [None]:
df_spark1.orderBy(F.col('ORIGIN_AIRPORT').asc()).show(600)

In [None]:
df_spark1.orderBy(F.col('ORIGIN_AIRPORT').desc()).show(600)

In [None]:
df_spark1.printSchema()

root
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- ARRIVAL_DELAY: integer (nullable = true)



In [None]:
df_spark1.filter(F.col('ORIGIN_AIRPORT').isNull()).show()



+--------------+-------------+
|ORIGIN_AIRPORT|ARRIVAL_DELAY|
+--------------+-------------+
+--------------+-------------+



                                                                                

In [None]:
#Testando se o agrupamento pelo job ficou correto ;-)
df_spark1.filter(F.col('ARRIVAL_DELAY') > 0).groupBy(F.col('ORIGIN_AIRPORT')).sum().count()



322

## Gravando o arquivo tratado pelo Spark no bucket

In [None]:
#Com o pyspark ele divide o arquivo em vários arquivos menores 
df_spark1.write.csv('gs://bucket-proj-individual/dados/tratado/flights_tratado')

## Criando o pipeline

In [None]:
#criando o pipeline a partir do bucket
import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions

pipeline_options ={
    'project':'bc23-aulas-clarissa',
    'runner': 'DataflowRunner',
    'region': 'southamerica-east1',
    'staging_location': 'gs://bucket-proj-individual/staging/', 
    'temp_location': 'gs://bucket-proj-individual/staging/', 
    'template_location': 'gs://bucket-proj-individual/models/modelobatch' 
}

pipeline_options=PipelineOptions.from_dictionary(pipeline_options)

serviceAccount='/content/bc23-aulas-clarissa-83f5074aa293.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS']=serviceAccount

p1 = beam.Pipeline(options=pipeline_options)

atraso_minuto = (
    p1
    |'1 Extrair do CSV' >> beam.io.ReadFromText('gs://bucket-proj-individual/dados/tratado/flights_tratado/',skip_header_lines = 1) 
    |'1 Separador do CSV' >> beam.Map(lambda record: record.split(','))
    |'1 Filtrar aero origem' >> beam.Filter(lambda record: float(record[1]) > 0) 
    |'1 Agregar colunas' >> beam.Map(lambda record: (record[0],float(record[1])))
    |'1 Contruir nova tabela' >> beam.CombinePerKey(sum) 
)
quantidade = (
    p1
    |'2 Extrair do CSV' >> beam.io.ReadFromText('gs://bucket-proj-individual/dados/tratado/flights_tratado/',skip_header_lines = 1) 
    |'2 Separador do CSV' >> beam.Map(lambda record: record.split(','))
    |'2 Filtrar aero origem' >> beam.Filter(lambda record: float(record[1]) > 0)
    |'2 Agregar colunas' >> beam.Map(lambda record: (record[0],float(record[1])))
    |'2 Contruir nova tabela' >> beam.combiners.Count.PerKey()  
)
tabela = (
    {'Quantidade_minutos':atraso_minuto,'Numero_de_atrasos':quantidade}
    |'Agrupar as pernas' >> beam.CoGroupByKey()
    |'Gravar o resultado'>> beam.io.WriteToText('gs://bucket-proj-individual/dados/final/agregado',file_name_suffix='.csv')
)
p1.run() 

<DataflowPipelineResult None at 0x7ff5c72a7fd0>