In [12]:
import requests
import pymongo
import pandas as pd
import numpy as np
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta


## EXTRACTION DE MES DONNEES

## definition d'une  fonction d'extraction

In [2]:
#definir une  fonction d'extraction de donnees
#cette fonction prend en paramtre l'url et retourne un dictionnaire d'information sur le capteur et les 
#donnees receuillis par le capteur sous forme de data frame

def extraction(url):
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        
        info = data['header']
        donnees =  pd.DataFrame(data['data'])
        
    else:
        return "erreur lors de la connection"
    
    return info, donnees



In [3]:
#EXEMPLE 1:  d'extraction des donnees depuis l'url avec l'id 283164601

info_smart188, data_SMART188 = extraction("https://airqino-api.magentalab.it/v3/getStationHourlyAvg/283164601")

In [4]:
#EXEMPLE 2:  d'extraction des donnees depuis l'url avec l'id 283181971

info_smart189, data_SMART189 = extraction("https://airqino-api.magentalab.it/v3/getStationHourlyAvg/283181971")

In [13]:
info_smart188

{'station_name': 'SMART188',
 'station_lat': 5.369297504425049,
 'station_lon': -3.958930253982544}

In [5]:
data_SMART188

Unnamed: 0,timestamp,CO,T,T. int.,NO2,O3,PM10,PM2.5,RH
0,2022-12-08 15:00:00,0.387553,17.753334,283.90000,23.237047,2.898020,14.300000,8.066667,59.210000
1,2022-12-08 16:00:00,0.510957,17.903334,285.13333,23.184278,3.218808,13.633333,7.333334,60.230000
2,2022-12-08 17:00:00,0.534507,17.973333,285.73334,23.149902,3.415739,13.033334,7.300000,60.590000
3,2022-12-08 18:00:00,0.484383,17.750000,283.86667,23.411600,2.076512,14.533334,8.433333,61.676666
4,2022-12-08 19:00:00,0.555620,17.446667,281.66666,23.623089,0.953288,17.666666,10.966666,62.930000
...,...,...,...,...,...,...,...,...,...
8149,2023-12-08 10:00:00,-0.641933,32.230000,437.70000,17.850210,37.296852,17.833334,7.533333,64.320000
8150,2023-12-08 11:00:00,-0.771043,33.186670,450.00000,17.320896,40.510357,15.366667,5.700000,60.243336
8151,2023-12-08 12:00:00,-0.809643,33.256668,453.00000,16.940329,42.474796,13.966666,5.166666,59.576668
8152,2023-12-08 13:00:00,-0.752519,33.058064,444.19354,16.768282,42.761086,15.032258,5.322581,60.819355


## CALCUL DE MES MOYENNES POUR CHAQUE CAPTEUR

#### definition d'une fonction de calcul de moyennes

In [7]:
#definir une fonction pour le calcule de moyennes.
#Cette fonction prend en paramettre  les donnees d'un capteur et calcule la moyenne par jour de CO et de PM_2.5 ,  
#elle retourne ensuite les resultats sous forme d'un data frame

def calcul_moy(data):
    df = pd.DataFrame({'date': data['timestamp'],'moy_CO': data['CO'] ,'moy_PM2.5': data['PM2.5']})
    df['date'] = df['date'].str.slice(stop=-9)   #supression des heures et minutes
    moyennes = df.groupby('date')[['moy_CO','moy_PM2.5']].mean().reset_index()
    return moyennes



In [8]:
#EXEMPLE : calcul de la moyennes de CO et PM2.5 pour le smart 188

moy_smart188 = calcul_moy(data_SMART188)

In [9]:
#EXEMPLE : calcul de la moyennes de CO et PM2.5 pour le smart 189

moy_smart189 = calcul_moy(data_SMART189)

In [10]:
moy_smart189

Unnamed: 0,date,moy_CO,moy_PM2.5
0,2022-12-08,0.509186,12.455555
1,2022-12-09,0.485723,8.419893
2,2022-12-10,0.482676,6.627778
3,2022-12-11,0.301162,3.828719
4,2022-12-12,0.344802,8.723611
...,...,...,...
340,2023-12-04,-0.063040,13.094400
341,2023-12-05,-0.120018,15.155583
342,2023-12-06,-0.017683,19.635708
343,2023-12-07,0.081053,18.790637


## insertion des resultats  dans mongo

In [17]:
#definir une fonction d'insertion des resultats du calcule de moyenne.
#cette fonction prend en paramettre le data_frame obtenu lors du calcule de la moyenne
# par jour du CO et du PM2.5 d'un capteur
#puis une base de donnees mongo et une collection
#elle stock ensuite le data frame sous forme de plusieurs dictionnaire dans mongo


def insertion(data,db,collection):
    
    client = pymongo.MongoClient("mongodb://localhost:27017/" )
    database = client[db]
    collection = database[collection]
    
    for i in range(len(data)):
        res = {"date":data.iloc[i,0],"moy_co":data.iloc[i,1],"moy_pm":data.iloc[i,2]}
        result = collection.insert_one(res)
    
    client.close()

  

In [18]:
#insertion des resulats du calcul de moyennes pour smart188 dans mongo
insertion(moy_smart188,"base_data354","moyen1" )

In [19]:
#insertion des resulats du calcul de moyennes pour smart189 dans mongo
insertion(moy_smart189,"base_data354","moyen2" )

### definition d'une fonction d'insertion de nos donnees dans mongo pour la visualisation

In [18]:
#definir une fonction d'insertion des donnees d'un capteur  dans la base mongo
#elle prend en paramettre les donnees, la base de donnees et le mon d'une collection

def insertion_data(data,db,collection):
    
    data['timestamp'] = data['timestamp'].str.slice(stop=-9)  #suppression des heure et des minutes 
    
    client = pymongo.MongoClient("mongodb://localhost:27017/" )
    database = client[db]
    collection = database[collection]
    
    for i in range(len(data)):
        res = {"date":data.iloc[i,0],"CO":data.iloc[i,1],"T":data.iloc[i,2],
              "T. int.":data.iloc[i,3],"NO2":data.iloc[i,4],"O3":data.iloc[i,5],
               "PM10":data.iloc[i,6],"PM2.5":data.iloc[i,7],"RH":data.iloc[i,8]
              }
        
        result = collection.insert_one(res)
    
    client.close()
    
#Nous avons definit ici une fonction qui stock toute les donnees collecter par un capteur dans une base de donnees mongo

In [19]:
#Exemple : insertion des donnees de smart_188 dans la base de donnees
insertion_data(data_SMART188,"base_data354","donnee_smart188")

In [20]:
#insertion des donnees de smart_189 dans la base de donnees
insertion_data(data_SMART189,"base_data354","donnee_smart189")

# AUTOMATISATION

### Pour automatiser mon code, je peux utiliser le dag airflow suivant:

####  Exemple d'un  dag qui recupere les donnees de nos capteurs, calcule les moyennes de CO et de PM2.5 par heure, et stock le 
#### resultat dans la base de donnees mongo db
#### NB: cette operation est effectuer toute les heure 

In [None]:

# Paramètres du Mon DAG

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 12, 6),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),}
    'email': ['nebout@email.com'], 
    'email_on_failure': True,
 
    
#DAG 

dag = DAG(
    'projet_data354',
    default_args=default_args,
    description='Extraction toutes les heures',
    schedule_interval='@hourly',)



# Tâche 1: Extraction des donnees  du premier capteur
t1_extrac_S188 = PythonOperator(
    task_id='extract_smart188',
    python_callable=extraction,
    op_args=['https://airqino-api.magentalab.it/v3/getStationHourlyAvg/283164601'],  # URL capteur smart 188
    dag=dag,
)


# Tâche 2: Extraction des donnees du deuxieme capteur
t2_extrac_S189 = PythonOperator(
    task_id='extract_data_smart189',
    python_callable=extraction,
    op_args=['https://airqino-api.magentalab.it/v3/getStationHourlyAvg/283181971'],  # URL capteur smart 189
    dag=dag,
)




# Tâche 3: calcule  de moyennes pour  smart_188 .
t3_moy_s188 = PythonOperator(
    task_id='moy_S188',
    python_callable=calcul_moy,
    provide_context=True,
    op_kwargs={'return2_tache1': '{{ ti.xcom_pull(task_ids="t1_extrac_S188")[1] }}'},
    dag=dag,
)



# Tâche 4: calcule  de moyennes pour  smart_189.
t4_moy_s189 = PythonOperator(
    task_id='moy_S189',
    python_callable=calcul_moy,
    provide_context=True,
    op_kwargs={'return2_tache2': '{{ ti.xcom_pull(task_ids="t2_extrac_S189")[1] }}'},
    dag=dag,
)




# Tâche 5: insertion des moyennes de s188 dans mongo.
t5_ins_s188 = PythonOperator(
    task_id='ins_S188',
    python_callable=insertion,
    op_args=[t3_moy_s188.output,"base_data354","moy_SMART188"],
    dag=dag,
)



# Tâche 6: insertion des moyennes de s189 dans mongo.
t6_ins_s189 = PythonOperator(
    task_id='ins_S189',
    python_callable=insertion,
    op_args=[t4_moy_s189.output,"base_data354","moy_SMART189"],
    dag=dag,
)



# Définir l'ordre d'exécution des tâches
t1_extrac_S188 >> t3_moy_s188 >> t5_ins_s188
t2_extrac_S189 >> t4_moy_s189 >> t6_ins_s189
