# TDE 3 - Big Data Processing with Apache Spark

## Imports & Configs

In [None]:
!pip install pyspark



In [None]:
import pyspark
from pyspark import SparkFiles
from pyspark.sql import SparkSession

# SparkSQL
import pyspark.sql.functions as f

## Coletando os dados

In [None]:
!wget -nc 'http://www.ppgia.pucpr.br/~jean.barddal/bigdata/transactions.csv.zip'
!unzip -n transactions.csv.zip

File ‘transactions.csv.zip’ already there; not retrieving.

Archive:  transactions.csv.zip


## Iniciando a sessão `spark`

In [None]:
spark = SparkSession.builder.appName('Transactions').getOrCreate()
sc = spark.sparkContext

## Formato `RDD`

In [None]:
rdd = sc.textFile("transactions.csv") \
        .map(lambda x: x.split(";")) \
        .filter(lambda a: a[0] != "country_or_area") \
        .filter(lambda a: a[2] != 'TOTAL')

In [None]:
# Visualizando as 5 primeiras instâncias
#rdd.take(5)

## Formato `csv` para SparkSQL

In [None]:
df = spark.read.csv("transactions.csv", header=True, inferSchema=True, sep=";")

In [None]:
# Configuração para realizar consultas com SQL
df.createOrReplaceTempView('table')

In [None]:
# Visualizando as 5 primeiras instâncias
df.show(5)

+---------------+----+---------+--------------------+------+---------+---------+---------------+--------+---------------+
|country_or_area|year|comm_code|           commodity|  flow|trade_usd|weight_kg|  quantity_name|quantity|       category|
+---------------+----+---------+--------------------+------+---------+---------+---------------+--------+---------------+
|    Afghanistan|2016|   010410|         Sheep, live|Export|     6088|     2339|Number of items|    51.0|01_live_animals|
|    Afghanistan|2016|   010420|         Goats, live|Export|     3958|      984|Number of items|    53.0|01_live_animals|
|    Afghanistan|2008|   010210|Bovine animals, l...|Import|  1026804|      272|Number of items|  3769.0|01_live_animals|
|        Albania|2016|   010290|Bovine animals, l...|Import|  2414533|  1114023|Number of items|  6853.0|01_live_animals|
|        Albania|2016|   010392|Swine, live excep...|Import| 14265937|  9484953|Number of items| 96040.0|01_live_animals|
+---------------+----+--

## Library Functions

In [None]:
from pathlib import Path
from shutil import rmtree

def save_rdd_to_file(rdd, filename: str):
        out_dir = Path("output_dir")
        file = Path("output_dir") / filename
        if file.exists():
            rmtree(out_dir)
        rdd.coalesce(1).saveAsTextFile(str(file))


def save_rdd(fn):

    def wrapper(*argv,**kwargs):
        dname = fn.__name__
        rdd = fn()
        save_rdd_to_file(rdd, dname)
        return rdd

    return wrapper

---

# Using `RDD`

## **1. The number of transactions involving Brazil**;

In [None]:
# Map
brazil_transactions = rdd.filter(lambda x: x[0] == "Brazil") \
                         .map(lambda x: (x[0], 1))

 # Reduce
brazil_transactions.reduceByKey(lambda a, b: a + b).take(1)

[('Brazil', 184678)]

**Answer:** The number of transactions involving Brazil are 184748

## **2. The number of transactions per year**;

In [None]:
@save_rdd
def ex2():
    # Map
    transactions_per_year = rdd.map(lambda x: (x[1], 1))

    # Reduce
    return transactions_per_year.reduceByKey(lambda a, b: a + b) \
                                .sortBy(lambda a: a[0])


ex2().take(5)

[('1988', 30970),
 ('1989', 63865),
 ('1990', 72190),
 ('1991', 83843),
 ('1992', 121059)]

## **3. The most commercialized commodity (summing the quantities) in 2016, per flow type**;

In [None]:
def sumQuantitiesCom(data):
  com = data[3]
  weight = data[6]
  weight = int(weight) if weight else 0
  flow = data[4]

  return (com, flow), weight

@save_rdd
def ex3():
  year2016 = rdd.filter(lambda a: a[1] == "2016") \

  quantitiesComm = year2016.map(sumQuantitiesCom) \
                           .reduceByKey(lambda a,b: a+b)

  flowGroup = quantitiesComm.map(lambda data: (data[0][1], (data[0][0], data[1]))) \
                            .reduceByKey(lambda a,b: a if b[1] < a[1] else b) \
                            .sortBy(lambda a: a[1][1])
  
  return flowGroup

   
ex3().take(5)

[('Re-Import', ('Lumber, coniferous (softwood) thickness < 6 mm', 51951285)),
 ('Re-Export',
  ('Oils petroleum, bituminous, distillates, except crude', 1452933784)),
 ('Export',
  ('Iron ore, concentrate, not iron pyrites,unagglomerate', 1343444789618)),
 ('Import',
  ('Iron ore, concentrate, not iron pyrites,unagglomerate', 1362436716054))]

## **4. The average of commodity values per year**;

In [None]:
@save_rdd
def ex4():
  perYear = rdd.map(lambda data: (data[1], (float(data[5]), 1)))

  meanYear = perYear \
            .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
            .map(lambda data: (data[0], (data[1][0] / data[1][1]))) \
            .sortBy(lambda data: data[0])

  return meanYear

ex4().take(5)

[('1988', 14906524.119341299),
 ('1989', 12658458.36696156),
 ('1990', 12931120.22652722),
 ('1991', 12950854.07263576),
 ('1992', 11579386.262822261)]

## **5. The average price of commodities per unit type, year, and category in the export flow in Brazil**;

In [None]:
@save_rdd
def ex5():
  perYear = rdd.filter(lambda a: a[0].upper() == 'BRAZIL') \
               .filter(lambda a: a[4].upper() == 'EXPORT') \
               .filter(lambda a: a[5].isnumeric()) \
               .map(lambda data: ((data[1], data[7], data[9]), (float(data[5]), 1)))

  meanYear = perYear \
            .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
            .map(lambda data: (data[0], (data[1][0] / data[1][1]))) \
            .sortBy(lambda data: (data[0][0], data[0][2]))

  return meanYear

ex5().take(5)

[(('1989', 'Weight in kilograms', '01_live_animals'), 351970.2),
 (('1989', 'Weight in kilograms', '02_meat_and_edible_meat_offal'),
  14544145.46875),
 (('1989',
   'Weight in kilograms',
   '03_fish_crustaceans_molluscs_aquatic_invertebrates_ne'),
  3150434.7),
 (('1989',
   'Volume in litres',
   '04_dairy_products_eggs_honey_edible_animal_product_nes'),
  210515.16666666666),
 (('1989',
   'Number of items',
   '04_dairy_products_eggs_honey_edible_animal_product_nes'),
  760340.3333333334)]

## **6. The commodity with the highest price per unit type and year**;

In [None]:
@save_rdd
def ex6_the_commodity_with_the_highest_price_per_unit_type_and_year():
    """
    Map:
        map: Mapeando as colunas quantity_name e year, inserindo as instâncias 
        no formato ((quantity_name, year), trade_usd)

        filter: Filtrando pelo elemento trade_usd da tupla

    Reduce:
        reduceByKey: Fazendo reduce e obtendo o maior preço
        sortBy: Ordenando os elementos
    """

    # Map                       
    hp_comm_unit_type_year = rdd.map(lambda x: ((x[7], x[1]), float(x[5]))) \
                                .filter(lambda x: x[1])

    # Reduce
    return hp_comm_unit_type_year.reduceByKey(lambda a, b: a if a > b else b) \
                                 .sortBy(lambda a: a[0])
                                    

# Chamando a função e exibindo 29 instâncias iniciais
ex6_the_commodity_with_the_highest_price_per_unit_type_and_year().take(5)

[(('Area in square metres', '1988'), 403386005.0),
 (('Area in square metres', '1989'), 367370724.0),
 (('Area in square metres', '1990'), 347716597.0),
 (('Area in square metres', '1991'), 403965643.0),
 (('Area in square metres', '1992'), 323730149.0)]

[(('Area in square metres', '1988'), 403386005.0),
 (('Area in square metres', '1989'), 367370724.0),
 (('Area in square metres', '1990'), 347716597.0),
 (('Area in square metres', '1991'), 403965643.0),
 (('Area in square metres', '1992'), 323730149.0)]

## **7. The number of transactions per flow type and year**;

In [None]:
@save_rdd
def ex7_the_number_of_transactions_per_flow_type_and_year():
    """
    Map:
        map: Mapeando as colunas flow e year, inserindo no formato ((flow, year), 1)
        filter: Filtrando apenas o valor 1 da tupla

    Reduce:
        reduceByKey: Fazendo reduce e obtendo o número de transações
        sortBy: Ordenando os elementos
    """

    # Map                       
    no_trans_flowtype_year = rdd.map(lambda x: ((x[4], x[1]), 1)) \
                                .filter(lambda x: x[1])

    # Reduce
    return no_trans_flowtype_year.reduceByKey(lambda a, b: a + b) \
                                 .sortBy(lambda a: a[0])


# Chamando a função e exibindo 29 instâncias iniciais
ex7_the_number_of_transactions_per_flow_type_and_year().take(5)

[(('Export', '1988'), 12499),
 (('Export', '1989'), 26140),
 (('Export', '1990'), 29139),
 (('Export', '1991'), 32808),
 (('Export', '1992'), 45755)]