In [None]:
!pip install pyspark
from pyspark.sql import SparkSession, Window
from pyspark import SparkFiles
from pyspark.sql.functions import * 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 59 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 80.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=dff1df0bc0b7dc6ce20a826015abc00130cfac3146e623db25758aa5e5ee3984
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [None]:
spark = SparkSession.builder.master('local').appName('tde3SparkSQL').getOrCreate()

In [None]:
#importando o csv como dataframe
df = spark.read.csv(SparkFiles.get("/content/transactions_menor.csv"), header=True, inferSchema=True, sep=';')

In [None]:
#imprimindo as 10 primeiras linha do dataframe
df.show(10)

+-------------------+----+---------+--------------------+---------+---------+----------+-------------------+----------+--------------------+
|    country_or_area|year|comm_code|           commodity|     flow|trade_usd| weight_kg|      quantity_name|  quantity|            category|
+-------------------+----+---------+--------------------+---------+---------+----------+-------------------+----------+--------------------+
|            Belgium|2016|   920510|Brass-wind instru...|   Export|   571297|    3966.0|    Number of items|    4135.0|92_musical_instru...|
|          Guatemala|2008|   660200|Walking-sticks, s...|   Export|    35022|    5575.0|    Number of items|   10089.0|66_umbrellas_walk...|
|           Barbados|2006|   220210|Beverage waters, ...|Re-Export|    81058|   44458.0|   Volume in litres|   24113.0|22_beverages_spir...|
|            Tunisia|2016|   780411|Lead foil of a th...|   Import|     4658|     121.0|Weight in kilograms|     121.0|78_lead_and_artic...|
|          Li

**<h2>Question 1</h2>**

The number of transactions involving Brazil.

In [None]:
#filtro para ter apenas linhas com a palavra 'Brazil'
df_split_brazil = df.filter("country_or_area == 'Brazil'")

#groupBy para agrupar todas as ocorrencias, e .count() para contar elas
df_split_brazil.groupBy('country_or_area').count().show()

**<h2>Question 2</h2>**
The number of transactions per year.

In [None]:
#groupBy para agrupar todas as ocorrencias, e .count() para contar elas
df_ano = df.groupBy("year").count().show()

**<h2>Question 3</h2>**
The number of transactions per flow type and year.

In [None]:
#groupBy para agrupar todas as ocorrencias, e .count() para contar elas
df_ano_fluxo = df.groupBy("year", "flow").count().show(10)

**<h2>Question 4</h2>**
The average of commodity values per year.

In [None]:
#utilizamos um groupBy para agrupar todas as ocorrencias, e o avg() na coluna "trade_usd" para fazer a media
df_media_ano = df.groupBy("year", "comm_code").avg("trade_usd").show()

**<h2>Question 5</h2>**
The average price of commodities per unit type, year, and category in the export flow in Brazil.


In [None]:
#filter para ter apenas Export no dataframe
df_exportacao = df_split_brazil.filter("flow == 'Export'")

#groupBy para agrupar todas as ocorrencias, e o avg() na coluna "trade_usd" para fazer a media
df_exportacao.groupBy('year','comm_code','quantity_name', 'quantity').avg('trade_usd').show(10)

**<h2>Question 6</h2>**
The maximum, minimum, and mean transaction price per unit type and year.

In [None]:
#calculando o maximo:
df.groupBy('quantity_name','year').max('trade_usd').show(5)

In [None]:
#calculando o minimo
df.groupBy('quantity_name','year').min('trade_usd').show(5)

In [None]:
#calculando a media
df_exportacao.groupBy('year', 'quantity_name').avg('trade_usd').show(5)

**<h2>Question 7</h2>**
The most commercialized commodity (summing the quantities) in 2016, per flow type.

In [None]:
#particionando por tipo de fluxo
partition = Window.partitionBy('flow')

#dataframe filtrado com apenas o ano de 2016
df_ano_filter = df.filter("year = '2016'")

#groupBy para agregar todas as ocorrencias em 2016, e o agg(sum()) para somar a quantidade e colocando em uma nova coluna "sum_quantity"
df_comm_fluxo = df_ano_filter.groupBy('commodity', 'flow').agg(sum('quantity').alias('soma_quantidade'))

#definindo a commodity mais comercializada por partição
df_comm_particionada = df_comm_fluxo.withColumn('quantidade_maxima', max('soma_quantidade').over(partition))

#filtro para ter no dataframe apenas os 4 maiores valores de quantidade por fluxo
df_final = df_comm_particionada.where(col('quantidade_maxima') == col('soma_quantidade')).drop('soma_quantidade')

df_final.show()