**Progetto di Technologies for Advanced Programming "_TAP US30_"**

**BATCH**

La fase di batch è considerata la fase iniziale del progetto. Essa pone la base con la quale sarà possibile visualizzare i dati storici di ciascun settore, insieme ad una finestra temporale entro la quale saranno collocate le previsioni calcolate in questa fase.

CLIENT PHP

Il primo passo della pipeline è quello di creare un client che s'interfacci
con l'API di Polygon e faccia richiesta dei dati, considerando la sola giornata 
precedente. Quest'ultimo sarà mandato in esecuzione su un container.Di seguito, 
vediamo come ottenere l'opportuno Dockerfile, da cui potremo creare l'immagine 
che, nel nostro "docker-compose", chiameremo "producer".

In [None]:
FROM alpine
RUN apk update && apk add php composer php-fileinfo
WORKDIR /app
RUN composer require polygon-io/api -W

L'immagine base è quella della versione "alpine" di Linux, scelta in quanto
tra le più "leggere" in termini di spazio sul disco. Il gestore di pacchetti
si chiama "apk", da cui installiamo le librerie php e composer. Eseguiamo il comando "composer" per richiedere
la libreria da utilizzare nel nostro codice php così da accedere all'API.
Dopodichè, una volta partito il container, verrà eseguto da docker-compose il comando che
innesca l'esecuzione del suddetto codice.

*batch_extract.py*

In [None]:
$tickers = array(
    "AXP" => "financial",
    "AMGN" => "health",
    "AAPL" => "tech",
    "BA" => "industrial",
    ...
    ...
    ...
);

$currentDate = new DateTime();
$startingDate = new DateTime();
$interval = new DateInterval('P5Y');
$startingDate->sub($interval);

$rest = new Rest($api_key);

function write_batch($data_array, $ticker, $category, $tickers)
{
    $path = "/data/raw/$category";
    $filename = "$ticker.txt";
    if(!file_exists($path))
        mkdir($path);
    $file = fopen($path."/".$filename, "w") or die("Unable to open file !");
    $results = $data_array["results"];
    foreach ($results as $result) {
        $result["tickerSymbol"] = $ticker;
        fwrite($file, json_encode($result) . "\n");
    }
    fclose($file);
}

foreach ($tickers as $ticker => $category) {
    sleep(1);
    $data = $rest->stocks->aggregates->get(
        $ticker,
        1,
        $startingDate->format('Y-m-d'),
        $currentDate->format('Y-m-d'),
        'day'
    );
    write_batch($data, $ticker, $category, $tickers);
}


Creiamo un array associativo con come chiave il nome dell'azienda e come valore il settore in cui
opera. Dichiariamo poi un oggetto di tipo "Rest" sul quale chiameremo una serie di
funzioni per prelevare i dati che ci occorrono. Il più importante è il metodo "get"
che richiede come parametri il nome dell'azienda, un intero, la data di inizio e 
quella di fine entro cui prelevare le informazioni e l'unità di tempo, che indichiamo
come il singolo giorno. In un ciclo for avanzato, scorriamo l'array associativo. ogni
12 secondi, inviamo una richiesta attraverso l'API. Salviamo queste informazioni su una
variabile che viene passata alla nostra funzione "write_batch", insieme al nome 
dell'azienda e alla categoria a cui appartiene, aggiungendo poi anche l'array
associativo stesso.

Dopo aver proceduto, alla creazione della cartella e del file, si salva in una variabile
l'array alla posizione "results" dell'array ottenuto dalla get. Si effettua un ciclo su
tutte le n-uple che caratterizzano i dati della specifica azienda, a partire dall'array
salvato nella variabile e, dopo aver aggiunto il ticker symbol in questione (dato che 
altrimenti non sarebbe presente per ogni n-upla, il che creerebbe dei problemi per il 
successivo processing), si scrive sul file appena creato il contenuto dell'array, 
sottoforma di JSON. Fuori dal ciclo, si procede alla chiusura del file.
Quello che otterremo è quindi la presenza di 6 cartelle, una per ogni settore a cui le
30 aziende appartengono, e ognuna di queste cartelle avrà dei file con estensione "txt" 
composti da n JSON, uno per ogni giorno in cui ogni azienda opera nel mercato azionario.

DATA PROCESSING

*process.py*

In [None]:
spark = SparkSession.builder.appName("tapus30").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")

names=["cgoods","financial","energy","health","industrial","tech"]
indexes=[Elasticsearch("http://es_cgoods:9200"),Elasticsearch("http://es_financial:9200"),Elasticsearch("http://es_energy:9200"),Elasticsearch("http://es_health:9200"),Elasticsearch("http://es_industrial:9200"),Elasticsearch("http://es_tech:9200")]
prediction_data=[]
historical_data=[]
day_in_ms = 86400000
window_size = 12

Innanzitutto, apriamo una sessione di Spark, scegliendo come master il nostro stesso container. Specifichiamo il nome del nostro driver e salviamo le impostazioni sull'oggetto "spark". Su questo verra richiamato "sparkContext" per l'apertura effettiva del context e salviamo ciò che ci viene restituito sulla variabile "sc", su cui chiameremo poi il metodo "setLogLevel", per decidere il tipo di logs che vogliamo siano prodotti all'esecuzione del codice.
Creiamo poi un array con i nomi dei settori, che saranno anche i nomi degli indici su elasticsearch. Dichiariamo un array di oggetti Elasticsearch attraverso cui, successivamente, potremo effettivamente inviare i dati alle sue sei diverse istanze, ancora una volta una per settore. 
Dichiariamo poi due array vuoti, che saranno popolati dai dati restituitici dal modello di machine learning che useremo (prediction_data) e dai dati che preleveremo dal nostro file system, relativi ai 2 anni di operazioni in borsa (historical_data). 
Impostiamo un offset fondamentale per operare con i timestamp (ovvero le date espresse in millisecondi, a partire dall'omonimo campo all'interno dei JSON), cioè quello che ci consente di aggiungere un giorno alla data da cui partiamo. Altra importante variabile è quella "window_size", che ci permetterà di eseguire la funzione ricorsiva che vedremo per implementare una sliding window di 12 giorni, base su cui il modello via via restituirà l'ultima previsione. 

In [None]:
for name in names:
    df = spark.read.JSON("/data/"+name)
    assembler = VectorAssembler(inputCols=['open','high','low'],outputCol='features')
    output = assembler.transform(df).select('features','close','tickerSymbol','timestamp')
    lr = LinearRegression (featuresCol='features',labelCol='close',maxIter=10,regParam=0.3,elasticNetParam=0.7)
    trained_model = lr.fit(output)
    print("processing data for "+name+"...")
    # Performs the recursive prediction based on the trained_model that we have just created
    predictions = recursive_prediction(df, trained_model)
    print("...done")
    # If we have previously executed the batch process (this file), 
    # we have to overwrite the model with fresh data
    lr.write().overwrite().save("/models/"+name)
    historical_data.append(format_data(output,"close").toPandas().to_dict(orient="records"))
    prediction_data.append(format_data(predictions.withColumnRenamed("close","prediction"),"prediction").toPandas().to_dict(orient="records"))

Eseguiamo un ciclo for sull'array di nomi dei settori. In esso, carichiamo su "df" i files contenuti nella cartella del settore su cui ci troviamo. Costruiamo un oggetto di tipo "VectorAssembler" che non restituirà altro che uno schema, il quale condenserà i dati contenuti nei campi "open", "high" e "low" in un unico vettore, chiamato features, che si passa legittimamente al modello per il machine learning.
Passiamo questo assembler al modello, chiamando su di esso il metodo "transform" e passando il dataframe appena caricato come argomento. Selezioniamo, sull'oggetto risultante, le colonne "features", "close", "tickerSymbol" e "timestamp" e carichiamo questo oggetto sulla variabile "output".
Applichiamo l'algoritmo di linear regression a questo dafaframe "output", dapprima ottenendo "lr", che non sarà altro che un oggetto contenente le impostazioni dei parametri di cui ha bisogno la linear regression, (nello specifico, il contenuto del vettore appena creato sarà alla base della previsione, il cui valore sarà riportato nella colonna "close", ed eseguiremo un massimo di 10 iterazioni nell'allenamento). Poi salveremo il modello a seguito del suo training in "trained_model", ottenuto dalla chiamata del metodo "fit" su "lr", passando come parametro "output".
Abbiamo tutto ciò che ci occorre per chiamare la funzione "rucursive_prediction", a partire dal dataframe iniziale e dal modello appena allenato. La funzione ci restituirà il dataframe "prediction", che conterrà il valore "close" per gli 11 giorni previsti a partire dalla sliding window, che inzia dagli ultimi 12 giorni di dati storici. 
Per salvare il modello "lr" nell'apposita cartella contenente il model dello specifico settore. chiamiamo sulla sua variabile il metodo "write", poi "overwrite" e poi "save" con il path su cui salvarlo.
Popoliamo adesso quei due array vuoti specificati all'inizio del codice, che conterranno oggetti di tipo dictionary (i quali elasticsearch potrà accettare), che nel primo caso saranno costituiti dalle n_uple del settore specifico corrispondenti alla colonna "close", nell'altro saranno corrispondenti alla colonna "prediction". 

In [None]:
for i in range(len(indexes)):
    print("sending index "+names[i]+" to elasticsearch...")
    save_and_send_data("prediction",prediction_data,names[i],indexes[i])
    save_and_send_data("historical",historical_data,names[i],indexes[i])

Per ognuno degli oggetti di tipo Elasticsearch nell'array indexes, inviamo la specifica di quale dei due indici, due per settore, vogliamo popolare, dell'oggetto di tipo dictionary, del nome del settore a cui è associato e del nome dell'oggetto elasticsearch a cui vogliamo inviare i dati.

In [None]:
def format_data(dataframe, close_name):
    return dataframe.groupBy("timestamp").avg(close_name).sort("timestamp").withColumn("timestamp",predictions.timestamp / 1000).withColumn("timestamp", col("timestamp").cast(TimestampType())).\
    withColumn("timestamp",date_format(col("timestamp"),"yyyy-MM-dd")).withColumn("avg("+close_name+")",round(col("avg("+close_name+")"),2))

Lo scopo di questa funzione, chiamata nel momento in cui aggiungiamo agli array di oggetti dictionary, è di restituire un dataframe ben formattato da trasformare nel suddetto tipo. 
Raggruppiamo le sue n-uple per timestamp, eseguiamo una media aritmetica dei valori del campo close per l'uno e prediction per l'altro, ordiniamo gli ormai singoli valori (a seconda del settore) in modo decrescente, sostituiamo il timestamp (dovrebbe essere chiamato su dataframe, no?) ottenendolo in secondi e castiamo questi valori al formato "anno, mese e giorno". Dall'altra parte, rinominiamo la colonna che contiene i valori di close o di prediction in modo da contestualizzare il fatto che si tratti ora di una media di quei valori, dopodichè arrotondiamo il valore della media alla seconda cifra decimale.

In [None]:
def epoch_ms_to_weekday(epoch_time_in_ms):
    dt_object = datetime.datetime.fromtimestamp(epoch_time_in_ms/1000)
    # Get the weekday as an integer (Monday is 0 and Sunday is 6)
    weekday_number = dt_object.weekday()
    return weekday_number

Lo scopo di questa funzione, come è apparente dal suo nome, è di trasformare il nostro timestamp (espresso in millisecondi) in un giorno preciso della settimana. Per fare questo, usiamo il metodo "fromtimestamp" della classe "Datetime", inserendo il timestamp in secondi come parametro. Salviamo l'oggetto restituito in "dt_object", poi chiamiamo su quest'ultimo oggetto il metodo "weekday" e sulla variabile "weekday_number" avremo il formato della data che volevamo, la quale restituiremo.

In [None]:
def recursive_prediction(dataframe, model, prediction=None, nIter=window_size):
    max_t = int(dataframe.select(F.max("timestamp")).first()[0])
    curr_weekday = epoch_ms_to_weekday(max_t+day_in_ms)
    day_offset = day_in_ms
    if curr_weekday > 4:
        day_offset = day_offset*(8-curr_weekday)
    
    window_spec = Window.partitionBy("tickerSymbol").orderBy(col("timestamp").desc())
    df_with_row_number = dataframe.withColumn("row_num", row_number().over(window_spec)).select("tickerSymbol","open","high","low","timestamp","close")
    result_df = df_with_row_number.filter(col("row_num") < window_size).drop("row_num")
    temp_df = result_df.groupBy("tickerSymbol").agg(F.avg("open").alias("open"),F.avg("high").alias("high"),F.avg("low").alias("low"),F.any_value("timestamp").alias("timestamp"),F.avg("close").alias("close"))
    assembler = VectorAssembler(inputCols=['open','high','low'],outputCol='features')
    temp_df = assembler.transform(temp_df)

    if(prediction != None):
        print("Completion "+str(int(((window_size-nIter+1)/window_size)*100))+"%")
        prediction = prediction.withColumn("timestamp",lit(max_t+day_offset))
        result_df = result_df.union(prediction.withColumnRenamed("prediction","close").select("tickerSymbol","open","high","low","timestamp"\
        ,"close")).distinct()
        nIter = nIter - 1
    prediction = trained_model.transform(temp_df).select("tickerSymbol","open","high","low","timestamp","prediction","features")
    if nIter == 0 and result_df is not None:
        return result_df
    else:
        return recursive_prediction(dataframe=result_df,model=trained_model,prediction=prediction,nIter=nIter)


Passiamo ora al cuore di questo codice, ovvero alla funzione al termine della quale avremo a disposizione il nostro dataframe *prediction* con tutti i valori della *close* predetti fino a un giorno in meno della sliding windows di valori storici e non su cui ci basiamo (importante ricordare che questi valori non siano quelli dei singoli giorni, ma le loro medie, in base al settore).

Innanzitutto, la prima volta in cui la funzione ricorsiva sarà chiamata, ad essere passato sarà il dataframe originale, quello che viene fuori da tutti i file "txt" presenti nelle cartelle che indicano i settori. Come secondo parametro, passeremo il modello già allenato. Poi abbiamo due parametri di default, l'uno *prediction* che si riferisce al dataframe delle predizioni mano a mano risultante, l'altro *nIter*, inizialmente impostato al valore di *window_size*, ovvero l'arco di tempo della nostra sliding window.

Il primo passo consiste nell'estrarre il timestamp con il valore più alto, dunque il valore del giorno più recente, e salvarlo in *max_t*. Per ottenere da questo il valore del giorno della settimana successivo, passiamo quel timestramp, sommato ai millisecondi che caratterizzano il trascorrere di un giorno, alla funzione *epoch_ms_to_weekday* già discussa. 
Dopo aver salvato un'unità di tempo (il giorno in millisecondi) su *day_offset*, verifichiamo se il giorno più recente sia maggiore del quarto giorno della settimana, ovvero oltre giovedì. Se è così, il *day_offset*, piuttosto che rimanere a un giorno, andrà a rappresentare il numero di giorni che servono per mappare opportunemente i giorni di fine settimana mancanti dai dati ai primi due giorni della settimana successiva.

Su *window_spec* carichiamo un oggetto di tipo *Window* ovvero un oggetto recante le caratteristiche che una finestra dovrebbe assumere. Nello specifico, noi vogliamo che esista una finestra per ogni ticker, che contenga tutte le n-uple relative a quel ticker, ordinate per timestamp. Il nostro obiettivo iniziale è avere una numerazione delle n-uple, quindi dei giorni, in ordine decrescente di data. Infatti in "df_with_row_numnber", salviamo il dataframe che, a partire da quello originale, abbia anche la colonna *row_num*, basata sullo schema presente nell'oggetto *window_spec*, e per di più ad esclusione di tutti i campi non necessari alla nostra analisi. Infatti, comprenderemo solo *tickerSymbol*,*open*,*high*,*low*,*timestamp* e *close*.
Su *result_df* avremo il dataframe che, sulla base del "row_num" del precedente, conterrà solo le n-uple relative agli ultimi tot giorni, in base alla window size, e che saranno prive del campo "row_num" stesso, ormai non utile per noi. 

A questo punto, creiamo un *temp_df* che, per ogni ticker, a partire delle tot n-uple, avrà una sola n-upla, indicante la media dei valori di ogni campo presente su *result_df*.
Per procedere con l'applicazione del modello di machine learning, abbiamo bisogno che tutti questi valori in *temp_df* confluiscano all'interno di un vettore, corrispondente alla colonna "features". Dunque ora *temp_df* conterrà quella colonna con il vettore.
A questo punto, se abbiamo passato alla funzione un dataframe "prediction" non nullo, salviamo sulla n-upla che lo caratterizza il timestamp opportuno in base al timestamp più recente presente in *max_t*, in modo che il valore sia considerato una stringa,
Dopo ciò, aggiungiamo questa n-upla al nostro *result_df*, rinominando prima la colonna *prediction* (in prediction) in *close*, in modo che si possa operare una union tra *result_df* e *prediction* (quest'ultimo in quanto dataframe). Decrementiamo poi il numero di iterazioni della funzione.

A prescindere dall'esistenza o meno di *prediction*, creiamolo o reinizializziamolo perchè contenga la predizione sulla giornata immediatamente successiva, basata sulle medie presenti in *temp_df*. Lo facciamo chiamando in causa il modello già allenato e passandogli, appunto, *temp_df*.

Data la ricorsività della funzione, ci occorre identificare un caso base e prevedere che operazioni compiere nell'ambito di esso e poi del passo ricorsivo. 
Il nostro caso base occorre quando il numero di iterazioni è pari a 0 e *result_df* non è nullo. Dunque, restituiamo *result_df*, che non sarà altro che il dataframe finale, contenente, oltre i dati storici, un giorno in meno di predizioni rispetto alla sliding window prevista.
Il passo ricorsivo viene eseguito altrimenti, richiamando recursive_prediction passando l'attuale *result_df* quale *dataframe*, il *trained_model* quale *model*, il dataframe *prediction* quale *prediction* e l'attuale *nIter* quale *nIter*.

**STREAMING**

La fase streaming è la seconda ed ultima fase del progetto. Essa consente la visualizzazione dei dati prodotti dalla prima fase, che saranno sovrapposti ad un flusso di dati in streaming relativi alla giornata di oggi. 

CLIENT PHP

Similarmente alla fase batch, ci avvaliamo di un client PHP per la richiesta dei dati all'API. Le differenze principali saranno discusse di seguito;

In [None]:
$tickers = array(
    "AXP" => "financial",
    "AMGN" => "health",
    "AAPL" => "tech",
    "BA" => "industrial",
    ...
    ...
    ...
);

$currentDate = new DateTime();
$startingDate = new DateTime();
$interval = new DateInterval('P2D');
$startingDate->sub($interval);

$rest = new Rest($api_key);

function write_batch($data_array, $ticker, $category, $tickers)
{
    $path = "/data/raw/$category";
    $filename = "$ticker.txt";
    if (!file_exists($path))
        mkdir($path);
    $file = fopen($path . "/" . $filename, "w") or die("Unable to open file !");
    $results = $data_array["results"];
    foreach ($results as $result) {
        usleep(250000);
        $result["tickerSymbol"] = $ticker;
        fwrite($file, json_encode($result) . "\n");
    }
    fclose($file);
}

while (true) {
    foreach ($tickers as $ticker => $category) {
        $data = $rest->stocks->aggregates->get(
            $ticker,
            15,
            $startingDate->format('Y-m-d'),
            $currentDate->format('Y-m-d'),
            'minute'
        );
        sleep(1);
        // var_dump($data);
        write_batch($data, $ticker, $category, $tickers);
    }
    sleep(60*15);
}

La richiesta dei dati viene fatta su un arco temporale molto più ristretto, al più di due giorni. Tale richiesta verrà riproposta ogni 15 minuti, ragione per il quale effettuiamo una sleep esattamente di tale valore.

Nel momento dell'apertura della borsa, l'API sarà in grado di fornirci i dati del giorno attuale, che ci consentiranno di visualizzare in tempo quasi reale l'andamento del mercato in relazione al settore scelto. Per tale motivo la granularità dei dati arrivatici è di 15 minuti.

*DATA INGESTION*

Abbiamo scelto di utilizzare Fluentd come framework di DI. Esso permette 
con facilità di accedere a files salvati nel proprio file system e usarli 
quali source.

*Dockerfile*

In [None]:
FROM fluentd
USER root
RUN apk add ruby-dev
RUN gem install fluent-plugin-kafka --no-doc

Partiamo dall'immagine "vergine" di Fluentd, scaricata direttamente dalla
repository pubblica, per poi installare ruby e le sue relative librerie di header per la compilazione di pacchetti relativi a ruby, e tramite il gestore di pacchetti di Ruby installiamo il plugin che permette lo scambio di dati tra fluentd e kafka. L'immagine risultante sarà chiamata "fluentkafka".

*fluent.conf*

La tappa più importante, dopo aver creato opportunemente il
Dockerfile e dunque aver avuto a disposizione l'immagine per il container,
è quella di scrivere correttamente il file di configurazione. 
Esistono due tag fondamentali che lo caratterizzano, ovvero "\<source\>" e 
"\<match\>", rispettivamente a indicare l'input e l'output della data 
ingestion. 
Source.

L'annotazione "@type tail" si utilizza per indicare che si intenda aprire
un file. Questi vengono letti, come suggerisce il nome stesso, dall'ultima
riga alla prima a meno che, come abbiamo fatto, non si setti la variabile
read_from_head a "true".  
I files da cui stiamo leggendo, come già spiegato nella sezione CLIENT PHP,
sono caratterizzati da un JSON per ogni riga. Dunque, indicheremo "format
JSON" affinchè vengano riconosciuti da fluentd come tali.
Visto che abbiamo scelto di suddividere le aziende in base al settore,
continuiamo a mantenere questi gruppi specificando una source per ognuno
di essi. Qualora non li taggassimo opportunemente, non riusciremmo ad 
utilizzarli in modo specifico all'interno del match. Per cui, per ognuno
di essi, scriveremo "tag" seguito dal nome del settore.
Infine, sarà fondamentale indicare il path da cui prelevarli, così come
un path temporaneo, su "pos_file", che non indicherà altro che la cartella
temporanea in cui si immagazzineranno i dati prima del loro invio in output.
Match.

L'annotazione "@type kafka2" indicherà che l'output verrà reindirizzato a 
kafka. Ciò è possibile in virtù dell'installazione del plugin che connette
fluentd a kafka, da Dockerfile. 
Il suddetto plugin ci permetterà di specificare due unità fondamentali per
kafka, ovvero i brokers e i topics. Abbiamo scelto, per leggibilità, di 
chiamare ogni broker come "k-[nomebroker]" e di bindarlo con la porta
"9092", la quale sarà mappata a una porta effettiva al di fuori del container
dell'istanza di kafka, che coincide con il broker stesso 
(si veda il docker_compose). Come nome del topic, avremo lo stesso nome
del tag specificato in "\<source\>". Avremo quindi un "\<match\>" per
ogni "\<source\>". Per specificare quali dati in input inoltreremo, il tag
sarà "\<match [nometag]\>".
Risulta necessario specificare anche il tipo di file in output, attraverso il
tag "@type JSON" all'interno del tag "\<format\>", rigorosamente annidato dentro
il match.

*DATA DISTRIBUTION*

Al fine di smistare opportunemente i dati acquisiti, abbiamo usato Apache Kafka. Le unità fondamentali tipiche di questo servizio sono i brokers, i topics, i producers e i consumers. La nostra scelta è stata di creare, come detto, 6 topic, forniti da altrettanti brokers. Ai fini della pipeline, non è necessario istruire producers e consumers (per debug, così da accertarci che i dati effettivamente fossero inviati da fluentd, abbiamo passato argomenti al file "sh" del consumer affinchè stampasse su console il contenuto di questi JSON)

*Dockerfile*

In [None]:
FROM bitnami/kafka
RUN rm -f /bitnami/kafka/data/.lock
COPY init.sh /opt/bitnami/scripts/kafka
COPY init /opt/bitnami/scripts/kafka
CMD ["/opt/bitnami/scripts/kafka/init"]

Ognuna della 6 istanze di kafka, identificate da un broker per ciascuna, si basa su questa immagine. Dopo aver scaricato l'immagine ufficiale di kafka, si aggiunge lo script "init.sh" all'interno del file system del container, così come "init" (il file compilato basato sul codice in c che ci permette un'opportuna temporizzazione tra il deployment del server e la creazione dei topics, rigorosamente l'una dopo l'altra, eseguendo prima lo script run.sh e poi init.sh) e si esegue "init".
L'immagine risultante sarà chiamata "kafka_init".

*Zookeeper e ZooNavigator*

Per navigare agilmente tra l'ampia offerta di servizi di Kafka, è sempre necessario che con i server kafka si attivi anche un server zookeeper e un altro server, nel nostro caso "zoonavigator", che consenta l'accesso via browser dell'interfaccia grafica di questo. 

*Dockerfile*

In [None]:
FROM zookeeper
RUN /apache-zookeeper-3.9.1-bin/bin/zkCli.sh << "delete /brokers/ids/*"

Questo Dockerfile parte dall'immagine ufficiale di zookeeper. L'immagine che costituisce sarà chiamata "zookeeper_clean"

Lo scopo di zookeeper_clean è di assicurarsi che non vi siano broker già presenti all'avvio, richiamando così la funzione di cancellazione offerta dall'apposito script integrato.

DATA PROCESSING

process.py

In [None]:
topics = {
    "cgoods":0,
    "financial":1,
    "energy":2,
    "health":3,
    "industrial":4,
    "tech":5
}

number_of_items = {
    "cgoods":5,
    "financial":5,
    "energy":1,
    "health":6,
    "industrial":5,
    "tech":8
}

# Definire lo schema dei dati
schema = StructType([
    StructField("v", LongType(), True),
    StructField("vw", DoubleType(), True),
    StructField("o", DoubleType(), True),
    StructField("c", DoubleType(), True),
    StructField("h", DoubleType(), True),
    StructField("l", DoubleType(), True),
    StructField("t", LongType(), True),
    StructField("n", LongType(), True),
    StructField("tickerSymbol", StringType(), True),
    StructField("volume", LongType(), True),
    StructField("open", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("timestamp", LongType(), True),
    StructField("numberOfItems", LongType(), True)
])

# Definire i topic di Kafka da cui leggere i dati
names=["cgoods","financial","energy","health","industrial","tech"]

# Creare un DataFrame per ogni topic
dataframes = [0,1,2,3,4,5]
averages = [1,2,3,4,5,6]
threads = []
historical_data = []

E' importante inizializzare una serie di strutture dati. Si parte con un array associativo "topics", che correla la chiave (il nome del topic) con un valore da 0 a 5. Ci servirà per indirizzare opportunemente operazioni compiute su altre strutture che vedremo, sulla base del nome del topic. L'altro array associativo "number_of_items" correla i topics con il numero di aziende che ne fanno parte. Lo "schema" contiene le istruzioni per strutturare opportunamente la singola n-upla. Invece "dataframes" è un array che per ora inizializziamo con gli interi e successivamente re-inizializziamo, come vedremo. Stesso varrà per "averages", mentre "threads" contiene i riferimenti ai 6 threads che creeremo, sempre uno per settore. Infine "historical_data" conterrà oggetti di tipo dictionary che potremo inviare ad elasticsearch.

In [None]:
for i in range(6):
    thread = threading.Thread(target=writeKafkaStreamingData, args=(dataframes,names[i]))
    threads.append(thread)

for thread in threads:
    thread.start()

time.sleep(15) # Diamo tempo ai container di avviarsi

load_indexes()

for i in range(6):
    averages[i] = dataframes[i].select(col("data.close"),col("data.timestamp"),col("topic"))
    # averages[i].printSchema()
    averages[i] = averages[i].groupBy("timestamp").avg("close").sort("timestamp").withColumn("timestamp",averages[i].timestamp / 1000).withColumn("timestamp", date_format(col("timestamp").cast(TimestampType()), "yyyy-MM-dd'T'HH:mm:ss.SSSZ")).withColumn("avg(close)",round(col("avg(close)"),2))
    averages[i] = averages[i].writeStream.outputMode("complete").queryName(names[i]).format("memory").start()
while(True):
    for i in range(6):
        temp_sdf = spark.sql("SELECT * FROM "+names[i])
        if temp_sdf.count() > 0:
            historical_data.append(temp_sdf.toPandas().to_dict(orient="records"))
            save_and_send_data(historical_data,names[i],indexes[i])

Il primo ciclo for, come detto, origina i threads, specificando per ognuuno quale sia la funzione che debbano eseguire e gli argomenti di cui quella funzione necessiti. Ogni nuova istanza si aggiunge all'array "threads". 
Il secondo ciclo for, per ogni "thread", lo starta. Si esegue poi una sleep che consenta a tutti gli altri containers, utili all'esecuzione dei passi precedenti della pipeline, di essere eseguiti.
Dopo aver chiamato la funzione "load_indexes" si esegue un altro ciclo for, alla fine del quale ognuna delle posizioni dell'array "averages" viene inizializzata con i dataframes che conterranno i dati pervenutici dallo streaming (non sono dei veri e propri dataframes, ma degli oggetti di tipo "streamingQuery", atti a mantenere lo streaming attivo in modo da scrivere su delle tabelle contenute nell'oggetto "spark", poi accessibili attraverso codice sql). Questi contengono due colonne, l'una quella del timestamp (la cui data sarà visualizzata nel formato standard, così come l'ora) e l'altra, "avg(close", quella del valore medio a chiusura (risultato della media aritmetica di tutti i prezzi a chiusura delle aziende dello specifico settore) rispetto al timestamp.
Nell'ultimo ciclo infinito, scelto proprio perchè è previsto che i dati continuino ad arrivare, si esegue un ciclo for in cui, per ogni topic, si inizializza un dataframe chiamato "temp_sdf" con lo specifico dataframe delle medie (come abbiamo detto, accedendo all'oggetto "spark" attraverso la funzione sql e referenziando la tabella del topic). Se questo dataframe contiene almeno una n-upla, lo si aggiunge all'array "historical_data" attraverso la funzione "append", come oggetto di tipo "dictionary". Dopodichè si utilizza la solita funzione "save_and_send_data" per inviare questi oggetti a elasticsearch.

In [None]:
def writeKafkaStreamingData(dataframes,topic):
    print("Executing "+topic+" thread...")
    dataframes[topics[topic]] = subscribeToTopic(dataframes,topic)

Questa funzione, che verrà eseguita dal singolo thread, inizializza l'array "dataframes" con i dataframes ottenuti dopo le sottoscrizioni ai topics di kafka.

In [None]:
def subscribeToTopic(dataframes,topic):
    df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "k-cgoods:9092,k-energy:9092,k-financial:9092,k-health:9092,k-industrial:9092,k-tech:9092") \
    .option("subscribe", topic) \
    .load().select(from_json(col("value").cast("string"), schema).alias("data"),col("topic").cast("string"))
    return df

Quì ci iscriviamo (inizializzando un nuovo dataframe, che poi restituiremo) usufruendo del plugin che collega kafka a spark, specificando da un lato quali siano i brokers per i quali ci terremo in ascolto, dall'altro il singolo topic a cui uno di questi broker si riferisce. Il dataframe sarà ricavato sulla base del json costituente i dati presenti nel topic, il quale li contiene nell'attributo "value". Dopo aver effettuato il cast a "string" e applicato su di essi lo "schema", dunque dopo averli predisposti a rappresentare un dataframe vero e proprio, rinominiamo la colonna "value" con il nome "data". Inoltre, castiamo la colonna "topic" a "string" e selezioniamo sia questa che la precedente in modo che il dataframe finale contenga solo il contenuto di entrambe.

In [None]:
def load_indexes():
    for name in names: # Ricostituiamo i dati storici + previsioni fornite dal lato batch
        historical = open("/indexes/"+name+"_historical.txt")
        prediction = open("/indexes/"+name+"_prediction.txt")
        for line in historical:
            indexes[topics[name]].index(index=name+"_historical",document=json.loads(line),timeout="30s") 
        for line in prediction:
            indexes[topics[name]].index(index=name+"_prediction",document=json.loads(line),timeout="30s") 

Questa funzione ha il solo scopo di riportare a elasticsearch tutti gli indici già ottenuti con l'esecuzione del batch, in modo da non dover rieseguire quest'ultimo contestualmente. Gli oggetti "historical" e "prediction", a partire da file di tipo "txt" caratterizzanti gli indici (presenti nel nostro file system), si possono scorrere linea per linea, così da ridiventare oggetti di tipo dictionary, inviabili ad elasticsearch.

In [None]:
def save_and_send_data(data, name, es_index):
    for i in range(len(data)):
        for j in range(len(data[i])):
            json_dump = json.dumps(data[i][j])
            es_index.index(index=name+"_streaming", id=j,document=json_dump)


In questo doppio ciclo for, a partire da "data", quale array di oggetti di tipo dictionary (ovvero "historical_data"), dal "name" (che sarà il nome del topic) e dall'oggetto di tipo elasticsearch (che ci permette la creazione di un indice dentro la specifica istanza, come abbiamo già visto nella parte del batch), si scorrono tutti i dataframes dello streaming (il loro numero è la lunghezza di "data") e, per ognuno di loro, si crea un oggetto json, il quale conterrà, per ogni settore, le n-uple che caratterizzano le medie dei prezzi a chiusura in base al timestamp (infatti la lunghezza di "data[i]" non è altro che il numero di n-uple che caratterizza il singolo dataframe a cui si accede).