# DDoS dataset analysis

L'obiettivo di questo progetto è analizzare un _dataset_ contenente informazioni relative sia ad attacchi DDoS che a normale traffico di rete.
Questo viene fatto per poter realizzare un'applicazione capace di distinguere il traffico sospetto da quello legittimo e poter quindi tempestivamente bloccare i tentativi di attacco.

## Descrizione del dataset

Il _dataset_ è stato ottenuto da un _paper_ scritto dalla "University of New South Wales".
Il _paper_ è stato pubblicato su [Science Direct](https://www.sciencedirect.com/science/article/abs/pii/S0167739X18327687) ed è disponibile su [ResearchGate](https://www.researchgate.net/publication/328736466_Towards_the_Development_of_Realistic_Botnet_Dataset_in_the_Internet_of_Things_for_Network_Forensic_Analytics_Bot-IoT_Dataset) in _preprint_.
Come è scritto in esso, l'obiettivo del _paper_ era la realizzazione del _dataset_ stesso, rispettando le condizioni di massimo realismo possibile del traffico generato e della configurazione dell'ambiente in cui gli attacchi simulati sono stati svolti.

Il _paper_ indica anche il fatto che sono state generate diverse tipologie di attacco, ma quelle di interesse sono state solamente quelle inerenti agli attacchi di tipo "Distributed Denial of Service", o "DDoS" in breve.
[Argus](https://openargus.org/) è stato il _software_  utilizzato per effettuare le catture dei pacchetti. La sua documentazione, nonché i suoi [esempi d'uso](https://openargus.org/using-argus), indicano come sono costruiti i _record_ che l'applicazione salva nel momento nel quale viene fatta una cattura di rete.

Ogni _record_ è il risultato di un raggruppamento di più pacchetti che svolgono la stessa funzione all'interno di una specifica connessione, o _flow_.
Ad esempio, un _record_ può contenere i pacchetti utilizzati dal protocollo "TCP" per effettuare l'_handshake_ con un'altro nodo di rete, il corpo della trasmissione, oppure la chiusura finale.
Per questo motivo ogni _record_, oltre a contenere informazioni capaci di identificare sorgente e destinazione della connessione, contengono anche dati derivanti dall'aggregazione dalle informazioni di più pacchetti.
Infine, dacché è possibile risalire dai _record_ alle singole connessioni, così come esplicitato nel _paper_ stesso, sono presenti anche informazioni su alcuni parametri nati dall'aggregazionr di _record_ differenti, informazioni che ci aspettiamo siano replicate uguali tra tutti i _record_ coinvolti.

Il sito in cui il _dataset_ è stato pubblicato è [questo](https://research.unsw.edu.au/projects/bot-iot-dataset), mentre il _download_ dei file può essere fatto dalla [cartella](https://cloudstor.aarnet.edu.au/plus/s/umT99TnxvbpkkoE?path=%2FLabelling) di un servizio _cloud_ della UNSW.
I _file_ che sono stati utilizzati in questo progetto sono quelli denominati "DDoS_HTTP.csv", "DDoS_TCP.csv" e "DDoS_UDP.csv".
Purtroppo, non è possibile effettuare il _download_ diretto di questi.

### Descrizione dei file

I tre _file_ del _dataset_ che sono stati utilizzati contengono tre diverse sotto-categorie di attacchi, ovvero attacchi che inviano messaggi "HTTP", segmenti "TCP" e datagrammi "UDP".
Non siamo interessati a tenere conto di questa distinzione, tanto più che tutti e tre i _file_, essendo stati generati dallo stesso _tool_, possiedono lo stesso formato "CSV" e gli stessi campi.

I campi presenti in ciascun file sono i seguenti:

* "stime": la data e l'ora di ricezione del primo pacchetto del _record_
* "flgs": le _flag_ dello stato della connessione presenti nei pacchetti del _record_
* "proto": il protocollo di livello di trasporto utilizzato dai pacchetti del _record_
* "saddr": l'indirizzo IP dell'interfaccia sorgente dei pacchetti del _record_
* "sport": la porta dell'interfaccia sorgente dei pacchetti del _record_
* "dir": la direzione del flusso dati, da sorgente a destinazione o bidirezionale
* "daddr": l'indirizzo IP dell'interfaccia destinazione dei pacchetti del _record_
* "dport": la porta dell'interfaccia destinazione dei pacchetti del _record_
* "pkts": il numero di pacchetti aggregati dal _record_
* "bytes": la somma dei _byte_ dei pacchetti aggregati
* "state": lo stato della connessione per i pacchetti aggregati dal _record_
* "srcid": l'identificatore usato dal _tool_ "Argus" per identificare la sorgente dati
* "ltime": la data e l'ora di ricezione dell'ultimo pacchetto del _record_
* "seq": il numero di sequenza che il _tool_ "Argus" ha assegnato al _record_
* "dur": la durata temporale totale del campionamento associato al _record_ 
* "mean": la durata media dei _record_ aggregati
* "stddev": la deviazione standard dei _record_ aggregati
* "smac": l'indirizzo MAC della sorgente dei pacchetti del _record_
* "dmac": l'indirizzo MAC della destinazione dei pacchetti del _record_
* "sum": la somma delle durate dei _record_ aggregati
* "min": il minimo delle durate dei _record_ aggregati
* "max": il massimo delle durate dei _record_ aggregati
* "soui": lo "Organizationally Unique Identifier" dell'indirizzo MAC della sorgente dei pacchetti del _record_
* "doui": lo "Organizationally Unique Identifier" dell'indirizzo MAC della destinazione dei pacchetti del _record_
* "sco": il "Country Code" associato all'indirizzo IP della sorgente dei pacchetti nel _record_
* "dco": il "Country Code" associato all'indirizzo IP della destinazione dei pacchetti nel _record_
* "spkts": il numero di pacchetti inviati dalla sorgente alla destinazione in questo _record_
* "dpkts": il numero di pacchetti inviati dalla destinazione alla sorgente in questo _record_
* "sbytes": il numero di _byte_ inviati dalla sorgente alla destinazione in questo _record_
* "dbytes": il numero di _byte_ inviati dalla destinazione alla sorgente in questo _record_
* "rate": i pacchetti al secondo inviati in questo _record_
* "srate": i pacchetti al secondo inviati dalla sorgente alla destinazione in questo _record_
* "drate": i pacchetti al secondo inviati dalla destinazione alla sorgente in questo _record_
* "record": questa feature non è spiegata all'interno del _paper_ né tantomeno nella documentazione di "Argus"
* "attack: se il _record_ è parte di un attacco o meno
* "category": la categoria dell'attacco
* "subcategory": la specifica sotto-categoria dell'attacco

I campi che abbiamo utilizzato nell'analisi sono stati:

* stime
* proto
* saddr
* sport
* daddr
* dport
* ltime
* dur
* pkts
* bytes
* rate
* attack

Inoltre, è stato utilizzato un secondo _dataset_ che contiene le associazioni tra le "well-known ports" e la descrizione del servizio corrispondente.
I campi presenti in questo _dataset_ sono:

* port: la "well-known port" associata al servizio
* protocol: il protocollo associato al servizio
* description: la descrizione del servizio

Tutti i campi di questo _dataset_ sono stati utilizzati.

## Preparazione dei dati

Per effettuare la preparazione dei dati, innanzitutto configuriamo il _kernel_ Spark che utilizzeremo.

In [1]:
%%configure -f
{
    "executorMemory": "8G", 
    "numExecutors": 2, 
    "executorCores": 3, 
    "conf": {
        "spark.dynamicAllocation.enabled": "false"
    }
}

Dopodiché, definiamo i percorsi dei file che utilizzeremo così come sono stati salvati sul servizio "Amazon S3" ed avviamo una nuova applicazione Spark.

In [2]:
//val bucketName = "unibo-bd2122-nfarabegoli/ddos"
//val bucketName = "unibo-bd2122-mcastellucci/project"

val pathTCPDataset = s"s3a://$bucketName/DDoS_TCP.csv"
val pathUDPDataset = s"s3a://$bucketName/DDoS_UDP.csv"
val pathHTTPDataset = s"s3a://$bucketName/DDoS_HTTP.csv"
//val pathTCPDataset = s"s3a://$bucketName/DDoS_TCP-sampled.csv"
//val pathUDPDataset = s"s3a://$bucketName/DDoS_UDP-sampled.csv"
//val pathHTTPDataset = s"s3a://$bucketName/DDoS_HTTP-sampled.csv"
val pathPortsDataset = s"s3a://$bucketName/ports.csv"

"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1657005714778_0003,spark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketName: String = unibo-bd2122-nfarabegoli/ddos
pathTCPDataset: String = s3a://unibo-bd2122-nfarabegoli/ddos/DDoS_TCP.csv
pathUDPDataset: String = s3a://unibo-bd2122-nfarabegoli/ddos/DDoS_UDP.csv
pathHTTPDataset: String = s3a://unibo-bd2122-nfarabegoli/ddos/DDoS_HTTP.csv
pathPortsDataset: String = s3a://unibo-bd2122-nfarabegoli/ddos/ports.csv
res7: String = SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/application_1657005714778_0003/


A questo punto è possibile costruire l'RDD per intero, in modo tale che contenga i dati di tutti e tre i file che ci interessano.
Inoltre, è stato caricato il file contenente il _dataset_ delle "well-known ports".

In [3]:
val dataset = sc.textFile(s"$pathTCPDataset,$pathUDPDataset,$pathHTTPDataset")
val ports = sc.textFile(pathPortsDataset)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dataset: org.apache.spark.rdd.RDD[String] = s3a://unibo-bd2122-nfarabegoli/ddos/DDoS_TCP.csv,s3a://unibo-bd2122-nfarabegoli/ddos/DDoS_UDP.csv,s3a://unibo-bd2122-nfarabegoli/ddos/DDoS_HTTP.csv MapPartitionsRDD[1] at textFile at <console>:31
ports: org.apache.spark.rdd.RDD[String] = s3a://unibo-bd2122-nfarabegoli/ddos/ports.csv MapPartitionsRDD[3] at textFile at <console>:27


A questo punto si tratta di fare _parsing_ dei _record_ del _dataset_.
Per effettuarlo correttamente, teniamo conto delle seguenti informazioni sui formati dei valori nelle singole colonne:

* stime, ltime: il valore è un _timestamp_ in secondi dall'epoca UNIX, anche se è espresso in formato decimale per poter avere la precisione dei millisecondi
* proto: il valore può essere uno tra "udp", "tcp", "arp", "ipv6-icmp", "icmp", "igmp", "rarp", ognuno dei quali è associato al corrispondente protocollo, sono però di interesse solamente i _record_ associati ai protocolli TCP e UDP
* saddr, dadd: il valore è un indirizzo IP in formato "dotted decimal notation", può perciò essere salvato come String
* sport, dport: il valore è un intero positivo che può arrivare ad un massimo di 65.536, perciò per poter essere rappresentato in linguaggio scala necessita di essere salvato in un Long
* pkts, bytes: il valore è un intero positivo di cui non è noto il massimo, per cui è logico pensare di salvare il valore in un Long
* dur, rate: il valore è un numero decimale, perciò per mantenere la precisione massima è stato utilizzato un Double
* attack: il valore può essere "1" nel caso il _record_ appartenga ad un attacco DDoS, "0" in caso contrario

Detto questo, sono state implementati il seguente Astract Data Type:

In [4]:
import java.time.format.DateTimeFormatter
import java.time.{ Instant, LocalDateTime, ZoneId }
import scala.util.{ Try, Success, Failure }

case class Record(
    startTime: LocalDateTime,
    protocol: String,
    sourceAddress: String,
    sourcePort: Long,
    destinationAddress: String,
    destinationPort: Long,
    packets: Long,
    bytes: Long,
    endTime: LocalDateTime,
    duration: Double,
    rate: Double,
    isDDoS: Boolean,
)

object Record {

  def apply(r: Seq[String]): Option[Record] =
    (for {
      startTime <- Try(
        Instant.ofEpochMilli((r.head.toDouble * 1000).toLong).atZone(ZoneId.systemDefault()).toLocalDateTime,
      )
      protocol <- if (r(2) == "tcp" || r(2) == "udp") Success(r(2)) else Failure(new IllegalStateException())
      sourceAddress = r(3)
      sourcePort <- Try(r(4).toLong)
      destinationAddress = r(6)
      destinationPort <- Try(r(7).toLong)
      packets <- Try(r(8).toLong)
      bytes <- Try(r(9).toLong)
      endTime <- Try(
        Instant.ofEpochMilli((r(12).toDouble * 1000).toLong).atZone(ZoneId.systemDefault()).toLocalDateTime,
      )
      duration <- Try(r(14).toDouble)
      rate <- Try(r(30).toDouble)
      isDDoS = r(34) == "1"
    } yield new Record(
      startTime,
      protocol,
      sourceAddress,
      sourcePort,
      destinationAddress,
      destinationPort,
      packets,
      bytes,
      endTime,
      duration,
      rate,
      isDDoS,
    )).toOption
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import java.time.format.DateTimeFormatter
import java.time.{Instant, LocalDateTime, ZoneId}
import scala.util.{Try, Success, Failure}
defined class Record
defined object Record
Companions must be defined together; you may wish to use :paste mode for this.


Per effettuare il _parsing_ del _dataset_ contenente le "well-known ports" è stato definito un ADT tenendo conto delle seguenti considerazioni sulle colonne:

* port: il valore è un intero positivo che può arrivare ad un massimo di 65.536, perciò per poter essere rappresentato in linguaggio scala necessita di essere salvato in un Long
* protocol: il valore può essere "UDP" o "TCP", per coerenza con l'altro _dataset_ questo valore è stato importato come String in formato _lowercase_
* description: per semplicità, è stata salvata come una String

In [5]:
case class PortDescription(
  port: Long,
  protocol: String,
  description: String
)

object PortDescription {

  def apply(r: Seq[String]): Option[PortDescription] = 
    (for {
       port <- Try(r.head.toLong)
       protocol <- Try(r(1).toLowerCase)
       description = r(2)
     } yield new PortDescription(port, protocol, description)
    ).toOption
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

defined class PortDescription
defined object PortDescription
Companions must be defined together; you may wish to use :paste mode for this.


Qui di seguito sono stati fatti degli _import_ di classi comuni a più _query_.

In [6]:
import org.apache.spark.{HashPartitioner, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
import scala.math.Numeric.Implicits.infixNumericOps

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.{HashPartitioner, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
import scala.math.Numeric.Implicits.infixNumericOps


Alla definizione è seguito il parsing vero e proprio, che ha tenuto conto del fatto che i tre _file_, essendo in formato CSV, hanno le virgolette che circondano ogni valore di ogni colonna e sono separati dai punti e virgola.
Assieme ai _record_ "legittimi", per così dire, saranno presenti anche le intestazioni dei tre _file_.
Questo però non ci preoccupa perché sappiamo che il _parser_ eliminerà correttamente quelle righe dall'RDD che caricheremo, non avendo lo stesso formato delle altre.

In [7]:
val parsedRecordDataset = 
    dataset.
        map(_.replace("\"", "")).
        map(_.split(";")).
        map(Record(_)).
        filter(_.isDefined).
        map(_.get)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

parsedRecordDataset: org.apache.spark.rdd.RDD[Record] = MapPartitionsRDD[8] at map at <console>:41


Effettuiamo il _caching_ del _dataset_ per vedere la sua occupazione in formato non serializzato, a cui facciamo seguire la semplice operazione di conteggio per attivare il meccanismo di _caching_ e sapere anche la dimensione in righe del _dataset_.
La quantità di memoria occupata complessiva, tra RAM e disco, è di 10.6GB e il numero di _record_ è di 38532503.

In [8]:
val recordDataset = parsedRecordDataset.persist(StorageLevel.MEMORY_AND_DISK)
val recordDatasetSize = recordDataset.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

recordDataset: parsedRecordDataset.type = MapPartitionsRDD[8] at map at <console>:41
recordDatasetSize: Long = 38532503


Per gli stessi motivi di cui sopra, effettuiamo il _caching_ del _dataset_ delle "well-known port", a cui facciamo seguire la semplice operazione di conteggio delle righe.
La quantità di memoria occupata è di 8.5MB e il numero di _record_ è di 31178.

In [9]:
val portsDataset = 
    ports.
        map(_.replace("\"", "")).
        map(_.split(",")).
        map(PortDescription(_)).
        filter(_.isDefined).
        map(_.get)
val cachedPortsDataset = portsDataset.persist(StorageLevel.MEMORY_AND_DISK)
cachedPortsDataset.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

portsDataset: org.apache.spark.rdd.RDD[PortDescription] = MapPartitionsRDD[13] at map at <console>:41
cachedPortsDataset: portsDataset.type = MapPartitionsRDD[13] at map at <console>:41
res11: Long = 31178


Per quanto riguarda il _dataset_ principale, è stato deciso di mantenerlo "_cached_" in memoria con possibilità di _spill-over_ su disco perché tutte le _query_ lo utilizzano.
È stato deciso di utilizzare questa forma di _caching_ perché l'aggiunta della serializzazione aggiunge un _overhead_ notevole nei tempi di esecuzione anche delle _query_ più semplici, rendendolo inefficiente.
La possibilità di servirsi anche del disco è necessaria date le sue dimensioni troppo elevate per poter essere ospitato interamente in memoria.
Per quanto riguarda invece il _dataset_ delle "well-known port", si è deciso di rimuoverlo dalla _cache_ dato che sarà utilizzato in una sola _query_.

In [10]:
cachedPortsDataset.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res12: cachedPortsDataset.type = MapPartitionsRDD[13] at map at <console>:41


# Query

### Percentuale dei _record_ associati ad attacchi DDoS

Con questa query si vuole calcolare la percentuale di _record_ che appartengono ad attacchi DDoS nel _dataset_ e così derivare anche il numero di _record_ che __non__ appartengono ad attacchi DDoS, ma a traffico legittimo.

In [11]:
val ddosCount = 
    recordDataset.
        map(r => if (r.isDDoS) (1, 0) else (0, 1)).
        reduce{ case((ddosCount1, legitCount1), (ddosCount2, legitCount2)) => (ddosCount1 + ddosCount2, legitCount1 + legitCount2) }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ddosCount: (Int, Int) = (38531238,1265)


La _query_ prende in input l'intero _dataset_ e produce due Long: il primo è il numero di _record_ appartenenti ad attacchi DDoS, il secondo è il numero di _record_ appartenenti a traffico legittimo.

L'implementazione scelta non fa altro che trasformare ogni _record_ in una coppia di valori, dove il primo rappresenta il conteggio dei _record_ associati ad attacchi e il secondo quelli associati a traffico legittimo.
La coppia avrà il valore 1 nella posizione corrispondente a quale tipo di traffico il _record_ stesso appartiene, il valore 0 nell'altra.
Dopodiché, viene compiuta un'operazione di _reduce_ che somma i valori nelle corrispondenti posizioni.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Conteggio record DDoS e legittimi   |  10.6 GB  | 12 s |

In [12]:
val ddosCount = 
    recordDataset.
        coalesce(12).
        map(r => if (r.isDDoS) (1, 0) else (0, 1)).
        reduce{ case((ddosCount1, legitCount1), (ddosCount2, legitCount2)) => (ddosCount1 + ddosCount2, legitCount1 + legitCount2) }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ddosCount: (Int, Int) = (38531238,1265)


È stata tentata una variante ottimizzata dove si ridimensionano le partizioni dello RDD riducendo il loro numero.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Conteggio record DDoS e legittimi (**coalesce**) |  10.6 GB  | 13 s |

Si è osservato che il tempo rimane essenzialmente costante, se non direttamente peggiora.
Per questo motivo, si è giudicata la _query_ originale già ottimale.

In [13]:
val ddosPercentage = ddosCount._1 / recordDatasetSize.toDouble * 100
val legitPercentage = ddosCount._2 / recordDatasetSize.toDouble * 100

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ddosPercentage: Double = 99.99671705728538
legitPercentage: Double = 0.0032829427146219906


Quello che è possibile dedurre da questa _query_ è che il _dataset_ contiene quasi esclusivamente _record_ inerenti ad attacchi DDoS, il 99.997%. 
Per qusto motivo, occorrerà tenere in conto questa differenza di peso tra i due tipi di traffico nelle _query_ successive ad esempio analizzandoli separatamente.

<div align="center">
    <img src="images/total_pie.png" width="40%"/>
</div>

### Percentuale dei protocolli coinvolti negli attacchi DDoS

Con questa query si vuole calcolare la percentuale con cui ciascun protocollo, ovvero "TCP" e "UDP", compare nei _record_ che appartengono ad attacchi DDoS nel _dataset_.

In [14]:
val ddosByProtocol = 
    recordDataset.
        filter(_.isDDoS).
        map(r => if (r.protocol == "tcp") (1, 0) else (0, 1)).
        reduce{ case((tcpCount1, udpCount1), (tcpCount2, udpCount2)) => (tcpCount1 + tcpCount2, udpCount1 + udpCount2) }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ddosByProtocol: (Int, Int) = (19566842,18964396)


La _query_ prende in input l'intero _dataset_ e produce due Long: il primo è il numero di _record_ il cui protocollo di livello di trasporto è "TCP", il secondo è il numero di _record_ il cui protocollo è "UDP".

L'implementazione scelta filtra il _dataset_ per eliminare i _record_ che appartengono a traffico legittimo, per poi  trasformare ogni _record_ in una coppia di valori, dove il primo rappresenta il conteggio dei _record_ associati al protocollo "TCP" e il secondo quelli associati a quello "UDP".
La coppia avrà il valore 1 nella posizione corrispondente a quale tipo di protocollo il _record_ stesso è associato, il valore 0 nell'altra.
Dopodiché, viene compiuta un'operazione di _reduce_ che somma i valori nelle corrispondenti posizioni.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Calcolo percentuale di protocolli usati in attacchi DDoS   |  10.6 GB  | 13 s |

In [15]:
val ddosByProtocol = 
    recordDataset.
        coalesce(12).
        filter(_.isDDoS).
        map(r => if (r.protocol == "tcp") (1, 0) else (0, 1)).
        reduce{ case((tcpCount1, udpCount1), (tcpCount2, udpCount2)) => (tcpCount1 + tcpCount2, udpCount1 + udpCount2) }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ddosByProtocol: (Int, Int) = (19566842,18964396)


È stata tentata una variante ottimizzata dove si ridimensionano le partizioni dello RDD riducendo il loro numero.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Calcolo percentuale di protocolli usati in attacchi DDoS (**coalesce**) |  10.6 GB  | 14 s |

Si è osservato che il tempo rimane essenzialmente costante, se non direttamente peggiora.
Per questo motivo, si è giudicata la _query_ originale già ottimale.
Questo fatto non ci sorprende, dacché la _query_ per come è implementata è molto simile alla precedente.

In [16]:
val tcpPercentage = ddosByProtocol._1 / recordDatasetSize.toDouble * 100
val udpPercentage = ddosByProtocol._2 / recordDatasetSize.toDouble * 100

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

tcpPercentage: Double = 50.780095962102436
udpPercentage: Double = 49.216621095182944


Come si può notare, il peso che entrambi i protocolli hanno negli attacchi DDoS è all'incirca lo stesso.
Non è perciò possibile aspettarsi che un certo segmento sia potenzialmente più o meno pericoloso a seconda del tipo di protocollo di livello di trasporto utilizzato.

<div align="center">
    <img src="images/protocol-chart.png" width="40%"/>
</div>

### Percentuale dei servizi presi di mira dagli attacchi DDoS

Con questa query si vuole calcolare la percentuale con cui ciascun servizio è stato preso di mira dagli attacchi DDoS.
Per servizio intendiamo la combinazione di porta e protocollo di livello di trasporto, che di norma sono associati a servizi predefiniti.
Ad esempio, il protocollo TCP e la porta 80 sono di norma associati ad un server HTTP, anche se nulla toglie che è possibile che sia presente un altro servizio in ascolto.
Certamente, un attaccante può studiare prima il sistema per capire quali servizi sono attivi, ma in mancanza di altre informazioni disponibili ricade su ciò che è vero per default.

In [17]:
val servicesDataset = 
    recordDataset.
        filter(_.isDDoS).
        map(r => ((r.destinationPort, r.protocol), 1)).
        reduceByKey(_ + _).
        persist(StorageLevel.MEMORY_AND_DISK)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

servicesDataset: org.apache.spark.rdd.RDD[((Long, String), Int)] = ShuffledRDD[24] at reduceByKey at <console>:37


In [18]:
val portsCountJoin = 
    servicesDataset.
        join(portsDataset.map(d => ((d.port, d.protocol), d.description))).
        sortBy(_._2, ascending = false).
        collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

portsCountJoin: Array[((Long, String), (Int, String))] = Array(((80,tcp),(19529331,Hypertext Transfer Protocol (HTTP) uses TCP in versions 1.x and 2. HTTP/3 uses QUIC)), ((80,udp),(18964396,Hypertext Transfer Protocol (HTTP) uses TCP in versions 1.x and 2. HTTP/3 uses QUIC)), ((9030,tcp),(4,Tor often used)), ((4116,tcp),(4,Smartcard-TLS)), ((6389,tcp),(4,EMC CLARiiON)), ((53855,tcp),(4,Certificate Management over CMS)), ((50271,tcp),(4,Certificate Management over CMS)), ((53850,tcp),(4,Certificate Management over CMS)), ((53849,tcp),(4,Certificate Management over CMS)), ((6260,tcp),(3,planet M.U.L.E.)), ((6347,tcp),(3,gnutella-rtr)), ((4444,tcp),(3,Xvfb X server virtual frame buffer service)), ((9043,tcp),(3,WebSphere Application Server Administration Console secure)), ((6201,tcp),(3,Th...


La _query_ prende in input sia il _dataset_ principale che quello delle "well-known ports" e produce l'elenco dei servizi più colpiti.
Ogni riga del risultato è composta da porta e protocollo di trasporto del servizio assieme alla sua descrizione e al conteggio delle occorrenze del servizio nel _dataset_ principale.

L'implementazione scelta filtra il _dataset_ per eliminare i _record_ che appartengono a traffico legittimo, per poi trasformare i _record_ in un formato "chiave valore".
La chiave è fatta dalla coppia "porta-protocollo", mentre il valore è un semplice 1, in modo tale da poter sommare tutti questi valori per chiave e avere così il conteggio delle loro occorrenze.
La chiave è poi utile per effettuare l'operazione di _join_ con il _dataset_ contenente le descrizioni dei servizi, permettendoci di ottenere un risultato le cui righe hanno il formato di nostro interesse.
L'ultima operazione effettuata è un ordinamento decrescente per poter avere per primi i servizi con maggior numero di occorrenze.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Percentuale di servizi attaccati |  10.6 GB  | 2 s (20 s con scrittura _cache_) |

Utilizzando una strategia di _caching_ per i dati utilizzati prima del _join_, si deriva che le operazioni di _join_ e _sorting_ impiegano circa 2 secondi.

In [19]:
val portsBroadcast = sc.broadcast(portsDataset.map(d => ((d.port, d.protocol), d.description)).collectAsMap())

val portsCountBroadcast = servicesDataset.
    map { case (k, count) => portsBroadcast.value.get(k).map(desc => (k, count, desc)) }.
    filter(_.isDefined).
    map(_.get).
    sortBy(_._2, ascending = false).
    collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

portsBroadcast: org.apache.spark.broadcast.Broadcast[scala.collection.Map[(Long, String),String]] = Broadcast(14)
portsCountBroadcast: Array[((Long, String), Int, String)] = Array(((80,tcp),19529331,Hypertext Transfer Protocol (HTTP) uses TCP in versions 1.x and 2. HTTP/3 uses QUIC), ((80,udp),18964396,Hypertext Transfer Protocol (HTTP) uses TCP in versions 1.x and 2. HTTP/3 uses QUIC), ((9030,tcp),4,Tor often used), ((53855,tcp),4,Certificate Management over CMS), ((6389,tcp),4,EMC CLARiiON), ((4116,tcp),4,Smartcard-TLS), ((50271,tcp),4,Certificate Management over CMS), ((53850,tcp),4,Certificate Management over CMS), ((53849,tcp),4,Certificate Management over CMS), ((54089,tcp),3,Certificate Management over CMS), ((53852,tcp),3,Certificate Management over CMS), ((54151,tcp),3,Certificate Management over CMS), ((53847,tcp),3,Certificate Management over CMS), ((49774,tcp),3,Certificate Management o...


Una prima tecnica di ottimizzazione prevede l'utilizzo della "broadcast variable".
In particolare, abbiamo effettuato il _broadcast_ del dataset più piccolo, ovvero quello delle "well-known ports".

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Percentuale di servizi attaccati (**broadcast variable**) |  10.6 GB  | 1 s (0.3 s per la _broadcast variable_) |

È stato osservato che la query dimezza il tempo che prima era impiegato con la _join_.

In [20]:
val portsCountHashPartition = 
    servicesDataset.
        partitionBy(new HashPartitioner(12)).
        join(portsDataset.map(d => ((d.port, d.protocol), d.description))).
        sortBy(_._2, ascending = false).
        collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

portsCountHashPartition: Array[((Long, String), (Int, String))] = Array(((80,tcp),(19529331,Hypertext Transfer Protocol (HTTP) uses TCP in versions 1.x and 2. HTTP/3 uses QUIC)), ((80,udp),(18964396,Hypertext Transfer Protocol (HTTP) uses TCP in versions 1.x and 2. HTTP/3 uses QUIC)), ((9030,tcp),(4,Tor often used)), ((4116,tcp),(4,Smartcard-TLS)), ((6389,tcp),(4,EMC CLARiiON)), ((53850,tcp),(4,Certificate Management over CMS)), ((53849,tcp),(4,Certificate Management over CMS)), ((50271,tcp),(4,Certificate Management over CMS)), ((53855,tcp),(4,Certificate Management over CMS)), ((6260,tcp),(3,planet M.U.L.E.)), ((6347,tcp),(3,gnutella-rtr)), ((4444,tcp),(3,Xvfb X server virtual frame buffer service)), ((9043,tcp),(3,WebSphere Application Server Administration Console secure)), ((6201,t...


Come secondo metodo di ottimizzazione, è stato scelto di ripartizionare il dataset più grande sfruttando un "hash partitioner".
Sono stati fatti alcuni tentativi per trovare il valore ottimale di partizioni da assegnare all'"hash partitioner". 

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Percentuale di servizi attaccati (**hash partitioner (12 partizioni)**) |  10.6 GB  | 1.3 s |

Si è giunti alla conclusione che il valore che produce il risultato migliore è 12 partizioni, che corrispondono infatti a 2 macchine ognuna con 3 _core_ e per ogni _core_ 2 partizioni, ottenendo una durata simile a quella che si ha con l'uso della _broadcast_variable_.

In [21]:
val partitioner = new HashPartitioner(12)

val portsCountHashPartition = 
    servicesDataset.
        partitionBy(partitioner).
        join(portsDataset.map(d => ((d.port, d.protocol), d.description)).partitionBy(partitioner)).
        sortBy(_._2, ascending = false).
        collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

partitioner: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@c
portsCountHashPartition: Array[((Long, String), (Int, String))] = Array(((80,tcp),(19529331,Hypertext Transfer Protocol (HTTP) uses TCP in versions 1.x and 2. HTTP/3 uses QUIC)), ((80,udp),(18964396,Hypertext Transfer Protocol (HTTP) uses TCP in versions 1.x and 2. HTTP/3 uses QUIC)), ((9030,tcp),(4,Tor often used)), ((4116,tcp),(4,Smartcard-TLS)), ((6389,tcp),(4,EMC CLARiiON)), ((53850,tcp),(4,Certificate Management over CMS)), ((50271,tcp),(4,Certificate Management over CMS)), ((53855,tcp),(4,Certificate Management over CMS)), ((53849,tcp),(4,Certificate Management over CMS)), ((6260,tcp),(3,planet M.U.L.E.)), ((6347,tcp),(3,gnutella-rtr)), ((4444,tcp),(3,Xvfb X server virtual frame buffer service)), ((9043,tcp),(3,WebSphere Application Server Administration Console secure)), ((6201,t...


| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Percentuale di servizi attaccati (**hash partitioner su entrambi i dataset (12 partizioni)**) |  10.6 GB  | 1.1 s |

Partizionare entrambi i dataset con lo stesso _partitioner_ non porta ad alcun vantaggio significativo rispetto a partizionare solo il più grande.

In [22]:
val totalPortsCount = portsCountJoin.map(_._2._1).reduce(_ + _)

println("| Service  | Description | Total records | Percentage |")
println("|----------|-------------|---------------|------------|")
(portsCountJoin.
    take(2).
    map { case ((port, protocol), (count, desc)) => 
        (s"$port - $protocol", f"$desc%-11s", f"$count%-13d", f"${(count.toDouble / totalPortsCount * 100).toString.substring(0, 5) + "%" }%-10s")
    }.
    toSeq :+ 
    (
        "Other   ", 
        "N/A        ", 
        f"${totalPortsCount - portsCountJoin.take(2).map(_._2._1).sum}%-13d", 
        f"${((totalPortsCount - portsCountJoin.take(2).map(_._2._1).sum).toDouble / totalPortsCount * 100).toString.substring(0, 4) + "%"}%-10s")
    ).
    foreach(t => println(s"| ${t._1} | ${t._2} | ${t._3} | ${t._4} |"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

totalPortsCount: Int = 38504055
| Service  | Description | Total records | Percentage |
|----------|-------------|---------------|------------|
| 80 - tcp | Hypertext Transfer Protocol (HTTP) uses TCP in versions 1.x and 2. HTTP/3 uses QUIC | 19529331      | 50.72%     |
| 80 - udp | Hypertext Transfer Protocol (HTTP) uses TCP in versions 1.x and 2. HTTP/3 uses QUIC | 18964396      | 49.25%     |
| Other    | N/A         | 10328         | 0.02%      |


Questa query ci ha permesso di capire che il servizio esposto sulla porta 80, un server HTTP, è quello maggiormente attaccato. Per questo motivo, tutto il traffico che arriva a quella specifica porta sarà trattato con maggiore attenzione.

<div align="center">
    <img src="images/ports-chart.png" width="40%"/>
</div>

In [23]:
servicesDataset.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res19: servicesDataset.type = ShuffledRDD[24] at reduceByKey at <console>:37


### Quantità di traffico DDoS in byte rispetto al totale

Con questa _query_ si voglioni identificare quali sono gli indirizzi IP che ricevono più traffico legato ad attacchi DDoS, calcolato come numero totale di _bytes_ trasmessi nella connessione. Dopodiché si vuole controntare il traffico DDoS rispetto al traffico totale che quegli indirizzi IP hanno ricevuto.

In [24]:
val ddosTrafficByIP =
    recordDataset.
    filter(_.isDDoS).
    map(r => (r.destinationAddress, r.bytes)).
    reduceByKey(_ + _).
    map { case (ip, traffic) => (ip, traffic / 1024.0) }.
    sortBy(_._2, ascending = false).
    take(5).
    toMap

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ddosTrafficByIP: scala.collection.immutable.Map[String,Double] = Map(192.168.100.148 -> 7457.1875, 192.168.100.5 -> 5259153.908203125, 192.168.100.7 -> 3382470.841796875, 192.168.100.3 -> 8999801.200195312, 192.168.100.6 -> 2398495.455078125)


La _query_ prende in input l'intero _dataset_ e produce in output una mappa che associa ad ogni indirizzo IP la quantità di _bytes_ del traffico DDoS subito.

La _query_ trattiene i _record_ che appartengono ad attacchi DDoS e dopodiché trattiene le sole colonne che contengono l'IP della destinazione del traffico e la quantità di _bytes_ ricevuti. Per ogni IP vengono sommati i _bytes_ del traffico ricevuto determinando quello totale per ogni IP. Infine l'unità di misura del traffico viene trasformata in KB e poi quest'ultimo ordinato in senso decrescente. Per una migliore visualizzazione dei risultati vengono presi i primi 5 IP che hanno subito il maggior traffico.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Quantità di _bytes_ trasferiti per ogni IP negli attacchi DDoS |  10.6 GB  | 15 s |

In [25]:
val totalTrafficByIP =
    recordDataset.
    map(r => (r.destinationAddress, r.bytes)).
    reduceByKey(_ + _).
    map { case (ip, traffic) => (ip, traffic / 1024.0) }.
    collectAsMap()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

totalTrafficByIP: scala.collection.Map[String,Double] = Map(199.7.83.42 -> 5.2666015625, 35.162.156.78 -> 5.759765625, ff02::fb -> 0.52734375, 192.168.100.147 -> 4621.00390625, 192.168.100.255 -> 10.640625, 172.217.25.170 -> 13.1513671875, 27.124.125.251 -> 0.52734375, 52.64.239.193 -> 2313.1181640625, 192.33.14.30 -> 7.7900390625, 192.168.100.150 -> 6951.62109375, 224.0.0.251 -> 0.46875, 96.7.49.66 -> 0.3251953125, 192.168.100.7 -> 3382470.841796875, 116.206.83.243 -> 1.0546875, 192.168.100.1 -> 1.9775390625, 192.36.148.17 -> 4.90234375, 216.58.199.42 -> 8.5849609375, 216.239.36.10 -> 0.5888671875, 184.85.248.65 -> 0.3251953125, 192.168.100.149 -> 6829.556640625, 192.168.100.46 -> 275956.6396484375, 192.168.100.3 -> 1.0039371459960938E7, 192.168.100.55 -> 329540.4853515625, 116.206.80....


La _query_ prende in input l'intero _dataset_ e produce in output una mappa che associa ad ogni indirizzo IP la quantità di _bytes_ del traffico totale ricevuto.

La _query_ trattiene le sole colonne che contengono l'IP della destinazione del traffico e la quantità di _bytes_ ricevuti. Per ogni IP vengono sommati i _bytes_ del traffico ricevuto determinando quello totale per ogni IP. Infine l'unità di misura del traffico viene trasformata in KB e poi quest'ultimo ordinato in senso decrescente.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Quantità di _bytes_ totali trasferiti per ogni IP nella cattura di rete |  10.6 GB  | 15 s |

In [26]:
val trafficByIPCached =
    recordDataset.
    map(r => ((r.destinationAddress, r.isDDoS), r.bytes)).
    reduceByKey(_ + _).
    map { case ((ip, ddos), traffic) => (ip, ddos, traffic / 1024.0) }.
    persist(StorageLevel.MEMORY_AND_DISK)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

trafficByIPCached: org.apache.spark.rdd.RDD[(String, Boolean, Double)] = MapPartitionsRDD[78] at map at <console>:37


In [27]:
val ddosTrafficByIpCache = trafficByIPCached.
    filter(_._2).
    map(r => (r._1, r._3)).
    reduceByKey(_ + _).
    sortBy(_._2, ascending = false).
    take(5).
    toMap

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ddosTrafficByIpCache: scala.collection.immutable.Map[String,Double] = Map(192.168.100.148 -> 7457.1875, 192.168.100.5 -> 5259153.908203125, 192.168.100.7 -> 3382470.841796875, 192.168.100.3 -> 8999801.200195312, 192.168.100.6 -> 2398495.455078125)


In [28]:
val totalTrafficByIpCache = trafficByIPCached.
    map(r => (r._1, r._3)).
    reduceByKey(_ + _).
    collectAsMap()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

totalTrafficByIpCache: scala.collection.Map[String,Double] = Map(199.7.83.42 -> 5.2666015625, 35.162.156.78 -> 5.759765625, ff02::fb -> 0.52734375, 192.168.100.147 -> 4621.00390625, 192.168.100.255 -> 10.640625, 172.217.25.170 -> 13.1513671875, 27.124.125.251 -> 0.52734375, 52.64.239.193 -> 2313.1181640625, 192.33.14.30 -> 7.7900390625, 192.168.100.150 -> 6951.62109375, 224.0.0.251 -> 0.46875, 96.7.49.66 -> 0.3251953125, 192.168.100.7 -> 3382470.841796875, 116.206.83.243 -> 1.0546875, 192.168.100.1 -> 1.9775390625, 192.36.148.17 -> 4.90234375, 216.58.199.42 -> 8.5849609375, 216.239.36.10 -> 0.5888671875, 184.85.248.65 -> 0.3251953125, 192.168.100.149 -> 6829.556640625, 192.168.100.46 -> 275956.6396484375, 192.168.100.3 -> 1.0039371459960938E7, 192.168.100.55 -> 329540.4853515625, 116.20...


Per ottimizzare questa query, essendo composta da due sotto _query_ le cui operazioni iniziali sono molto simili, è stato deciso di impiegare il meccanismo di _caching_ sulla parte comune delle operazioni. Questo ha implicato riscrivere leggermente entrambe le _query_, aumentando il numero di operazioni che compiono complessivamente ma è stato giustificato dal risparmio di tempo ottenuto. 

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Quantità di _bytes_ trasferiti per ogni IP negli attacchi DDoS (**refactoring per cache**) |  10.6 GB  | 0.7 s (15 s con scrittura _cache_)|
| Quantità di _bytes_ totali trasferiti per ogni IP nella cattura di rete (**refactoring per cache**) |  10.6 GB  | 0.5 s |

Anche contando le operazioni di _caching_, con questa tecnica si dimezzano i tempi complessivi di esecuzione.
Escludendole dal conteggio, la quantità di tempo risparmiato è molto significativa.

In [29]:
val totalImportantTraffic = totalTrafficByIP.filterKeys(ddosTrafficByIP.keySet(_))
val totalImportantTrafficSorted = totalImportantTraffic.toSeq.sortBy(_._1)
val ddosTrafficSorted = ddosTrafficByIP.toSeq.sortBy(_._1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

totalImportantTraffic: scala.collection.Map[String,Double] = Map(192.168.100.7 -> 3382470.841796875, 192.168.100.3 -> 1.0039371459960938E7, 192.168.100.6 -> 2398495.455078125, 192.168.100.148 -> 7457.1875, 192.168.100.5 -> 7079893.10546875)
totalImportantTrafficSorted: Seq[(String, Double)] = Vector((192.168.100.148,7457.1875), (192.168.100.3,1.0039371459960938E7), (192.168.100.5,7079893.10546875), (192.168.100.6,2398495.455078125), (192.168.100.7,3382470.841796875))
ddosTrafficSorted: Seq[(String, Double)] = Vector((192.168.100.148,7457.1875), (192.168.100.3,8999801.200195312), (192.168.100.5,5259153.908203125), (192.168.100.6,2398495.455078125), (192.168.100.7,3382470.841796875))


La query ci ha permesso di osservare che gli IP `192.168.100.3`, `192.168.100.5`, `192.168.100.6` e `192.168.100.7` sono quelli soggetti al maggior traffico DDoS in relazione al traffico totale, quindi quelli maggiormente attaccati.

<div align="center">
    <img src="images/ddos-traffic.png" width="40%"/>
</div>

In [30]:
trafficByIPCached.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res20: trafficByIPCached.type = MapPartitionsRDD[78] at map at <console>:37


### Calcolo delle distribuzioni delle frequenze di invio di pacchetti e bytes per flusso

In questa _query_ si è voluta calcolare la distribuzione di due parametri non presenti nel dataset che sono: la frequenza media di invio dei pacchetti per un flusso e la frequenza media di invio di bytes per un flusso. Per flusso intendiamo un raggruppamento di record identificati da: IP e porta sorgente, IP e porta destinazione e protocollo.

In [31]:
val flowsDataset =
    recordDataset.
    filter(_.duration > 0.0).
    map(r =>
      (
        (r.sourceAddress, r.sourcePort, r.destinationAddress, r.destinationPort, r.protocol),
        (r.isDDoS, r.duration, r.packets, r.bytes),
      ),
    ).
    reduceByKey { case ((isDDoS1, duration1, packets1, bytes1), (isDDoS2, duration2, packets2, bytes2)) =>
      (isDDoS1 || isDDoS2, duration1 + duration2, packets1 + packets2, bytes1 + bytes2)
    }.
    map { case (_, (isDDoS, duration, packets, bytes)) => (isDDoS, packets / duration, bytes / duration) }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

flowsDataset: org.apache.spark.rdd.RDD[(Boolean, Double, Double)] = MapPartitionsRDD[92] at map at <console>:45


Entrambe le _query_ hanno una parte iniziale in comune dove vengolo eliminate tutte le righe dove la durata è pari a 0 e dopodiché vengono trasformate in una coppia chiave-valore dove la chiave è costituita dalle colonne che identificano un flusso e il valore è costituito invece dalle colonne che saranno utili successivamente, ovvero: se il record è un DDoS o meno, la durata del record, il numero di pacchetti scambiati e il numero di _byte_ trasferiti. 
Dopodiché vengono sommate per ciascun flusso i valori delle ultime tre colonne e uniti tramite OR logico i valori della prima colonna, che saranno uguali per ciascun flusso. 
Infine, vengono trasformati i valori in modo tale da ottenere un dataset composto da tre colonne dove la prima indica se la prima da parte di un attacco DDoS o meno, la seconda è la frequenza media di pacchetti e la terza è la frequenza di _byte_ media.

La dimensione di questo _dataset_ intermedio è di 776.4 MB.

In [32]:
val (ddosPacketsRateSum, ddosBytesRateSum, ddosCount, legitPacketsRateSum, legitBytesRateSum, legitCount) =
    flowsDataset.
    map { case (isDDoS, packetsRate, bytesRate) =>
      if (isDDoS) (packetsRate, bytesRate, 1, 0.0, 0.0, 0) else (0.0, 0.0, 0, packetsRate, bytesRate, 1)
    }.
    reduce { 
      case(
        (ddosPacketsRate1, ddosBytesRate1, ddosCount1, legitPacketsRate1, legitBytesRate1, legitCount1), 
        (ddosPacketsRate2, ddosBytesRate2, ddosCount2, legitPacketsRate2, legitBytesRate2, legitCount2)
      ) => (
        ddosPacketsRate1 + ddosPacketsRate2, 
        ddosBytesRate1 + ddosBytesRate2, 
        ddosCount1 + ddosCount2, 
        legitPacketsRate1 + legitPacketsRate2, 
        legitBytesRate1 + legitBytesRate2,
        legitCount1 + legitCount2
      )  
    }

val ddosPacketsRateMean = ddosPacketsRateSum / ddosCount
val ddosBytesRateMean = ddosBytesRateSum / ddosCount
val legitPacketsRateMean = legitPacketsRateSum / legitCount
val legitBytesRateMean = legitBytesRateSum / legitCount

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ddosPacketsRateSum: Double = 584305.5830716968
ddosBytesRateSum: Double = 5.646841374913515E7
ddosCount: Int = 2132236
legitPacketsRateSum: Double = 31059.85973989538
legitBytesRateSum: Double = 5124031.632252166
legitCount: Int = 89
ddosPacketsRateMean: Double = 0.2740341984056628
ddosBytesRateMean: Double = 26.483191236399325
legitPacketsRateMean: Double = 348.9871880887121
legitBytesRateMean: Double = 57573.38912642883


Questa _query_ prende in input il _dataset_ intermedio e ne produce un altro composto di sei colonne: le prime tre sono la frequenza media dei pacchetti per i flussi DDoS, la frequenza media di _byte_ per i flussi DDoS e il numero di flussi DDoS. Le ultime tre colonne rappresentano gli campi ma per il flussi legittimi.

Questa _query_ prende il dataset intermedio costruito in precedenza e lo trasforma in un tupla a sei valori dove se il flusso è DDoS i valori della riga saranno copiati nei primi due campi della tupla, il terzo posto a uno e gli ultimi tre posti a zero, viceversa se il flusso è legittimo. Infine viene effettuata una operazione di somma tra tutte le colonne. Questo permette di ottenere le somme e i conteggi per le due variabili di interesse sia per attacchi DDoS che traffico legittimo consentendo quindi di calcolare i valori medi.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Conteggio e somma delle frequenze di pacchetti e _bytes_ sia legittimi che DDoS |  10.6 GB  | 26 s (2.2 min compresa generazione dataset intermedio) |

In [33]:
val ddosPacketsRateMeanBroadcast = sc.broadcast(ddosPacketsRateMean)
val ddosBytesRateMeanBroadcast = sc.broadcast(ddosBytesRateMean)
val legitPacketsRateMeanBroadcast = sc.broadcast(legitPacketsRateMean)
val legitBytesRateMeanBroadcast = sc.broadcast(legitBytesRateMean)

val (ddosPacketsRateDiff, ddosBytesRateDiff, legitPacketsRateDiff, legitBytesRateDiff) =
    flowsDataset.
    map { case (isDDoS, packetsRate, bytesRate) =>
      if (isDDoS)
        (
         math.pow(packetsRate - ddosPacketsRateMeanBroadcast.value, 2),
         math.pow(bytesRate - ddosBytesRateMeanBroadcast.value, 2),
         0.0,
         0.0,
        )
      else
        (
         0.0,
         0.0,
         math.pow(packetsRate - legitPacketsRateMeanBroadcast.value, 2),
         math.pow(bytesRate - legitBytesRateMeanBroadcast.value, 2),
        )
    }.
    reduce {
      case(
        (ddosPacketsRateDiff1, ddosBytesRateDiff1, legitPacketsRateDiff1, legitBytesRateDiff1),
        (ddosPacketsRateDiff2, ddosBytesRateDiff2, legitPacketsRateDiff2, legitBytesRateDiff2)
      ) => (
        ddosPacketsRateDiff1 + ddosPacketsRateDiff2,
        ddosBytesRateDiff1 + ddosBytesRateDiff2,
        legitPacketsRateDiff1 + legitPacketsRateDiff2,
        legitBytesRateDiff1 + legitBytesRateDiff2
      )
    }

val ddosPacketsRateStdDev = math.sqrt(ddosPacketsRateDiff / ddosCount)
val ddosBytesRateStdDev = math.sqrt(ddosBytesRateDiff / ddosCount)
val legitPacketsRateStdDev = math.sqrt(legitPacketsRateDiff / legitCount)
val legitBytesRateStdDev = math.sqrt(legitBytesRateDiff / legitCount)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ddosPacketsRateMeanBroadcast: org.apache.spark.broadcast.Broadcast[Double] = Broadcast(45)
ddosBytesRateMeanBroadcast: org.apache.spark.broadcast.Broadcast[Double] = Broadcast(46)
legitPacketsRateMeanBroadcast: org.apache.spark.broadcast.Broadcast[Double] = Broadcast(47)
legitBytesRateMeanBroadcast: org.apache.spark.broadcast.Broadcast[Double] = Broadcast(48)
ddosPacketsRateDiff: Double = 51875.59647335777
ddosBytesRateDiff: Double = 4.5501268453884387E8
legitPacketsRateDiff: Double = 1.0095912342565912E8
legitBytesRateDiff: Double = 1.2827464729662332E12
ddosPacketsRateStdDev: Double = 0.15597820418003486
ddosBytesRateStdDev: Double = 14.608112880857709
legitPacketsRateStdDev: Double = 1065.0690932150894
legitBytesRateStdDev: Double = 120053.66182970731


Questa _query_ prende in input l'intero dataset e produce un dataset composto di quattro colonne: le prime due sono rispettivamente le somme delle differenze al quadrato tra la frequenza di pacchetti in un flusso DDoS e la frequenza dei byte in un flusso DDoS e le loro medie, mentre le seconde due sono gli stessi campi ma per i flussi legittimi.

Questa _query_ prende il dataset intermedio costruito in precedenza e lo trasforma in un tupla a quattro valori dove se il flusso è DDoS la differenza al quadrato tra i valori della riga e le loro medie saranno copiati nei primi due campi della tupla, viceversa se il flusso è legittimo. Infine viene effettuata una operazione di somma tra tutte le colonne. Questo permette di ottenere le somme per le due variabili di interesse sia per attacchi DDoS che traffico legittimo consentendo quindi di calcolare le deviazioni standard.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Calcolo deviazione standard delle frequenze di pacchetti e _bytes_ per traffico legittimo e DDoS |  10.6 GB  | 26 s |

In [34]:
val flowsDatasetCached =
    recordDataset.
    filter(_.duration > 0.0).
    map(r =>
      (
        (r.sourceAddress, r.sourcePort, r.destinationAddress, r.destinationPort, r.protocol),
        (r.isDDoS, r.duration, r.packets, r.bytes),
      ),
    ).
    reduceByKey { case ((isDDoS1, duration1, packets1, bytes1), (isDDoS2, duration2, packets2, bytes2)) =>
      (isDDoS1 || isDDoS2, duration1 + duration2, packets1 + packets2, bytes1 + bytes2)
    }.
    map { case (_, (isDDoS, duration, packets, bytes)) => (isDDoS, packets / duration, bytes / duration) }.
    cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

flowsDatasetCached: org.apache.spark.rdd.RDD[(Boolean, Double, Double)] = MapPartitionsRDD[98] at map at <console>:45


In [35]:
val (ddosPacketsRateSum, ddosBytesRateSum, ddosCount, legitPacketsRateSum, legitBytesRateSum, legitCount) =
    flowsDatasetCached.
    map { case (isDDoS, packetsRate, bytesRate) =>
      if (isDDoS) (packetsRate, bytesRate, 1, 0.0, 0.0, 0) else (0.0, 0.0, 0, packetsRate, bytesRate, 1)
    }.
    reduce { 
      case(
        (ddosPacketsRate1, ddosBytesRate1, ddosCount1, legitPacketsRate1, legitBytesRate1, legitCount1), 
        (ddosPacketsRate2, ddosBytesRate2, ddosCount2, legitPacketsRate2, legitBytesRate2, legitCount2)
      ) => (
        ddosPacketsRate1 + ddosPacketsRate2, 
        ddosBytesRate1 + ddosBytesRate2, 
        ddosCount1 + ddosCount2, 
        legitPacketsRate1 + legitPacketsRate2, 
        legitBytesRate1 + legitBytesRate2,
        legitCount1 + legitCount2
      )  
    }

val ddosPacketsRateMeanBroadcast = sc.broadcast(ddosPacketsRateSum / ddosCount)
val ddosBytesRateMeanBroadcast = sc.broadcast(ddosBytesRateSum / ddosCount)
val legitPacketsRateMeanBroadcast = sc.broadcast(legitPacketsRateSum / legitCount)
val legitBytesRateMeanBroadcast = sc.broadcast(legitBytesRateSum / legitCount)

val (ddosPacketsRateDiff, ddosBytesRateDiff, legitPacketsRateDiff, legitBytesRateDiff) =
    flowsDatasetCached.
    map { case (isDDoS, packetsRate, bytesRate) =>
      if (isDDoS)
        (
         math.pow(packetsRate - ddosPacketsRateMeanBroadcast.value, 2),
         math.pow(bytesRate - ddosBytesRateMeanBroadcast.value, 2),
         0.0,
         0.0,
        )
      else
        (
         0.0,
         0.0,
         math.pow(packetsRate - legitPacketsRateMeanBroadcast.value, 2),
         math.pow(bytesRate - legitBytesRateMeanBroadcast.value, 2),
        )
    }.
    reduce {
      case(
        (ddosPacketsRateDiff1, ddosBytesRateDiff1, legitPacketsRateDiff1, legitBytesRateDiff1),
        (ddosPacketsRateDiff2, ddosBytesRateDiff2, legitPacketsRateDiff2, legitBytesRateDiff2)
      ) => (
        ddosPacketsRateDiff1 + ddosPacketsRateDiff2,
        ddosBytesRateDiff1 + ddosBytesRateDiff2,
        legitPacketsRateDiff1 + legitPacketsRateDiff2,
        legitBytesRateDiff1 + legitBytesRateDiff2
      )
    }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ddosPacketsRateSum: Double = 584305.5830716968
ddosBytesRateSum: Double = 5.646841374913517E7
ddosCount: Int = 2132236
legitPacketsRateSum: Double = 31059.85973989538
legitBytesRateSum: Double = 5124031.632252166
legitCount: Int = 89
ddosPacketsRateMeanBroadcast: org.apache.spark.broadcast.Broadcast[Double] = Broadcast(52)
ddosBytesRateMeanBroadcast: org.apache.spark.broadcast.Broadcast[Double] = Broadcast(53)
legitPacketsRateMeanBroadcast: org.apache.spark.broadcast.Broadcast[Double] = Broadcast(54)
legitBytesRateMeanBroadcast: org.apache.spark.broadcast.Broadcast[Double] = Broadcast(55)
ddosPacketsRateDiff: Double = 51875.59647335778
ddosBytesRateDiff: Double = 4.5501268453884375E8
legitPacketsRateDiff: Double = 1.0095912342565912E8
legitBytesRateDiff: Double = 1.2827464729662332E12


È stato applicato il meccanismo di _caching_ per cercare di ottimizzare l'esecuzione delle due _sub-query_, suggerito dalla presenza del _dataset_ intermedio che entrambe sfruttano.
Benché Spark abbia un meccanismo di _caching_ "interno", capace di ottimizzare le _query_ che vengono fatte a partire da variabili che vengono riutilizzate, il _caching_ esplicito del _dataset_ dà risultati comunque superiori.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Conteggio e somma per pacchetti, _bytes_ sia legittimi che DDoS (**cache**) |  10.6 GB  | 0.3 s (2.6 min con scrittura _cache_) |
| Calcolo deviazione standard per packets rate e bytes rate per traffico legittimo e DDoS (**cache**) | 10.6 GB | 0.5 s |

Con i valori calcolati sono state costruite le distribuizioni gaussiane delle frequenze dei pacchetti per flusso e dei byte per flusso, sia per quelli DDoS che quelli legittimi.

Si può osservare che il traffico DDoS ha una frequenza di pacchetti relativamente bassa, attorno a 0.27 pacchetti al secondo, mentre invece il traffico legittimo ha una frequenza di pacchetti che si aggira mediamente attorno ai 350 pacchetti al secondo. Essendo la varianza per il traffico leggittimo molto più elevata, ciò indica che c'è maggiore incertezza sul valore che la media rappresenta, poiché il traffico legittimo è ragionevole che sia maggiormente eterogeneo.

Discorso analogo si può fare per quanto riguarda la frequenza dei _byte_, che è mediamente di 26 _byte_ al secondo per i flussi DDoS, mentre è mediamente di 57573 _bytes_ al secondo per i flussi legittimi.

Abbiamo tenuto conto di queste informazioni per calcolare un valore nella nostra metrica che indica un determinato flusso come DDoS tanto più quanto si avvicina ai valori medi qui trovati perché quelli inerenti al traffico legittimo sono troppo distribuiti per essere considerati significativi. 

<div align="center">
    <div>
        <img src="images/ddos-flow-packets-rate.png" width="40%"/>
        <img src="images/legit-flow-packets-rate.png" width="40%"/>
    </div>
    <div>
        <img src="images/ddos-flow-bytes-rate.png" width="40%"/>
        <img src="images/legit-flow-bytes-rate.png" width="40%"/>
    </div>
</div>

In [36]:
flowsDatasetCached.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res26: flowsDatasetCached.type = MapPartitionsRDD[98] at map at <console>:45


## Statistiche temporali su attacchi 

Questa _query_ si da come obiettivo quello di estrapolare l'arco temporale all'interno del quale sono stati condotti gli attacchi DDoS.

In [37]:
def minDate(date1: LocalDateTime, date2: LocalDateTime): LocalDateTime = {
    if (date1.isBefore(date2)) date1 else date2
}

def maxDate(date1: LocalDateTime, date2: LocalDateTime): LocalDateTime = {
    if (date1.isAfter(date2)) date1 else date2
}

val minMaxDate = recordDataset.filter(_.isDDoS).map(r => (r.startTime, r.startTime)).reduce {
    case ((accMax, accMin), (r1, r2)) => (maxDate(accMax, r1), minDate(accMin, r2))
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

minDate: (date1: java.time.LocalDateTime, date2: java.time.LocalDateTime)java.time.LocalDateTime
maxDate: (date1: java.time.LocalDateTime, date2: java.time.LocalDateTime)java.time.LocalDateTime
minMaxDate: (java.time.LocalDateTime, java.time.LocalDateTime) = (2018-06-04T09:22:59.512,2018-06-04T07:02:01.762)


Questa _query_ prende in input l'intero dataset e restituisce una coppia di valori che rappresentano il _timestamp_ più piccolo di inizio di un attacco e il _timestamp_ più grande di inizio di un attacco.

La _query_ trattiene solamente i _record_ che appartengono a un attacco DDoS e trasforma ciascuno di essi in una coppia di valori che rappresentano l'inizio dell'attacco. Infine viene calcolato il valore minimo e massimo di quei due valori attraverso una operazione di _reduce_.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Orario di inizio del primo attacco e orario di inizio dell'ultimo attacco |  10.6 GB  | 14 s |

In [38]:
val minMaxDate = recordDataset.
    filter(_.isDDoS).
    coalesce(12).
    map(r => (r.startTime, r.startTime)).
    reduce {
        case ((accMax, accMin), (r1, r2)) => (maxDate(accMax, r1), minDate(accMin, r2))
    }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

minMaxDate: (java.time.LocalDateTime, java.time.LocalDateTime) = (2018-06-04T09:22:59.512,2018-06-04T07:02:01.762)


Come ottimizzazione è stato tentata la riduzione del numero di partizioni a 12, ma non c'è stato un miglioramento delle prestazioni. Per questo motivo si è rinunciato ad ulteriori ottimizzazioni vista la semplicità della query.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Orario di inizio del primo attacco e orario di inizio dell'ultimo attacco (**coalesce**) |  10.6 GB  | 14 s |

In [39]:
val startTime = minMaxDate._2
val endTime = minMaxDate._1

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

startTime: java.time.LocalDateTime = 2018-06-04T07:02:01.762
endTime: java.time.LocalDateTime = 2018-06-04T09:22:59.512


Come si può osservare dai risultati precedenti, la distanza temporale tra il primo e l'ultimo attacco è di circa 2.30h. Da questo risultato è stato derivato il fatto che non è possibile estrapolare ulteriori informazioni utili dai _timestamp_.

### Quartili e distribuzioni delle colonne "packets", "bytes", "rate" e "byteRate"

In questa _query_ si sono voluti calcolare i quartili delle colonne del _dataset_ "packets", "bytes", "rate" e di una colonna aggiuntiva, non presente nel _dataset_, "byteRate", che rappresenta il numero di _byte_ che sono stati inviati al secondo durante una connessione.
I quartili sono stati poi utilizzati per ripulire il _dataset_ dai cosiddetti _outlier_ e poter calcolare media e deviazione standard di ciascuna colonna, ovvero la loro distribuzione.

In [40]:
case class Quartiles[T: Numeric](min: T, firstQuartile: T, secondQuartile: T, thirdQuartile: T, max: T)

def getStatistics[T: Numeric: Ordering: ClassTag](dataset: RDD[T], size: Long): Quartiles[T] = {
    dataset.
        sortBy(v => v).
        zipWithIndex().
        filter { case (_, index) =>
            index == 0 || index == size / 4 || index == size / 2 || index == size * 3 / 4 || index == size - 1
        }.
        collect().
        sortBy(_._2).
        map(_._1) match {
            case Array(min, first, second, third, max) => Quartiles(min, first, second, third, max)
        }
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

defined class Quartiles
getStatistics: [T](dataset: org.apache.spark.rdd.RDD[T], size: Long)(implicit evidence$1: Numeric[T], implicit evidence$2: Ordering[T], implicit evidence$3: scala.reflect.ClassTag[T])Quartiles[T]


In [41]:
val ddosSize = recordDataset.filter(_.isDDoS).count()
val legitSize = recordDataset.filter(!_.isDDoS).count()
val filteredDDoSSize = recordDataset.filter(r => r.isDDoS && r.duration > 0.0).count()
val filteredLegitSize = recordDataset.filter(r => !r.isDDoS && r.duration > 0.0).count()

val packetsDDoS = recordDataset.filter(_.isDDoS).map(_.packets)
val packetsLegit = recordDataset.filter(!_.isDDoS).map(_.packets)
val bytesDDoS = recordDataset.filter(_.isDDoS).map(_.bytes)
val bytesLegit = recordDataset.filter(!_.isDDoS).map(_.bytes)
val rateDDoS = recordDataset.filter(_.isDDoS).map(_.rate)
val rateLegit = recordDataset.filter(!_.isDDoS).map(_.rate)
val bytesRateDDoS = recordDataset.filter(r => r.isDDoS && r.duration > 0.0).map(r => r.bytes / r.duration)
val bytesRateLegit = recordDataset.filter(r => !r.isDDoS && r.duration > 0.0).map(r => r.bytes / r.duration)

val packetQuartileDDoS = getStatistics(packetsDDoS, ddosSize)
val packetQuartileLegit = getStatistics(packetsLegit, legitSize)
val bytesQuartileDDoS = getStatistics(bytesDDoS, ddosSize)
val bytesQuartileLegit = getStatistics(bytesLegit, legitSize)
val rateQuartileDDoS = getStatistics(rateDDoS, ddosSize)
val rateQuartileLegit = getStatistics(rateLegit, legitSize)
val bytesRateQuartileDDoS = getStatistics(bytesRateDDoS, filteredDDoSSize)
val bytesRateQuartileLegit = getStatistics(bytesRateLegit, filteredLegitSize)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ddosSize: Long = 38531238
legitSize: Long = 1265
filteredDDoSSize: Long = 36387979
filteredLegitSize: Long = 1246
packetsDDoS: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[111] at map at <console>:34
packetsLegit: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[113] at map at <console>:33
bytesDDoS: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[115] at map at <console>:33
bytesLegit: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[117] at map at <console>:33
rateDDoS: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[119] at map at <console>:33
rateLegit: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[121] at map at <console>:33
bytesRateDDoS: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[123] at map at <console>:33
bytesRateLegit: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[125] at map at <console>:33
packetQuartileDDoS: Quartiles[Long] = Quartiles(1,3,5,8,32)
packetQuartileLegit: Quartiles[Long] = Quartiles(1,270,951,2397,248848)
bytesQuartileDDoS: Q

Questa _query_ prende in input una serie di _dataset_ intermedi, ognuno dei quali ha ricevuto un'operazione di eliminazione delle righe che non appartengono al tipo di traffico di interesse e poi un'operazione di trasformazione che porta il _dataset_ ad avere solamente la colonna di interesse.

La _query_ per l'estrazione dei quartili da una colonna presuppone che il _dataset_ sia formato solamente da una colonna di tipo numerico e che sia già stata calcolata la dimensione del _dataset_.
La _query_ si preoccupa di ordinare le righe dalla più piccola alla più grande e di numerarle, dopodiché trattiene solamente quelle in corrispondenza dei quartili, ovvero il valore minimo in prima posizione, il valore ad un quarto del _dataset_, la mediana, il valore a tre quarti del _dataset_ e il massimo in ultima posizione.
Una volta fatto li si raccoglie e li si ordina per indice, nel caso in cui fossero stati disordinati, e vengono salvati in un'apposita struttura dati.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Numero di record appartenenti ad attacchi DDoS |  10.6 GB  | 15 s |
| Numero di record appartenenti al traffico legittimo |  10.6 GB  | 15 s |
| Numero di record appartenenti ad attacchi DDoS con durata non 0 |  10.6 GB  | 15 s |
| Numero di record appartenenti al traffico legittimo con durata non 0 |  10.6 GB  | 15 s |
| Calcolo quartili per pacchetti in attacchi DDoS |  10.6 GB  | 39 s |
| Calcolo quartili per pacchetti nel traffico legittimo |  10.6 GB  | 70 s |
| Calcolo quartili per _bytes_ in attacchi DDoS |  10.6 GB  | 41 s |
| Calcolo quartili per _bytes_ nel traffico legittimo |  10.6 GB  | 71 s |
| Calcolo quartili per _rate_ in attacchi DDoS |  10.6 GB  | 86 s |
| Calcolo quartili per _rate_ nel traffico legittimo |  10.6 GB  | 67 s |
| Calcolo quartili per _bytes rate_ in attacchi DDoS |  10.6 GB  | 99 s |
| Calcolo quartili per _bytes rate_ nel traffico legittimo |  10.6 GB  | 67 s |

In [42]:
case class Gaussian(mean: Double, stdDev: Double)

def getGaussian[T: Numeric](sc: SparkContext, rdd: RDD[T]): Gaussian = {
    val (sum, count) = rdd.map(r => (r, 1)).reduce{ case((sum1, count1), (sum2, count2)) => (sum1 + sum2, count1 + count2) }
    val mean = sum.toDouble() / count
    val meanBroadcast = sc.broadcast(mean)

    val stdDev = math.sqrt(rdd.map(r => math.pow(r.toDouble() - meanBroadcast.value, 2)).sum() / count)
    Gaussian(mean, stdDev)
}

def cleanByIQR[T: Numeric: Ordering](sc: SparkContext, rdd: RDD[T], quartiles: Quartiles[T]): RDD[T] = {
    val rangeBroadcast = sc.broadcast(
        (
            quartiles.firstQuartile.toDouble() - 1.5 * (quartiles.thirdQuartile - quartiles.firstQuartile).toDouble(),
            quartiles.thirdQuartile.toDouble() + 1.5 * (quartiles.thirdQuartile - quartiles.firstQuartile).toDouble(),
        ),
    )
    rdd.filter(r => r.toDouble() > rangeBroadcast.value._1 && r.toDouble() < rangeBroadcast.value._2)
}

val packetDDoSGaussian = getGaussian(sc, cleanByIQR(sc, packetsDDoS, packetQuartileDDoS))
val packetLegitGaussian = getGaussian(sc, cleanByIQR(sc, packetsLegit, packetQuartileLegit))
val bytesDDoSGaussian = getGaussian(sc, cleanByIQR(sc, bytesDDoS, bytesQuartileDDoS))
val bytesLegitGaussian = getGaussian(sc, cleanByIQR(sc, bytesLegit, bytesQuartileLegit))
val rateDDoSGaussian = getGaussian(sc, cleanByIQR(sc, rateDDoS, rateQuartileDDoS))
val rateLegitGaussian = getGaussian(sc, cleanByIQR(sc, rateLegit, rateQuartileLegit))
val bytesRateDDoSGaussian = getGaussian(sc, cleanByIQR(sc, bytesRateDDoS, bytesRateQuartileDDoS))
val bytesRateLegitGaussian = getGaussian(sc, cleanByIQR(sc, bytesRateLegit, bytesRateQuartileLegit))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

defined class Gaussian
getGaussian: [T](sc: org.apache.spark.SparkContext, rdd: org.apache.spark.rdd.RDD[T])(implicit evidence$1: Numeric[T])Gaussian
cleanByIQR: [T](sc: org.apache.spark.SparkContext, rdd: org.apache.spark.rdd.RDD[T], quartiles: Quartiles[T])(implicit evidence$1: Numeric[T], implicit evidence$2: Ordering[T])org.apache.spark.rdd.RDD[T]
packetDDoSGaussian: Gaussian = Gaussian(5.909105626202674,3.278499336168519)
packetLegitGaussian: Gaussian = Gaussian(1226.423275862069,1281.2192816070487)
bytesDDoSGaussian: Gaussian = Gaussian(529.8851801659653,240.05394429652554)
bytesLegitGaussian: Gaussian = Gaussian(904257.2375979113,1127246.4062869572)
rateDDoSGaussian: Gaussian = Gaussian(0.27486262284753876,0.18712897621757338)
rateLegitGaussian: Gaussian = Gaussian(61.127205176800025,53.07578216633704)
bytesRateDDoSGaussian: Gaussian = Gaussian(33.865466800873364,17.01016488809738)
bytesRateLegitGaussian: Gaussian = Gaussian(52217.11108098465,55860.34186395787)


Questa _query_ prende in input una serie di _dataset_ intermedi, ognuno dei quali ha ricevuto un'operazione di eliminazione delle righe che non appartengono al tipo di traffico di interesse e poi un'operazione di trasformazione che porta il _dataset_ ad avere solamente la colonna di interesse.

La _query_ per ottenere la distribuzione gaussiana di ciascuna delle colonne è composta da due fasi.

La prima fase si preoccupa dell'eliminazione degli _outlier_ che andrebbero ad inficiare il calcolo dei valori della distribuzione.
Questa viene fatta sulla base dei quartili estratti in precedenza, calcolando per ogni valore il _range_ interquartile secondo la sua definizione.
Una volta calcolato questo, la _query_ prende in input il _dataset_ precedente ed applica una trasformazione per eliminare tutte quelle righe che non rientrano nel _range_.

La seconda fase effettua in prima istanza il calcolo della media, trasformando ogni riga del _dataset_ ripulito in una coppia fatta dal valore originale in essa contenuto e il valore 1 e poi effettuando la somma di tutti i valori in queste due colonne.
In questo modo si ottiene la somma dei valori e il loro conteggio e così, dividendoli tra loro, è possibile ottenere il valor medio.
A questo punto, per ottenere la deviazione standard, non si fa altro che prendere il _dataset_ ricevuto e trasformare il valore di ciascuna riga nel quadrato della differenza con la media e poi sommare tutti i valori.
Dividendo questo valore ottenuto per il numero complessivo di righe nel _dataset_ ottenuto in precedenza e poi calcolando la radice quadrata di quest'ultima si ottiene quindi la deviazione standard.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Calcolo gaussiana per pacchetti in attacchi DDoS |  10.6 GB  | 31 s |
| Calcolo gaussiana per pacchetti nel traffico legittimo |  10.6 GB  | 36 s |
| Calcolo gaussiana per _bytes_ in attacchi DDoS |  10.6 GB  | 28 s |
| Calcolo gaussiana per _bytes_ nel traffico legittimo |  10.6 GB  | 26 s |
| Calcolo gaussiana per _rate_ in attacchi DDoS |  10.6 GB  | 28 s |
| Calcolo gaussiana per _rate_ nel traffico legittimo |  10.6 GB  | 26 s |
| Calcolo gaussiana per _bytes rate_ in attacchi DDoS |  10.6 GB  | 29 s |
| Calcolo gaussiana per _bytes rate_ nel traffico legittimo |  10.6 GB  | 26 s |

In [43]:
val ddosDataset = recordDataset.filter(_.isDDoS).persist(StorageLevel.MEMORY_AND_DISK)
val legitDataset = recordDataset.filter(!_.isDDoS).persist(StorageLevel.MEMORY_AND_DISK)

val packetsDDoSCached = ddosDataset.map(_.packets).persist(StorageLevel.MEMORY_AND_DISK)
val packetsLegitCached = legitDataset.map(_.packets).persist(StorageLevel.MEMORY_AND_DISK)
val bytesDDoSCached = ddosDataset.map(_.bytes).persist(StorageLevel.MEMORY_AND_DISK)
val bytesLegitCached = legitDataset.map(_.bytes).persist(StorageLevel.MEMORY_AND_DISK)
val rateDDoSCached = ddosDataset.map(_.rate).persist(StorageLevel.MEMORY_AND_DISK)
val rateLegitCached = legitDataset.map(_.rate).persist(StorageLevel.MEMORY_AND_DISK)
val bytesRateDDoSCached = ddosDataset.filter(_.duration > 0.0).map(r => r.bytes / r.duration).persist(StorageLevel.MEMORY_AND_DISK)
val bytesRateLegitCached = legitDataset.filter(_.duration > 0.0).map(r => r.bytes / r.duration).persist(StorageLevel.MEMORY_AND_DISK)

val packetQuartileDDoSCached = getStatistics(packetsDDoSCached, ddosSize)
val packetQuartileLegitCached = getStatistics(packetsLegitCached, legitSize)
val bytesQuartileDDoSCached = getStatistics(bytesDDoSCached, ddosSize)
val bytesQuartileLegitCachedCached = getStatistics(bytesLegitCached, legitSize)
val rateQuartileDDoSCached = getStatistics(rateDDoSCached, ddosSize)
val rateQuartileLegitCached = getStatistics(rateLegitCached, legitSize)
val bytesRateQuartileDDoSCached = getStatistics(bytesRateDDoSCached, recordDataset.filter(r => r.isDDoS && r.duration > 0.0).count())
val bytesRateQuartileLegitCached = getStatistics(bytesRateLegitCached, recordDataset.filter(r => !r.isDDoS && r.duration > 0.0).count())

val packetDDoSGaussianCached = getGaussian(sc, cleanByIQR(sc, packetsDDoSCached, packetQuartileDDoS))
val packetLegitGaussianCached = getGaussian(sc, cleanByIQR(sc, packetsLegitCached, packetQuartileLegit))
val bytesDDoSGaussianCached = getGaussian(sc, cleanByIQR(sc, bytesDDoSCached, bytesQuartileDDoS))
val bytesLegitGaussianCached = getGaussian(sc, cleanByIQR(sc, bytesLegitCached, bytesQuartileLegit))
val rateDDoSGaussianCached = getGaussian(sc, cleanByIQR(sc, rateDDoSCached, rateQuartileDDoS))
val rateLegitGaussianCached = getGaussian(sc, cleanByIQR(sc, rateLegitCached, rateQuartileLegit))
val bytesRateDDoSGaussianCached = getGaussian(sc, cleanByIQR(sc, bytesRateDDoSCached, bytesRateQuartileDDoS))
val bytesRateLegitGaussianCached = getGaussian(sc, cleanByIQR(sc, bytesRateLegitCached, bytesRateQuartileLegit))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ddosDataset: org.apache.spark.rdd.RDD[Record] = MapPartitionsRDD[206] at filter at <console>:33
legitDataset: org.apache.spark.rdd.RDD[Record] = MapPartitionsRDD[207] at filter at <console>:33
packetsDDoSCached: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[208] at map at <console>:34
packetsLegitCached: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[209] at map at <console>:33
bytesDDoSCached: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[210] at map at <console>:33
bytesLegitCached: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[211] at map at <console>:33
rateDDoSCached: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[212] at map at <console>:33
rateLegitCached: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[213] at map at <console>:33
bytesRateDDoSCached: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[215] at map at <console>:33
bytesRateLegitCached: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[217] at map at <console>:33
packetQuartileDDoSCached:

Per ottimizzare l'esecuzione della _query_ è stato deciso di effettuare il _caching_ di tutti _dataset_ intermedi che sono stati utilizzati.

In questo modo, per quanto riguarda le _query_ sui quartili, i tempi di esecuzione si sono ridotti,  se però non si considerando anche le scritture dovute al _caching_.
Il vero guadagno si vede infatti nelle _query_ per il calcolo delle distribuzioni.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Calcolo quartili per pacchetti in attacchi DDoS (**cache**) |  10.6 GB  | 11 s (132 s con scrittura _cache_) |
| Calcolo quartili per pacchetti nel traffico legittimo (**cache**) |  10.6 GB  | 0.6 s (39 s con scrittura _cache_) |
| Calcolo quartili per _bytes_ in attacchi DDoS (**cache**) |  10.6 GB  | 15 s (75 s con scrittura _cache_) |
| Calcolo quartili per _bytes_ nel traffico legittimo (**cache**) |  10.6 GB  | 0.9 s (0.9 s con scrittura _cache_) |
| Calcolo quartili per _range_ in attacchi DDoS (**cache**) |  10.6 GB  | 16 s (65 s con scrittura _cache_) |
| Calcolo quartili per _range_ nel traffico legittimo (**cache**) |  10.6 GB  | 0.9 s (0.9 s con scrittura _cache_) |
| Calcolo quartili per _bytes rate_ in attacchi DDoS (**cache**) |  10.6 GB  | 17 s (81 s con scrittura _cache_) |
| Calcolo quartili per _bytes rate_ nel traffico legittimo (**cache**) |  10.6 GB  | 0.7 s (0.7 s con scrittura _cache_) |
| Calcolo gaussiana per pacchetti in attacchi DDoS (**cache**) |  10.6 GB  | 3 s |
| Calcolo gaussiana per pacchetti nel traffico legittimo (**cache**) |  10.6 GB  | 0.5 s |
| Calcolo gaussiana per _bytes_ in attacchi DDoS (**cache**) |  10.6 GB  | 4 s |
| Calcolo gaussiana per _bytes_ nel traffico legittimo (**cache**) |  10.6 GB  | 0.2 s |
| Calcolo gaussiana per _range_ in attacchi DDoS (**cache**) |  10.6 GB  | 4 s |
| Calcolo gaussiana per _range_ nel traffico legittimo (**cache**) |  10.6 GB  | 0.3 s |
| Calcolo gaussiana per _bytes rate_ in attacchi DDoS (**cache**) |  10.6 GB  |  4 s |
| Calcolo gaussiana per _bytes rate_ nel traffico legittimo (**cache**) |  10.6 GB  | 0.4 s |

Come è possibile osservare per tutti e quattro i campi, i _record_ associati ad attacchi DDoS hanno una distribuzione più "compatta" rispetto ai corrispettivi associati a traffico legittimo, cioè con una varianza più bassa, che ci garantisce una maggiore affidabilità del dato.
Questo perché, ancora una volta, il traffico legittimo risulta molto variegato e perciò difficile da descrivere con precisione.
Inoltre, il valore di ciascun campo per il traffico DDoS è più basso di quello che riguarda quello legittimo.
Il numero di pacchetti per _record_ in un attacco DDoS è circa 5.9 pacchetti contro i 1226 per quello legittimo, il numero di _byte_ è di 530 contro 904257, il numero di pacchetti al secondo è di 0.27 contro 61 e infine il numero di byte al secondo è di 34 contro 52217.
Utilizzeremo questi valori nella metrica per identificare potenziali flussi di traffico DDoS, sapendo la loro conformazione.

<div align="center">
    <div>
        <img src="images/packets-ddos.png" width="40%"/>
        <img src="images/packets-legit.png" width="40%"/>
    </div>
    <div>
        <img src="images/bytes-ddos.png" width="40%"/>
        <img src="images/bytes-legit.png" width="40%"/>
    </div>
    <div>
        <img src="images/rates-ddos.png" width="40%"/>
        <img src="images/rates-legit.png" width="40%"/>
    </div>
    <div>
        <img src="images/bytesRate-ddos.png" width="40%"/>
        <img src="images/bytesRate-legit.png" width="40%"/>
    </div>
</div>

In [44]:
ddosDataset.unpersist()
legitDataset.unpersist()
packetsDDoSCached.unpersist()
packetsLegitCached.unpersist()
bytesDDoSCached.unpersist()
bytesLegitCached.unpersist()
rateDDoSCached.unpersist()
rateLegitCached.unpersist()
bytesRateDDoSCached.unpersist()
bytesRateLegitCached.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res38: ddosDataset.type = MapPartitionsRDD[206] at filter at <console>:33
res39: legitDataset.type = MapPartitionsRDD[207] at filter at <console>:33
res40: packetsDDoSCached.type = MapPartitionsRDD[208] at map at <console>:34
res41: packetsLegitCached.type = MapPartitionsRDD[209] at map at <console>:33
res42: bytesDDoSCached.type = MapPartitionsRDD[210] at map at <console>:33
res43: bytesLegitCached.type = MapPartitionsRDD[211] at map at <console>:33
res44: rateDDoSCached.type = MapPartitionsRDD[212] at map at <console>:33
res45: rateLegitCached.type = MapPartitionsRDD[213] at map at <console>:33
res46: bytesRateDDoSCached.type = MapPartitionsRDD[215] at map at <console>:33
res47: bytesRateLegitCached.type = MapPartitionsRDD[217] at map at <console>:33


## Costruzione della metrica

La metrica è stata costruita sulla base delle analisi svolte in precedenza, assumendo che l'applicazione che dovrà svolgere le analisi riceverà in input _record_ nello stesso formato di quelli del _dataset_ analizzato.
Ricapitolando, le informazioni di cui si è tenuto conto sono:

* la porta 80 è fortemente soggetta ad attacchi di tipo DDoS
* le macchine associate agli indirizzi IP `192.168.100.3`, `192.168.100.6` e `192.168.100.7` sono fortemente soggette ad attacchi di tipo DDoS
* le distribuzioni dei campi "packets", "bytes", "rate" e "byteRate"
* le distribuzioni dei campi "rate" e "byteRate" per ciascun flusso

Queste hanno permesso di costruire la seguente funzione capace di assegnare un peso a ciascun _record_ in arrivo.
È bene tenere a mente che la metrica dovrà restituire un valore tra 0 e 1, che potrà essere interpretato come la probabilità che lo specifico _record_ appartenga a traffico DDoS.

In [45]:
def metric(
    destinationPort: Long, 
    destinationAddress: String, 
    packets: Long, 
    bytes: Long, 
    rate: Double, 
    byteRate: Double, 
    flowRate: Double, 
    flowByteRate: Double
): Double =
    ((if (destinationPort == 80L) 1 else 0) +
     (if (Set("192.168.100.3", "192.168.100.6", "192.168.100.7", "192.168.100.5")(destinationAddress)) 1 else 0) +
     (1 - math.min(math.abs(packets - 5.909105626202674) / (3 * 3.2784993361685184), 1.0)) +
     (1 - math.min(math.abs(bytes - 529.8851801659653) / (3 * 240.0539442965255), 1.0)) +
     (1 - math.min(math.abs(rate - 0.27486262284753876) / (3 * 0.18712897621757338), 1.0)) + 
     (1 - math.min(math.abs(byteRate - 33.86546680087338) / (3 * 17.010164888097375), 1.0)) +
     (1 - math.min(math.abs(flowRate - 0.27403419840566284) / (3 * 0.15597820418003489), 1.0)) +
     (1 - math.min(math.abs(flowByteRate - 26.483191236399332) / (3 * 14.608112880857705), 1.0))
    ) / 8

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

metric: (destinationPort: Long, destinationAddress: String, packets: Long, bytes: Long, rate: Double, byteRate: Double, flowRate: Double, flowByteRate: Double)Double


La metrica si basa su due operazioni fondamentali che permettono di trasformare i suoi parametri in pesi, ovvero in valori compresi tra 0 e 1.
La prima si preoccupa di trasformare le grandezze booleane in pesi semplicemente assegnando il peso 1 quando il valore è `true` e 0 quando è `false`.

La seconda, più complessa, coinvolge le grandezze numeriche e si basa sul fatto di avere a disposizione la distribuzione delle stesse, assumendo che la forma di queste sia gaussiana.
L'idea è quella di ottenere il peso attraverso un'operazione analoga a quella di standardizzazione, che sottrae al valore della misura il valore medio della distribuzione e lo divide per la deviazione standard.
Questo viene fatto perchè i valori su cui ci si è basati per la costruzione della gaussiana vengano trasformati in quelli appartenenti ad una distribuzione standard, cioè con media 0 e deviazione standard 1, cosicché tutti i valori che ricadono nel _range_ \[μ - σ, μ + σ\] vengono mappati nel _range_ \[0, 1\].
L'operazione costruita effettua questa operazione di standardizzazione, ma dividendo per tre volte la deviazione standard, in modo tale da far ricadere nel _range_ \[0, 1\] il 99,73% dei valori che seguono la distribuzione, evitando la "saturazione" di una maggiore quantità di valori.
Questa procedura infatti non garantisce che il valore ottenuto sia compreso tra 0 e 1 in generale, perciò viene applicata un'operazione di minimo per cui, qualora il valore della funzione dovesse eccedere 1, il valore restituito è 1.
Inoltre, sia per evitare che il valore restituito dalla standardizzazione sia negativo, sia perché ci interessa pesare la distanza dalla media e non tanto la distanza "sinistra" o "destra", al numeratore viene applicata un'operazione di valore assoluto.
In questo modo ci siamo garantiti che il valore restituito sia sempre compreso tra 0 e 1, solo che a questo punto tanto più si avvicina a 0 tanto più si allinea alla distribuzione del traffico DDoS, avvicinandosi al valor medio della stessa, tanto più si avvicina a 1 quanto più se ne discosta.
Per avere l'effetto contrario desiderato basta sottrarre da 1 il valore calcolato.

A questo punto, tutti i pesi sono valori tra 0 e 1 e la loro media aritmetica può essere solamente un valore compreso tra 0 e 1, ottenendo il _range_ di valori desiderato.

### Valutazione delle prestazioni

In questa query, si sono volute valutare le prestazioni della funzione metrica così definita.
Questo significa calcolare il numero di "true positive", "false positive", "true negative" e "false negative" prodotti dall'applicazione della metrica sul _dataset_ originale.
Per "true positive" e "false positive" si intendono i _record_ associati correttamente alle classi di appartenenza che sono quella degli attacchi DDoS e del traffico legittimo.
Invece, per "false positive" e "false negative" il viceversa, ovvero le istanze incorrettamente classificate.

In [46]:
val flowDataDataset =
    recordDataset.
        filter(_.duration > 0.0).
        map(r =>
          (
            (r.sourceAddress, r.sourcePort, r.destinationAddress, r.destinationPort, r.protocol),
            (r.duration, r.packets, r.bytes),
          ),
        ).
        reduceByKey { case ((duration1, packets1, bytes1), (duration2, packets2, bytes2)) =>
          (duration1 + duration2, packets1 + packets2, bytes1 + bytes2)
        }.
        map { case (k, (duration, packets, bytes)) => (k, (packets / duration, bytes / duration)) }.
        persist(StorageLevel.MEMORY_AND_DISK)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

flowDataDataset: org.apache.spark.rdd.RDD[((String, Long, String, Long, String), (Double, Double))] = MapPartitionsRDD[303] at map at <console>:45


In [47]:
val metricDataset =
    recordDataset.
    map(r => (
        (r.sourceAddress, r.sourcePort, r.destinationAddress, r.destinationPort, r.protocol),
        (r.packets, r.bytes, r.rate, if (r.duration > 0.0) r.bytes / r.duration else 0.0, r.isDDoS)
    )).
    join(flowDataDataset).
    map { 
        case(
            (_, _, destinationAddress, destinationPort, _), 
            ((packets, bytes, rate, byteRate, isDDoS), (flowRate, flowByteRate))
        ) => 
        (isDDoS, metric(destinationPort, destinationAddress, packets, bytes, rate, byteRate, flowRate, flowByteRate) >= 0.1)
    }.
    map { case(actual, predicted) => 
        if (actual == predicted) (if (actual) (1, 0) else (0, 1), (0, 0))
        else ((0, 0), if (!actual) (1, 0) else (0, 1))
    }.
    reduce { case(((truePositive1, trueNegative1), (falsePositive1, falseNegative1)), ((truePositive2, trueNegative2), (falsePositive2, falseNegative2))) =>
        ((truePositive1 + truePositive2, trueNegative1 + trueNegative2), (falsePositive1 + falsePositive2, falseNegative1 + falseNegative2))
    }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

metricDataset: ((Int, Int), (Int, Int)) = ((38530875,186),(1073,29))


La _query_ prende in input l'intero _dataset_ e un'aggregazione del _dataset_ che raggruppa i _flow_ in esso e produce in output una coppia di valori, ognuno dei quali è a sua volta una coppia, la prima sono i "true positive" e i "true negative", mentre la seconda è costituita dai "false positive" e dai "false negative".

La _query_ per prima cosa crea il _dataset_ aggregato rimuovendo i _record_ con durata inferiore a 0, che porterebbero a dei risultati erronei nei calcoli successivi.
Dopodiché, trasforma ciascun _record_ in una coppia chiave-valore dove la chiave rappresenta l'identificatore di un flusso, quindi l'insieme dei valori indirizzo IP sorgente, porta sorgente, indirizzo IP destinazione, porta destinazione e protocollo di trasporto.
Il valore è invece rappresentato dalle colonne che saranno necessarie in un secondo momento, cioè la durata, il numero di pacchetti e il numero di _byte_.
Viene quindi effettuata una somma di questi valori per ciascun flusso e poi un'ultima trasformazione per ottenere dalle tre colonne i due valori di interesse, cioè il numero di pacchetti al secondo e il numero di _byte_ al secondo per flusso.

La seconda parte della _query_ si preoccupa invece di trasformare il _dataset_ in modo che ogni _record_ sia nella forma chiave-valore.
La chiave è la stessa che per il _dataset_ aggregato precedentemente, mentre il valore è una tupla fatta dalle colonne mancanti per il calcolo della metrica più quella che ci dice se il _record_ appartiene a traffico DDoS o meno.
Il numero di _byte_ al secondo è forzato a 0 se la durata del _record_ stesso è 0, per semplicità.
In seguito viene effettuato il _join_ tra i due _dataset_ e poi trasformata ogni riga per ottenere una coppia di valori dove il primo indica se la riga appartiene a traffico DDoS o meno, mentre la seconda è la predizione effettuata dalla metrica.
Il valore di cut-off è stato manualmente impostato a 0.5 perché capace di dare le prestazioni migliori.

A questo punto alla _query_ non resta altro da fare che trasformare la coppia di valori in uno dei quattro casi possibili di predizione, ovvero "true positive", "true negative", "false positive" e "false negative".
Viene quindi costruita una coppia di coppie di valori che avrà il valore 1 in corrispondenza della posizione inerente allo specifico caso e infine verrà effettuata una somma dei valori di queste tuple così da poter ottenere il conteggio per ciascun caso.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Calcolo matrice di confusione |  11.2 GB  | 2.7 min (4.8 min con scrittura _cache_) |

In [48]:
val flowBroadcast = sc.broadcast(flowDataDataset.collectAsMap())

val metricDataset =
    recordDataset.
    map(r => (
        (r.sourceAddress, r.sourcePort, r.destinationAddress, r.destinationPort, r.protocol),
        (r.packets, r.bytes, r.rate, if (r.duration > 0.0) r.bytes / r.duration else 0.0, r.isDDoS)
    )).
    map {
        case (k, v) => flowBroadcast.value.get(k).map(r => (k, (v, r)))
    }.
    filter(_.isDefined).
    map(_.get).
    map { 
        case(
            (_, _, destinationAddress, destinationPort, _), 
            ((packets, bytes, rate, byteRate, isDDoS), (flowRate, flowByteRate))
        ) => 
        (isDDoS, metric(destinationPort, destinationAddress, packets, bytes, rate, byteRate, flowRate, flowByteRate) >= 0.5)
    }.
    map { case(actual, predicted) => 
        if (actual == predicted) (if (actual) (1, 0) else (0, 1), (0, 0))
        else ((0, 0), if (!actual) (1, 0) else (0, 1))
    }.
    reduce { case(((truePositive1, trueNegative1), (falsePositive1, falseNegative1)), ((truePositive2, trueNegative2), (falsePositive2, falseNegative2))) =>
        ((truePositive1 + truePositive2, trueNegative1 + trueNegative2), (falsePositive1 + falsePositive2, falseNegative1 + falseNegative2))
    }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

flowBroadcast: org.apache.spark.broadcast.Broadcast[scala.collection.Map[(String, Long, String, Long, String),(Double, Double)]] = Broadcast(198)
metricDataset: ((Int, Int), (Int, Int)) = ((38288961,1259),(0,241943))


Un primo tentativo di ottimizzazione ha coinvolto l'utilizzo delle _broadcast variables_.
L'idea è stata quella di effettuare il _broadcast_ a tutti gli _executor_ del _dataset_ più piccolo, ovvero quello aggregato.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Calcolo matrice di confusione (**broadcast variable**) |  11.2 GB  | 1.2 min (8 s per la generazione della _broadcast variable_) |

In [49]:
val metricDataset =
    recordDataset.
    map(r => (
        (r.sourceAddress, r.sourcePort, r.destinationAddress, r.destinationPort, r.protocol),
        (r.packets, r.bytes, r.rate, if (r.duration > 0.0) r.bytes / r.duration else 0.0, r.isDDoS)
    )).
    partitionBy(new HashPartitioner(140)).
    join(flowDataDataset).
    map { 
        case(
            (_, _, destinationAddress, destinationPort, _), 
            ((packets, bytes, rate, byteRate, isDDoS), (flowRate, flowByteRate))
        ) => 
        (isDDoS, metric(destinationPort, destinationAddress, packets, bytes, rate, byteRate, flowRate, flowByteRate) >= 0.5)
    }.
    map { case(actual, predicted) => 
        if (actual == predicted) (if (actual) (1, 0) else (0, 1), (0, 0))
        else ((0, 0), if (!actual) (1, 0) else (0, 1))
    }.
    reduce { case(((truePositive1, trueNegative1), (falsePositive1, falseNegative1)), ((truePositive2, trueNegative2), (falsePositive2, falseNegative2))) =>
        ((truePositive1 + truePositive2, trueNegative1 + trueNegative2), (falsePositive1 + falsePositive2, falseNegative1 + falseNegative2))
    }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

metricDataset: ((Int, Int), (Int, Int)) = ((38288961,1259),(0,241943))


Una seconda tecnica di ottimizzazione testata è stata quella di introdurre un "HashPartitioner" capace di ridurre le partizioni prima dell'esecuzione della _join_.
Il _partitioner_ è stato applicato sul _dataset_ più grande, in modo che fosse solo il più piccolo dei due a subire _shuffling_.
Questa tecnica ha migliorato il tempo impiegato dalla _query_ quando si dimezza il numero di partizioni a 140.
Aumentare il numero di partizioni peggiora il tempo di esecuzione, così come diminuirlo rispetto a questo valore.

| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Calcolo matrice di confusione (**hash partitioner (140 partizioni)**) |  11.2 GB  | 1.6 min |

In [50]:
val partitioner = new HashPartitioner(28)

val flowDataDataset =
    recordDataset.
        filter(_.duration > 0.0).
        map(r =>
          (
            (r.sourceAddress, r.sourcePort, r.destinationAddress, r.destinationPort, r.protocol),
            (r.duration, r.packets, r.bytes),
          ),
        ).
        reduceByKey { case ((duration1, packets1, bytes1), (duration2, packets2, bytes2)) =>
          (duration1 + duration2, packets1 + packets2, bytes1 + bytes2)
        }.
        map { case (k, (duration, packets, bytes)) => (k, (packets / duration, bytes / duration)) }.
        partitionBy(partitioner)

val metricDataset =
    recordDataset.
    map(r => (
        (r.sourceAddress, r.sourcePort, r.destinationAddress, r.destinationPort, r.protocol),
        (r.packets, r.bytes, r.rate, if (r.duration > 0.0) r.bytes / r.duration else 0.0, r.isDDoS)
    )).
    partitionBy(partitioner).
    join(flowDataDataset).
    map { 
        case(
            (_, _, destinationAddress, destinationPort, _), 
            ((packets, bytes, rate, byteRate, isDDoS), (flowRate, flowByteRate))
        ) => 
        (isDDoS, metric(destinationPort, destinationAddress, packets, bytes, rate, byteRate, flowRate, flowByteRate) >= 0.5)
    }.
    map { case(actual, predicted) => 
        if (actual == predicted) (if (actual) (1, 0) else (0, 1), (0, 0))
        else ((0, 0), if (!actual) (1, 0) else (0, 1))
    }.
    reduce { case(((truePositive1, trueNegative1), (falsePositive1, falseNegative1)), ((truePositive2, trueNegative2), (falsePositive2, falseNegative2))) =>
        ((truePositive1 + truePositive2, trueNegative1 + trueNegative2), (falsePositive1 + falsePositive2, falseNegative1 + falseNegative2))
    }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

partitioner: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@1c
flowDataDataset: org.apache.spark.rdd.RDD[((String, Long, String, Long, String), (Double, Double))] = ShuffledRDD[327] at partitionBy at <console>:49
metricDataset: ((Int, Int), (Int, Int)) = ((38288961,1259),(0,241943))


L'ultima tecnica di ottimizzazione impiegata è stata quella di applicare lo stesso _partitioner_ di tipo "HashPartitioner" su entrambi i _dataset_, così da fare lo _shuffling_ solamente all'inizio e la _join_ deve solamente occuparsi di accoppiare le partizioni nel modo giusto.
Questa tecnica peggiora le prestazioni della _query_ e il risparmio di tempo massimo si ha utilizzando solamente 28 partizioni.
Aumentarle ulteriormente non fa ridurre ulteriormente la durata della _query_.


| Query | Dati in input | Tempo |
| ----- | ----- | ----- |
| Calcolo matrice di confusione (**hash partitioner su entrambi i dataset (28 partizioni)**) |  11.2 GB  | 3.7 min |

In [51]:
println("|                    | Actually positive | Actually negative |")
println("|--------------------|-------------------|-------------------|")
println(f"| Predicted positive | ${metricDataset._1._1.toString}%-17s | ${metricDataset._2._1.toString}%-17s |")
println(f"| Predicted negative | ${metricDataset._2._2.toString}%-17s | ${metricDataset._1._2.toString}%-17s |")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

|                    | Actually positive | Actually negative |
|--------------------|-------------------|-------------------|
| Predicted positive | 38288961          | 0                 |
| Predicted negative | 241943            | 1259              |


Come si può osservare, la metrica è molto buona nell'individuare i _record_ che appartengono a traffico legittimo, meno nel individuare gli attacchi DDoS, essendo perciò più conservativa.
Gli errori che compie sono comunque relativamente bassi e così fatta non soffre di falsi positivi, segnalando traffico da bloccare che invece è legittimo.
Certamente, risultati più veritieri si sarebbero ottenuti basandosi su di un _test set_ differente dal _training set_ utilizzato, che avrebbe evidenziato potenziali problemi di _overfitting_.

In [52]:
flowDataDataset.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res55: flowDataDataset.type = ShuffledRDD[327] at partitionBy at <console>:49
