In [1]:
from pyspark.sql import SparkSession
import re
import random
import matplotlib.pyplot as plt
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
rdd = sc.textFile('epa-http.txt')

In [3]:
url_pat = re.compile('(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]).*')
rdd = rdd.filter(lambda x: url_pat.match(x) and x[-1] != '-').map(lambda x: x.split(' '))
l = rdd.count()

In [4]:
def time_period(x):
    time = x[1][1: -1]
    time_list = time.split(':')
    sec = ((int(time_list[0]) - 29) * 86400 + int(time_list[1]) * 3600 + \
           int(time_list[2]) * 60 + int(time_list[3]) - (23 * 3600 + 53 * 60 + 25))
    return sec

In [5]:
data = rdd.map(time_period).collect()

In [6]:
def A(x, sa):
    return random.random() < sa

In [7]:
def B(x, sb):
    time_base = 24 * 3600 * sb
    return x < time_base

In [8]:
sa = 0.5
l = len(data)
Tp1_record = []
Tp2_record = []
for i in range(101):
    sb = i / 100.0
    
    ca = 0
    cb = 0    
    #Execution Order A->B
    for j in range(l):
        ca += 1
        if A(data[j], sa):
            cb += 1
            B(data[j], sb)
    tp1 = l * 3.0 / (ca + cb) / 2
    Tp1_record.append(tp1)
    
    ca = 0
    cb = 0
    #Operator Reordering B->A
    cb += 1
    for j in range(l):
        cb += 1
        if B(data[j], sb):
            ca += 1
            A(data[j], sa)
    tp2 = l * 3.0 / (ca + cb) / 2
    Tp2_record.append(tp2)

In [9]:
plt.figure(figsize = [6, 6])
axis = [x / 100.0 for x in range(101)]
plt.plot(axis, Tp1_record, '-', axis, Tp2_record, '--')
plt.title('Selection Reordering')
plt.xlabel('Selectivity of B')
plt.ylabel('Throughput')
plt.legend(['Not reordered', 'Reordered'])
plt.show()