In [None]:
!pip install -q findspark
!pip install -q pyspark

In [None]:
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext, SparkFiles
conf = SparkConf().setAppName("TDE02 - BigData").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf = conf)

#importando o csv
rdd = sc.textFile(SparkFiles.get("/content/transactions_amostra.csv"))

In [None]:
#codigo para colocar a primeira linha do CSV em uma variavel
primeira_linha = rdd.first()

In [None]:
#definindo o RDD sem a primeira linha para evitar conflitos
rdd  = rdd.filter(lambda c: c != primeira_linha)

**<h2>Question 1</h2>**

The number of transactions involving Brazil.

In [None]:
#filtro para ter apenas linhas com a palavra 'Brazil'
rdd_split_brazil = rdd.filter(lambda x: x.split(';')[0] == 'Brazil')

print('Número de transações envolvendo o Brasil: \n')
#count para contar a occorencia de linhas
print(rdd_split_brazil.count())

**<h2>Question 2</h2>**
The number of transactions per year.

In [None]:
#map para pegar o valor do ano e a quantidade
rdd_ano = rdd.map(lambda x: ((x.split(";")[1]), 1))

#reduce a partir da chave (ano), somando a quantidade
rdd_reduce_2 = rdd_ano.reduceByKey(lambda a, b: a + b)

print('Numero de transações por ano (ano, quantidade):')
#sort by para deixar o ano de forma decrescente
rdd_reduce_2.sortBy(lambda x: x[0], ascending = False).collect()

**<h2>Question 3</h2>**
The number of transactions per flow type and year.

In [None]:
#map para pegar os valores de tipo, ano e consequentemente a quantidade
rdd_fluxo_ano = rdd.map(lambda x: ((x.split(";")[4], x.split(";")[1]), 1))

#reduce a partir da chave (tipo, ano)) para somar a quantidade de  transações
rdd_reduce_3 = rdd_fluxo_ano.reduceByKey(lambda a, b: a + b)

print('Numero de transações por fluxo e ano ((tipo, ano), quantidade):')
#sort by para deixar ordenado o tipo
rdd_reduce_3.sortBy(lambda x: x[0], ascending = False).collect()

**<h2>Question 4</h2>**
The average of commodity values per year.

In [None]:
#map para pegar os valores de ano e valor
rdd_media_ano = rdd.map(lambda x: ((x.split(';')[1]), (float(x.split(';')[5]))))

#aqui calculamos o numero total com o count
numero_total = rdd_media_ano.count()

#reduce a partir da chave (ano)) para ter a media do valor das comodities por ano
rdd_reduce_4 = rdd_media_ano.reduceByKey(lambda a, b: ((a + b) / numero_total))

print('A média do valor das comodities por ano (ano, preco):')
rdd_reduce_4.sortBy(lambda x: x[0], ascending = False).collect()

**<h2>Question 5</h2>**
The average price of commodities per unit type, year, and category in the export flow in Brazil.


In [None]:
#filter para ter apenas Export no RDD
rdd_exportacao = rdd_split_brazil.filter(lambda x: x.split(';')[4] == 'Export')

#map para pegar os valores de pais, codigo, numero de itens, quantidade e preco
rdd_preco_medio_exportacao = rdd_exportacao.map(lambda x: ((x.split(';')[0], x.split(';')[2], x.split(';')[7], x.split(';')[8]), (float(x.split(';')[5]), 1)))

#reduce a partir da chave (pais, codigo, numero de itens, quantidade) para ter a soma do preço
rdd_reduce_sum = rdd_preco_medio_exportacao.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

#mapValues para somar o valor do preco sem alterar as chaves
preço_medio_commodities = rdd_reduce_sum.mapValues(lambda a: (a[0] / a[1]))
#reduce para fazer a soma de tudo feito anteriormente

rdd_reduce_5 = preço_medio_commodities.reduceByKey(lambda a, b: a + b)
print('O preço medio das comodities, de exportação brasileira ((pais, codigo, numero de itens, quantidade), preco):')

#sort by para ordenar apartir do preco
rdd_reduce_5.sortBy(lambda x: x[1], ascending = False).collect()

**<h2>Question 6</h2>**
The maximum, minimum, and mean transaction price per unit type and year.

In [None]:
#calculando o preço maximo de exportação
rdd_ano_fluxo_export = rdd_exportacao.map(lambda x: ((x.split(';')[1], (x.split(';')[4])), (float(x.split(';')[5]))))

#reduce a partir da chave (ano, tipo) para somar o valor total de Exportacao
rdd_reduce_export = rdd_ano_fluxo_export.reduceByKey(lambda a, b: a + b)

print('O PREÇO MÁXIMO DE EXPORTAÇÃO POR TIPO E ANO (ano, tipo, preco): ')
#sortby para ordenar de forma decrescente e pegar apenas um valor
rdd_reduce_export.sortBy(lambda x: x[1], ascending = False).take(1)

In [None]:
#calculando o preço minimo de exportação

print('O PREÇO MINIMO DE EXPORTAÇÃO POR TIPO E ANO (ano, tipo, preco): ')
#sortby para ordenar de forma crecente e pegar apenas um valor
rdd_reduce_export.sortBy(lambda x: x[1], ascending = True).take(1)

In [None]:
#calculando o preço medio de exportação
#colocando em uma variavel o numero total de exportaçoes
numero_total_export = (rdd_ano_fluxo_export.count())

#map para pegar apenas o preco das exportaçoes
rdd_map_media_export = rdd_ano_fluxo_export.map((lambda x: ((x[1]))))

#reduce para somar o preco das exportacoes
rdd_map_media__export_reduce = rdd_map_media_export.reduce(lambda a, b: a + b)

print('O PREÇO MEDIO DE EXPORTAÇÃO POR TIPO E ANO:')
#calculando a media do preco
print(rdd_map_media__export_reduce / numero_total_export)

In [None]:
#calculando o preço maximo de importação
#filter para pegar apenas as linhas que sejam import
rdd_importacao = rdd.filter(lambda x: x.split(';')[4] == 'Import')

#map para pegar os valores de ano, tipo e preco
rdd_ano_fluxo_import = rdd_importacao.map(lambda x: ((x.split(';')[1], (x.split(';')[4])), (float(x.split(';')[5]))))

#reduce a partir da chave (ano, tipo) para somar o valor total de Importacao
rdd_reduce_import = rdd_ano_fluxo_import.reduceByKey(lambda a, b: a + b)

print('O PREÇO MÁXIMO DE IMPORTAÇÃO POR TIPO E ANO ((ano, tipo), preco): ')
#sortby para ordenar de forma decrescente e pegar apenas um valor
rdd_reduce_import.sortBy(lambda x: x[1], ascending = False).take(1)

In [None]:
#calculando o preço minimo de exportação

print('O PREÇO MINIMO DE IMPORTAÇÃO POR TIPO E ANO (ano, tipo, preco):')
#sortby para ordenar de forma crescente e pegar apenas um valor
rdd_reduce_import.sortBy(lambda x: x[1], ascending = True).take(1)

In [None]:
#calculando o preço medio de exportação

#colocando em uma variavel o numero total de importacoes
numero_total_import = (rdd_ano_fluxo_import.count())

#map para pegar apenas o preco das importacoes
rdd_map_media_import = rdd_reduce_import.map((lambda x: ((x[1]))))

#reduce para somar o preco das importacoes
rdd_map_media__import_reduce = rdd_map_media_import.reduce(lambda a, b: a + b)

print('O PREÇO MEDIO DE IMPORTAÇÃO POR TIPO E ANO:')
#calculando a media do preco
print(rdd_map_media__import_reduce / numero_total_import)

**<h2>Question 7</h2>**
The most commercialized commodity (summing the quantities) in 2016, per flow type.

In [None]:
#EXPORT:

#filter para pegar as linha com a valor de ano igual a 2016
rdd_ano_2016 = rdd.filter(lambda x: x.split(';')[1] == '2016')

#filter para pegar as linha com a valor de tipo igual a Export
rdd_ano_2016_export = rdd_ano_2016.filter(lambda x: x.split(';')[4] == 'Export')

#map para pegar os valores de ano e quantidade
rdd_ano_2016_export_qnt = rdd_ano_2016_export.map(lambda x: ((x.split(';')[2]), (float(x.split(';')[8]))))

#reduce a partir de uma chave para somar o valor das quantidades
rdd_export_reduce = rdd_ano_2016_export_qnt.reduceByKey(lambda a, b: a + b)

print('A commoditie mais EXPORTADA em 2016 (codigo, quantidade):')
#sortby para ordenar de forma decrescente e pegar apenas um valor
rdd_export_reduce.sortBy(lambda x: x[1], ascending = False).take(1)

In [None]:
#IMPORT:
#filter para pegar as linha com a valor de tipo igual a Export
rdd_ano_2016_import = rdd_ano_2016.filter(lambda x: x.split(';')[4] == 'Import')

#map para pegar os valores de ano e quantidade
rdd_ano_2016_import_qnt = rdd_ano_2016_import.map(lambda x: ((x.split(';')[2]), (float(x.split(';')[8]))))

#reduce a partir de uma chave para somar o valor das quantidades
rdd_import_reduce = rdd_ano_2016_import_qnt.reduceByKey(lambda a, b: a + b)

print('A commoditie mais IMPORTADA em 2016 (codigo, quantidade):')
#sortby para ordenar de forma decrescente e pegar apenas um valor
rdd_import_reduce.sortBy(lambda x: x[1], ascending = False).take(1)