<a href="https://colab.research.google.com/github/clarissa-souza/Desafio_raizen_PySpark/blob/main/PySpark_Raizen.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Instalando PySpark
pip install pyspark

In [10]:
# Importando as bibliotecas 
import os
import json
import requests
from datetime import datetime, date
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from functools import reduce
from pyspark.sql.types import StringType

In [2]:
#configurar a sparksession
spark = (SparkSession.builder
          .master('local[4]') #aqui eu defino a maquina principal nesse caso vou usar a maquina do colab 'local' se não fosse deveria colocar o endereço url dela obs.: 'local[4]' 4 é a quantidade de nucleos ou cores
          .appName('pyspark-airflow') # o nome do app
          .config('spark.ui.port','4050') # a porta onde vou rodar
          .getOrCreate() #pegue ou crie
          )

In [3]:
#Download dos arquivos
def dados_diesel():
  os.makedirs ('dadosOriginais',exist_ok = True)
  url='https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/vdpb/vct/vendas-oleo-diesel-tipo-m3-2013-2022.csv'
  response = requests.get(url).content
  open('dadosOriginais/vendas-oleo-diesel-tipo-m3-2013-2022.csv','wb').write(response)

def dados_petroleo():
  os.makedirs ('dadosOriginais',exist_ok = True)
  url='https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/vdpb/vendas-derivados-petroleo-e-etanol/vendas-derivados-petroleo-etanol-m3-1990-2022.csv'
  response = requests.get(url).content
  open('dadosOriginais/vendas-derivados-petroleo-etanol-m3-1990-2022.csv','wb').write(response)

In [4]:
#Une os arquivos 
def unionAll(*dfs):
  return reduce(DataFrame.unionAll, dfs)

In [5]:
#Extracao dos dados
def extracaoDados():
  dfDiesel = (spark
          .read
          .format ('csv') 
          .option ('header','true') # para incluir o cabeçalho
          .option('inferschema','true') #para ele dizer o esquema
          .option('delimiter',';')
          .load('/content/dadosOriginais/vendas-oleo-diesel-tipo-m3-2013-2022.csv')
          )

  dfPetroleo = (spark
          .read
          .format ('csv') 
          .option ('header','true') # para incluir o cabeçalho
          .option('inferschema','true') #para ele dizer o esquema
          .option('delimiter',';')
          .load('/content/dadosOriginais/vendas-derivados-petroleo-etanol-m3-1990-2022.csv')
          )
  dfDadosExtraido = unionAll(*[dfDiesel, dfPetroleo])
  return dfDadosExtraido

In [6]:
def trataDados(dfGeral):
  #Bloquear a coluna que não é necessária
  dfGeral=dfGeral.drop(F.col('GRANDE REGIÃO'))
  #Alterar o nome das colunas
  dfGeral=dfGeral.withColumnRenamed('ANO','year_month') \
               .withColumnRenamed('MÊS','mes') \
               .withColumnRenamed('UNIDADE DA FEDERAÇÃO','uf') \
               .withColumnRenamed('PRODUTO','product') \
               .withColumnRenamed('VENDAS','volume')  
  #Alterando a coluna mes de nome para número ainda em string
  mesDict={'JAN':'01','FEV':'02','MAR':'03','ABR':'04','MAI':'05','JUN':'06','JUL':'07','AGO':'08','SET':'09','OUT':'10','NOV':'11','DEZ':'12'}
  dfGeral=dfGeral.rdd.map(lambda x: (x.year_month,x.uf,x.product,x.volume,mesDict[x.mes])).toDF(['year_month','uf','product','volume','mes'])
  #Alterando a coluna ano para string
  dfGeral = dfGeral.withColumn('year_month',dfGeral['year_month'].cast(StringType()))
  # Juntando year_month com mes
  dfGeral=dfGeral.withColumn('year_month',F.concat(F.col('year_month'),F.lit('-'),F.col('mes'))).drop(F.col('mes'))
  #Alterando a coluna uf de nome estado para uf
  ufDict={'ACRE':'AC','ALAGOAS':'AL','AMAPÁ':'AP','AMAZONAS':'AM','BAHIA':'BA','CEARÁ':'CE','DISTRITO FEDERAL':'DF','ESPÍRITO SANTO':'ES','GOIÁS':'GO','MARANHÃO':'MA','MATO GROSSO':'MT','MATO GROSSO DO SUL':'MS','MINAS GERAIS':'MG','PARANÁ':'PR','PARAÍBA':'PB','PARÁ':'PA','PERNAMBUCO':'PE','PIAUÍ':'PI','RIO DE JANEIRO':'RJ','RIO GRANDE DO NORTE':'RN','RIO GRANDE DO SUL':'RS','RONDÔNIA':'RO','RORAIMA':'RR','SANTA CATARINA':'SC','SERGIPE':'SE','SÃO PAULO':'SP','TOCANTINS':'TO'}
  dfGeral=dfGeral.rdd.map(lambda x: (x.year_month,ufDict[x.uf],x.product,x.volume)).toDF(['year_month','uf','product','volume'])
  #Inserindo a coluna unit
  dfGeral = dfGeral.withColumn('unit',F.lit('m3'))
  # Inserindo coluna created_at
  dfGeral=dfGeral.withColumn('created_at',current_timestamp())
  #alterando o volume de string para double 
  dfGeral = dfGeral.withColumn('volume',dfGeral['volume'].cast('double'))
  #alterando o volume de string para double 
  dfGeral = dfGeral.withColumn('year_month', to_date(F.col('year_month'),'yyyy-MM'))
  return dfGeral


In [15]:
def gravaDados(dfGravaDados):
  os.makedirs ('dadosTratados',exist_ok = True)
  dfGravaDados.write.format('json').save('dadosTratados/DadosDerivadosPrtroleo.json')

In [7]:
#Execucao
dados_diesel()
dados_petroleo()

In [8]:
dfDados=extracaoDados()

In [11]:
dfPetroleo=trataDados(dfDados)

In [17]:
dfPetroleo.printSchema()

root
 |-- year_month: date (nullable = true)
 |-- uf: string (nullable = true)
 |-- product: string (nullable = true)
 |-- volume: double (nullable = true)
 |-- unit: string (nullable = false)
 |-- created_at: timestamp (nullable = false)



In [16]:
gravaDados(dfPetroleo)