# Outils informatiques pour le big data

## Examen du vendredi 7 septembre 2018


## Question 1  (1 point)
Ecrire une fonction `reverse` qui renvoie l'entier passé en argument mais lu de droite à gauche.
```py
>>> reverse(13)
31
```

In [None]:
def reverse(n):
    return int(str(n)[::-1])
reverse(12334567)

**Ecrire une fonction `palindromic` qui revoie `True` si le nombre passé en argument est palindromique:**
```py
>>> palindromic(121), palindromic(122)
True, False
```

In [None]:
def palindromic(n):
    s = str(n)
    return s == s[::-1]

**En utilisant les deux fonctions précedentes, calculer la somme des nombres non palindromiques inférieurs à une valeur nmax possédant la propriété suivante**:

*le carré du nombre inversé est égale au nombre inversé élévé au carré*.

- écrire une version utilisant un structure `for...if...`
- écrire une autre version en utilisant les deux fonctions python `filter` et `sum`.

In [None]:
def skinny_sum_with_for_loop(nmax):
    result = 0
    for n in range(nmax):
        if not palindromic(n) and reverse(n)**2 == reverse(n**2):
            result += n
    return result

print(skinny_sum_with_for_loop(1000))
%timeit skinny_sum_with_for_loop(1000)

In [None]:
def skinny_sum_with_filter(nmax):
    f = lambda n: not palindromic(n) and reverse(n)**2 == reverse(n**2)
    return sum(filter(f,range(nmax)))

print(skinny_sum_with_filter(1000))
%timeit skinny_sum_with_filter(1000)

# Calcul parallèle

On souhaite appliqué la fonction reverse en utilisant le calcul parallèle.

**Paralléliser la fonction `serial_map` en utilisant [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html)**

In [None]:
%%time 
from time import sleep

def perform_computation(f, result, data, i):
    sleep(1) # pause d'une seconde
    print ("Computing the ", i, "th result...")
    result[i] = f(data[i])

def serial_map(f, data):
    result = [None] * len(data)
    for i in range(len(data)):
        perform_computation(f, result, data, i)
    return result

data = [12, 86, 8, 24, 11, 21, 66, 11, 29, 34]

result = serial_map(lambda x: reverse(x), data )

print(result)

In [None]:
%%time

from concurrent.futures import ThreadPoolExecutor

def parallel_map(f, data):
    result = [None] * len(data)
    tasks = []
    with ThreadPoolExecutor() as e:
        for i in range(len(data)):
            tasks.append(e.submit(perform_computation,f, result, data, i))
        _ = [t.result() for t in tasks]
    return result

data = [12, 86, 8, 24, 11, 21, 66, 11, 29, 34]

result = parallel_map(lambda x: reverse(x), data )

print(result)    

## Genome

- Ecrivez un programme pour calculer les fréquences des sequences de cinq bases d'un génome, et trier ces sequences dans l'ordre décroissant des fréquences.

**Lire les lignes du fichier "genome.txt" et créer à partir de celles-ci une liste de sequences de longueur 5**

```py
>>> seq_split("masterpythonbigdatas")
['maste','rpyth','onbig','datas')
```

**Ecrire une fonction calculant la fréquence d'apparition des différentes séquences**

**Afficher les 10 premières séquences les plus fréquentes avec la valeur de leur fréquence.**

## Version parallèle 

Coder une version parallèle en utilisant 
- [Spark RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [Dask Bag](http://dask.pydata.org/en/latest/bag-creation.html)

In [None]:
def group_characters(line, n=5):
    result = ''
    i = 0
    for ch in line:
        result = result + ch
        i = i + 1
        if i % n == 0:
            yield result
            result = ''

def group_and_split(line):
    return [sequence for sequence in group_characters(line)]

In [None]:
group_and_split('abcdefghijklmno')

In [None]:
genome = 'genome.txt'
from operator import itemgetter
with open(genome) as f:
    data = f.readlines()
    
res = dict()
for line in data:
    for key in group_and_split(line):
        try:
            res[key] += 1
        except KeyError:
            res[key] = 1
        
dict(sorted(res.items(), key=itemgetter(1), reverse=True)[:10])

In [None]:
from pyspark import SparkContext

sc = SparkContext('local[*]')

In [None]:

genome = 'genome.txt'

rdd = sc.textFile(genome)
counts = rdd.flatMap(group_and_split) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda x, y: x + y)

counts.takeOrdered(10,lambda atuple: -atuple[1])

In [None]:
sc.stop()

In [None]:
import dask.bag as db
b = db.read_text(genome)

In [None]:
frequencies = b.map(group_and_split).flatten().frequencies().topk(10, lambda x: x[1])

In [None]:
frequencies.compute()