# NF26 — Stockage en haute volumétrie et applications
# TD7 — Algorithmes en Streaming et Map-Reduce

## 1 Question préliminaires
Question Pour 𝑑 = 4, estimer la valeur maximale de 𝑛 de telle sorte à ce que 𝑋 puisse
tenir dans 8GiB de RAM en utilisant des flottant standards.
-  float = 32 bits 
- double = 64 bits
- 1 octet = 8 bits 


Question Donner une solution simple qui permet de doubler la valeur maximale de 𝑛
en perdant en précision sur la représentation des flottants.
- 1/2 milliard de ligne 


## 2 Traitements en haute volumétrie, en streaming
### 2.1 Algorithmes élémentaires


**Question** *Créer un générateur qui permet de lire les données en streaming et qui
retourne pour chaque ligne une dataclass.*

In [28]:
import csv
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable

In [18]:
import numpy as np
import csv
from dataclasses import dataclass
from datetime import datetime
def data_reader() -> dict: 
    for file in ["202403-citibike-tripdata_1.csv","202403-citibike-tripdata_2.csv", "202403-citibike-tripdata_3.csv"] : 
        with open(file, 'r') as data:
            yield from csv.DictReader(data)


In [7]:
reader = data_reader()
next(reader)

{'ride_id': '0FC89A53DF9D7E90',
 'rideable_type': 'electric_bike',
 'started_at': '2024-03-07 19:49:43',
 'ended_at': '2024-03-07 20:20:33',
 'start_station_name': '48 St & Skillman Ave',
 'start_station_id': '6283.05',
 'end_station_name': 'Kingston Ave & Park Pl',
 'end_station_id': '4016.03',
 'start_lat': '40.746153593',
 'start_lng': '-73.916188598',
 'end_lat': '40.67308',
 'end_lng': '-73.94191',
 'member_casual': 'member'}

In [23]:
@dataclass(frozen=True)
class Trip:
    started_at: datetime
    ended_at: datetime
    start_lat: float
    start_lng: float
    end_lat: float
    end_lng: float


In [35]:
def get_data() -> dict:
    for path in Path("202403-citibike-tripdata.csv").rglob("*.csv"):
        reader = csv.DictReader(path.open("r"))
        for row in reader:
            try:
                yield Trip(
                    started_at=datetime.strptime(row["started_at"], "%Y-%m-%d %H:%M:%S"),
                    ended_at=datetime.strptime(row["ended_at"], "%Y-%m-%d %H:%M:%S"),
                    **{
                        col: float(row[col])
                        for col in ("start_lat", 'start_lng',"end_lat", 'end_lng')
                    }
                )
            except ValueError:
                pass


In [36]:
next(iter(get_data()))

Trip(started_at=datetime.datetime(2024, 3, 7, 19, 49, 43), ended_at=datetime.datetime(2024, 3, 7, 20, 20, 33), start_lat=40.746153593, start_lng=-73.916188598, end_lat=40.67308, end_lng=-73.94191)

**Question** *Créer la fonction mean qui à l’aide du générateur calcule en streaming la
moyenne empirique 𝑇̄ de la durée d’un trajet et la moyenne empirique 𝐷̄ de la distance
parcourue, en une passe sur le jeu de données.*

In [40]:
first_100 = []
for i, el in enumerate(get_data()) : 
    first_100.append(el)
    if i> 100 : 
        break
#first_100

In [39]:
n_trips= 0
n=0
for i, el in enumerate(get_data()) : 
    n+=1
    n_trips+=(el.ended_at-el.started_at).seconds
print(n_trips/n) 

733.2550470748557


### 2.2 Algorithmes approchés en streaming
**Question** Créer la fonction median qui à l’aide du générateur calcule en streaming une
valeur approchée de la médiane de la durée d’un trajet.


In [42]:
#calcul du min et max possible en streaming mais pas de la median
precision = 10
median= 0
mini = np.Inf  
maxi = -np.Inf
for i, el in enumerate(get_data()):
    duree = (el.ended_at-el.started_at).seconds
    if duree < mini : 
        mini= duree
    if maxi < duree : 
        maxi = duree
median_estimate = (mini+maxi)/2
median_min= mini
median_max= maxi
while(median_estimate - median_min > precision):
    n_trips_below_estimate = 0
    n_trips_above_estimate = 0
    for el in get_data():
        if (el.ended_at-el.started_at).seconds < median_estimate : 
            n_trips_below_estimate+=1
        else : 
            n_trips_above_estimate +=1
    if n_trips_below_estimate < n_trips_above_estimate : 
        median_min = median_estimate
    else : 
        median_max = median_estimate
    median_estimate = (median_min + median_max/2)

KeyboardInterrupt: 

## 3 Traitements en haute volumétrie, Map-Reduce avec PySpark
Question À l’aide de l’API RDD de PySpark, calculer :
- La station de départ la plus fréquente,
- Les 4 trajets les plus fréquents l’après-midi,
- Une valeur approchée du 3ème quartile de la durée d’un trajet.
