## Exercise 1
### - Given a text file of 10,000 lines, each line contains a pair of (key,value)
### - Calculate the average value for each key
### - Apply groupByKey() and reduceByKey() functions
Compare?

### Initialize Spark

In [1]:
import findspark
findspark.init()

### Generate text file with 10,000 lines of (key,value)

In [38]:
# generate a text file of 10,000 lines, each line contains a pair of (key, value) 
# where key is a random integer between 1 and 1000, and value is a random integer between 1 and 10000
# the key-value pairs are separated by a tab character
import random
num_lines = 10000

file_name = "data/keyValue.txt"
with open(file_name, "w+") as f:
    for i in range(num_lines):
        key = random.randint(1, 1000)
        value = random.randint(1, 10000)
        f.write(f"{key},{value}\n")

print(f"Generated {num_lines} lines of key-value pairs in {file_name}")

Generated 10000 lines of key-value pairs in data/keyValue.txt


In [None]:
# generate a text file of 10,000 lines, each line contains a pair of (key, value) 
# where key is a random integer between 1 and 1000, and value is a random integer between 1 and 10000
# the key-value pairs are separated by a tab character
import random
num_lines = 10000

file_name = "data/keyValuePairs.txt"
with open(file_name, "w+") as f:
    for i in range(num_lines):
        # key is random character between a and z
        key = chr(random.randint(65,90))
        value = random.randint(1, 10000)
        f.write(f"{key},{value}\n")

print(f"Generated {num_lines} lines of key-value pairs in {file_name}")

**Create a Spark Session**

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("KeyValuePairs") \
    .getOrCreate()

**Load the dataset**

In [3]:
keyValue_rdd = spark.sparkContext.textFile("data/keyValuePairs.txt")

In [4]:
keyValue_rdd.take(5)

['E,453', 'T,3649', 'N,7835', 'O,3204', 'C,1891']

**Parse the pairs**

In [5]:
# pairs = keyValue_rdd.map(lambda line: tuple(map(int, line.split(','))))
pairs = keyValue_rdd.map(lambda line: (line.split(',')[0], int(line.split(',')[1])))

In [6]:
pairs.take(10)

[('E', 453),
 ('T', 3649),
 ('N', 7835),
 ('O', 3204),
 ('C', 1891),
 ('D', 2149),
 ('V', 6004),
 ('K', 4950),
 ('A', 2381),
 ('I', 8167)]

#### Calculate average value for each key applying groupByKey() and reduceByKey() functions

In [7]:
# Group by key
grouped = pairs.groupByKey()
grouped.take(5)

[('N', <pyspark.resultiterable.ResultIterable at 0x25f1bbc36a0>),
 ('O', <pyspark.resultiterable.ResultIterable at 0x25f0421ec40>),
 ('C', <pyspark.resultiterable.ResultIterable at 0x25f0421ea30>),
 ('K', <pyspark.resultiterable.ResultIterable at 0x25f0421edf0>),
 ('J', <pyspark.resultiterable.ResultIterable at 0x25f0421e2e0>)]

In [8]:
averages_groupByKey = grouped.mapValues(lambda values: sum(values) / len(values))

# Collect and print the results
averages_groupByKey_result = averages_groupByKey.collect()
print("Averages using groupByKey():", averages_groupByKey_result)

Averages using groupByKey(): [('N', 5023.275462962963), ('O', 4956.304785894206), ('C', 4970.28493150685), ('K', 4884.73631840796), ('J', 5220.904891304348), ('S', 4663.287206266318), ('R', 5062.941025641026), ('L', 5016.019073569482), ('W', 4851.543080939948), ('E', 4717.466666666666), ('T', 5141.010610079576), ('D', 5068.779527559055), ('V', 4841.910447761194), ('A', 4693.275675675675), ('I', 5074.010075566751), ('B', 5099.991666666667), ('U', 5219.643835616438), ('F', 5018.796650717703), ('H', 4941.757746478873), ('Q', 4816.017676767677), ('Z', 4847.907928388747), ('M', 4901.10752688172), ('X', 4993.679045092838), ('Y', 5304.022842639594), ('G', 4902.5610972568575), ('P', 5159.91280653951)]


In [9]:
import time

In [10]:
# Start timing
start_time_groupByKey = time.time()

# Group by key and count occurrences
grouped_pairs = pairs.groupByKey()
counts_groupByKey = grouped_pairs.mapValues(len)

# Collect and print the results
counts_groupByKey_result = counts_groupByKey.collect()
end_time_groupByKey = time.time()

print("Counts using groupByKey():", counts_groupByKey_result)
print("Time taken by groupByKey():", end_time_groupByKey - start_time_groupByKey, "seconds")


Counts using groupByKey(): [('N', 432), ('O', 397), ('C', 365), ('K', 402), ('J', 368), ('S', 383), ('R', 390), ('L', 367), ('W', 383), ('E', 390), ('T', 377), ('D', 381), ('V', 402), ('A', 370), ('I', 397), ('B', 360), ('U', 365), ('F', 418), ('H', 355), ('Q', 396), ('Z', 391), ('M', 372), ('X', 377), ('Y', 394), ('G', 401), ('P', 367)]
Time taken by groupByKey(): 2.881922721862793 seconds


In [11]:
# Reduce by key
sum_counts = pairs.mapValues(lambda x: (x, 1))\
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

averages_reduceByKey = sum_counts.mapValues(lambda x: x[0]/x[1])

averages_reduceByKey_result = averages_reduceByKey.collect()
print("Averages using reduceByKey():", averages_reduceByKey_result)

Averages using reduceByKey(): [('N', 5023.275462962963), ('O', 4956.304785894206), ('C', 4970.28493150685), ('K', 4884.73631840796), ('J', 5220.904891304348), ('S', 4663.287206266318), ('R', 5062.941025641026), ('L', 5016.019073569482), ('W', 4851.543080939948), ('E', 4717.466666666666), ('T', 5141.010610079576), ('D', 5068.779527559055), ('V', 4841.910447761194), ('A', 4693.275675675675), ('I', 5074.010075566751), ('B', 5099.991666666667), ('U', 5219.643835616438), ('F', 5018.796650717703), ('H', 4941.757746478873), ('Q', 4816.017676767677), ('Z', 4847.907928388747), ('M', 4901.10752688172), ('X', 4993.679045092838), ('Y', 5304.022842639594), ('G', 4902.5610972568575), ('P', 5159.91280653951)]


In [12]:
# Start timing
start_time_reduceByKey = time.time()

# Count occurrences using reduceByKey
counts_reduceByKey = pairs.mapValues(lambda value: 1).reduceByKey(lambda x, y: x + y)

# Collect and print the results
counts_reduceByKey_result = counts_reduceByKey.collect()
end_time_reduceByKey = time.time()

print("Counts using reduceByKey():", counts_reduceByKey_result)
print("Time taken by reduceByKey():", end_time_reduceByKey - start_time_reduceByKey, "seconds")


Counts using reduceByKey(): [('N', 432), ('O', 397), ('C', 365), ('K', 402), ('J', 368), ('S', 383), ('R', 390), ('L', 367), ('W', 383), ('E', 390), ('T', 377), ('D', 381), ('V', 402), ('A', 370), ('I', 397), ('B', 360), ('U', 365), ('F', 418), ('H', 355), ('Q', 396), ('Z', 391), ('M', 372), ('X', 377), ('Y', 394), ('G', 401), ('P', 367)]
Time taken by reduceByKey(): 2.674659013748169 seconds


In [14]:
# Compare the results
groupByKey_dict = dict(averages_groupByKey_result)
reduceByKey_dict = dict(averages_reduceByKey_result)

comparison = [(key, groupByKey_dict[key], reduceByKey_dict[key]) for key in groupByKey_dict if groupByKey_dict[key] != reduceByKey_dict[key]]

if not comparison:
    print("The results are the same.")
else:
    print("Differences found:", comparison)

The results are the same.
