# Simple Spark

Have you ever wondered how Apache Spark and Beam work under the cover? Both Spark and Beam are high-performance, data-parallel processing frameworks generally run on a compute cluster. Basically, they are used for ETL to maximize throughput.  
<br>
Parallelism can occur at the **chip** level: Single instruction, multiple data (SIMD) instructions can utilize the full width of the registers on a CPU (NumPy) and GPU (TensorFlow).  
Parallelism can also occur at the **process** level using libraries such as `multiprocessing`, `ipyparallel`, and `MRJob`. You can take a look at my comparision between ipyparallel and MRJob: https://github.com/eugeneh101/ipyparallel-vs-MRJob  
Parallelism can also occur at the **machine** level where multiple machines work cooperatively in a cluster--that's what Spark and Beam do.  
<br>
I'm deeply curious about the implementation underlying Spark in which you chain a bunch of transformations to create a Directed Acyclic Graph (DAG). The graph is defined lazily, so the actual computation is deferred until an action is called. In this repo, I focus on building the transformation DAG (ie. how do you perform a `groupByKey()` in a lazy way?). The annotations below explain the pros and cons for each of my implementations, which are ordered from simplest to most sophisticated and complete. My notebook requires Python 3.3+. Enjoy!  

Disclaimer: This repo does not focus on the parallelized nature of computation across machines.
<br><br>


#### 1st Attempt
Pros: 
* simple generator style to create transforms
  
Cons: 
* only works for first action. Once called, no further actions can be performed

#### 2nd Attempt
Pros:
* 2nd implemention using generator style
* slight improvement in that calling an action the second time will return non-empty list. 

Cons: 
* still suffers from the same problem of only can perform (correct) action 1 time
* only the transformations up to the last transformation are applied--cannot get the correct result from action for first transformation if there exists a second transformation

#### 3rd Attempt
Pros:
* decorator style: composing higher order functions to emulate the sequence of transformations
* added `reduce()` action
* can create multiple RDDs where actions will get you the correct result
* decoupled the RDDs, hence RDDs are immutable

Cons:
* cannot do `filter()` or `groupByKey()` due to the inherent limitation of using decorators

#### 4th Attempt
Pros:
* instead of generator or decorator, implemented RDD transformations as a list of map functions

Cons:
* `reduceByKey()`, `filter()`, and `groupByKey()` not implemented

#### 5th Attempt
Pros:
* 2nd implementation using a list of functions and generator function
* `reduceByKey()` transformation implemented
* `filter()` transformation implemented weakly
* attempted to implement `flatMap()`

Cons:
* `reduceByKey()` uses `groupByKey()`, which creates a dictionary with keys and lists of values. Hence, memory usage can spike if the list of values corresponding to its key is very large
* `filter()` doesn't work if transformed element is falsy (False, 0, "")
* `flatMap()` doesn't work correctly due to limitation of how `__results_generator__()` iterates though elements in RDD

#### 6th Attempt (Best Implementation)
Pros:
* 3rd implementation using a list of functions (and recursive generator function)
* superior implementation of `reduceByKey()` where two values for a given key is immediately reduced into 1 value. Hence, each key will always have only 1 value
* superior implementation of `filter()` transformation using the recursive generator function `__results_generator__()`
* `flatMap()` implementation fixed using the recursive generator function `__results_generator__()`

Cons:
* can only reify (call an action on) an inputted generator 1 time--if you `parallelize()` a generator, then the second time you call an action, an empty list will be returned. Only a weakness for generators as they can only be iterated through once. If you `parallelize()` a list or tuple, then the second time you call an action, the correct answer will be returned.

#### 7th Attempt
Pros:
* 4th implementation using a  list of functions (and recursive generator function)
* even if you `parallelize()` a generator, the second time you call an action, RDD will return the correct result. After the first reify, the generator (that you `parallelize()`) is memoized as a list
* Moved the type checking (whether if the current RDD's `__sequence__` is a (generator) function, generator, or container) from `__results_generator__()` to `__sequence_generator__()`

Cons:
* despite being more complete than the 6th implementation, it is harder to read

# 1st Attempt

In [1]:
class Simple_Spark:
    def __init__(self):
        self.sequence = None
        self.rdd = None
    
    def parallelize(self, sequence):
        self.sequence = sequence
        return self
    
    def map(self, func):
        if self.rdd is None:
            self.rdd = (func(element) for element in self.sequence)
        else:
            self.rdd = (func(element) for element in self.rdd)
        return self
    
    def collect(self):
        results = list(self.rdd)
        return results
    
    def __iter__(self):
        yield from self.rdd
        
    def __contains__(self, element):
        return element in iter(self)

In [2]:
def multi_2(x): return x * 2
def multi_3(x): return x * 3

sc = Simple_Spark()
rdd = sc.parallelize(range(10))
rdd2 = rdd.map(multi_2)
rdd3 = rdd2.map(multi_3)

print(rdd3.collect())
print(rdd3.collect()) # action only works once

[0, 6, 12, 18, 24, 30, 36, 42, 48, 54]
[]


In [3]:
sc = Simple_Spark()
rdd = sc.parallelize(range(10))
rdd2 = rdd.map(multi_2)
rdd3 = rdd2.map(multi_3)
print(54 in rdd3)

True


# 2nd Attempt

In [1]:
import types


class Simple_Spark:
    def __init__(self):
        self.rdd = None
    
    def parallelize(self, sequence):
        self.rdd = sequence
        return self
    
    def map(self, func):
        def wrapper(rdd=self.rdd):
            if isinstance(rdd, types.FunctionType):
                for element in rdd():
                    yield func(element)
            else:
                for element in rdd:
                    yield func(element)
                    
        self.rdd = wrapper
        return self
        
    def collect(self):
        if isinstance(self.rdd, types.FunctionType):
            results = list(self.rdd())
        else:
            results = list(self.rdd)
        return results
    
    def __iter__(self):
        if isinstance(self.rdd, types.FunctionType):
            yield from self.rdd()
        else:
            yield from self.rdd

    def __contains__(self, element):
        return element in iter(self)    

In [2]:
def multi_2(x): return x * 2
def multi_3(x): return x * 3

sc = Simple_Spark()
rdd = sc.parallelize(range(10))

In [3]:
rdd2 = rdd.map(multi_2)
rdd3 = rdd.map(multi_3)

print(rdd2.collect()) # this RDD is now incorect
print(rdd3.collect()) # only works once, which is the last map transform apply

[0, 6, 12, 18, 24, 30, 36, 42, 48, 54]
[0, 6, 12, 18, 24, 30, 36, 42, 48, 54]


In [4]:
rdd = sc.parallelize(range(10))
rdd2 = rdd.map(multi_2)
print(8 in rdd2)

True


# 3rd Attempt

In [1]:
from functools import reduce
import copy


class Simple_Spark:
    def parallelize(self, sequence):
        return RDD(sequence)


class RDD:
    def __init__(self, __sequence__, __transformation_chain__=None):
        self.__sequence__ = __sequence__
        self.__transformation_chain__ = __transformation_chain__

    @staticmethod
    def __compose__(next_function, previous_function): # decorator
        def wrapper(element):
            return next_function(previous_function(element))
        return wrapper
    
    def map(self, func): # immutability
        if self.__transformation_chain__ is None:
            __transformation_chain__ = func
        else:
            __transformation_chain__ = copy.deepcopy(self.__compose__(func, self.__transformation_chain__))
        return RDD(self.__sequence__, __transformation_chain__)
    
    def reduce(self, func):
        return reduce(lambda x, y: func(x, y), self.__results_generator__())

    def __results_generator__(self):
        if self.__transformation_chain__ is None:
            return (element for element in self.__sequence__)
        else:
            return (self.__transformation_chain__(element) for element in self.__sequence__)

    def collect(self):
        return list(self.__results_generator__())
  
    def __contains__(self, element):
        return element in self.__results_generator__()

In [2]:
def multi_2(x): return x * 2

sc = Simple_Spark()
rdd = sc.parallelize(range(10))
rdd2 = rdd.map(multi_2)

print(rdd.collect())
print(rdd2.collect())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


In [3]:
print(rdd2.reduce(lambda x, y: x + y))
print(rdd.collect())
print(7 in rdd)

90
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
True


In [4]:
print(rdd
    .map(lambda x: x * 2)
    .map(lambda x: x + 1)
    .map(lambda x: x / 2)
    .collect())

[0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5]


# 4th Attempt

In [1]:
from functools import reduce
import copy


class Simple_Spark:
    def parallelize(self, sequence):
        return RDD(sequence)


class RDD:
    def __init__(self, __sequence__, __transformation_chain__=None):
        self.__sequence__ = __sequence__
        self.__transformation_chain__ = [] if __transformation_chain__ is None else __transformation_chain__
        
    def map(self, func):
        return RDD(self.__sequence__, copy.copy(self.__transformation_chain__ + [func]))
    
    def reduce(self, func):
        return reduce(lambda x, y: func(x, y), self.__results_generator__())

    def __results_generator__(self):
        return (reduce(lambda value, f: f(value), self.__transformation_chain__, element) for element in self.__sequence__)
    
    def collect(self):
        return list(self.__results_generator__())
            
    def __contains__(self, element):
        return element in self.__results_generator__()   

In [2]:
def multi_2(x): return x * 2

sc = Simple_Spark()
rdd = sc.parallelize(range(10))
rdd2 = rdd.map(multi_2)

In [3]:
print(rdd.collect())
print(rdd2.collect())
print(rdd2.reduce(lambda x, y: x + y))
print(rdd.collect())
print(7 in rdd)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
90
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
True


# 5th Attempt

In [1]:
from functools import reduce
import copy
from collections import defaultdict
import types



class Simple_Spark:
    def parallelize(self, sequence):
        return RDD(sequence)


class RDD:
    class FilteredElement:
        pass    
    
    def __init__(self, __sequence__, __transformation_chain__=None):
        self.__sequence__ = __sequence__
        self.__transformation_chain__ = [] if __transformation_chain__ is None else __transformation_chain__
        
    def map(self, func):
        return RDD(self.__sequence__, copy.copy(self.__transformation_chain__ + [func]))    
    
    def flatMap(self, func):
        def wrapper(element):
            yield from func(element)
            #for element in self.__results_generator__():
            #    print("element {}".format(element))
            #    for sub_element in func(element):
            #        yield sub_element
        return RDD(self.__sequence__, __transformation_chain__=copy.copy(self.__transformation_chain__ + [func]))
    
    def filter(self, func):
        def wrapper(element):
            temp_element = func(element)
            if temp_element:
                return element
            else:
                return RDD.FilteredElement()        
        return RDD(self.__sequence__, copy.copy(self.__transformation_chain__ + [wrapper]))

    def groupByKey(self):
        def wrapper():
            key_values_dict = defaultdict(list)
            for key_value in self.__results_generator__():
                if len(key_value) != 2:
                    raise Exception("Element does not have length 2")
                key_values_dict[key_value[0]].append(key_value[1])
            for key_values in key_values_dict.items():
                yield key_values
        return RDD(wrapper, __transformation_chain__=None)
    
    def reduceByKey(self, func): # transformation, not action
        def wrapper():
            for key, values in self.groupByKey().__results_generator__():
                yield key, reduce(func, values)
        return RDD(wrapper, __transformation_chain__=None)
    
    def reduce(self, func):
        return reduce(lambda x, y: func(x, y), self.__results_generator__())

    def __results_generator__(self):
        if isinstance(self.__sequence__, types.FunctionType):
            __sequence__ = self.__sequence__()
        else:
            __sequence__ = self.__sequence__
        for element in __sequence__:
            for func in self.__transformation_chain__:
                element = func(element)
                if isinstance(element, RDD.FilteredElement):
                    break
            else:
                yield element
    
    def collect(self):
        return list(self.__results_generator__())
            
    def __contains__(self, element):
        return element in self.__results_generator__()

In [2]:
sc = Simple_Spark()

rdd = sc.parallelize([range(10), range(10, 20)])

print(rdd.map(list).collect())
print(rdd.flatMap(list).collect()) # flatMap doesn't work correctly

[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]]
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]]


In [3]:
rdd = sc.parallelize(range(10))
rdd1 = rdd.filter(lambda x: x % 2)
print(rdd1.collect())
rdd2 = rdd1.map(lambda x: x * 2)
print(rdd2.collect())

[1, 3, 5, 7, 9]
[2, 6, 10, 14, 18]


In [4]:
rdd3 = rdd.map(lambda x: ("odd" if x % 2 else "even", x))
print(rdd3.collect())
rdd4 = rdd3.groupByKey()
print(rdd4.collect())

[('even', 0), ('odd', 1), ('even', 2), ('odd', 3), ('even', 4), ('odd', 5), ('even', 6), ('odd', 7), ('even', 8), ('odd', 9)]
[('even', [0, 2, 4, 6, 8]), ('odd', [1, 3, 5, 7, 9])]


In [5]:
rdd5 = rdd3.reduceByKey(lambda x, y: x + y)
print(rdd5.collect())
print(rdd5.map(lambda x: x[1]).reduce(lambda x, y: x + y))

[('even', 20), ('odd', 25)]
45


# 6th Attempt

In [1]:
from functools import reduce
import copy
from collections import defaultdict
import types


class Simple_Spark:
    def parallelize(self, sequence):
        return RDD(sequence)


class RDD:    
    def __init__(self, __sequence__, __transformation_chain__=None):
        self.__sequence__ = __sequence__
        self.__transformation_chain__ = [] if __transformation_chain__ is None else __transformation_chain__
        
    def map(self, func):
        def wrapper(element):
            yield from [func(element)]
        return RDD(self.__sequence__, copy.copy(self.__transformation_chain__) + [wrapper])

    def flatMap(self, func):
        def wrapper(element):
            yield from func(element)
        return RDD(self.__sequence__, __transformation_chain__=copy.copy(self.__transformation_chain__) + [wrapper])

    def filter(self, func):
        def wrapper(element):
            return filter(func, [element])
        return RDD(self.__sequence__, copy.copy(self.__transformation_chain__) + [wrapper])

    def groupByKey(self):
        def wrapper():
            key_values_dict = defaultdict(list)
            for key_value in self.__results_generator__():
                if len(key_value) != 2:
                    raise Exception("Element does not have length 2")
                key_values_dict[key_value[0]].append(key_value[1])
            yield from key_values_dict.items()
        return RDD(wrapper, __transformation_chain__=None)
    
    def reduceByKey(self, func): # transformation, not action
        def wrapper():
            reducebykey_dict = {}
            for key, value in self.__results_generator__():
                if key in reducebykey_dict:
                    reducebykey_dict[key] = func(reducebykey_dict[key], value)
                else:
                    reducebykey_dict[key] = value
            yield from reducebykey_dict.items()
        return RDD(wrapper, __transformation_chain__=None)

    def reduce(self, func):
        return reduce(lambda x, y: func(x, y), self.__results_generator__())

    def __results_generator__(self):
        def __recursive_generator__(element, __transformation_chain__):
            if len(__transformation_chain__):
                for subelement in __transformation_chain__[0](element):
                    yield from __recursive_generator__(subelement, __transformation_chain__[1:])
            else:
                yield element
        
        if isinstance(self.__sequence__, types.FunctionType):
            __sequence__ = self.__sequence__()
        else:
            __sequence__ = self.__sequence__
        for element in __sequence__:
            yield from __recursive_generator__(element, self.__transformation_chain__)

    def collect(self):
        return list(self.__results_generator__())
            
    def __contains__(self, element):
        return element in self.__results_generator__()

In [2]:
sc = Simple_Spark()

rdd = sc.parallelize([range(10), range(10, 20)])

print(rdd.map(list).collect())
print(rdd.flatMap(list).collect()) # flatMap works correctly

[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]


In [3]:
rdd1 = rdd.flatMap(list).map(lambda x: ("odd" if x % 2 else "even", x))
print(rdd1.collect())

[('even', 0), ('odd', 1), ('even', 2), ('odd', 3), ('even', 4), ('odd', 5), ('even', 6), ('odd', 7), ('even', 8), ('odd', 9), ('even', 10), ('odd', 11), ('even', 12), ('odd', 13), ('even', 14), ('odd', 15), ('even', 16), ('odd', 17), ('even', 18), ('odd', 19)]


In [4]:
rdd2 = rdd1.groupByKey()
print(rdd2.collect())
print(('odd', [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]) in rdd2)
print('odd'in rdd2.map(lambda kv: kv[0]))
print(rdd1.reduceByKey(lambda x, y: x + y).collect())

[('even', [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]), ('odd', [1, 3, 5, 7, 9, 11, 13, 15, 17, 19])]
True
True
[('even', 90), ('odd', 100)]


# 7th Attempt

In [1]:
from functools import reduce
import copy
from collections import defaultdict
import types


class Simple_Spark:
    def parallelize(self, sequence):
        return RDD(sequence)


class RDD:    
    def __init__(self, __sequence__, __transformation_chain__=None):
        self.__sequence__ = __sequence__
        self.__transformation_chain__ = [] if __transformation_chain__ is None else __transformation_chain__
        
    @property
    def __sequence_generator__(self):
        print(type(self.__sequence__))
        if isinstance(self.__sequence__, types.FunctionType):
            yield from self.__sequence__()
        elif isinstance(self.__sequence__, types.GeneratorType):
            print("run 1 time")
            placeholder = []
            for element in self.__sequence__:
                placeholder.append(element)
                yield element
            self.__sequence__ = placeholder
        else:
            yield from self.__sequence__

    def map(self, func):
        def wrapper(element):
            yield from [func(element)]
        return RDD(lambda: self.__sequence_generator__, copy.copy(self.__transformation_chain__) + [wrapper])
    
    def flatMap(self, func):
        def wrapper(element):
            yield from func(element)
        return RDD(lambda: self.__sequence_generator__, __transformation_chain__=copy.copy(self.__transformation_chain__) + [wrapper])
    
    def filter(self, func):
        def wrapper(element):
            return filter(func, [element])
        return RDD(lambda: self.__sequence_generator__, copy.copy(self.__transformation_chain__) + [wrapper])

    def groupByKey(self):
        def wrapper():
            key_values_dict = defaultdict(list)
            for key_value in self.__results_generator__():
                if len(key_value) != 2:
                    raise Exception("Element does not have length 2")
                key_values_dict[key_value[0]].append(key_value[1])
            yield from key_values_dict.items()
        return RDD(wrapper, __transformation_chain__=None)
    
    def reduceByKey(self, func): # transformation, not action
        def wrapper():
            reducebykey_dict = {}
            for key, value in self.__results_generator__():
                if key in reducebykey_dict:
                    reducebykey_dict[key] = func(reducebykey_dict[key], value)
                else:
                    reducebykey_dict[key] = value
            yield from reducebykey_dict.items()
        return RDD(wrapper, __transformation_chain__=None)
    
    def reduce(self, func):
        return reduce(lambda x, y: func(x, y), self.__results_generator__())

    def __results_generator__(self):
        def __recursive_generator__(element, __transformation_chain__):
            if len(__transformation_chain__):
                for subelement in __transformation_chain__[0](element):
                    yield from __recursive_generator__(subelement, __transformation_chain__[1:])
            else:
                yield element

        for element in self.__sequence_generator__:
            yield from __recursive_generator__(element, self.__transformation_chain__)
    
    def collect(self):
        return list(self.__results_generator__())
            
    def __contains__(self, element):
        return element in self.__results_generator__()

In [2]:
sc = Simple_Spark()
rdd = sc.parallelize((x for x in range(10)))

print(rdd.collect())

<class 'generator'>
run 1 time
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


In [3]:
rdd2 = rdd.map(lambda x: x * 2)
print(rdd2.collect())

<class 'function'>
<class 'list'>
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


In [4]:
print(rdd2.map(lambda x: x / 2).collect())

<class 'function'>
<class 'function'>
<class 'list'>
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]


In [5]:
gen = (x for x in range(10))

rdd = sc.parallelize(gen)
rdd2 = rdd.filter(lambda x: x % 2)
print(rdd2.collect())

rdd3 = rdd2.map(lambda x: x * 2)
print(rdd3.collect())

<class 'function'>
<class 'generator'>
run 1 time
[1, 3, 5, 7, 9]
<class 'function'>
<class 'function'>
<class 'list'>
[2, 6, 10, 14, 18]


In [6]:
rdd4 = rdd.map(lambda x: ("odd" if x % 2 else "even", x))
print(rdd4.collect())

rdd5 = rdd4.groupByKey()
print(rdd5.collect())

<class 'function'>
<class 'list'>
[('even', 0), ('odd', 1), ('even', 2), ('odd', 3), ('even', 4), ('odd', 5), ('even', 6), ('odd', 7), ('even', 8), ('odd', 9)]
<class 'function'>
<class 'function'>
<class 'list'>
[('even', [0, 2, 4, 6, 8]), ('odd', [1, 3, 5, 7, 9])]


In [7]:
rdd6 = rdd4.reduceByKey(lambda x, y: x + y)
print(rdd6.collect())

print(rdd6.map(lambda x: x[1]).reduce(lambda x, y: x + y))

<class 'function'>
<class 'function'>
<class 'list'>
[('even', 20), ('odd', 25)]
<class 'function'>
<class 'function'>
<class 'function'>
<class 'list'>
45


In [8]:
rdd = sc.parallelize([range(10), range(10, 20)])

rdd2 = rdd.flatMap(list)
print(rdd2.collect())
rdd3 = rdd2.filter(lambda x: x % 2)
print(rdd3.collect())

rdd4 = rdd3.map(lambda x: x * 2)
print(rdd4.collect())

<class 'function'>
<class 'list'>
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
<class 'function'>
<class 'function'>
<class 'list'>
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
<class 'function'>
<class 'function'>
<class 'function'>
<class 'list'>
[2, 6, 10, 14, 18, 22, 26, 30, 34, 38]


In [9]:
rdd5 = rdd2.map(lambda x: ("odd" if x % 2 else "even", x))
print(rdd5.collect())

rdd6 = rdd5.groupByKey()
print(rdd6.collect())

<class 'function'>
<class 'function'>
<class 'list'>
[('even', 0), ('odd', 1), ('even', 2), ('odd', 3), ('even', 4), ('odd', 5), ('even', 6), ('odd', 7), ('even', 8), ('odd', 9), ('even', 10), ('odd', 11), ('even', 12), ('odd', 13), ('even', 14), ('odd', 15), ('even', 16), ('odd', 17), ('even', 18), ('odd', 19)]
<class 'function'>
<class 'function'>
<class 'function'>
<class 'list'>
[('even', [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]), ('odd', [1, 3, 5, 7, 9, 11, 13, 15, 17, 19])]


In [10]:
rdd7 = rdd5.reduceByKey(lambda x, y: x + y)
print(rdd7.collect())

print(rdd7.map(lambda x: x[1]).reduce(lambda x, y: x + y))

print(("even", 90) in rdd7)

<class 'function'>
<class 'function'>
<class 'function'>
<class 'list'>
[('even', 90), ('odd', 100)]
<class 'function'>
<class 'function'>
<class 'function'>
<class 'function'>
<class 'list'>
190
<class 'function'>
<class 'function'>
<class 'function'>
<class 'list'>
True


In [11]:
temp_rdd = (sc.parallelize(x for x in range(10))
            .map(lambda x: ("odd" if x % 2 else "even", x))
            .reduceByKey(lambda x, y: x + y)
            .map(lambda kv: kv[1])
           )

In [12]:
temp_rdd.reduce(lambda x, y: x + y) # generator is reified once

<class 'function'>
<class 'function'>
<class 'function'>
<class 'generator'>
run 1 time


45

In [13]:
temp_rdd.reduce(lambda x, y: x + y) # second time, underlying collection is a list

<class 'function'>
<class 'function'>
<class 'function'>
<class 'list'>


45

In [14]:
def multi_2(x): return x * 2
def multi_3(x): return x * 3
def multi_4(x): return x * 4

rdd = sc.parallelize(range(10))
rdd2 = rdd.map(multi_2)
rdd3 = rdd2.map(multi_3)
rdd4 = rdd3.map(multi_4)

In [15]:
print(rdd2.collect())
print(6 in rdd2)
print(rdd3.collect())
print(rdd3.reduce(lambda x, y: x + y))
print(rdd3
    .map(lambda x: x * 2)
    .map(lambda x: x + 1)
    .map(lambda x: x / 2)
    .collect())

<class 'function'>
<class 'range'>
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
<class 'function'>
<class 'range'>
True
<class 'function'>
<class 'function'>
<class 'range'>
[0, 6, 12, 18, 24, 30, 36, 42, 48, 54]
<class 'function'>
<class 'function'>
<class 'range'>
270
<class 'function'>
<class 'function'>
<class 'function'>
<class 'function'>
<class 'function'>
<class 'range'>
[0.5, 6.5, 12.5, 18.5, 24.5, 30.5, 36.5, 42.5, 48.5, 54.5]


In [16]:
def multiply_and_print(multiplicand, comment):
    def multiply(number):
        print(number, comment)
        return multiplicand * number
    return multiply

rdd = sc.parallelize(range(10))
rdd2 = rdd.map(multiply_and_print(2, "mult2"))
rdd3 = rdd2.map(multiply_and_print(3, "mult3"))
rdd4 = rdd3.map(multiply_and_print(4, "mult4"))

rdd4.collect() # look at the order of lazy functions

<class 'function'>
<class 'function'>
<class 'function'>
<class 'range'>
0 mult2
0 mult3
0 mult4
1 mult2
2 mult3
6 mult4
2 mult2
4 mult3
12 mult4
3 mult2
6 mult3
18 mult4
4 mult2
8 mult3
24 mult4
5 mult2
10 mult3
30 mult4
6 mult2
12 mult3
36 mult4
7 mult2
14 mult3
42 mult4
8 mult2
16 mult3
48 mult4
9 mult2
18 mult3
54 mult4


[0, 24, 48, 72, 96, 120, 144, 168, 192, 216]