In [1]:
from pyspark.sql import SparkSession
import numpy as np
import random
import matplotlib.pyplot as plt
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
rdd = sc.textFile('kddcup.data_10_percent_corrected')
record_num = rdd.count()

In [3]:
label_rdd = rdd.map(lambda x: x.split(',')).map(lambda x: x[-1][:-1])
label_set = label_rdd.distinct().collect()
label_num = len(label_set)
labels = label_rdd.collect()
random.shuffle(labels)

In [4]:
ReservoirSize = 1000.0
p_in = 1
q = 1 / ReservoirSize
r_u = []
r_b = []
for i in range(record_num):
    
    l = len(r_b)
    p_base = random.random()
    if l == ReservoirSize:
        p_in *= (1 - q)
        index = random.randint(0, l - 1)
        del r_b[index]
        l -= 1
    if p_in > random.random():
        p = l / ReservoirSize
        if p > p_base:
            index = random.randint(0, l - 1)
            r_b[index] = i
        else:
            r_b.append(i)
    
    l = len(r_u)
    if l < ReservoirSize:
        r_u.append(i)
    else:
        p = ReservoirSize / (i + 1)
        if p > p_base:
            index = random.randint(0, l - 1)
            r_u[index] = i

In [5]:
total_count = {}
unbiased_count = {}
biased_count = {}

for i in range(label_num):
    total_count[label_set[i]] = 0.0
    unbiased_count[label_set[i]] = 0.0
    biased_count[label_set[i]] = 0.0

r_b_count = 0
r_u_count = 0
step = 10000
biased_error = []
unbiased_error = []
for i in range(record_num - 1, record_num - 100001, -1):
    total_count[labels[i]] += 1
    if i in r_b:
        biased_count[labels[i]] += 1
        r_b_count += 1
    if i in r_u:
        unbiased_count[labels[i]] += 1
        r_u_count += 1
    if (record_num - i) % step == 0:
        be = 0
        ue = 0
        for j in range(label_num):
            be += abs(biased_count[labels[j]] / r_b_count - total_count[labels[j]] / (record_num - i))
            ue += abs(unbiased_count[labels[j]] / r_u_count - total_count[labels[j]] / (record_num - i))
        biased_error.append(be / label_num)
        unbiased_error.append(ue / label_num)

In [6]:
axis = np.arange(1, 11)
plt.plot(axis, biased_error, axis, unbiased_error)
plt.title('Count Query Estimation')
plt.xlabel('USER SPECIFIED HORIZON (* 10000)')
plt.ylabel('ABSOLUTE ERROR')
plt.legend(['Biased Error', 'Unbiased Error'])
plt.show()

<matplotlib.legend.Legend at 0x110a5a350>