### Criação request para baixar os arquivos CSV

In [2]:
import requests

In [3]:
url = 'https://balanca.economia.gov.br/balanca/bd/comexstat-bd/ncm/EXP_2021.csv'
url1 = 'https://balanca.economia.gov.br/balanca/bd/comexstat-bd/ncm/EXP_2022.csv'

url2 = 'https://balanca.economia.gov.br/balanca/bd/comexstat-bd/ncm/IMP_2021.csv'
url3 = 'https://balanca.economia.gov.br/balanca/bd/comexstat-bd/ncm/IMP_2022.csv'

exportacao2021 = requests.get(url, verify=False)
exportacao2022 = requests.get(url1, verify=False)

importacao2021 = requests.get(url2, verify=False)
importacao2022 = requests.get(url3, verify=False)




In [4]:
with open("exportacao2021.csv", "wb") as f:
  f.write(exportacao2021.content)
with open("exportacao2022.csv", "wb") as f:
  f.write(exportacao2022.content)
with open("importacao2021.csv", "wb") as f:
  f.write(importacao2021.content)
with open("importacao2022.csv", "wb") as f:
  f.write(importacao2022.content)

### Criação do SparkSession

In [5]:
from pyspark.sql import SparkSession

# spark = SparkSession.builder.appName("A3Data").getOrCreate()
spark = SparkSession \
        .builder \
        .config("spark.sql.repl.eagerEval.enabled", "True") \
        .config("spark.sql.repl.eagerEval.maxNumRows", "30") \
        .getOrCreate()

### Lendo os arquivos CSV e colocando as colunas com letras minúsculas

In [6]:
df = spark.read.csv("exportacao2021.csv", header=True, inferSchema=True, sep=";")
df1 = spark.read.csv("exportacao2022.csv", header=True, inferSchema=True, sep=";")
df2 = spark.read.csv("importacao2021.csv", header=True, inferSchema=True, sep=";")
df3 = spark.read.csv("importacao2022.csv", header=True, inferSchema=True, sep=";")

In [7]:
for col in df.columns:
    df = df.withColumnRenamed(col, col.lower())

for col in df1.columns:
    df1 = df1.withColumnRenamed(col, col.lower())

for col in df2.columns:
    df2 = df2.withColumnRenamed(col, col.lower())

for col in df3.columns:
    df3 = df3.withColumnRenamed(col, col.lower())

In [8]:
df2

co_ano,co_mes,co_ncm,co_unid,co_pais,sg_uf_ncm,co_via,co_urf,qt_estat,kg_liquido,vl_fob,vl_frete,vl_seguro
2021,12,85065010,11,160,SP,1,817800,1830130,8096,125561,10186,212
2021,2,85013110,11,573,PR,1,917800,6,20,4478,36,0
2021,7,73071920,10,160,SP,1,817800,86604,86604,480783,21677,22
2021,11,73182900,10,386,RS,1,817800,859,859,13332,860,8
2021,8,82032010,10,386,RS,1,817800,7,7,504,3,1
2021,7,40119090,11,249,RJ,4,717700,11,228,3007,2005,0
2021,11,40169300,10,399,RJ,4,717700,39,39,46459,1900,55
2021,4,85322200,11,160,AM,4,227700,7772342,7242,356729,90115,226
2021,2,82089000,10,386,MG,4,817600,3,3,2593,267,4
2021,6,85365090,11,160,SP,1,817800,3536381,79728,1692916,40686,1273


### Passando o arquivo CSV para PARQUET e lendo ele

In [9]:
df.write.parquet("exportacao2021PARQUET.parquet")
df1.write.parquet("exportacao2022PARQUET.parquet")
df2.write.parquet("importacao2021PARQUET.parquet")
df3.write.parquet("importacao2022PARQUET.parquet")

In [10]:
parquet_df = spark.read.parquet("exportacao2021PARQUET.parquet")
parquet_df1 = spark.read.parquet("exportacao2022PARQUET.parquet")
parquet_df2 = spark.read.parquet("importacao2021PARQUET.parquet")
parquet_df3 = spark.read.parquet("importacao2022PARQUET.parquet")

### Criação da TempView dos arquivos já em PARQUET

In [11]:
parquet_df.createOrReplaceTempView("exportacao2021")
parquet_df1.createOrReplaceTempView("exportacao2022")

parquet_df2.createOrReplaceTempView("importacao2021")
parquet_df3.createOrReplaceTempView("importacao2022")

### CONSULTAS QUESTÕES

#### Qual foi o valor total das exportações marítimas do Brasil em 2021?

In [12]:
spark.sql("""
  select co_mes, sum(co_mes) from exportacao2021
  where co_pais = 105
  group by co_mes
""")

co_mes,sum(co_mes)
12,2172
3,126
8,720
10,700
11,2464
1,2
6,6
5,10
4,4
9,9


#### Quais são os 5 principais produtos exportados em *2022* do Brasil

In [13]:
spark.sql("""
  select CO_NCM as produto, SUM(VL_FOB) soma_do_valor
  from exportacao2022
  group by CO_NCM
  order by soma_do_valor DESC
  LIMIT (5)
""")

produto,soma_do_valor
12019000,46553259740
27090010,42553764089
26011100,25734247774
10059010,12072359607
2023000,10916695699


#### Quais estados tiveram maior gasto com frete?

In [14]:
spark.sql("""
  select SG_UF_NCM as estado, MAX(VL_FRETE) as valor_frete from importacao2021
  group by SG_UF_NCM 
  order by valor_frete desc
  LIMIT (5)
""")

estado,valor_frete
RJ,17763780
SP,11893975
CE,11535803
AM,9265771
PE,8375929


#### Quais estados brasileiros mais recebem importações?

In [15]:
spark.sql("""
  select distinct SG_UF_NCM as estado, sum(KG_LIQUIDO) importados from importacao2022
  group by SG_UF_NCM
  order by importados desc
  LIMIT (5)
""")

estado,importados
SP,25838853966
PR,16052279399
RJ,14718260814
RS,14197374416
SC,14016648426


#### Diferença de frete do díesel entre 2021 e 2022

In [16]:
spark.sql("""
    SELECT importacao2021.co_mes, importacao2021.sg_uf_ncm, importacao2021.co_ncm, importacao2021.kg_liquido, importacao2021.vl_frete, importacao2022.co_mes, importacao2022.sg_uf_ncm, importacao2022.co_ncm, importacao2022.vl_frete, importacao2022.kg_liquido
     FROM importacao2021
     INNER JOIN importacao2022
     ON importacao2021.co_ncm = importacao2022.co_ncm and importacao2021.co_mes = importacao2022.co_mes and importacao2021.sg_uf_ncm = importacao2022.sg_uf_ncm 
     WHERE importacao2021.sg_uf_ncm = 'PE' and importacao2021.co_ncm = 27101921
     ORDER BY importacao2021.co_mes 
     limit(30)
 """)

co_mes,sg_uf_ncm,co_ncm,kg_liquido,vl_frete,co_mes.1,sg_uf_ncm.1,co_ncm.1,vl_frete.1,kg_liquido.1
2,PE,27101921,9627990,156927,2,PE,27101921,192260,6129032
2,PE,27101921,9627990,156927,2,PE,27101921,122190,2480019
3,PE,27101921,1724322,39273,3,PE,27101921,279165,7809287
4,PE,27101921,1623374,36727,4,PE,27101921,2033341,57052904
6,PE,27101921,12038774,343227,6,PE,27101921,930820,14387200
6,PE,27101921,12038774,343227,6,PE,27101921,265750,10424101
6,PE,27101921,12038774,343227,6,PE,27101921,581716,16293476
6,PE,27101921,98251905,685141,6,PE,27101921,930820,14387200
6,PE,27101921,98251905,685141,6,PE,27101921,265750,10424101
6,PE,27101921,98251905,685141,6,PE,27101921,581716,16293476


#### Qual via de importação foi mais utilizada no ano de 2022?


In [19]:
spark.sql("""
select co_via, count (CO_VIA) as total_utilizado 
from importacao2022
group by CO_VIA
order by total_utilizado desc
limit (6)
""")

co_via,total_utilizado
1,1002928
4,966851
7,44341
10,8728
9,250
5,66
