In [None]:
!pip install pyspark

import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkFiles # permite baixar arquivos

# criar uma sessão
spark = SparkSession.builder.appName('TdeSparkSql').getOrCreate()

# criar um contexto
sc = spark.sparkContext

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 9.0 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=9a117ba11d141c6c2c2d613d8526ce416fa647e76410a5be5aca57e237620913
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [None]:
# obtencao dos dados
base_url = 'http://www.ppgia.pucpr.br/~jean.barddal/bigdata/'
arquivos = ['transactions_sample.csv']

urls = [base_url + x for x in arquivos]
for url in urls:
  # print(url)
  sc.addFile(url)

#1. The number of transactions involving Brazil;

In [None]:
from pyspark.sql.functions import col, max, avg, count, min, sum

df = spark.read.csv("file://" + SparkFiles.get("transactions_sample.csv"),
                    header=True,
                    inferSchema=True,
                    sep=';')

In [None]:
df.printSchema()

root
 |-- country_or_area: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- comm_code: integer (nullable = true)
 |-- commodity: string (nullable = true)
 |-- flow: string (nullable = true)
 |-- trade_usd: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- quantity_name: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- category: string (nullable = true)



In [None]:
df.createOrReplaceTempView("view_table")

dfSelect = df.select(col("country_or_area")).filter(col("country_or_area") == "Brazil")

dfSelect.show(5)

+---------------+
|country_or_area|
+---------------+
|         Brazil|
|         Brazil|
|         Brazil|
|         Brazil|
|         Brazil|
+---------------+
only showing top 5 rows



In [None]:
dfSelect.count()


550

#2. The number of transactions per year;

In [None]:
dfSelect = df.select(col("year"))

dfSelect.show(5)

+----+
|year|
+----+
|2016|
|2016|
|2008|
|2016|
|2016|
+----+
only showing top 5 rows



In [None]:
df.groupBy(col("year")).agg(count("*").alias("transactions")).show(10)

+----+------------+
|year|transactions|
+----+------------+
|1990|          91|
|2003|         411|
|2007|         447|
|2015|         506|
|2006|         401|
|2013|         510|
|1997|         342|
|1988|          23|
|1994|         199|
|2014|         499|
+----+------------+
only showing top 10 rows



#3. The most commercialized commodity (summing the quantities) in 2016, per flow type.

In [None]:
from pyspark.sql import functions as F

# Filtrando pelo ano de 2016
dfSelect = df.select(col("*")).filter(col("year") == "2016")

# Agrupando por fluxo e commodity e somando as colunas de quantidade no df filtrado por ano
somas = dfSelect.groupBy("flow", "commodity").agg(F.sum("quantity").alias('quantidade total')).orderBy(col('quantidade total').desc()).show(5)


+------+--------------------+----------------+
|  flow|           commodity|quantidade total|
+------+--------------------+----------------+
|Import|Animals, live, ex...|     15521160322|
|Export|Fowls, live domes...|       148808219|
|Import|Fowls, live domes...|       122898701|
|Import|Fowls, live domes...|        78076421|
|Export|Fowls, live domes...|        37596533|
+------+--------------------+----------------+
only showing top 5 rows



#4. The average of commodity values per year;

In [None]:
mediaPorAno = df.groupBy("year", "commodity").agg(avg("trade_usd").alias('média de commodities')).orderBy(col('year').desc()).show(5)


+----+--------------------+--------------------+
|year|           commodity|média de commodities|
+----+--------------------+--------------------+
|2016|Horses, live pure...|   6428837.258064516|
|2016|Swine, live excep...|         1.5910932E7|
|2016|Swine, live pure-...|          3172956.25|
|2016|         Sheep, live|   9138491.043478262|
|2016|Horses, live exce...|1.5337869030303031E7|
+----+--------------------+--------------------+
only showing top 5 rows



#5. The average price of commodities per unit type, year, and category in the export flow in Brazil;

In [None]:
# Filtrando pelo país e flow export
dfSelect = df.select(col("*")).filter(col("country_or_area") == "Brazil")
# Filtrando pelo flow export
dfSelect = dfSelect.select(col("*")).filter(col("flow") == "Export")

# Fazendo a média por unidade, ano e categoria
dfSelect.groupBy("quantity_name", "year", "category").agg(avg("trade_usd").alias('média')).orderBy(col('year').desc()).show(5)


+---------------+----+---------------+--------------------+
|  quantity_name|year|       category|               média|
+---------------+----+---------------+--------------------+
|Number of items|2016|01_live_animals|        2.83752834E7|
|Number of items|2015|01_live_animals|2.5286922545454547E7|
|Number of items|2014|01_live_animals|      9.2774914625E7|
|Number of items|2013|01_live_animals| 8.695532677777778E7|
|Number of items|2012|01_live_animals| 5.841521372727273E7|
+---------------+----+---------------+--------------------+
only showing top 5 rows



#6. The commodity with the highest price per unit type and year;

In [None]:
# Fazendo a média por unidade, ano e categoria
df.groupBy(col("quantity_name"), col("year")).agg(max("trade_usd").alias('maior valor')).show(5)

+-------------------+----+-----------+
|      quantity_name|year|maior valor|
+-------------------+----+-----------+
|        No Quantity|1997|   98024384|
|    Number of items|1990|  563872733|
|    Number of items|2002| 1154904975|
|Weight in kilograms|1998|  191161776|
|    Number of items|2011|  873639517|
+-------------------+----+-----------+
only showing top 5 rows



#7. The number of transactions per flow type and year.

In [None]:
df.groupBy(col('flow'), col('year')).agg(count('*').alias('Contagem')).orderBy(col('year').desc()).show(5)

+---------+----+--------+
|     flow|year|Contagem|
+---------+----+--------+
|Re-Export|2016|      13|
|   Import|2016|     191|
|Re-Import|2016|      10|
|   Export|2016|     140|
|Re-Import|2015|      15|
+---------+----+--------+
only showing top 5 rows

