# Setting up experiment: which is faster, filter &rarr; join or join &rarr; filter

In [1]:
import random
random.seed(42)

list_len = 200000
key_list = [random.randrange(0, 100, 1) for _ in range(list_len)]
value_list = random.sample(range(list_len), list_len)

with open('text_files/filter_join_test/data_1.txt', 'w') as f:
    for i in range(len(key_list)):
        f.write(f'({key_list[i]}, {value_list[i]})\n')

In [2]:
random.seed(1)

list_len = 200000
key_list = [random.randrange(0, 100, 1) for _ in range(list_len)]
value_list = random.sample(range(list_len), list_len)

with open('text_files/filter_join_test/data_2.txt', 'w') as f:
    for i in range(len(key_list)):
        f.write(f'({key_list[i]}, {value_list[i]})\n')

In [3]:
def convert_to_tuple(str_data):
    key, value = str_data[1:-1].split(', ')
    return (key, int(value))

# Join &rarr; Filter

In [4]:
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

spark_conf = SparkConf().setMaster('local[*]').setAppName('Filter_Join experiment')
spark_context = SparkContext().getOrCreate(spark_conf)

In [5]:
key_value_rdd_1 = spark_context.textFile('text_files/filter_join_test/data_1.txt').map(convert_to_tuple)
key_value_rdd_2 = spark_context.textFile('text_files/filter_join_test/data_2.txt').map(convert_to_tuple)

In [6]:
result_rdd_1 = key_value_rdd_1.join(key_value_rdd_2) \
                              .filter(lambda x: int(x[0]) >= 20)

In [7]:
result_rdd_1 = result_rdd_1.repartition(12)

In [8]:
result_rdd_1.count() # 2m 43.5s || 2m 39s

319531914

In [9]:
result_rdd_1.take(10)

[('86', (147676, 40112)),
 ('86', (147676, 75016)),
 ('86', (147676, 89082)),
 ('86', (147676, 39070)),
 ('86', (147676, 147901)),
 ('86', (147676, 141902)),
 ('86', (147676, 60019)),
 ('86', (147676, 31920)),
 ('86', (147676, 52481)),
 ('86', (147676, 13541))]

In [10]:
spark_context.stop()

# Filter &rarr; Join

In [11]:
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

spark_conf = SparkConf().setMaster('local[*]').setAppName('Filter_Join experiment')
spark_context = SparkContext().getOrCreate(spark_conf)

In [12]:
key_value_rdd_1 = spark_context.textFile('text_files/filter_join_test/data_1.txt').map(convert_to_tuple)
key_value_rdd_2 = spark_context.textFile('text_files/filter_join_test/data_2.txt').map(convert_to_tuple)

In [13]:
result_rdd_2 = key_value_rdd_1.filter(lambda x: int(x[0]) >= 20) \
                              .join(key_value_rdd_2)

In [14]:
result_rdd_2 = result_rdd_2.repartition(12)

In [15]:
result_rdd_2.count() # 47s || 46.8s

319531914

In [16]:
result_rdd_2.take(10)

[('86', (147676, 40112)),
 ('86', (147676, 75016)),
 ('86', (147676, 89082)),
 ('86', (147676, 39070)),
 ('86', (147676, 147901)),
 ('86', (147676, 141902)),
 ('86', (147676, 60019)),
 ('86', (147676, 31920)),
 ('86', (147676, 52481)),
 ('86', (147676, 13541))]

In [17]:
spark_context.stop()