# **TDE 2 - Spark**

## **Download Spark**

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 42.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=a1659a118ed27c6d359a7bc9b866be848bdd373574256ead86a631673d2cf372
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


## **IMPORTS**

In [None]:
import re
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkFiles

## **Download da Base**

In [None]:
spark = SparkSession.builder.master('local[*]').\
        appName('TDE').getOrCreate()

sc = spark.sparkContext

In [None]:
sc.addFile('https://www.dropbox.com/s/cjztq44o5k7imwu/transactions_amostra.csv?dl=1')

In [None]:
rdd = sc.textFile('file://' + SparkFiles.get('transactions_amostra.csv'))

rdd.take(5)

['country_or_area;year;comm_code;commodity;flow;trade_usd;weight_kg;quantity_name;quantity;category',
 'Belgium;2016;920510;Brass-wind instruments;Export;571297;3966.0;Number of items;4135.0;92_musical_instruments_parts_and_accessories',
 'Guatemala;2008;660200;Walking-sticks, seat-sticks, whips, etc.;Export;35022;5575.0;Number of items;10089.0;66_umbrellas_walking_sticks_seat_sticks_whips_etc',
 'Barbados;2006;220210;Beverage waters, sweetened or flavoured;Re-Export;81058;44458.0;Volume in litres;24113.0;22_beverages_spirits_and_vinegar',
 'Tunisia;2016;780411;Lead foil of a thickness <2mm;Import;4658;121.0;Weight in kilograms;121.0;78_lead_and_articles_thereof']

In [None]:
rdd = rdd.map(lambda x: x.split(";"))

COUNTRY = 0
YEAR = 1
COMMODITY_CODE = 2
COMMODITY = 3
FLOW = 4
PRICE = 5
WEIGHT = 6
UNIT = 7
AMOUNT = 8
CATEGORY = 9

# rdd.take(5)

## **Exercícios:**

### **1)** Número de transações envolvendo o Brasil

In [None]:
bra_transac_rdd = rdd.filter(lambda x: x[COUNTRY] == "Brazil").count()

with open("EX1.txt", "w") as f:
  f.write(str(bra_transac_rdd))

### **2)** Número de transações por ano


In [None]:
year_transac_rdd = rdd.map(lambda x: (x[YEAR], 1))\
                      .reduceByKey(lambda x, y: x + y)\
                      .sortByKey()

# year_transac_rdd.take(50)

year_transac_rdd.coalesce(1).saveAsTextFile("EX2.txt")

### **3)** Número de transações por tipo de fluxo e ano

In [None]:
year_flow_transac_rdd = rdd.map(lambda x: ((x[YEAR], x[FLOW]), 1))\
                           .reduceByKey(lambda x, y: x + y)\
                           .sortByKey()

# year_flow_transac_rdd.take(50)

year_flow_transac_rdd.coalesce(1).saveAsTextFile("EX3.txt")

### **4)** Média de valores de commodities por ano

In [None]:
header_drop_rdd = rdd.mapPartitionsWithIndex(lambda x, y: list(y)[1:] if(x == 0) else y)

total_amount_price_year = header_drop_rdd.map(lambda x: (x[YEAR], (float(x[AMOUNT]), float(x[PRICE]))))\
                                         .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
                                         .sortByKey()

comm_mean_year = total_amount_price_year.mapValues(lambda x: x[1] / x[0])

# comm_mean_year.take(50)

comm_mean_year.coalesce(1).saveAsTextFile("EX4.txt")

### **5)** Média de preços de commodities por tipo de unidade, ano e categoria no fluxo de exportação do Brasil

In [None]:
bra_ex_flow = rdd.filter(lambda x: x[COUNTRY] == "Brazil" and x[FLOW] == "Export")

key_rdd = bra_ex_flow.map(lambda x: ((x[YEAR], x[CATEGORY], x[UNIT]), (float(x[AMOUNT]), float(x[PRICE]))))\
                     .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
                     .sortByKey()

mean_calc = key_rdd.mapValues(lambda x: x[1] / x[0])

# mean_calc.take(50)

mean_calc.coalesce(1).saveAsTextFile("EX5.txt")

### **6)** O máximo, mínimo e médio preço de transação por tipo de unidade e ano

In [None]:
header_drop_rdd = rdd.mapPartitionsWithIndex(lambda x, y: list(y)[1:] if(x == 0) else y)

unit_price_year = header_drop_rdd.map(lambda x: ((x[YEAR], x[UNIT]), int(x[PRICE])))

count = unit_price_year.mapValues(lambda x: 1).reduceByKey(lambda x, y: x + y)

MIN = unit_price_year.reduceByKey(lambda x, y: x if x < y else y)

MAX = unit_price_year.reduceByKey(lambda x, y: x if x > y else y)

total = unit_price_year.reduceByKey(lambda x, y: x + y)

mean = total.join(count).mapValues(lambda x: int(x[0] / x[1]))

result = MIN.join(MAX).join(mean).sortByKey()

# result.take(50)

result.coalesce(1).saveAsTextFile("EX6.txt")

### **7)** A commodity mais comercializada (somando as quantidades) em 2016, por tipo de fluxo

In [None]:
year_rdd = rdd.filter(lambda x: x[YEAR] == "2016")

sum_key_rdd = year_rdd.map(lambda x: ((x[COMMODITY], x[FLOW]), float(x[AMOUNT])))\
                      .reduceByKey(lambda x, y: x + y)

MAX_comm_flow_type = sum_key_rdd.map(lambda x: (x[0][1], (x[0][0], x[1])))\
                                .reduceByKey(lambda x, y: x if x[1] > y[1] else y)

# MAX_comm_flow_type.collect()

MAX_comm_flow_type.coalesce(1).saveAsTextFile("EX7.txt")