# DDoS and Intrusion detection


In this exercise, we'll use Spark structured streaming to detect DDoS attacks and attempts to access the admin panel of the website.

First, we'll add the same test_query function function from the cleanup notebook.

In [None]:

from time import sleep
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

from IPython.display import display, clear_output

import pyspark 
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *


def test_query(sdf, mode="append", rows=None, wait=2, sort=None):
    # If a query with the same name exists, stop it.
    query_name = "test_query"
    query = None
    for q in spark.streams.active:
        if (q.name == query_name):
            query = q
    if query is not None:
        query.stop()

    try:
        tq = (
            # Create an output stream
            sdf.writeStream               
            # Only write new rows to the output
            .outputMode(mode)           
            # Write output stream to an in-memory Spark table (a DataFrame)
            .format("memory")               
            # The name of the output table will be the same as the name of the query
            .queryName(query_name)
            # Submit the query to Spark and execute it
            .start()
        )

        tq.processAllAvailable()

        sleep(wait)
        while(tq.status.get("isTriggerActive") == True):
            print(f"DataAvailable: {tq.status['isDataAvailable']},\tTriggerActive: {tq.status['isTriggerActive']}\t{tq.status['message']}")
            sleep(wait)

        # When the status says "Waiting for data to arrive", that means the query
        # has finished its current iteration and is waiting for new messages from
        # Kafka.
        print(f"DataAvailable: {tq.status['isDataAvailable']},\tTriggerActive: {tq.status['isTriggerActive']}\t{tq.status['message']}")

        memory_sink = spark.table(query_name)

        if sort:
            memory_sink = memory_sink.sort(*sort)

        # Show result table in Jupyter Notebook. Since Jupyter Notebooks have native support for showing pandas tables,
        # we convert the Spark DataFrame.
        if rows:
            display(memory_sink)
            display(memory_sink.take(10))
        else:
            display(memory_sink)
            display(memory_sink.toPandas())

    finally:
        # Always try to stop the query but it doesn't matter if it fails.
        try:
            tq.stop()
        except:
            pass


In [None]:
%%bash
# Install the required Python 3 dependencies
python3 -m pip install kafka-python pyarrow  # type: ignore

Create a Spark context and specify that the python spark-kafka libraries need to be added.

In [None]:
# Create a local Spark cluster with two executors (if it doesn't already exist)
spark = SparkSession.builder.master('local[2]').getOrCreate()

Create a streaming DataFrame that represents the events received from the Kafka topic `clicks-cleaned`.

In [None]:
input = (
    
)    

Cast the json to columns in the DataFrame. Make sure to use TimestampType for the `ts_ingest` since we already converted it in the `clean` notebook.

In [None]:
decoded_json_stream = (
    
)

test_query(decoded_json_stream, mode="append")

Create a [user-defined function (`udf`)](https://docs.databricks.com/spark/latest/spark-sql/udf-python.html) `forbidden_clicks` which takes a URL as input and returns `True` if the URL points to the admin part of the website (when it ends with `/admin`).

As an example, the following code creates a UDF which squares each value of a column. It is used on the "id" column and the resulting column's name is changed to "id_squared".

```python
from pyspark.sql.functions import udf

@udf("long")
def squared_udf(s):
  return s * s

df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
```

Use the UDF to create the dataframe `df_forbidden` which contains the collumn `forbidden` which specifies if the URL is an admin URL.

In [None]:
df_forbidden = (
    
)

test_query(df_forbidden, mode="append")

We'll do the same for detecting ddos attacks. First we want to flag whether an individual event is suspicious, i.e. whether the page_timer and page_height are both `0`. However, this time we'll use a `pandas_udf`.

[Regular Python UDF's have the disadvantage that they operate on one row at a time](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html), causing them to suffer from high serialization and invocation overhead. Pandas UDF's are built on top of Apache Arrow to support high-performant UDF's in Python.

This is the squared_udf converted to a pandas udf.

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('long', PandasUDFType.SCALAR)
def squared_pandas_udf(s):
    return s * s

df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
```

The regular UDF version works one row at a time: the user-defined function takes a long `s` and returns the result of `s*s` as a long. In the Pandas version, the user-defined function takes a pandas.Series `s` and returns the result of `s*s` as a pandas.Series. Because `s*s` is vectorized on `pandas.Series`, the Pandas version is much faster than the row-at-a-time version.

Note that there are two important requirements when using scalar pandas UDFs:

* The input and output series must have the same size.
* How a column is split into multiple pandas.Series is internal to Spark, and therefore the result of user-defined function must be independent of the splitting.


In [None]:
df_ddos = (

)

test_query(df_ddos, mode="append")

In the cell above we highlight the use of high performance User Defined Functions (UDF's) with pandas. For simple use cases such as the one here we could also avoid using UDF's and write the following instead:

```python
df_ddos = df_data.withColumn('flagged', when((col('visitor_page_timer') == 0) & (col('visitor_page_height') == 0), True).otherwise(False))
```

The second step in detecting a ddos attack is counting how many suspicious events happen within a certain timeframe. For this, well combine `groupBy` and a 30 seconds `window` based on the `ts_ingest` timestamp.

In [None]:
df_ddos_window = (
    
)

test_query(df_ddos_window, mode="complete")

Notice that for this query, we're using outputmode `complete` instead of `append`. This is because `append` mode can never change a row in the result table once it's written. However, Spark does not know when all events of a certain window have been seen. Spark assumes by default that data can be "late", meaning an earlier event can enter the stream _after_ a later event has entered. In `complete` mode, rows are written to the result table immediately when they become available and they are updated once new data arrives.

Although this solution in complete mode works, it will consume a lot of RAM over time because all intermediary results for all windows will be saved. Even if those windows ended years ago!

In order to solve this memory issue, you need to define when Spark can assume that it will not receive events from a certain window anymore. This is done using a [`watermark`](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking). With a watermark, you specify how "late" data can be.

For this exercise, you can assume data will not arrive more than 10 seconds late.

* Use [`withWatermark`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withWatermark.html) to add a watermark to the query with a treshold of 10 seconds. Using the timestamp as eventime.
* Run the query using append mode.


In [None]:
df_ddos_window_watermark = (
    
)

test_query(df_ddos_window_watermark, mode="append")

Now run these queries and write the output to `clicks-calculated-forbidden` and `clicks-calculated-ddos`. Use a trigger with `processingTime = "30 seconds"` for the ddos query so that the next interval is only calculated 30 seconds after the first interval starts.

In [None]:
try:
    # Remove old checkpoint dir
    os.rmdir("checkpoints-forbidden")
except FileNotFoundError as e:
    pass
except OSError as e:
    # Ignore "file not found" errors
    if (e.errno != 39):
        raise e

query_forbidden = (
    
)

# Sleep two seconds
sleep(2)

# Show the status of the query
display(query_forbidden.status)

In [None]:
try:
    # Remove old checkpoint dir
    os.rmdir("checkpoints-ddos")
except FileNotFoundError as e:
    pass
except OSError as e:
    # Ignore "file not found" errors
    if (e.errno != 39):
        raise e

query_ddos = (
    
)

# Sleep two seconds
sleep(2)

# Show the status of the query
display(query_ddos.status)

## Spark helpers

The following code stops all running queries.

In [None]:
sleep(2)

for q in spark.streams.active:
    print("Stopping query '{}' with name '{}'".format(q.id, q.name))
    q.stop()
