In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=760df90317b1ce610236386d69b01d02523789152c91dcb520804833b9fa84f9
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
!wget https://jpbarddal.github.io/assets/data/bigdata/transactions_amostra.csv.zip
!unzip transactions_amostra.csv.zip

--2023-05-20 03:24:41--  https://jpbarddal.github.io/assets/data/bigdata/transactions_amostra.csv.zip
Resolving jpbarddal.github.io (jpbarddal.github.io)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jpbarddal.github.io (jpbarddal.github.io)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 47513871 (45M) [application/zip]
Saving to: ‘transactions_amostra.csv.zip’


2023-05-20 03:24:42 (201 MB/s) - ‘transactions_amostra.csv.zip’ saved [47513871/47513871]

Archive:  transactions_amostra.csv.zip
  inflating: transactions_amostra.csv  
  inflating: __MACOSX/._transactions_amostra.csv  


In [55]:
from pyspark.sql import SparkSession

# Inicializa o Spark
spark = SparkSession.builder\
    .master('local[*]')\
    .appName("")\
    .getOrCreate()
    
sc = spark.sparkContext

# Carrega o conjunto de dados a partir do arquivo CSV
dataset = sc.textFile("transactions_amostra.csv")

In [6]:
# Printando as colunas

columns = dataset.take(1)[0].split(";")
print("Colunas do dataset:")
for column in columns:
    print(column)


Colunas do dataset:
country_or_area
year
comm_code
commodity
flow
trade_usd
weight_kg
quantity_name
quantity
category


#Problema 1: O número de transações envolvendo o Brasil


In [7]:
# Filtra as transações que envolvem o Brasil
transacoes_brasil = dataset.filter(lambda linha: "Brazil" in linha)

# Converte para um PairRDD com uma chave constante
transacoes_brasil_com_chave = transacoes_brasil.map(lambda linha: ("brasil", 1))

# Calcula o total de transações envolvendo o Brasil
total_transacoes_brasil = transacoes_brasil_com_chave.reduceByKey(lambda a, b: a + b)

# Recupera a contagem
contagem = total_transacoes_brasil.collect()[0][1]

print("Total de transações envolvendo o Brasil:", contagem)

Total de transações envolvendo o Brasil: 27693


#Problema 2: O número de transações por tipo de fluxo e ano


In [8]:
# Mapeia as transações para o par ((tipo de fluxo, ano), 1)
transacoes_por_fluxo_ano = dataset.map(lambda linha: ((linha.split(";")[4], linha.split(";")[1]), 1)) \
    .reduceByKey(lambda a, b: a + b)

# Itera sobre os resultados e imprime o tipo de fluxo, ano e contagem de transações
for fluxo_ano, contagem in transacoes_por_fluxo_ano.collect():
    print("Tipo de fluxo e ano:", fluxo_ano, contagem)

Tipo de fluxo e ano: ('Import', '2013') 30689
Tipo de fluxo e ano: ('Import', '2010') 31435
Tipo de fluxo e ano: ('Import', '2003') 29997
Tipo de fluxo e ano: ('Export', '2015') 17756
Tipo de fluxo e ano: ('Export', '2014') 17984
Tipo de fluxo e ano: ('Import', '2012') 29987
Tipo de fluxo e ano: ('Import', '2014') 29723
Tipo de fluxo e ano: ('Import', '1995') 19375
Tipo de fluxo e ano: ('Export', '2012') 17863
Tipo de fluxo e ano: ('Export', '2013') 18063
Tipo de fluxo e ano: ('Re-Export', '2003') 1706
Tipo de fluxo e ano: ('Re-Export', '2013') 2225
Tipo de fluxo e ano: ('Re-Import', '2005') 1323
Tipo de fluxo e ano: ('Re-Export', '1995') 1132
Tipo de fluxo e ano: ('Re-Export', '1990') 518
Tipo de fluxo e ano: ('flow', 'year') 1
Tipo de fluxo e ano: ('Export', '2008') 17445
Tipo de fluxo e ano: ('Import', '2008') 29883
Tipo de fluxo e ano: ('Import', '2005') 31124
Tipo de fluxo e ano: ('Export', '2009') 17825
Tipo de fluxo e ano: ('Export', '1989') 3547
Tipo de fluxo e ano: ('Re-Import

# Problema 3: A média dos valores das commodities por ano



In [14]:
# Filtra as linhas que possuem um valor numérico válido na coluna de valores de commodities
valores_commodities = dataset.filter(lambda linha: linha.split(";")[5].replace(".", "").isdigit()) \
    .map(lambda linha: (linha.split(";")[1], float(linha.split(";")[5])))

# Calcula a soma e contagem dos valores das commodities por ano usando a função aggregateByKey
valores_commodities_por_ano = valores_commodities.aggregateByKey((0.0, 0),
                                                                  lambda acumulador, valor: (acumulador[0] + valor, acumulador[1] + 1),
                                                                  lambda acumulador1, acumulador2: (acumulador1[0] + acumulador2[0], acumulador1[1] + acumulador2[1]))

# Calcula a média dos valores das commodities por ano usando a função mapValues
media_valores_por_ano = valores_commodities_por_ano.mapValues(lambda acumulador: acumulador[0] / acumulador[1])

# Itera sobre os resultados e imprime o valor médio da commodity para cada ano
for ano, media in media_valores_por_ano.collect():
    print("Valor médio da commodity para o ano", ano, ":", media)

Valor médio da commodity para o ano 2016 : 29418327.57526777
Valor médio da commodity para o ano 2011 : 33559943.50890797
Valor médio da commodity para o ano 2003 : 13028917.611334749
Valor médio da commodity para o ano 1993 : 10353959.855309162
Valor médio da commodity para o ano 2007 : 23710673.174875777
Valor médio da commodity para o ano 2014 : 46120404.41345007
Valor médio da commodity para o ano 2012 : 39028921.881444596
Valor médio da commodity para o ano 1995 : 12286454.103356835
Valor médio da commodity para o ano 2006 : 21175872.541099638
Valor médio da commodity para o ano 1997 : 9549881.214776853
Valor médio da commodity para o ano 2005 : 18673099.052178193
Valor médio da commodity para o ano 1999 : 9561516.927263891
Valor médio da commodity para o ano 1992 : 9402960.863025468
Valor médio da commodity para o ano 1994 : 11350325.049077941
Valor médio da commodity para o ano 2004 : 15388487.793083541
Valor médio da commodity para o ano 1991 : 13069223.85515173
Valor médio da 

# Problema 4: O preço médio das commodities por tipo de unidade, ano e categoria no fluxo de exportação no Brasil


In [15]:
# Filtra as transações que envolvem o Brasil e estão no fluxo de exportação
export_flow_brazil = dataset.filter(lambda line: "Brazil" in line and line.split(";")[4] == "Export")

# Mapeia as linhas para obter o tipo de unidade, ano, categoria e preço da commodity
commodity_price_per_unit_type = export_flow_brazil.map(lambda line: ((line.split(";")[7], line.split(";")[1], line.split(";")[8]), float(line.split(";")[5])))

# Calcula a soma e contagem dos preços das commodities por tipo de unidade, ano e categoria usando a função aggregateByKey
commodity_price_per_unit_type_category = commodity_price_per_unit_type.aggregateByKey((0.0, 0),
                                                                                      lambda acumulador, valor: (acumulador[0] + valor, acumulador[1] + 1),
                                                                                      lambda acumulador1, acumulador2: (acumulador1[0] + acumulador2[0], acumulador1[1] + acumulador2[1]))

# Calcula o preço médio das commodities por tipo de unidade, ano e categoria usando a função mapValues
average_price_per_unit_type_category = commodity_price_per_unit_type_category.mapValues(lambda acumulador: acumulador[0] / acumulador[1])

print("Preço médio das commodities por tipo de unidade, ano e categoria no fluxo de exportação no Brasil:")

# Coleta os resultados e imprime cada elemento
result = average_price_per_unit_type_category.collect()
for element in result:
    print(element)

[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
(('Weight in kilograms', '1993', '105817.0'), 1053372.0)
(('Weight in kilograms', '2006', '24211.0'), 137098.0)
(('Weight in kilograms', '1995', '26000.0'), 9949.0)
(('Weight in kilograms', '2004', '23984005.0'), 84859819.0)
(('Weight in kilograms', '2005', '7269.0'), 45357.0)
(('Weight in kilograms', '1997', '669.0'), 10931.0)
(('Weight in kilograms', '2005', '11682.0'), 7031.0)
(('Weight in kilograms', '1992', '105000.0'), 6461.0)
(('Weight in kilograms', '2006', '9612.0'), 62827.0)
(('Number of items', '2012', '201.0'), 11594.0)
(('Weight in kilograms', '1997', '57824.0'), 653383.0)
(('Weight in kilograms', '1991', '201.0'), 12320.0)
(('Weight in kilograms', '1999', '27223.0'), 150872.0)
(('Weight in kilograms', '2000', '8441773.0'), 506553.0)
(('Weight in kilograms', '2014', '1341250.0'), 1183625.0)
(('Weight in kilograms', '1998', '2321912.0'), 29202302.0)
(('Number of items', '2015', '1880.0'), 95673.0)
(('W

# Problema 5: O preço máximo, mínimo e médio por tipo de unidade e ano


In [60]:
# Processar o conjunto de dados
transacoes = dataset.map(lambda linha: ((linha.split(";")[7], linha.split(";")[1]), float(linha.split(";")[5].replace(",", ".")) if linha.split(";")[5].replace(",", ".").replace(".", "").isdigit() else 0))

# Filtrar transações com preços inválidos
transacoes_validas = transacoes.filter(lambda transacao: transacao[1] != 0)

# Calcular preço máximo, mínimo e médio por tipo de unidade e ano
estatisticas_preco_por_unidade_ano = transacoes_validas.aggregateByKey(
    (float('-inf'), float('inf'), 0.0, 0),
    lambda a, b: (max(a[0], b), min(a[1], b), a[2] + b, a[3] + 1),
    lambda a, b: (max(a[0], b[0]), min(a[1], b[1]), a[2] + b[2], a[3] + b[3])
)

# Calcular preço médio da transação
preco_medio_por_unidade_ano = estatisticas_preco_por_unidade_ano.mapValues(lambda v: (v[0], v[1], v[2] / v[3]))

# Iterar sobre os resultados e imprimir preço máximo, mínimo e médio por tipo de unidade e ano
for (unidade_ano, (preco_maximo, preco_minimo, preco_medio)) in preco_medio_por_unidade_ano.collect():
    tipo_unidade, ano = unidade_ano
    print("Tipo de Unidade:", tipo_unidade)
    print("Ano:", ano)
    print("Preço Máximo:", preco_maximo)
    print("Preço Mínimo:", preco_minimo)
    print("Preço Médio:", preco_medio)
    print()


Tipo de Unidade: Weight in kilograms
Ano: 1996
Preço Máximo: 9884802991.0
Preço Mínimo: 1.0
Preço Médio: 10828237.657696381

Tipo de Unidade: Weight in kilograms
Ano: 1997
Preço Máximo: 3896247558.0
Preço Mínimo: 1.0
Preço Médio: 8108849.113137338

Tipo de Unidade: Weight in kilograms
Ano: 2001
Preço Máximo: 10391536933.0
Preço Mínimo: 1.0
Preço Médio: 8992010.407064678

Tipo de Unidade: Weight in kilograms
Ano: 2005
Preço Máximo: 45281302495.0
Preço Mínimo: 1.0
Preço Médio: 18390801.82820919

Tipo de Unidade: Weight in kilograms
Ano: 2006
Preço Máximo: 66411903451.0
Preço Mínimo: 1.0
Preço Médio: 20259168.047239587

Tipo de Unidade: Area in square metres
Ano: 2004
Preço Máximo: 2702287262.0
Preço Mínimo: 10.0
Preço Médio: 9700523.179864254

Tipo de Unidade: Weight in kilograms
Ano: 1988
Preço Máximo: 3667587072.0
Preço Mínimo: 7.0
Preço Médio: 14612962.890881147

Tipo de Unidade: Number of items
Ano: 1996
Preço Máximo: 5099083250.0
Preço Mínimo: 1.0
Preço Médio: 26330183.199369583

Ti

# Problema 6: O país com o maior preço médio de commodities na categoria Exportação


In [39]:
# Filtra o conjunto de dados para a categoria de exportação
exportacao = dataset.filter(lambda line: line.split(";")[4] == "Export")

# Mapeia cada linha para pares chave-valor (país, preço)
preco_por_pais = exportacao.map(lambda line: (line.split(";")[0], float(line.split(";")[5])))

# Calcula a soma e contagem dos preços para cada país
soma_contagem_por_pais = preco_por_pais.aggregateByKey(
    (0.0, 0),
    lambda acc, price: (acc[0] + price, acc[1] + 1),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
)

# Calcula o preço médio para cada país
preco_medio_por_pais = soma_contagem_por_pais.mapValues(lambda acc: acc[0] / acc[1])

# Encontra o país com o maior preço médio
pais_com_maior_preco_medio = preco_medio_por_pais.max(lambda x: x[1])

# Imprime o resultado
print("País com o maior preço médio de commodities na categoria Exportação:",
      pais_com_maior_preco_medio[0], pais_com_maior_preco_medio[1])

País com o maior preço médio de commodities na categoria Exportação: Angola 16369666068.142857


# Problema 7: A commodity mais comercializada (somando as quantidades) em 2016 por tipo de fluxo


In [40]:
# Filtra o conjunto de dados para o ano de 2016
commodities_2016 = dataset.filter(lambda line: line.split(";")[1] == "2016")

# Mapeia cada linha para pares chave-valor ((commodity, tipo de fluxo), quantidade)
quantidades_commodities_2016 = commodities_2016.map(lambda line: ((line.split(";")[3], line.split(";")[4]), float(line.split(";")[8])))

# Calcula a soma das quantidades para cada commodity e tipo de fluxo
quantidades_commodities_fluxo = quantidades_commodities_2016.reduceByKey(lambda a, b: a + b)

# Encontra a commodity mais comercializada em 2016
commodity_mais_comercializada = quantidades_commodities_fluxo.max(key=lambda x: x[1])

# Imprime o resultado
print("Commodity mais comercializada em 2016:", commodity_mais_comercializada[0])

Commodity mais comercializada em 2016: ('Iron ore, concentrate, not iron pyrites,unagglomerate', 'Export')
