# Relazione finale

**Gruppo:** Bug Data

**Componenti del gruppo:** Jerin George Mathew, Luca Pasquini

## 1. Indice
La relazione è articolata nella seguente maniera:

- #### Analisi del dataset
- #### Specifiche hardware e software
- #### Primo job
    - *Map reduce*
    - *Hive*
    - *Spark*
    - *Risultati*
    - *Grafici*
- #### Secondo job
    - *Map reduce*
    - *Hive*
    - *Spark*
    - *Risultati*
    - *Grafici*
- #### Terzo job
    - *Map reduce*
    - *Hive*
    - *Spark*
    - *Risultati*
    - *Grafici*
- #### Conclusioni

Verrà dunque dapprima analizzato e descritto il dataset a disposizione per poi discutere l'implementazione dei job richiesti dal progetto nelle varie tecnologie richieste dalle specifiche.

## 2. Analisi del dataset
Verranno analizzati in questa sezione i due dataset a disposizione.

In [1]:
%matplotlib inline
# Importing libraries
import matplotlib.pyplot as plt
import pandas as pd

HISTORICAL_STOCK_PRICES_FILEPATH = 'dataset/historical_stock_prices.csv'
HISTORICAL_STOCKS_FILEPATH = 'dataset/historical_stocks.csv'

hsp = pd.read_csv(HISTORICAL_STOCK_PRICES_FILEPATH)
hs = pd.read_csv(HISTORICAL_STOCKS_FILEPATH)

### 2.1 `historical_stock_prices.csv`

In [5]:
print("No. of rows and columns")
print("-----------------------")
hsp.shape

No. of rows and columns
-----------------------


(20973889, 8)

Dunque il file `historical_stock_prices.csv` è composto da 209738889 righe e 8 colonne. Stampiamo ora un sottoinsieme delle righe del file `historical_stock_prices.csv`. 

In [26]:
hsp.head()

Unnamed: 0,ticker,open,close,adj_close,low,high,volume,date
0,AHH,11.5,11.58,8.493155,11.25,11.68,4633900,2013-05-08
1,AHH,11.66,11.55,8.471151,11.5,11.66,275800,2013-05-09
2,AHH,11.55,11.6,8.507822,11.5,11.6,277100,2013-05-10
3,AHH,11.63,11.65,8.544494,11.55,11.65,147400,2013-05-13
4,AHH,11.6,11.53,8.456484,11.5,11.6,184100,2013-05-14


Come già detto nelle specifiche, il file `historical_stock_prices.csv` è composto dai seguenti campi:
- ticker: simbolo dell’azione
- open: prezzo di apertura
- close: prezzo di chiusura
- adj_close: prezzo di chiusura “modificato”
- lowThe: prezzo minimo
- highThe: prezzo massimo
- volume: numero di transazioni
- date: data nel formato aaaa-mm-gg

Una attività preliminare all'implementazione vera e propria dei job è quella di data cleaning in cui si va a verificare la presenza di eventuali record non corretti (ad esempio contenenti valori nulli) che potrebbero essere presenti nel dataset. Verifichiamo in particolare che non siano presenti valori nulli nel file `historical_stock_prices.csv`.

In [6]:
print("Check null values")
print("-----------------")
hsp.isnull().any().any()

Check null values
-----------------


False

Dunque non sono presenti valori nulli. Una altra attività che possiamo effettuare è andare a verificare che vi sia un record per ogni coppia `(ticker, data)`, ovvero che non siano presenti record duplicati per un certo `ticker` in una certa data.

In [2]:
print("Check duplicate values")
print("----------------------")
len(hsp.groupby(['ticker', 'date'])) != hsp.shape[0]

Check duplicate values
----------------------


False

Passiamo ora ad analizzare il secondo dataset, `historical_stocks.csv`.

### 2.2 `historical_stocks.csv`

In [23]:
print("No. of rows and columns")
print("-----------------------")
hs.shape

No. of rows and columns
-----------------------


(6460, 5)

In [25]:
hs.head()

Unnamed: 0,ticker,exchange,name,sector,industry
0,PIH,NASDAQ,"1347 PROPERTY INSURANCE HOLDINGS, INC.",FINANCE,PROPERTY-CASUALTY INSURERS
1,PIHPP,NASDAQ,"1347 PROPERTY INSURANCE HOLDINGS, INC.",FINANCE,PROPERTY-CASUALTY INSURERS
2,TURN,NASDAQ,180 DEGREE CAPITAL CORP.,FINANCE,FINANCE/INVESTORS SERVICES
3,FLWS,NASDAQ,"1-800 FLOWERS.COM, INC.",CONSUMER SERVICES,OTHER SPECIALTY STORES
4,FCCY,NASDAQ,1ST CONSTITUTION BANCORP (NJ),FINANCE,SAVINGS INSTITUTIONS


Dunque il secondo dataset contiene 6460 righe e 5 campi:
- ticker: simbolo dell’azione
- exchange: NYSE o NASDAQ
- name: nome dell’azienda
- sector: settore dell’azienda
- industry: industria di riferimento per l’azienda

Vediamo anche per questo secondo dataset se sono presenti valori nulli e/o duplicati e stampiamo alcune statistiche.

In [31]:
print("Check null values")
print("-----------------")
hs.isnull().any().any()

Check null values
-----------------


True

In [34]:
hs.isnull().any()

ticker      False
exchange    False
name        False
sector       True
industry     True
dtype: bool

Notiamo dunque come siano presenti dei campi, in particolare `sector` e `industry`  nel secondo dataset che presentano valori nulli. Stampiamo ora qualche record contenente valori nulli.

In [35]:
hs[hs['sector'].isnull() | hs['industry'].isnull()].head()

Unnamed: 0,ticker,exchange,name,sector,industry
19,ABP,NASDAQ,ABPRO CORPORATION,,
42,SQZZ,NASDAQ,ACTIVE ALTS CONTRARIAN ETF,,
62,ACT,NASDAQ,ADVISORSHARES VICE ETF,,
100,ABDC,NASDAQ,ALCENTRA CAPITAL CORP.,,
124,SMCP,NASDAQ,ALPHAMARK ACTIVELY MANAGED SMALL CAP ETF,,


In fase di implementazione dei job occorrerà dunque tenere a mente di ignorare i record contenenti valori nulli di `sector` e `industry`

In [32]:
print("Check duplicate values")
print("----------------------")
len(hs['ticker']) != hs.shape[0]

Check duplicate values
----------------------


False

Dunque non sono presenti valori duplicati per il file `historical_stocks.csv`.

### 2.3 `historical_stocks.csv` + `historical_stock_prices.csv`

Facciamo ora il join tra i due dataset per effettuare poi ulteriori attività di analisi.

In [36]:
df = pd.merge(hs, hsp, on = 'ticker')

In [37]:
df.shape

(20973889, 12)

In [38]:
df.head()

Unnamed: 0,ticker,exchange,name,sector,industry,open,close,adj_close,low,high,volume,date
0,PIH,NASDAQ,"1347 PROPERTY INSURANCE HOLDINGS, INC.",FINANCE,PROPERTY-CASUALTY INSURERS,8.0,7.95,7.95,7.9,8.5,642900,2014-04-01
1,PIH,NASDAQ,"1347 PROPERTY INSURANCE HOLDINGS, INC.",FINANCE,PROPERTY-CASUALTY INSURERS,7.94,8.16,8.16,7.9,8.29,228400,2014-04-02
2,PIH,NASDAQ,"1347 PROPERTY INSURANCE HOLDINGS, INC.",FINANCE,PROPERTY-CASUALTY INSURERS,8.29,8.39,8.39,8.05,8.4,105000,2014-04-03
3,PIH,NASDAQ,"1347 PROPERTY INSURANCE HOLDINGS, INC.",FINANCE,PROPERTY-CASUALTY INSURERS,8.5,8.69,8.69,8.32,8.7,113600,2014-04-04
4,PIH,NASDAQ,"1347 PROPERTY INSURANCE HOLDINGS, INC.",FINANCE,PROPERTY-CASUALTY INSURERS,9.0,8.94,8.94,8.55,9.0,60500,2014-04-07


Ad esempio possiamo verificare se una certa compagnia dispone di più di una azione (ovvero la presenza di un un nome (`name`) di compagnia associato a più di un `ticker`).

In [48]:
df.groupby('name')['ticker'].nunique().max()

12

In [50]:
df.groupby('name')['ticker'].nunique().idxmax()

'BARCLAYS PLC'

In [54]:
list(df[df['name'] == 'BARCLAYS PLC'].groupby(['name', 'ticker']).groups.keys())

[('BARCLAYS PLC', 'BCS'),
 ('BARCLAYS PLC', 'DFVL'),
 ('BARCLAYS PLC', 'DFVS'),
 ('BARCLAYS PLC', 'DLBL'),
 ('BARCLAYS PLC', 'DLBS'),
 ('BARCLAYS PLC', 'DTUL'),
 ('BARCLAYS PLC', 'DTUS'),
 ('BARCLAYS PLC', 'DTYL'),
 ('BARCLAYS PLC', 'DTYS'),
 ('BARCLAYS PLC', 'FLAT'),
 ('BARCLAYS PLC', 'STPP'),
 ('BARCLAYS PLC', 'TAPR')]

Possiamo notare ad esempio come **_BARCLAYS PLC_** abbia ben 12 `ticker`.

Verifichiamo ora se vi sono aziende (nomi di aziende) associate a più di un settore

In [55]:
df.groupby('name')['sector'].nunique().max()

2

In [56]:
df.groupby('name')['sector'].nunique().idxmax()

'ENERGIZER HOLDINGS, INC.'

In [57]:
list(df[df['name'] == 'ENERGIZER HOLDINGS, INC.'].groupby(['name', 'sector']).groups.keys())

[('ENERGIZER HOLDINGS, INC.', 'CONSUMER NON-DURABLES'),
 ('ENERGIZER HOLDINGS, INC.', 'MISCELLANEOUS')]

Possiamo notare ad esempio come **_ENERGIZER HOLDINGS, INC._** sia associato a due settori.

Un'ulteriore controllo che possiamo effettuare sta nel verificare se sono presenti compagnie che sono quotate sia nel NYSE che nel NASDAQ

In [58]:
df.groupby('name')['exchange'].nunique().max()

2

In [59]:
df.groupby('name')['exchange'].nunique().idxmax()

'AMTRUST FINANCIAL SERVICES, INC.'

In [60]:
list(df[df['name'] == 'AMTRUST FINANCIAL SERVICES, INC.'].groupby(['name', 'exchange']).groups.keys())

[('AMTRUST FINANCIAL SERVICES, INC.', 'NASDAQ'),
 ('AMTRUST FINANCIAL SERVICES, INC.', 'NYSE')]

Notiamo come ad esempio **_AMTRUST FINANCIAL SERVICES, INC._** sia quotata in entrambe le borse.

## 3 Primo Job

### 3.1 Specifiche
Implementare un job che sia in grado di generare, in ordine, le dieci azioni la cui quotazione (prezzo di chiusura) è cresciuta maggiormente dal 1998 al 2018, indicando, per ogni azione: (a) il simbolo, (b) l’incremento percentuale, (c) il prezzo minimo raggiunto, (e) quello massimo e (f) il volume medio giornaliero in quell’intervallo temporale.

### 3.2 Implementazione in MapReduce

**Mapper**

In fase di mapping estrapoliamo innanzitutto da ciascun record i campi `ticker`, `close`, `low`, `high`, `volume` e `date`.

Successivamente verifichiamo che il record sia relativo ad un anno che ricada nell'intervallo 1998-2018, per poi stampare la chiave i valori associati alla chiave. 
In particolare la chiave è composta da due campi: `ticker`, usata come chiave "primaria", e `date` usata come chiave "secondaria".

In questo modo valori che verranno ricevuti dal reducer saranno aggregati per il solo campo `ticker` e ordinati in base al campo `date` (si parla in questo caso di _secondary sort_). Questo consentirà nella fase di reduce di evitare comparazioni tra date ai fini del calcolo della differenza percentuale.


I valori sono nello specifico i campi `close`, `low`, `high` e `volume`.

```python
class mapper:
    
    map(key, record):
        ticker, _, close, _, low, high, volume, date = data
        year = getYear(date)
        if 1998 <= year <= 2018:
            key = ticker, date
            value = close, low, high, volume
            Emit(key, value)
```

**Reducer**

Durante la fase di reduce definiamo una variabile globale `result` contenente una lista di strutture dati che chiameremo di seguito `item`, ciascuno dei quali contiene i seguenti campi:

- ticker
- differenza percentuale
- volume medio
- prezzo minimo
- prezzo massimo

In particolare ciascun `item` viene calcolato a partire da un data coppia chiave-valore in fase di reduce.

Si descrive ora come vengono calcolati gli ultimi 4 campi di questa struttura dati.

In particolare, per calcolare la differenza percentuale estraiamo dalla lista `values` (si veda lo pseudocodice) il campo `close` del primo e dell'ultimo elemento della lista, essendo i valori associati ai ticker ordinati per data. Si procede poi al calcolo della differenza percentuale. 

Per quanto riguarda il volume medio, si estraggono i valori di `volume` associati al `ticker` corrente, si sommano tali valori e si divide il risultato per il numero di `volume`. 

Infine per poter calcolare il prezzo massimo e minimo, si estraggono i campi `low` e `high` da ciascun elemento della lista per individuare poi il prezzo minimo e massimo. 

I valori così calcolati vengono poi salvati nella struttura dati `item`. A questo punto si verifica che la data meno recente del ticker corrente sia il 1998 come pure che la data più recente associata al `ticker` processato sia nell'anno 2018 (ai fini di filtrare le aziende che esistono tutt'ora oggi) e, in caso di esito positivo, la struttura dati `item` così computata viene aggiunta alla lista `result`. 

Infine, una volta computati tutti i ticker, la lista `result` viene ordinata in base al campo `percentChange` dei suoi elementi (in ordine decrescente) e vengono stampati i primi 10 di tale lista ordinata.

```python
class reducer:
    
    setup():
        result = empty list

    reduce(key, records):

        # get percent change
        startingClosePrice = values.getFirstElement().getClose()
        endingClosePrice = values.getLastElement.getClose()
        percentChange = (endingClosePrice - startingClosePrice)/startingClosePrice

        # get volume
        volumeValues = values.getVolumes()
        totalVolume = 0
        count = 0
        for each volume in volumeValues:
            totalVolume += volume
            count += 1
        averageVolume = totalVolume/count

        # get minimum low price
        lowValues = values.getLowValues()
        minLow = infinity
        for low in lowValues:
            minLow = min(minLow, low)

        # get maximum high price
        highValues = values.getHighValues()
        maxHigh = - infinity
        for high in highValues:
            maxHigh = max(maxHigh, high)

        # add this item to result list
        startingDate = values.getFirstElement().getYear()
        endingDate = values.getLastElement().getYear()
        if startingDate == 1998 and endingDate == 2018:
            obj = {ticker, percentChange, minLow, maxHigh, averageVolume}
            result.append(obj)
    
    cleanup()
        sortedResult = sortByPercentChange(results, reverse=True)
        for i in range(10):
            Emit(sortedResult.getItem(i))
```

### 3.3 Implementazione in Hive

Vengono definite complessivamente 6 viste:

- `ticker_min_max_avg`, contenente il minimo valore di `low` e il massimo valore di `high` per ogni `ticker`
- `ticker_min_data` e `ticker_close_min_data`, contenenti la data meno recente e il corrispondente valore di `close` per ciascun `ticker` rispettivamente
- `ticker_max_data` e `ticker_close_max_data`, contenenti la data più recente e il corrispondente valore di `close` per ciascun `ticker` rispettivamente
- `ticker_percentuale`, che calcola l'incremento percentuale per ciascun `ticker` a partire dalle viste precedenti

Infine viene effettuato un join tra le viste `ticker_min_max_avg` e `ticker_percentuale` per ottenere il risultato richiesto.

```SQL

CREATE VIEW IF NOT EXISTS ticker_min_max_avg AS 
SELECT ticker, min(low) AS min_price, max(high) AS max_price, avg(volume) AS avg_volume 
FROM historical_stock_prices 
WHERE YEAR(data)>=1998 AND YEAR(data)<=2018 GROUP BY ticker;

CREATE VIEW IF NOT EXISTS ticker_min_data AS 
SELECT ticker, min(TO_DATE(data)) AS min_data 
FROM historical_stock_prices 
WHERE YEAR(data)==1998 
GROUP BY ticker;

CREATE VIEW IF NOT EXISTS ticker_max_data AS 
SELECT ticker, max(TO_DATE(data)) AS max_data 
FROM historical_stock_prices 
WHERE YEAR(data)==2018 
GROUP BY ticker;

CREATE VIEW IF NOT EXISTS ticker_close_min_data AS 
SELECT h.ticker, h.data, h.close 
FROM ticker_min_data AS t, historical_stock_prices AS h 
WHERE h.ticker=t.ticker AND h.data=t.min_data;

CREATE VIEW IF NOT EXISTS ticker_close_max_data AS 
SELECT h.ticker, h.data, h.close 
FROM ticker_max_data AS t, historical_stock_prices AS h 
WHERE h.ticker=t.ticker AND h.data=t.max_data;

CREATE VIEW IF NOT EXISTS ticker_percentuale AS 
SELECT mi.ticker, ((ma.close-mi.close)/mi.close) AS inc_perc 
FROM ticker_close_max_data AS ma join ticker_close_min_data AS mi on ma.ticker=mi.ticker;

INSERT OVERWRITE LOCAL DIRECTORY 'output/'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t' 
SELECT a.ticker, b.inc_perc, a.min_price, a.max_price, a.avg_volume 
FROM ticker_min_max_avg AS a join ticker_percentuale AS b on a.ticker=b.ticker 
ORDER BY b.inc_perc DESC limit 10;
```

### 3.4 Implementazione in Spark

Si descrive ora l'implementazione del job in Spark. Vengono principalmente definiti i seguenti `RDD`:

- `min_ticker_low`, contenente per ciasun ticker il prezzo di chiusura minimo
- `max_ticker_high`, contenente per ciasun ticker il prezzo di chiusura massimo
- `avg_ticker_volume`, contenente per ciascun ticker il volume medio giornaliero
- `min_data_close`, che associa ad un dato ticker il prezzo di chiusura relativo alla data meno recente
- `max_data_close`, che associa ad un dato ticker il prezzo di chiusura relativo alla data più recente

Si effettua poi il join tra `min_data_close` e `max_data_close` per ottenere `join_inc_perc`. 

A partire da `join_inc_perc` viene calcolato l'incremento percentuale per ciascun ticker (`inc_perc`). 

Infine viene fatto il join tra `inc_perc`,`avg_ticker_volume`, `min_ticker_low` e `max_ticker_high`, per poi ordinare il risultato in base all'incremento percentuale (in ordine decrescente) estraendo infine i primi 10 elementi di tale `RDD` ordinato.

```python
input = leggi tutte le righe del file historical_stock_prices.csv
        con data compresa tra il 1998 e il 2018

min_ticker_low = input.map(riga → (ticker, low))
		              .reduceByKey(min(low1, low2))

max_ticker_high = input.map(riga → (ticker, high))
			           .reduceByKey(max(high1, high2))

avg_ticker_volume = input.map(riga → (ticker, (volume,1)))
                         .reduceByKey((volume1+volume2, count+1))
                         .map(riga→(ticker, TotVolume/count))

min_data_close = input.map(riga → (ticker, (close, data)))
                      .reduceByKey(minimo((data1,close1), (data2,close2)))
                      .filter(data.year == "1998")	

max_data_close = input.map(riga → (ticker, (close, data)))
                      .reduceByKey(massimo((data1,close1), (data2,close2)))
                      .filter(data.year == "2018")

join_inc_perc = min_data_close
                    .join(max_data_close)
    
inc_perc = join_inc_perc
                .map(riga → (ticker, (maxclose-minclose)/minclose))

result = max_ticker_high
            .join(min_ticker_low)
            .join(inc_perc)
            .join(avg_ticker_volume)
            .sortBy(incremento percentuale decrescente)
            .take(10)
```

### 3.5 Risultati

Si mostra ora l'output restituito per il primo job:

```
MNST  163340.387616%  0.0305979158729  70.2200012207  7347898.8208
AMZN  38328.032677%   4.14583349228    1925.0         7868702.73287
AAPL  37146.0319467%  0.482142865658   219.179992676  121398558.199
CTSH  36312.8011611%  0.145833328366   85.0999984741  6272137.93307
CELG  24924.0000849%  0.171875         147.169998169  8002695.57352
WP    24012.4988778%  0.0500000007451  96.5100021362  1270066.16934
MED   13733.2303561%  0.0936999991536  229.199996948  223768.309139
NVR   11786.7001488%  21.625           3700.0         56463.7413395
ANSS  10077.1432059%  1.375            184.949996948  482841.405197
TSCO  9508.67816472%  0.40625          97.25          1592298.53705
```




### 3.6 Grafici

## 4. Job 2

### 4.1 Specifiche
Realizzare job che sia in grado di generare, per ciascun settore, il relativo “trend” nel periodo 2004-2018 ovvero un elenco contenete, per ciascun anno nell’intervallo: (a) il volume complessivo del settore, (b) la percentuale di variazione annuale (differenza percentuale arrotondata tra la quotazione di fine anno e quella di inizio anno) e (c) la quotazione giornaliera media. N.B.: volume e quotazione di un settore si ottengono sommando i relativi valori di tutte le azioni del settore.

### 4.2 Implementazione in MapReduce

**Mapper**:

Leggiamo dapprima, utilizzando la _Distributed Cache_ (https://hadoop.apache.org/docs/r3.0.0/api/org/apache/hadoop/filecache/DistributedCache.html) il file `historical_stocks.csv`, per poi definire, a partire dallo stesso file, una struttura dati (`tickerToSectorMap`) che associa a ciascun `ticker` il corrispondente settore (escludendo i `ticker` privi di un corrispondente settore).

La struttura dati così creata verrà usata per poter poi effettuare in seguito il "join" con i record del file `historical_stock_prices.csv` associando a ciascun record il settore del `ticker` corrispondente.

In fase di mapping estrapoliamo da ciascun record i campi `ticker`, `close`, `volume` e `date`, verifichiamo che il record sia relativo ad un anno che ricada nell'intervallo 2004-2018 per poi stampare i campi estratti da quest'ultimo. In particolare la chiave è composta da due campi: `ticker`, usata come chiave primaria, e `date` usata come chiave secondaria. I valori sono invece i campi `close`, `low`, `high` e `volume`. In questo modo, in fase di reduce, i valori associati a ciascun `ticker` saranno ordinati per data, in maniera tale da evitare comparazioni tra date ai fini del calcolo della differenza percentuale.