In [5]:
!pip install pyspark
from pyspark import SparkContext
import random
from collections import Counter

sc = SparkContext("local", "AMS_SecondMoment")

# Example stream from assignment (Example 4.7)
stream = ['a', 'b', 'c', 'b', 'd', 'a', 'c', 'd', 'a', 'b', 'd', 'c', 'a', 'a', 'b']
n = len(stream)
print("Length of the stream: ", n)
stream_rdd = sc.parallelize(list(enumerate(stream, start=1)))

def ams_single_trial(i, ei):
    def count_ci(record):
        pos, val = record
        return pos >= i and val == ei

    ci = stream_rdd.filter(count_ci).count()
    estimate = n * (2 * ci - 1)
    return (i, ei, ci, estimate)

num_trials = 3
results = []
for _ in range(num_trials):
    i = random.randint(1, n)
    ei = stream[i - 1]
    trial_result = ams_single_trial(i, ei)
    results.append(trial_result)

for res in results:
    i, ei, ci, estimate = res
    print(f"Trial i={i}, e(i)='{ei}', c(i)={ci}, Estimate={estimate}")

f2_estimate = sum([r[3] for r in results]) / num_trials
print(f"\nFinal AMS F2 Estimate (averaged over {num_trials} trials): {f2_estimate}")

freq = Counter(stream)
actual_f2 = sum([v ** 2 for v in freq.values()])
print(f"Actual F2 (Ground Truth): {actual_f2}")

sc.stop()


Trial i=1, e(i)='a', c(i)=5, Estimate=135
Trial i=7, e(i)='c', c(i)=2, Estimate=45
Trial i=8, e(i)='d', c(i)=2, Estimate=45

Final AMS F2 Estimate (averaged over 3 trials): 75.0
Actual F2 (Ground Truth): 59


In [6]:
sc = SparkContext("local", "AMS_SecondMoment")

stream = ['a', 'b', 'c', 'b', 'd', 'a', 'c', 'd', 'a', 'b', 'd', 'c', 'a', 'a', 'b']
n = len(stream)
print("Length of the stream: ", n)
stream_rdd = sc.parallelize(list(enumerate(stream, start=1)))  # (position, element)

def ams_fixed_trial(i, ei):
    def count_ci(record):
        pos, val = record
        return pos >= i and val == ei

    ci = stream_rdd.filter(count_ci).count()
    estimate = n * (2 * ci - 1)
    return (i, ei, ci, estimate)

#positions 3, 8, and 13
fixed_indices = [3, 8, 13]
results = []
for i in fixed_indices:
    ei = stream[i - 1]
    trial_result = ams_fixed_trial(i, ei)
    results.append(trial_result)

for res in results:
    i, ei, ci, estimate = res
    print(f"AMS Trial for i={i}: e(i)='{ei}', c(i)={ci}, AMS Estimate={estimate}")

f2_estimate = sum([r[3] for r in results]) / len(results)
print(f"\nFinal AMS F2 Estimate (averaged over 3 trials): {f2_estimate}")

freq = Counter(stream)
actual_f2 = sum([v ** 2 for v in freq.values()])
print(f"Actual F2 (Ground Truth): {actual_f2}")
sc.stop()


Length of the stream:  15
AMS Trial for i=3: e(i)='c', c(i)=3, AMS Estimate=75
AMS Trial for i=8: e(i)='d', c(i)=2, AMS Estimate=45
AMS Trial for i=13: e(i)='a', c(i)=2, AMS Estimate=45

Final AMS F2 Estimate (averaged over 3 trials): 55.0
Actual F2 (Ground Truth): 59


Network Traffic Monitoring – Detect Heavy IPs using AMS

In [7]:
from pyspark import SparkContext
from collections import Counter
import random
sc = SparkContext("local", "AMS_Network_Traffic")

stream = [
    '192.168.1.1', '192.168.1.2', '192.168.1.1', '192.168.1.3',
    '192.168.1.2', '192.168.1.1', '192.168.1.4', '192.168.1.5',
    '192.168.1.1', '192.168.1.5', '192.168.1.5', '192.168.1.5',
    '192.168.1.1', '192.168.1.2', '192.168.1.2', '192.168.1.2',
    '192.168.1.1', '192.168.1.3', '192.168.1.5', '192.168.1.5'
]
n = len(stream)

rdd = sc.parallelize(list(enumerate(stream, start=1)))  # (position, IP)

def ams_trial(i, ip):
    def count_ci(record):
        pos, val = record
        return pos >= i and val == ip

    ci = rdd.filter(count_ci).count()
    estimate = n * (2 * ci - 1)
    return (i, ip, ci, estimate)

num_trials = 10
results = []

for _ in range(num_trials):
    i = random.randint(1, n)
    ip = stream[i - 1]
    trial_result = ams_trial(i, ip)
    results.append(trial_result)

for res in results:
    i, ip, ci, estimate = res
    print(f"AMS Trial - i={i}: IP='{ip}', c(i)={ci}, AMS Estimate={estimate}")

f2_estimate = sum([r[3] for r in results]) / num_trials
print(f"\nFinal AMS F2 Estimate (averaged over {num_trials} trials): {f2_estimate}")

freq = Counter(stream)
actual_f2 = sum([v ** 2 for v in freq.values()])
print(f"Actual F2 (Ground Truth): {actual_f2}")

print("\n----- Interpretation -----")
if actual_f2 <= n * 2:
    print("Traffic is relatively uniform. No heavy hitters detected.")
elif actual_f2 > n * 2 and actual_f2 <= n * 4:
    print("Mild skew detected: Some IPs are appearing more frequently than others.")
else:
    print("Heavy skew detected! Some IPs dominate the traffic and may need investigation.")

print(f"AMS Estimate is {f2_estimate:.2f}, and True F2 is {actual_f2}. AMS is a good approximation here!")

sc.stop()


AMS Trial - i=2: IP='192.168.1.2', c(i)=5, AMS Estimate=180
AMS Trial - i=12: IP='192.168.1.5', c(i)=3, AMS Estimate=100
AMS Trial - i=1: IP='192.168.1.1', c(i)=6, AMS Estimate=220
AMS Trial - i=3: IP='192.168.1.1', c(i)=5, AMS Estimate=180
AMS Trial - i=4: IP='192.168.1.3', c(i)=2, AMS Estimate=60
AMS Trial - i=16: IP='192.168.1.2', c(i)=1, AMS Estimate=20
AMS Trial - i=11: IP='192.168.1.5', c(i)=4, AMS Estimate=140
AMS Trial - i=16: IP='192.168.1.2', c(i)=1, AMS Estimate=20
AMS Trial - i=16: IP='192.168.1.2', c(i)=1, AMS Estimate=20
AMS Trial - i=9: IP='192.168.1.1', c(i)=3, AMS Estimate=100

Final AMS F2 Estimate (averaged over 10 trials): 104.0
Actual F2 (Ground Truth): 102

----- Interpretation -----
Heavy skew detected! Some IPs dominate the traffic and may need investigation.
AMS Estimate is 104.00, and True F2 is 102. AMS is a good approximation here!
