# AUTHOR: T1 TIDE

## Create Spark

### Import Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import functions as func

### Spark Conf

In [2]:
conf = SparkConf().setAppName("Assignment 25072024")\
                    .setMaster("local[*]") \
                    .set("spark.executor.memory", "4g") \
                    .set("spark.driver.memory", "4g") \
                    .set("spark.executor.cores", "4") \
                    .set("spark.driver.cores","5")

### SparkSession

In [3]:
spark = SparkSession.builder \
                    .config(conf=conf) \
                    .getOrCreate()

### SparkContext

In [4]:
sc = spark.sparkContext

## Exercise 1
- Given a dataset of 1000 arbitrary numbers in a text file
- Find all prime numbers in the given dataset
- Save result under a new text file

### Check Prime number function

In [5]:
from math import sqrt,ceil
def is_prime(n):
    if n <= 1:
        return False
    for i in range(2,ceil(sqrt(n))+1):
        if n%i == 0:
            return False
    return True

### No random

#### Generate 10.000.000 number list sorted

In [6]:
numbers_file = "input_data/numbers.txt"

In [7]:
# import random
# numbers = [random.randint(1, 100000000) for _ in range(10000000)]
numbers = list(range(1,10000001))
# numbers = [number for number in range(1,100000001)]
with open(numbers_file,'w+') as file:
    for number in numbers:
        file.write(f"{number}\n")
    file.close()

In [8]:
len(numbers)

10000000

#### RDD

In [9]:
prime_numbers_rdd_output_path = "output_data/prime_numbers_rdd.txt"

In [10]:
numbers_rdd = sc.textFile(numbers_file)

In [11]:
numbers_rdd = numbers_rdd.map(lambda x: int(x))

In [12]:
prime_numbers_rdd = numbers_rdd.filter(is_prime)

In [13]:
prime_numbers_rdd = prime_numbers_rdd.repartition(12)

In [14]:
%%time
prime_numbers_rdd.take(10)

CPU times: total: 15.6 ms
Wall time: 34.4 s


[131, 137, 139, 149, 151, 157, 163, 167, 173, 179]

In [15]:
%%time
prime_numbers_rdd.saveAsTextFile(path=prime_numbers_rdd_output_path)

CPU times: total: 31.2 ms
Wall time: 7.18 s


### Random

In [16]:
numbers_random_file = "input_data/numbers_random.txt"

In [17]:
import random

numbers_random = [random.randint(1,10000000) for _ in range(1,10000000)]

with open(numbers_random_file,'w+') as file:
    for number in numbers_random:
        file.write(f"{number}\n")

In [18]:
numbers_random_rdd_output_path = "output_data/prime_numbers_rdd_random.txt"

In [19]:
numbers_rdd_random = sc.textFile(numbers_random_file)

In [20]:
numbers_rdd_random = numbers_rdd_random.map(lambda x: int(x))

In [21]:
prime_numbers_rdd_random = numbers_rdd_random.filter(is_prime)

In [22]:
prime_numbers_rdd_random = prime_numbers_rdd_random.repartition(numPartitions=12)

In [23]:
%%time
prime_numbers_rdd_random.take(10)

CPU times: total: 0 ns
Wall time: 31.6 s


[8805487,
 363533,
 9349091,
 9176003,
 7034653,
 3358151,
 4037767,
 6173003,
 1918363,
 1395481]

In [24]:
%%time
prime_numbers_rdd_random.saveAsTextFile(numbers_random_rdd_output_path)

CPU times: total: 62.5 ms
Wall time: 6.97 s


## Exercise 2
- Generate a text file of 10,000 lines, each line contains a pair of (key,value)
- Calculate the average value for each key
- Apply GroupByKey() and ReduceByKey() Functions

### Generate text file with 10000

In [25]:
import random

textFile_path = "input_data/textLine.txt"

with open(textFile_path, 'w+') as file:
    for i in range(1, 10001):
        key = random.randint(1, 100)
        line = f"{key}, this is line number {i}\n"
        file.write(line)

In [26]:
rdd = sc.textFile(textFile_path)

In [27]:
rdd.take(5)

['49, this is line number 1',
 '10, this is line number 2',
 '39, this is line number 3',
 '86, this is line number 4',
 '69, this is line number 5']

In [28]:
def convert_to_key_value(value):
    value = (int(value.split(',')[0].strip()), value.split(',')[1].strip())
    return value

def convert_to_key_lenValue(value):
    value = (int(value.split(',')[0].strip()), len(value.split(',')[1].strip()))
    return value

In [29]:
key_value_rdd = rdd.map(convert_to_key_value)

In [30]:
key_lenValue_rdd = rdd.map(convert_to_key_lenValue)

In [31]:
key_lenValue_rdd.take(5)

[(49, 21), (10, 21), (39, 21), (86, 21), (69, 21)]

### Group by key

In [32]:
def sum_count(value):
    value = (sum(value) , len(value))
    return value

def avg_cal_groupByKey(value):
    value = value[0] / value[1]
    return value

In [33]:
%%time

grouped_rdd = key_lenValue_rdd.groupByKey()

sum_count_rdd = grouped_rdd.mapValues(sum_count)

avg_rdd_groupByKey = sum_count_rdd.mapValues(avg_cal_groupByKey)

avg_rdd_groupByKey.take(5)

CPU times: total: 31.2 ms
Wall time: 1.9 s


[(10, 23.86868686868687),
 (86, 23.846153846153847),
 (18, 23.8018018018018),
 (82, 23.852631578947367),
 (50, 23.855555555555554)]

In [34]:
%%time
sum_count_rdd.take(5)

CPU times: total: 0 ns
Wall time: 625 ms


[(10, (2363, 99)),
 (86, (2480, 104)),
 (18, (2642, 111)),
 (82, (2266, 95)),
 (50, (2147, 90))]

In [35]:
%%time
avg_rdd_groupByKey.take(5)

CPU times: total: 0 ns
Wall time: 631 ms


[(10, 23.86868686868687),
 (86, 23.846153846153847),
 (18, 23.8018018018018),
 (82, 23.852631578947367),
 (50, 23.855555555555554)]

In [36]:
avg_groupByKey_output_path = "output_data/avg_groupByKey.txt"

In [37]:
avg_rdd_groupByKey.saveAsTextFile(path=avg_groupByKey_output_path)

### Reduce by key

In [38]:
def value_and_count(value):
    value = (value,1)
    return value

def sum_lenValue(value_1,value_2):
    value_1,value_2  =(value_1[0] + value_2[0], value_1[1] + value_2[1])
    return value_1,value_2

def avg_cal_reduceByKey(value):
    value = value[0] / value[1]
    return value

In [39]:
%%time
sum_count_rdd = key_lenValue_rdd.mapValues(value_and_count) \
                                .reduceByKey(sum_lenValue)

avg_rdd_reduceByKey = sum_count_rdd.mapValues(avg_cal_reduceByKey)

avg_rdd_reduceByKey.take(5)

CPU times: total: 0 ns
Wall time: 1.88 s


[(10, 23.86868686868687),
 (86, 23.846153846153847),
 (18, 23.8018018018018),
 (82, 23.852631578947367),
 (50, 23.855555555555554)]

In [40]:
%%time
sum_count_rdd.take(5)

CPU times: total: 0 ns
Wall time: 672 ms


[(10, (2363, 99)),
 (86, (2480, 104)),
 (18, (2642, 111)),
 (82, (2266, 95)),
 (50, (2147, 90))]

In [41]:
%%time
avg_rdd_reduceByKey.take(5)

CPU times: total: 15.6 ms
Wall time: 643 ms


[(10, 23.86868686868687),
 (86, 23.846153846153847),
 (18, 23.8018018018018),
 (82, 23.852631578947367),
 (50, 23.855555555555554)]

In [42]:
avg_reduceByKey_output_path = "output_data/avg_reduceByKey.txt"

In [43]:
avg_rdd_reduceByKey.saveAsTextFile(path=avg_reduceByKey_output_path)