### Verify the spark contex
Variable initiated by sc

In [None]:
sc

### Create a python list of integers [1..10]

In [None]:
my_list = [i for i in range(1,11)]
print(my_list)

### Create an RDD form a python list

In [None]:
rdd = sc.parallelize(my_list)

### Display the rdd content: collect

In [None]:
result = rdd.collect()
print(result)

### Display the rdd content: First

In [None]:
rdd.first()

### Display the rdd content: take

In [None]:
result = rdd.take(1)
print(result)

In [None]:
result = rdd.take(4)
print(result)

### Verify the data structure

In [None]:
print(f"The rdd class type: {type(rdd)}")
print(f"The result class type: {type(result)}")

### Increment all elements of the rdd
[1, 2, 3] -> [2,3,4]

Apply a transformation using the map function

In [None]:
rdd_inc = rdd.map(lambda x: x + 1)
rdd_inc.collect()

#### Using a function inside the map

In [None]:
"""
Define the inc function
param1: integer
return: integer
"""
def inc(ele: int) -> int:
    return ele + 1

# Apply the transformation
rdd_inc_by_function = rdd.map(inc)

# Display the content of an RDD
rdd_inc_by_function.collect()

### Compute the square of each element
[1,2,3] -> [1,4,9]

Using lambda function

In [None]:
sq_rdd = rdd.map(lambda x: x * x)
sq_rdd.collect()

#### Using a defined function

In [None]:
def double(x: int) -> int:
    return x * x
sq_rdd = rdd.map(double)
sq_rdd.collect()

### In a single line
Create -> Transform -> Display

In [None]:
sc.parallelize(my_list).map(lambda x: x * x).collect()

### Count the number of elements of an RDD

In [None]:
rdd_inc.count()

### Filter odd numbers

In [None]:
rdd.filter(lambda x: x % 2 == 0).collect()

#### Using a defined function

In [None]:
def odd(ele: int) -> bool:
    if ele % 2 == 0:
        return True
    else:
        return False

    
#def odd(ele: int) -> bool:
#    return ele % 2 == 0

rdd.filter(odd).collect()

### Count the number of odd ints

In [None]:
rdd.filter(lambda ele: ele % 2 == 1).count()

### Sort RDD

In [None]:
random_list = [random.randint(1,10) for r in range(20) ]
rdd = sc.parallelize(random_list)
rdd.sortBy(lambda x:x).collect()

### Sort RDD descending

In [None]:
rdd.sortBy(lambda x:x, False).collect()

### Deduplicate

In [None]:
rdd.distinct().collect()

### Deduplicate and sort

In [None]:
rdd.distinct().sortBy(lambda x:x).collect()

### Deduplicate, sort the RDD and display the last two elements

In [None]:
rdd.distinct().sortBy(lambda x:x, False).take(2)

### Exercise 1
1. Create a python list of integers
2. Create an RDD from the python list
3. Apply a first transformation: f(x) = x - 1
4. Apply a second transformation: g(x) = x * 2
5. Display the final result

### How the reduce works: 
[1, 2, 3, 4] -> [3, 3, 4] -> [6, 4] -> 10

In [None]:
result = sc.parallelize(my_list).reduce(lambda x , y: x + y)
print(result)
print(type(result))

#### Using a defined function

In [None]:
def reducer(x: int, y: int) -> int:
    return x + y
sc.parallelize(my_list).reduce(reducer)

In [None]:
sc.parallelize(random_list) \
    .map(lambda x: x-1) \
    .map(lambda x: x*2) \
    .reduce(lambda x,y: x+y)

### Exercise 2
1. Create a python list composed of 20 random integers between (1,10)
2. Create an rdd from the python list
3. Deduplicate the numbers
4. Create two rdds even and odd from the initial rdd
5. Sum all the elements of each RDD

### Built in functions

In [None]:
print(f"The max: {rdd.max()}")
print(f"The min: {rdd.min()}")
print(f"The mean: {rdd.mean()}")
print(f"The standard deviation: {rdd.stdev()}")
print(f"The number of elements: {rdd.count()}")

print(rdd.stats())


In [None]:
import random

random_list = [random.randint(1,100) for i in range(20)]
random_list

In [None]:
rdd = sc.parallelize(random_list)
avg = rdd.reduce(lambda x,y : x+y) / rdd.count()
print(avg)

In [None]:
max_value = rdd.reduce(lambda x,y: x if x>y else y)
min_value = rdd.reduce(lambda x,y: x if x<y else y)
print(f'max value {max_value}')
print(f'min value {min_value}')

## Complexe structure

### FlatMap

In [None]:
ma_liste = [
    [1,2,3],
    [4,5,6],
    [7,8,9]
]
sc.parallelize(ma_liste).flatMap(lambda x:x).collect()

In [None]:
from typing import List, Tuple
users: List[Tuple[str, int]] = [
    ('Eli', 32),
    ('Dana', 44),
    ('Joe', 15),
    ('Anis', 20)
]

#### Filter adult users (age > 18) then sort the users by their age

In [None]:
sc.parallelize(users)\
    .filter(lambda x: x[1] > 18)\
    .sortBy(lambda x: x[1])\
    .collect()

### Exercise 3

The dataset below represents a list of cars defined by ('Constructor', 'Model', 'Color', 'ModelYear)
1. Create a RDD from the following list
2. Filter the data that contains empty values
3. Sort the data by the ModelYear

In [None]:
cars: List[Tuple[str, str, str, int]] = [
    ('VolksWagen', 'Golf', 'Yellow', 2018),
    ('Toyota', 'Yaris', 'White', 2020),
    ('VolksWagen', 'Golf', None, 2010),
    ('Peugeot', '208', 'Green', 2005),
    ('Honda', None, 'Blue', 2003)
]

### Exercise 4
Consider the following dataset
1. Create a RDD from the python list
2. Filter the None values
3. Sort The data by user ages
4. Display the number of persons per state

In [None]:
data = [
    {'Name': 'Elis', 'Age': 31, 'State': 'New York'},
    {'Name': 'Tatiana', 'Age': None, 'State': 'Louisiana'},
    {'Name': None, 'Age': 23, 'State': 'Alaska'},
    {'Age': 20, 'State': 'Hawai'},
    {'Name': 'Kedy', 'Age': 40, 'State': 'Colorado'},
    {'Name': 'Nadana', 'Age': 23, 'State': 'New York'}
]

In [None]:
notes = sc.parallelize([('math', 3),('science', 4),('philosophy', 10)])
notes.sortBy(lambda x:x[1],False).collect()

In [None]:
SUM = notes.map(lambda a: a[1]).reduce(lambda a,b: a+b)
COUNT = notes.count()
SUM / COUNT

## String types

### Read a text file

In [None]:
sc.textFile('/var/log/kern.log') \
    .map(lambda s: s.lower()) \
    .filter(lambda s: 'error' in s)\
    .collect()

In [None]:
rdd = sc.textFile('book.txt')
rdd.take(1)

In [None]:
rdd.map(lambda s: s.lower()).take(10)

In [None]:
rdd.map(lambda s: s.lower()) \
    .filter(lambda s: 'ebook' in s) \
    .count()

In [None]:
rdd.filter(lambda s: 'ebook' in s.lower()).count()

In [None]:
'Correctable Errors collector initialized'.lower()

In [None]:
rdd = sc.textFile('book.txt')
rdd.take(1)

In [None]:
rdd.count()

In [None]:
'Ceci est une phrase'.split()

In [None]:
rdd.flatMap(lambda s: s.split()).take(10)

In [None]:
rdd.flatMap(lambda s: s.split()).map(lambda x: x.lower()).take(10)

In [None]:
'Ceci, est une phrase avec, des ponctuations'.split()

In [None]:
'Ceci,'.split(',')

In [None]:
'est'.split(',')

In [None]:
'Ceci,'.strip(',')

### WordCount
1. Download the [book]()
2. Compute the occurence of each word, consider removing the punctuation and to lowercase all the words