<a href="https://colab.research.google.com/github/gabrielfjm/TDE3-SPARK/blob/main/TDE3_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('tde3').getOrCreate()

sc = spark.sparkContext

# Carregar o dataset, remover o cabeçalho e tratar dados faltantes
url = '/content/drive/MyDrive/FACULDADE/operacoes_comerciais_inteira.csv'

rdd = sc.textFile(url)
rdd = rdd.filter(lambda line: "Country" not in line).filter(lambda line: line.strip() != "")

In [None]:
# 1 - Contar transações envolvendo o Brasil:
brazil_transactions = rdd.filter(lambda line: "Brazil" in line.split(";")[0]).count()
print(brazil_transactions)

184748


In [None]:
# 2 - Número de transações envolvendo o Brasil com PairRDD:
brazil_pair_rdd = rdd.filter(lambda line: "Brazil" in line.split(";")[0]).map(lambda line: ("Brazil", 1)).reduceByKey(lambda x, y: x + y)
print(brazil_pair_rdd.collect())

[('Brazil', 184748)]


In [None]:
# 3 - Número de transações envolvendo o Brasil em 2016 com RDD:
brazil_2016 = rdd.filter(lambda line: "Brazil" in line.split(";")[0] and "2016" in line.split(";")[1]).count()
print(brazil_2016)

6210


In [None]:
# 4 - Número de transações envolvendo o Brasil em 2016 com PairRDD:
brazil2016_pair_rdd = rdd.filter(lambda line: "Brazil" in line.split(";")[0] and "2016" in line.split(";")[1]).map(lambda line: ("Brazil", 1)).reduceByKey(lambda x, y: x + y)
print(brazil2016_pair_rdd.collect())

[('Brazil', 6210)]


In [None]:
# 5 - Transações por flow e ano a partir de 2010:
# Filtrar dados a partir de 2010, ignorando linhas com formato inesperado
rdd_filtered = rdd.filter(lambda line: len(line.split(";")) > 4 and line.split(";")[1].isdigit() and int(line.split(";")[1]) >= 2010)

# Transformar Flow em maiúsculas e mapear (ano, flow) como chave, com valor 1 para contagem
flow_year_rdd = rdd_filtered.map(lambda line: ((line.split(";")[1], line.split(";")[4].upper()), 1))

# Contar o número de transações por ano e flow
flow_year_count = flow_year_rdd.reduceByKey(lambda x, y: x + y)

# Ordenar por ano
flow_year_count_sorted = flow_year_count.sortByKey()

# Exibir o resultado
for record in flow_year_count_sorted.collect():
    print(record)

(('2010', 'EXPORT'), 125013)
(('2010', 'IMPORT'), 222447)
(('2010', 'RE-EXPORT'), 17058)
(('2010', 'RE-IMPORT'), 9273)
(('2011', 'EXPORT'), 125909)
(('2011', 'IMPORT'), 220850)
(('2011', 'RE-EXPORT'), 17564)
(('2011', 'RE-IMPORT'), 10273)
(('2012', 'EXPORT'), 128072)
(('2012', 'IMPORT'), 222034)
(('2012', 'RE-EXPORT'), 16900)
(('2012', 'RE-IMPORT'), 10337)
(('2013', 'EXPORT'), 127904)
(('2013', 'IMPORT'), 216282)
(('2013', 'RE-EXPORT'), 16758)
(('2013', 'RE-IMPORT'), 9992)
(('2014', 'EXPORT'), 125142)
(('2014', 'IMPORT'), 208749)
(('2014', 'RE-EXPORT'), 20139)
(('2014', 'RE-IMPORT'), 10424)
(('2015', 'EXPORT'), 125878)
(('2015', 'IMPORT'), 203920)
(('2015', 'RE-EXPORT'), 18966)
(('2015', 'RE-IMPORT'), 10288)
(('2016', 'EXPORT'), 106104)
(('2016', 'IMPORT'), 160235)
(('2016', 'RE-EXPORT'), 16147)
(('2016', 'RE-IMPORT'), 7024)


In [None]:
# 6 - Média do Price em 2016 com RDD:
# Filtrar as linhas para o ano de 2016, ignorando linhas com dados faltantes
prices_2016_rdd = rdd.filter(lambda line: len(line.split(";")) > 5 and line.split(";")[1] == "2016")

# Extrair a coluna Price, convertendo para float
prices_2016 = prices_2016_rdd.map(lambda line: float(line.split(";")[5]))

# Calcular a média dos preços
total_price = prices_2016.sum()
count_price = prices_2016.count()
average_price_usd = total_price / count_price if count_price > 0 else 0

# Conversão de USD para BRL (taxa de conversão fixa para este exemplo)
conversion_rate = 5.0  # Exemplo: 1 USD = 5 BRL
average_price_brl = average_price_usd * conversion_rate

# Formatar o valor em reais
average_price_brl_formatted = f"R$ {average_price_brl:,.2f}"
print(average_price_brl_formatted)

R$ 689,923,030.98


In [None]:
# 7 - Média do Price em 2016 com PairRDD:
# Taxa de conversão de USD para BRL
conversion_rate = 5.0  # Exemplo: 1 USD = 5 BRL

# Filtrar dados para o ano de 2016 e garantir que a coluna Price é um valor numérico
price_2016_rdd = rdd.filter(lambda line: len(line.split(";")) > 5 and line.split(";")[1] == "2016" and line.split(";")[5].replace('.', '', 1).isdigit())

# Mapear para chave (2016) e valor (Price, 1) para facilitar a média
price_pair_rdd = price_2016_rdd.map(lambda line: ("2016", (float(line.split(";")[5]), 1)))

# Reduzir somando os valores de Price e as contagens
price_sum_count = price_pair_rdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

# Calcular a média, aplicar a taxa de conversão e formatar em reais
price_avg = price_sum_count.mapValues(lambda x: f"R$ {(x[0] / x[1]) * conversion_rate:,.2f}")

# Exibir o resultado
for record in price_avg.collect():
    print(record)

('2016', 'R$ 689,923,030.98')


In [None]:
# 8 - O preço máximo e mínimo por categoria e por ano, ordenado por país com PairRDD:
# Definir a taxa de conversão de USD para BRL
conversion_rate = 5.0  # Exemplo: 1 USD = 5 BRL

# Filtrar o RDD para garantir que o ano, categoria e price são válidos
filtered_rdd = rdd.filter(lambda line: len(line.split(";")) > 6 and line.split(";")[5].replace('.', '', 1).isdigit())

# Mapear para (chave, valor) onde chave = (ano, categoria em maiúsculas) e valor = (price)
# Extrair ano, categoria, e price e garantir que categoria esteja em letras maiúsculas
pair_rdd = filtered_rdd.map(lambda line: (
    (line.split(";")[1], line.split(";")[9].upper()),  # Chave: (ano, categoria em maiúsculas)
    float(line.split(";")[5])  # Valor: preço convertido para float
))

# Calcular o máximo e mínimo para cada (ano, categoria)
# Usar reduceByKey para encontrar o máximo e mínimo de cada chave
min_max_rdd = pair_rdd.combineByKey(
    lambda price: (price, price),  # Inicializar o par (max, min) com o primeiro preço
    lambda acc, price: (max(acc[0], price), min(acc[1], price)),  # Atualizar máximo e mínimo
    lambda acc1, acc2: (max(acc1[0], acc2[0]), min(acc1[1], acc2[1]))  # Combinar para o máximo e mínimo final
)

# Converter os preços de USD para BRL e formatar o resultado
min_max_rdd_brl = min_max_rdd.mapValues(lambda x: (f"R$ {x[0] * conversion_rate:,.2f}", f"R$ {x[1] * conversion_rate:,.2f}"))

# Exibir o resultado ordenado por ano e categoria
for record in min_max_rdd_brl.sortByKey().collect():
    print(record)

(('1988', '01_LIVE_ANIMALS'), ('R$ 904,041,860.00', 'R$ 545.00'))
(('1988', '02_MEAT_AND_EDIBLE_MEAT_OFFAL'), ('R$ 7,479,678,285.00', 'R$ 115.00'))
(('1988', '03_FISH_CRUSTACEANS_MOLLUSCS_AQUATIC_INVERTEBRATES_NE'), ('R$ 788,415,965.00', 'R$ 90.00'))
(('1988', '04_DAIRY_PRODUCTS_EGGS_HONEY_EDIBLE_ANIMAL_PRODUCT_NES'), ('R$ 7,373,920,000.00', 'R$ 60.00'))
(('1988', '05_PRODUCTS_OF_ANIMAL_ORIGIN_NES'), ('R$ 1,111,410,000.00', 'R$ 35.00'))
(('1988', '06_LIVE_TREES_PLANTS_BULBS_ROOTS_CUT_FLOWERS_ETC'), ('R$ 4,336,074,880.00', 'R$ 125.00'))
(('1988', '07_EDIBLE_VEGETABLES_AND_CERTAIN_ROOTS_AND_TUBERS'), ('R$ 2,216,529,920.00', 'R$ 230.00'))
(('1988', '08_EDIBLE_FRUIT_NUTS_PEEL_OF_CITRUS_FRUIT_MELONS'), ('R$ 2,276,124,960.00', 'R$ 10.00'))
(('1988', '09_COFFEE_TEA_MATE_AND_SPICES'), ('R$ 9,269,400,320.00', 'R$ 40.00'))
(('1988', '10_CEREALS'), ('R$ 10,434,202,650.00', 'R$ 35.00'))
(('1988', '11_MILLING_PRODUCTS_MALT_STARCHES_INULIN_WHEAT_GLUTE'), ('R$ 1,093,093,515.00', 'R$ 130.00'))
(('1988

In [None]:
# 9 - Retornar um único valor para PairRDD contendo o país com a maior exportação (Flow=Export):
# Filtrar o RDD para transações de exportação e garantir que o preço é numérico
export_rdd = rdd.filter(lambda line: len(line.split(";")) > 5 and line.split(";")[4] == "Export" and line.split(";")[5].replace('.', '', 1).isdigit())

# Mapear para (chave, valor) onde chave = (país, flow) e valor = preço convertido para float
export_pair_rdd = export_rdd.map(lambda line: (line.split(";")[0], float(line.split(";")[5])))

# Somar os preços de exportação por país
export_total_per_country = export_pair_rdd.reduceByKey(lambda x, y: x + y)

# Encontrar o país com a maior exportação
country_with_max_export = export_total_per_country.takeOrdered(1, key=lambda x: -x[1])[0]  # Ordena e pega o maior valor

# Formatando a saída final para mostrar como esperado
resultado = ((country_with_max_export[0], "Export"), f"R$ {country_with_max_export[1] * 5.0:,.2f}")  # Convertendo para BRL e formatando
print(resultado)

(('EU-28', 'Export'), 'R$ 185,677,734,421,835.00')


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Inicializar SparkSession (necessário para usar SparkSQL)
spark = SparkSession.builder.appName("TDE3").getOrCreate()

In [None]:
# 10 - Retornar o preço máximo por país e por ano utilizando as funções do SparkSQL:
# Transformar RDD em DataFrame
# Separar as colunas e garantir que ano e preço são numéricos
data_rows = rdd.map(lambda line: line.split(";")) \
               .filter(lambda fields: len(fields) > 5 and fields[1].isdigit() and fields[5].replace('.', '', 1).isdigit()) \
               .map(lambda fields: Row(country=fields[0], year=int(fields[1]), price=float(fields[5])))

# Criar DataFrame a partir do RDD
df = spark.createDataFrame(data_rows)

# Registrar o DataFrame como uma tabela temporária
df.createOrReplaceTempView("transactions")

# Executar consulta SQL para encontrar o preço máximo por país e ano
query = """
SELECT country, year, MAX(price) AS max_price
FROM transactions
GROUP BY country, year
ORDER BY country, year
"""

max_price_by_country_and_year = spark.sql(query)

# Exibir o resultado formatado, convertendo os preços para reais
conversion_rate = 5.0  # Exemplo: 1 USD = 5 BRL
result = max_price_by_country_and_year.withColumn("max_price_brl", max_price_by_country_and_year["max_price"] * conversion_rate) \
                                      .select("country", "year", "max_price_brl")

# Mostrar os resultados
result.show(truncate=False)

+-----------+----+---------------+
|country    |year|max_price_brl  |
+-----------+----+---------------+
|Afghanistan|2008|1.5099300645E10|
|Afghanistan|2009|1.6682173905E10|
|Afghanistan|2010|2.5771249335E10|
|Afghanistan|2011|3.1951554735E10|
|Afghanistan|2012|3.1024920505E10|
|Afghanistan|2013|4.2772068745E10|
|Afghanistan|2014|3.848589085E10 |
|Afghanistan|2015|3.8614325245E10|
|Afghanistan|2016|3.2670702065E10|
|Albania    |1996|4.69240896E9   |
|Albania    |1997|3.14513152E9   |
|Albania    |1998|4.20628092E9   |
|Albania    |1999|5.771755845E9  |
|Albania    |2000|5.447339395E9  |
|Albania    |2001|6.65315533E9   |
|Albania    |2002|7.51842603E9   |
|Albania    |2003|9.3216443E9    |
|Albania    |2004|1.1500841705E10|
|Albania    |2005|1.307173506E10 |
|Albania    |2006|1.528704213E10 |
+-----------+----+---------------+
only showing top 20 rows



In [None]:
# 11 - Buscar o preço mínimo por país e por ano, filtrado de forma crescente por ano e país (ordem alfabética):
# Definir a taxa de conversão de USD para BRL
conversion_rate = 5.0  # Exemplo: 1 USD = 5 BRL

# Transformar RDD em DataFrame
data_rows = rdd.map(lambda line: line.split(";")) \
               .filter(lambda fields: len(fields) > 5 and fields[1].isdigit() and fields[5].replace('.', '', 1).isdigit()) \
               .map(lambda fields: Row(country=fields[0], year=int(fields[1]), price=float(fields[5])))

# Criar DataFrame a partir do RDD
df = spark.createDataFrame(data_rows)

# Registrar o DataFrame como uma tabela temporária
df.createOrReplaceTempView("transactions")

# Executar consulta SQL para encontrar o preço mínimo por país e ano
query = """
SELECT country, year, MIN(price) AS min_price
FROM transactions
GROUP BY country, year
ORDER BY year ASC, country ASC
"""

min_price_by_country_and_year = spark.sql(query)

# Converter o preço mínimo para reais e formatar
min_price_in_brl = min_price_by_country_and_year.withColumn("min_price_brl", min_price_by_country_and_year["min_price"] * conversion_rate) \
                                                .select("country", "year", "min_price_brl")

# Exibir o resultado
min_price_in_brl.show(truncate=False)

+------------------------+----+-------------+
|country                 |year|min_price_brl|
+------------------------+----+-------------+
|Australia               |1988|35.0         |
|Finland                 |1988|10.0         |
|Fmr Fed. Rep. of Germany|1988|5000.0       |
|Greece                  |1988|10.0         |
|Haiti                   |1988|505.0        |
|Iceland                 |1988|40.0         |
|India                   |1988|30.0         |
|Japan                   |1988|8035.0       |
|Portugal                |1988|25.0         |
|Rep. of Korea           |1988|5.0          |
|Switzerland             |1988|35.0         |
|Thailand                |1988|20.0         |
|Australia               |1989|5.0          |
|Bangladesh              |1989|5.0          |
|Brazil                  |1989|5.0          |
|Canada                  |1989|20.0         |
|Cyprus                  |1989|10.0         |
|Denmark                 |1989|475.0        |
|Finland                 |1989|35.

In [None]:
# 12 - Transação que teve o maior preço por kg do tipo export. Buscar essa informação e indicar em qual ano, país e categoria, ela aconteceu:
# Definir a taxa de conversão de USD para BRL
conversion_rate = 5.0  # Exemplo: 1 USD = 5 BRL

# Transformar o RDD em DataFrame e garantir que os campos necessários são válidos
data_rows = rdd.map(lambda line: line.split(";")) \
               .filter(lambda fields: len(fields) > 7 and fields[4] == "Export" and
                       fields[5].replace('.', '', 1).isdigit() and
                       fields[6].replace('.', '', 1).isdigit()) \
               .map(lambda fields: Row(country=fields[0], year=int(fields[1]), category=fields[9],
                                       price=float(fields[5]), weight=float(fields[6])))

# Criar DataFrame a partir do RDD
df = spark.createDataFrame(data_rows)

# Registrar o DataFrame como uma tabela temporária
df.createOrReplaceTempView("transactions")

# Executar consulta SQL para calcular o preço por kg e encontrar o valor máximo para exportações
query = f"""
SELECT country, year, category, (price / weight) * {conversion_rate} AS price_per_kg_brl
FROM transactions
WHERE weight > 0
ORDER BY price_per_kg_brl DESC
LIMIT 1
"""

max_price_per_kg_export = spark.sql(query)

# Exibir o resultado
max_price_per_kg_export.show(truncate=False)

+-------+----+--------------------------------------------+----------------+
|country|year|category                                    |price_per_kg_brl|
+-------+----+--------------------------------------------+----------------+
|Norway |1999|89_ships_boats_and_other_floating_structures|1.463344E9      |
+-------+----+--------------------------------------------+----------------+

