# Setup

In [1]:
# imports

import findspark
import pyspark
from pyspark.streaming import StreamingContext

findspark.init()
findspark.find()

'/home/joaooliv/.local/lib/python3.10/site-packages/pyspark'

In [12]:
# dropping context

if 'sc' in locals():
    sc.stop()
if 'ssc' in locals() :
    ssc.stop()

# setting up context

sc = pyspark.SparkContext("local[*]")
sc.setLogLevel("FATAL")
ssc = StreamingContext(sc, 1) # 1 sec mini-batches

ssc.checkpoint(directory="spark_checkpoints")

---
# Exercises

## Exercise 1

In a denial-of-service event it is important to identify the IP sources that might be attacking the system, by issuing a large number of requests.

Write a program to find the IP sources that have done more than 50 requests in the last 10 seconds -- dump this information every 5 seconds. 


In [None]:
lines = ssc.socketTextStream("localhost", 7777)

lines.filter(lambda line: len(line) > 0)\
    .map(lambda line: line.split(" ")[1])\
    .countByValueAndWindow(10, 5)\
    .filter(lambda count: count[1] > 50)\
    .pprint()

ssc.start()
ssc.awaitTermination(60)
ssc.stop()

## Exercise 2

#### a)
Write a program to dump the number of requests, minimum processing time, maximum processing time for request in the last 10 seconds, **for all** source IPs that performed more than 100 requests -- dump this information every 5 second.  

In [None]:
lines = ssc.socketTextStream("localhost", 7777)

more_than_100 = lines.filter(lambda line: len(line) > 0)\
    .map(lambda line: line.split(" ")[1])\
    .countByValueAndWindow(10, 5)\
    .filter(lambda count: count[1] > 100)

metrics = lines.filter(lambda line: len(line) > 0)\
    .map(lambda line: line.split(" "))\
    .map(lambda values: (values[1], (1, float(values[5]), float(values[5]))))\
    .reduceByKeyAndWindow(lambda rt1, rt2: (rt1[0] + rt2[0], max(rt1[1], rt2[1]), min(rt1[2], rt2[2])), None, 10, 5)

more_than_100.leftOuterJoin(metrics).pprint()

ssc.start()
ssc.awaitTermination(50)
ssc.stop()

#### b)

Write a program to dump the number of requests, minimum processing time, maximum processing time for request in the last 10 seconds, **only if at least one** source IP has performed more than 100 requests -- dump this information every 5 second.

In [11]:
lines = ssc.socketTextStream("localhost", 7777)

ip_count = lines.filter(lambda line: len(line) > 0)\
    .map(lambda line: line.split(" ")[1])\
    .countByValueAndWindow(10, 5)

metrics = lines.filter(lambda line: len(line) > 0)\
    .map(lambda line: line.split(" "))\
    .map(lambda values: (values[1], (1, float(values[5]), float(values[5]))))\
    .reduceByKeyAndWindow(lambda rt1, rt2: (rt1[0] + rt2[0], max(rt1[1], rt2[1]), min(rt1[2], rt2[2])), None, 10, 5)

ip_count.map(lambda t: (None, t[1] > 100)).reduceByKey(lambda b1, b2: b1 or b2).join(metrics.map(lambda t: (None, t)))\
    .filter(lambda t: t[1][0] == True).map(lambda t: t[1][1]).pprint()
    #.filter(lambda t: ).map(lambda t: t[1][1])

ssc.start()
ssc.awaitTermination(50)
ssc.stop()

                                                                                

-------------------------------------------
Time: 2023-03-20 17:57:20
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-03-20 17:57:25
-------------------------------------------
('120.52.73.98', (58.278, 0.126))
('185.28.193.95', (42.547, 0.013))
('192.241.151.220', (64.08, 0.055))
('2a01:488:66:1000:5c33:8503:0:1', (56.966, 0.028))
('2a02:c207:2008:5757::1', (42.167, 2.674))
('97.77.104.22', (59.513, 0.065))
('211.140.26.58', (57.435, 0.126))
('2602:ff62:104:7c9:8000::', (48.317, 0.159))
('202.106.16.36', (57.905, 0.17))
('2001:41d0:8:e7b5::1', (46.603, 46.603))
...



                                                                                

-------------------------------------------
Time: 2023-03-20 17:57:30
-------------------------------------------
('120.52.73.98', (66.036, 0.126))
('192.241.151.220', (64.08, 0.078))
('185.28.193.95', (69.654, 0.014))
('31.14.134.193', (0.274, 0.274))
('2a01:488:66:1000:5c33:8503:0:1', (56.966, 0.028))
('2a02:c207:2008:5757::1', (64.871, 0.132))
('97.77.104.22', (65.789, 0.065))
('202.106.16.36', (66.13, 0.17))
('2602:ff62:104:7c9:8000::', (63.304, 0.121))
('211.140.26.58', (65.395, 0.223))
...



                                                                                

-------------------------------------------
Time: 2023-03-20 17:57:35
-------------------------------------------
('120.52.73.98', (70.609, 0.127))
('82.146.37.33', (68.41, 0.102))
('192.241.151.220', (70.172, 0.078))
('185.28.193.95', (75.759, 0.014))
('31.14.134.193', (0.274, 0.274))
('2a01:488:66:1000:5c33:8503:0:1', (69.566, 0.113))
('2a02:c207:2008:5757::1', (64.871, 0.132))
('61.132.241.109', (53.475, 53.475))
('97.77.104.22', (70.108, 0.066))
('202.106.16.36', (69.813, 0.176))
...



                                                                                

-------------------------------------------
Time: 2023-03-20 17:57:40
-------------------------------------------
('120.52.73.98', (72.538, 0.127))
('82.146.37.33', (71.582, 0.102))
('180.234.223.91', (56.729, 56.729))
('185.28.193.95', (80.846, 0.017))
('192.241.151.220', (71.891, 0.091))
('2a01:488:66:1000:5c33:8503:0:1', (71.428, 0.074))
('61.132.241.109', (53.475, 53.475))
('198.50.206.0', (71.29, 0.237))
('97.77.104.22', (72.583, 0.087))
('202.106.16.36', (72.502, 0.128))
...



                                                                                

-------------------------------------------
Time: 2023-03-20 17:57:45
-------------------------------------------
('120.52.73.98', (73.801, 0.127))
('82.146.37.33', (73.495, 0.15))
('180.234.223.91', (73.598, 56.729))
('192.241.151.220', (71.891, 0.091))
('185.28.193.95', (80.846, 0.017))
('2a01:488:66:1000:5c33:8503:0:1', (72.477, 0.074))
('198.50.206.0', (73.712, 0.146))
('202.106.16.36', (73.779, 0.128))
('97.77.104.22', (73.6, 0.106))
('2001:41d0:8:e7b5::1', (72.566, 0.114))
...



Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
	at java.base/java.lang.Thread.sleep(Native Method)
	at org.apache.spark.streaming.receiver.ReceiverSupervisor.$anonfun$restartReceiver$1(ReceiverSupervisor.scala:196)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.ja

-------------------------------------------
Time: 2023-03-20 17:57:50
-------------------------------------------
('120.52.73.98', (76.098, 0.126))
('82.146.37.33', (73.495, 72.396))
('180.234.223.91', (73.598, 73.598))
('185.15.43.51', (75.487, 75.487))
('185.28.193.95', (75.967, 0.035))
('192.241.151.220', (75.823, 0.394))
('31.14.134.193', (75.83, 58.503))
('198.50.206.0', (75.798, 0.146))
('2a01:488:66:1000:5c33:8503:0:1', (75.62, 0.135))
('2a02:c207:2008:5757::1', (75.395, 0.299))
...



[Stage 230:>                                                      (0 + 12) / 12]

## Exercise 3
Write a program to dump the IP sources that deviate most from the average in terms of the number of requests made in the last 30 seconds - dump this information every 5 seconds.

In [None]:
lines = ssc.socketTextStream("localhost", 7777)


ip_request_count = lines.filter(lambda line: len(line) > 0)\
    .map(lambda line: line.split(" ")[1])\
    .countByValueAndWindow(10, 5)

request_per_ip = ip_request_count\
    .transform(lambda rdd: rdd.zipWithIndex().map(lambda t: (t[0][1], t[1])))\
    .reduce(lambda t1, t2: (t1[0] + t2[0], max(t1[1], t2[1])))\
    .map(lambda t: t[0] / t[1])

ip_request_count\
    .map(lambda t: (None, t))\
    .leftOuterJoin(request_per_ip.map(lambda t: (None, t)))\
    .map(lambda t: (t[1][0][0], (t[1][0][1], t[1][1])))\
    .mapValues(lambda t: ((t[0] - t[1])/t[1]) if (t[1] not in [0, None]) else 0)\
    .filter(lambda t: t[1] > 1)\
    .transform(lambda rdd: rdd.sortBy(lambda t: t[1], ascending=False))\
    .pprint()

# here we can map ip_request_count and request_per_ip to have None as key and join them and then map back to normal

ssc.start()
ssc.awaitTermination(50)
ssc.stop()

## Exercise 4

Run additional logsender servers for subsets of the logs (IPv4 and IPv6 logs), using the following commands.

```
!nohup python logsender/server.py logsender/webipv4.log 7778 > /dev/null 2> /dev/null &
!nohup python logsender/server.py logsender/webipv6.log 7779 > /dev/null 2> /dev/null &
```

Write a program that combines the two streams, dumping the number of requests made in the last 15 seconds - dump this information every 5 seconds.

In [None]:
lines_ipv4 = ssc.socketTextStream("localhost", 7778)
lines_ipv6 = ssc.socketTextStream("localhost", 7779)

lines_ipv4.countByWindow(15, 5).map(lambda count: (None, count))\
    .join(lines_ipv6.countByWindow(15, 5).map(lambda count: (None, count)))\
    .map(lambda t: sum(t[1]))\
    .map(lambda count: f"Total count is: {count}").pprint()

ssc.start()
ssc.awaitTermination(20)
ssc.stop()

## Exercise 5

Write a program that combines the two streams from the previous exercise and dumps the proportion of IPv4 vs IPv6 requests in the last 20 seconds - dump this information every 5 seconds.


In [None]:
lines_ipv4 = ssc.socketTextStream("localhost", 7778)
lines_ipv6 = ssc.socketTextStream("localhost", 7779)

lines_ipv4.countByWindow(15, 5).map(lambda count: (None, count))\
    .join(lines_ipv6.countByWindow(20, 5).map(lambda count: (None, count)))\
    .map(lambda t: t[1][0]/t[1][1])\
    .map(lambda count: f"For each ipv6 address there are {count} ipv4 addreses").pprint()

ssc.start()
ssc.awaitTermination(20)
ssc.stop()