In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 25 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 47.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=8bc478136c1d44706f5e8019c4577a853e2cc100e78114522f9278d410fd20ab
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkFiles

spark = SparkSession.builder.master('local[*]').\
        appName('alex').getOrCreate()
        
sc = spark.sparkContext

sc.addFile('/content/drive/MyDrive/transactions_amostra.csv')

# carregando o arquivo
rdd = sc.textFile('file://' + SparkFiles.get('transactions_amostra.csv'))

# fazendo split de sentenças em palavras
palavras = rdd.map(lambda frase: frase.split(';')).filter(lambda l: l[1].isnumeric())

In [8]:
# Questão1 
# 1. (Easy) The number of transactions involving Brazil;

# primeiramente fazemos um filter pra filtrar todas as linhas em que o 'Brazil' se encontra passando x[0], depois fazemos um map para contar todas as ocorrencias, e por fim somamos tudo usando um reduceByKey.
questao1 = palavras.filter(lambda x: 'Brazil' in x[0]).map(lambda x: (x[0], 1)).reduceByKey(lambda x ,y: x + y)
questao1.take(5)

questao1.coalesce(1).saveAsTextFile('questao1.txt')


[('Brazil', 54762)]

In [9]:
# Questão2
# 2. (Easy) The number of transactions per year;

# primeiramente fazemos um map na coluna 'year' e passando 1 como ocorrencia de cada ano que é encontrado, por fim fazemos somamos tudo usando um reduceByKey para retornar a soma das ocorrencias de todos os anos.
questao2 = palavras.map(lambda x: (x[1], 1)).reduceByKey(lambda x ,y: x + y)
questao2.take(5)

questao2.coalesce(1).saveAsTextFile('questao2.txt')

[('2011', 105486),
 ('1991', 22652),
 ('2005', 105913),
 ('1999', 86191),
 ('2000', 97572)]

In [10]:
# Questão3 
# 3. (Easy) The number of transactions per flow type and year;

# primeiramente fazemos um map na coluna ano e na coluna de flow type com uma espaço no meio para separar as strings, e passando 1 como contagem de cada linha econtrada. Por fim fazemos um reduceByKey para realizar o somatório de todas as ocorrencias.
questao3 = palavras.map(lambda x: (x[1] + " " + x[4], 1)).reduceByKey(lambda x ,y: x + y)
questao3.take(5)

questao3.coalesce(1).saveAsTextFile('questao3.txt')

[('1997 Import', 47599),
 ('2000 Export', 33582),
 ('2014 Re-Export', 5573),
 ('2013 Export', 36090),
 ('1998 Re-Export', 3017)]

In [17]:
#Questão 4
# 4. (Easy) The average of commodity values per year;

# primeiramente fazemos um map na coluna ano e na coluna "trade_usd", depois fazemos um groupByKey para agrupar as chaves e os valores e por fim fazer novamente um map passando pelas chaves e realizando uma média entre os valores
questao4 = palavras.map(lambda x: (x[1], float(x[5]))).groupByKey()\
.map(lambda x: (x[0], sum(list(x[1])) / len(list(x[1]))))

questao4.take(5)
questao4.saveAsTextFile('questao4.txt')

[('2011', 37056771.709060915),
 ('1991', 12069165.192609925),
 ('2005', 17458172.20780263),
 ('1999', 9328194.404264946),
 ('2000', 12780250.522332227)]

In [21]:
# Questão 5
# (Easy) The average price of commodities per unit type, year, and category in the export flow in Brazil;

# primeiramente fazer um filter pra pegar apenas o pais "Brazil" e o flow type como "Export", depois fazemos um map pegando o unit, year, category, flowType e pais
# depois fazemos um groupByKey para agrupar as chaves e os valores e por fim um ultimpo map passando pelas chaves e realizando as medias dos valores.

questao5 = palavras.filter(lambda x: x[0] == "Brazil" and x[4] == "Export")
finalQT5 = questao5.map(lambda x: ((x[7] + "" + x[1] + " " + x[9] + " " + x[4] + " " + x[0]), float(x[5]))).groupByKey()\
.map(lambda x: (x[0], sum(list(x[1])) / len(list(x[1]))))

finalQT5.take(10)
finalQT5.saveAsTextFile('questao5.txt')

[('Weight in kilograms2016 62_articles_of_apparel_accessories_not_knit_or_crochet Export Brazil',
  244220.66666666666),
 ('Weight in kilograms1990 87_vehicles_other_than_railway_tramway Export Brazil',
  37357374.36363637),
 ('Weight in kilograms2002 78_lead_and_articles_thereof Export Brazil',
  312033.0),
 ('Weight in kilograms1992 08_edible_fruit_nuts_peel_of_citrus_fruit_melons Export Brazil',
  13613885.857142856),
 ('Weight in kilograms2014 18_cocoa_and_cocoa_preparations Export Brazil',
  76505492.0),
 ('Weight in kilograms2005 48_paper_paperboard_articles_of_pulp_paper_and_board Export Brazil',
  6539302.84),
 ('Weight in kilograms1992 72_iron_and_steel Export Brazil',
  29707629.604651164),
 ('Weight in kilograms1998 40_rubber_and_articles_thereof Export Brazil',
  15267148.933333334),
 ('Weight in kilograms2000 03_fish_crustaceans_molluscs_aquatic_invertebrates_ne Export Brazil',
  2449754.9545454546),
 ('Weight in kilograms2003 82_tools_implements_cutlery_etc_of_base_metal 

In [23]:
# Questao 6 
# (Medium) The maximum, minimum, and mean transaction price per unit type and year;

# primeiramente fazemos um map passando pela 'quantity_name', 'year' e passando como valor a 'trade_usd', depois fazemos um groupByKey para agrupar as chaves e os valores
# e por fim um novo map printando o valor minimo encontrado, maximo e a média.

questao6 = palavras.map(lambda x: ((x[7], x[1]), float(x[5]))).groupByKey().map(lambda x: (x[0], [min(x[1]), max(x[1]), sum(x[1]) / len(x[1])]))
questao6.take(5)

questao6.saveAsTextFile('questao6.txt')

[(('Weight in kilograms', '2016'), [1.0, 56865611547.0, 27962304.5069153]),
 (('Weight in kilograms', '2010'), [1.0, 105814257878.0, 28822456.939314935]),
 (('Weight in kilograms', '1990'), [1.0, 4480755200.0, 10720973.53031925]),
 (('Weight in kilograms', '2014'), [1.0, 359475936313.0, 41934737.83217836]),
 (('Weight in kilograms', '2006'), [1.0, 262924347374.0, 22895231.261773862])]

In [31]:
# Questao 7
# (Hard) The most commercialized commodity (summing the quantities) in 2016, per flow type.

# primeiramente filtramos as linhas que possuem 2016 como ano, depois fazemos uma map pela coluna commodity e por flowtype, passando como valor a coluna de quantity
# depois fazemos um groupByKey para agrupar as chaves e os valores
# depois fazemos um novo map pegando o flow type na posicao x[0][1], e a soma dos valores na quantity
# por fim fazemos um reduceByKey com um if para pegar os maiores valores de cada flow type.

questao7 = palavras.filter(lambda x: '2016' in x[1]).map(lambda x: ((x[3], x[4]), float(x[8]))).groupByKey().map(lambda x: (x[0][1], (sum(list(x[1])), x[0][0]))).reduceByKey(lambda x, y: x if x > y else y)
questao7.take(5)

questao7.saveAsTextFile('questao7.txt')

[('Re-Export',
  (1261968000.0, 'Safety razor blades, including blanks in strips')),
 ('Export',
  (699847369665.0, 'Ice, snow and potable water not sweetened or flavoure')),
 ('Re-Import',
  (38774873.0, 'Chem wood pulp, soda/sulphate, non-conifer, bleached')),
 ('Import',
  (1073469712878.0, 'Iron ore, concentrate, not iron pyrites,unagglomerate'))]