<h1><center>Spark Context and basic MapReduce operations</center></h1>

## We have a 100x100 board.
### We throw 3 stones a Million times.
<img src="misc/board.png" width="200">
## Week 3: Reading files with SparkContext and Intro to Lambda Expressions
### Given the XY position of each stone in the board <br> can we calculate how many of them forms a valid triangle?

In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext.getOrCreate()

### we take string lines

In [3]:
triangles = sc.textFile("data/triangles.csv")
triangles.take(5)

['13 27, 68 55, 12 62',
 '64 96, 91 62, 43 51',
 '28 44, 93 59, 68 4',
 '60 8, 87 65, 93 98',
 '4 65, 2 77, 18 85']

<h1><center>Lambda Expressions</center> <img src="misc/lambda.jpg" width="400" align="middle"/></h1>

In [4]:
distance = lambda p1, p2: ((p1[0]-p2[0])**2 + (p1[1]-p2[1])**2)**0.5
def sure_distance(p1, p2):
    x1, y1 = p1 # (13, 27)
    x2, y2 = p2 # (68, 55)
    dist = ((x2 - x1)**2 + (y2- y1)**2)**0.5
    return dist

In [5]:
distance((4,0), (0,3))
sure_distance((4,0), (0,3))

5.0

In [6]:
l = [(1,'z'), (2,'a')]

In [7]:
sorted(l)

[(1, 'z'), (2, 'a')]

In [8]:
sorted(l, key=lambda x: x[1])

[(2, 'a'), (1, 'z')]

### Let's deal with each line alone

In [9]:
sample_line = triangles.take(1)[0]
sample_line

'13 27, 68 55, 12 62'

### 1) split the string line by comma ,

In [10]:
split_comma = lambda line: line.split(',')
splitted_sample = split_comma(sample_line)
splitted_sample

['13 27', ' 68 55', ' 12 62']

### 2.1) Convert a string point to a tuple of integer x-y position
#### PS. Tuples are your friends

In [35]:
point_str2tuple = lambda point_str: tuple([int(ps) for ps in point_str.split()]) # this is just a substep
point_str2tuple(splitted_sample[0])

(13, 27)

### 2.2) Take every string point in the list and convert it to x-y integer position

In [36]:
splitted2points = lambda splitted_line: tuple([point_str2tuple(i) for i in splitted_line])
sample_points = splitted2points(splitted_sample)
sample_points

((13, 27), (68, 55), (12, 62))

### 3) Validate a triangle

In [37]:
def isvalid_triangle(p1, p2, p3):
    l1 = distance(p1, p2)
    l2 = distance(p2, p3)
    l3 = distance(p3, p1)
    return l1+l2>l3 and l1+l3>l2 and l2+l3 > l1

In [38]:
points_valid_triangle = lambda points: isvalid_triangle(*points)
points_valid_triangle(sample_points)

True

### 4) Combine all previous steps in one function

In [39]:
line2valid = lambda line: points_valid_triangle(
    splitted2points(
        split_comma(line)))
line2valid(sample_line)

True

## Week 4: Map, Reduce and Filtering operations

<h1><center>Map/Reduce</center></h1>
<img src="misc/mapreduce.jpeg" width="1000">

In [40]:
validated_triangles = triangles.map(line2valid)
validated_triangles.take(5)

[True, True, True, True, True]

In [41]:
num_triangles = validated_triangles.reduce(lambda x,y: x+y)
num_triangles

999074

![title](misc/oneline.jpg)

In [42]:
num_triangles_short = triangles.map(lambda line:line.split(','))\
                                .map(lambda splitted_line: tuple([
                                    tuple([int(ps) for ps in point_str.split()])
                                        for point_str in splitted_line]))\
                                .map(lambda points: isvalid_triangle(*points))\
                                .reduce(lambda x, y: x+y)
num_triangles_short

999074

## Filter: Choose only valid triangles

In [43]:
valid_triangles = triangles.map(lambda line:line.split(','))\
                                .map(lambda splitted_line: tuple([
                                    tuple([int(ps) for ps in point_str.split()])
                                        for point_str in splitted_line]))\
                                .filter(lambda points: isvalid_triangle(*points))
valid_triangles.count()

999074

## RDD as a Python dictionary?

In [53]:
triangles_id = valid_triangles.map(lambda x: (hash(tuple(x)), tuple(x)))
triangles_id.take(5)

[(-973281609, ((13, 27), (68, 55), (12, 62))),
 (-1943038625, ((64, 96), (91, 62), (43, 51))),
 (935403028, ((28, 44), (93, 59), (68, 4))),
 (555573573, ((60, 8), (87, 65), (93, 98))),
 (1402881299, ((4, 65), (2, 77), (18, 85)))]

In [54]:
triangles_zip = valid_triangles.zipWithUniqueId()\
                                .map(lambda x: (x[1], x[0]))
triangles_zip.take(5)

[(0, ((13, 27), (68, 55), (12, 62))),
 (2, ((64, 96), (91, 62), (43, 51))),
 (4, ((28, 44), (93, 59), (68, 4))),
 (6, ((60, 8), (87, 65), (93, 98))),
 (8, ((4, 65), (2, 77), (18, 85)))]

### Let's record the circumference of each valid triangle

In [46]:
def get_circumference(p1, p2, p3):  # if it is an attribute triangle.circumference, otherwise call it get_circumference(*triangle)
    l1 = distance(p1, p2)
    l2 = distance(p2, p3)
    l3 = distance(p1, p3)
    return l1 + l2 +l3

In [47]:
circumferences = valid_triangles.map(lambda points: (points, get_circumference(*points)))
circumferences.take(5)

[(((13, 27), (68, 55), (12, 62)), 153.16718353495094),
 (((64, 96), (91, 62), (43, 51)), 142.3197117729059),
 (((28, 44), (93, 59), (68, 4)), 183.69209268352833),
 (((60, 8), (87, 65), (93, 98)), 192.4716796394055),
 (((4, 65), (2, 77), (18, 85)), 54.46718011206216)]

## Sort triangles with their circumferences, top first

In [52]:
sorted_triangles_circum = circumferences.sortBy(lambda triangle_circum: triangle_circum[1], ascending=False)
sorted_triangles_circum.take(5)

[(((95, 96), (1, 4), (97, 0)), 323.63359264863595),
 (((95, 96), (100, 1), (3, 97)), 323.6103632138558),
 (((91, 1), (100, 100), (1, 12)), 322.5355198617959),
 (((0, 5), (100, 2), (100, 91)), 320.938886629934),
 (((98, 97), (9, 0), (3, 94)), 320.88210920228073)]

<img src="misc/hash.png" width="200" align="left">
<h1>"Hashing is your friend"</h1>
<h2><center>Efficiency</center></h2>

In [55]:
circumferences_hashed = valid_triangles.map(lambda points: (hash(points), get_circumference(*points)))
circumferences_hashed.take(5)

[(-973281609, 153.16718353495094),
 (-1943038625, 142.3197117729059),
 (935403028, 183.69209268352833),
 (555573573, 192.4716796394055),
 (1402881299, 54.46718011206216)]