#### Le but de ce TP est d'explorer les fonctionalités d'un stockage objet avec [MINIO](https://min.io/), une solution open source rapide et compatible S3.  

#### Nous y verrons comment configurer un serveur MinIO, gérer des buckets et interagir avec les objets

#### Vous pouvez retrouver la documentation [ici](https://min.io/docs/minio/linux/developers/python/API.html#) 

Pré-Requis

0 - docker, docker-compose

1 - git checkout main && git pull && git checkout -b "you-branche-name"

2 - cd Tps/TP2

3 - docker compose -f docker/docker-compose.yaml build 

4 - docker compose -f docker/docker-compose.yaml up

5 - Aller sur l'ui de minio sur http://localhost:9001 (user: miniouser, password: miniopassword)

6 - Aller sur jupyter sur http://localhost:8888 (puis ouvrir le tp puis commencer à travailler)

### On va explorer ensemble les fonctionnalités de minio (ou d'un stockage objet en général)

**Buckets**: Les buckets, unités fondamentales du stockage objet dans MinIO, permettent d'organiser et de gérer efficacement les données. On peut associer plusieurs configurations à un bucket. \
**Droits d'accès**: Vous pouvez définir qui a accès au bucket (public ou privé) et spécifier des permissions détaillées (lecture, écriture, etc.). \
**Versioning**: Cette option permet de conserver différentes versions des objets, utile pour suivre les modifications ou restaurer des données supprimées. \
**Politique de cycle de vie**: Elle automatise la gestion des objets en définissant des règles pour leur archivage ou leur suppression après une certaine durée. \
**Chiffrement**: Les données du bucket peuvent être protégées par un chiffrement pour garantir leur sécurité. \
**Notifications**: Configurez des événements pour envoyer des alertes lorsque des actions spécifiques (ajout, suppression) se produisent dans le bucket. \
**Replication**: Permet de copier automatiquement les données du bucket vers un autre, souvent dans un but de sauvegarde ou de haute disponibilité.

Les imports

In [1]:
from minio import Minio
import pandas as pd
import s3fs

In [2]:
MINIO_ACCESS_KEY = "miniouser"
MINIO_SECRET_KEY = "miniopassword"
MINIO_ENDPOINT_URL = "http://minio:9000"
BUCKET_NAME = "dataplatform"

In [3]:
storage_options={
   'key': MINIO_ACCESS_KEY,
   'secret': MINIO_SECRET_KEY,
   'endpoint_url': MINIO_ENDPOINT_URL,
}

Initialisons le client minio

In [4]:
client = Minio(
    "minio:9000",
    access_key=MINIO_ACCESS_KEY,
    secret_key=MINIO_SECRET_KEY,
    secure=False,  # Set to True if you're using HTTPS
    region = "eu-west-1"
    
)

Créer un bucket 

Referer vous à la documentation https://min.io/docs/minio/linux/developers/python/API.html#

In [20]:
bucket_name = "sales-raw-data-v2"

def create_bucket(bucket_name: str):
    if not  client.bucket_exists(bucket_name):
        client.make_bucket(bucket_name)
    else:
        print("Hey! This bucket exists already")


In [15]:
!ls

TP_datalake_Stockage-Object-Minio.ipynb  raw_data.csv


Lister les buckets

In [11]:
buckets = client.list_buckets()
print("la liste des buckets: ")
for bucket in buckets:
    print(bucket.name)

la liste des buckets: 
sales-raw-data
sales-raw-data-v2


Upload un fichier dans un bucket avec la méthode *fput_object*

In [14]:
! touch raw_data.csv

In [13]:
result = client.fput_object(
    bucket_name, "raw_data.csv", "raw_data.csv"
)

Supprimer ce fichier

In [16]:
client.remove_object(bucket_name,"raw_data.csv" )

Vérifier que la suppression s'est bien passée 

In [17]:
objects = client.list_objects(bucket_name)
for obj in objects:
    print(obj)

Supprimer le bucket que vous avez créer

In [19]:
client.remove_bucket("sales-raw-data")

On va à présent reprendre faire un peu de data processing 

Créer un bucket nommé: *dataplatform*

In [21]:
create_bucket("dataplatform")

Lire les données sources et réécrivez le dataframe dans minio dans le bucket que vous venez de créer

In [22]:
import psycopg2
import pandas as pd

In [23]:
db_params = {
    'dbname': 'dataplatform',
    'user': 'postgres',
    'password': 'postgrespassword',
    'host': 'postgres-db',
    'port': 5432
}
table_name = "stock_data"

In [24]:
def get_conn():
    conn = psycopg2.connect(**db_params)
    return conn

In [25]:
conn = get_conn()

Lire les données depuis la table postgres

In [26]:
query = f"SELECT * FROM {table_name}"
raw_df = pd.read_sql(query, conn)

  raw_df = pd.read_sql(query, conn)


AttributeError: 'Series' object has no attribute 'year'

In [29]:
raw_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 619040 entries, 0 to 619039
Data columns (total 7 columns):
 #   Column  Non-Null Count   Dtype  
---  ------  --------------   -----  
 0   date    619040 non-null  object 
 1   open    619029 non-null  float64
 2   high    619032 non-null  float64
 3   low     619032 non-null  float64
 4   close   619040 non-null  float64
 5   volume  619040 non-null  int64  
 6   name    619040 non-null  object 
dtypes: float64(4), int64(1), object(2)
memory usage: 33.1+ MB


In [39]:
raw_df["year"] = pd.DatetimeIndex(raw_df['date']).year

In [40]:
raw_df

Unnamed: 0,date,open,high,low,close,volume,name,year
0,2013-02-08,15.07,15.12,14.63,14.75,8407500,AAL,2013
1,2013-02-11,14.89,15.01,14.26,14.46,8882000,AAL,2013
2,2013-02-12,14.45,14.51,14.10,14.27,8126000,AAL,2013
3,2013-02-13,14.30,14.94,14.25,14.66,10259500,AAL,2013
4,2013-02-14,14.94,14.96,13.16,13.99,31879900,AAL,2013
...,...,...,...,...,...,...,...,...
619035,2018-02-01,76.84,78.27,76.69,77.82,2982259,ZTS,2018
619036,2018-02-02,77.53,78.12,76.73,76.78,2595187,ZTS,2018
619037,2018-02-05,76.64,76.92,73.18,73.83,2962031,ZTS,2018
619038,2018-02-06,72.74,74.56,72.13,73.27,4924323,ZTS,2018


##### Comme vu en cours on va sauvegarder les données brutes au niveau du stockage objet en appliquant les bonnes pratiques
- format
- partitionnement
- etc

In [33]:
#TODO
raw_df.to_parquet("s3://dataplatform/stock_data_stock_partitioned.parquet", compression="snappy", partition_cols="name",storage_options=storage_options)

In [41]:
#TODO
raw_df.to_parquet("s3://dataplatform/stock_data_year_partitioned.parquet", compression="snappy", partition_cols="year",storage_options=storage_options)

##### On va essayer de lire ces données à niveau depuis le stockage objet.

In [42]:
raw_from_minio = pd.read_parquet("s3://dataplatform/stock_data_year_partitioned.parquet", storage_options=storage_options)

In [43]:
raw_from_minio

Unnamed: 0,date,open,high,low,close,volume,name,year
0,2013-02-08,15.07,15.12,14.63,14.75,8407500,AAL,2013
1,2013-02-11,14.89,15.01,14.26,14.46,8882000,AAL,2013
2,2013-02-12,14.45,14.51,14.10,14.27,8126000,AAL,2013
3,2013-02-13,14.30,14.94,14.25,14.66,10259500,AAL,2013
4,2013-02-14,14.94,14.96,13.16,13.99,31879900,AAL,2013
...,...,...,...,...,...,...,...,...
619035,2018-02-01,76.84,78.27,76.69,77.82,2982259,ZTS,2018
619036,2018-02-02,77.53,78.12,76.73,76.78,2595187,ZTS,2018
619037,2018-02-05,76.64,76.92,73.18,73.83,2962031,ZTS,2018
619038,2018-02-06,72.74,74.56,72.13,73.27,4924323,ZTS,2018


In [49]:
# TO DO
stock_agg = raw_from_minio.groupby("name").agg({"close": "mean"}).reset_index().rename(columns={"name": "stock_name", "close": "price"})


In [54]:
stock_agg_sorted = stock_agg.sort_values(by="price", ascending=False)

##### Un peu d'analyse
- Le prix moyen de chaque action ?
- Les actions les plus cheres, les moins cheres ?

In [None]:
#TODO


##### On peut sauvegarder ce nouveau dataset aggrégé

In [57]:
#TODO
stock_agg_sorted.to_parquet("s3://dataplatform/clean_data/stock_data_aggregated.parquet", compression="snappy",storage_options=storage_options)
