In [6]:
import requests
import pandas as pd
from pymongo import MongoClient
# Importer le module MySQLdb
import MySQLdb
from cassandra.cluster import Cluster


In [7]:
def extract():
    print("Extract..")
    url_st1 = "https://airqino-api.magentalab.it/v3/getStationHourlyAvg/283164601"
    url_st2 = "https://airqino-api.magentalab.it/v3/getStationHourlyAvg/283181971"

    print("Recuperation station1..")
    response_st1 = requests.get(url_st1)
    print("Recuperation station2..")
    response_st2 = requests.get(url_st2)

    print("Verif..")

    if response_st1.status_code == 200 and response_st2.status_code == 200:
        data_st1 = response_st1.json()
        data_st2 = response_st2.json()

        station1_df = pd.DataFrame(data_st1['data'])
        station1_df['sensor'] = data_st1['header']['station_name']
        station2_df = pd.DataFrame(data_st2['data'])
        station2_df['sensor'] = data_st2['header']['station_name']
        print("Df créés.")

        return station1_df, station2_df
    return None

In [8]:
def transform():
    station1, station2 = extract()
    
    print("Transform..")

    # Convertir la colonne 'timestamp' en datetime
    station1['timestamp'] = pd.to_datetime(station1['timestamp'])
    station2['timestamp'] = pd.to_datetime(station2['timestamp'])

    station1['date'] = pd.to_datetime(station1['timestamp'].dt.date)
    station2['date'] = pd.to_datetime(station2['timestamp'].dt.date)

    print("Calcul de moyennes..")

    # Calculer la moyenne journalière de CO et PM2.5
    daily_avg_st1 = station1.groupby('date')[['CO', 'PM2.5']].mean().reset_index()
    daily_avg_st2 = station2.groupby('date')[['CO', 'PM2.5']].mean().reset_index()

    daily_avg_st1['sensor'] = station1['sensor']
    daily_avg_st2['sensor'] = station2['sensor']
    print("Terminé")

    return daily_avg_st1, daily_avg_st2


In [9]:
def transform_for_mysql():
    station1, station2 = extract()
    print("Transform..")
    station1['timestamp'] = pd.to_datetime(station1['timestamp'])
    station2['timestamp'] = pd.to_datetime(station2['timestamp'])

    station1.rename(columns={"timestamp": "date"},inplace = True)
    station2.rename(columns={"timestamp": "date"},inplace = True)

    station1['sensor'] = 'SMART188'
    station2['sensor'] = 'SMART189'

    daily_avg_st1 = station1[['date', 'sensor', 'CO', 'PM2.5']]
    daily_avg_st2 = station2[['date', 'sensor', 'CO', 'PM2.5']]

    print("Terminé")

    return daily_avg_st1, daily_avg_st2


In [10]:
def load_in_mongo():
    station1, station2 = transform()
    print("Transform..")

    print("Conection to mongo database..")

    client = MongoClient('localhost', 27017)
    db = client['sensors_data']
    collection = db['data']

    print("To_dict")

    station1_dict = station1.to_dict("records")
    station2_dict = station2.to_dict("records")

    print("Loading..")
    # Pour chaque document, vérifier s'il existe déjà dans la collection
    # Si non, insérer le document avec la méthode insert_one()
    # Sinon, passer au document suivant
    for doc in station1_dict + station2_dict:
        # On peut utiliser les champs date et sensor comme critère de recherche
        # On crée un dictionnaire avec ces deux clés et leurs valeurs
        query = {"date": doc["date"], "sensor": doc["sensor"]}
        # On utilise la méthode find_one() pour chercher le document par sa date et son sensor
        # Si le résultat est None, cela signifie que le document n'existe pas
        if collection.find_one(query) is None:
            # On insère le document avec la méthode insert_one()
            collection.insert_one(doc)
            print(f"Inserted document with date {doc['date']} and sensor {doc['sensor']}")
        else:
            # On passe au document suivant
            print(f"Document with date {doc['date']} and sensor {doc['sensor']} already exists")
    print("Ok")


In [11]:
def load_in_cassandra():
    station1, station2 = transform()
    print("Transform..")

    print("Connection to cassandra cluster..")

    # On crée un objet Cluster qui représente le cluster Cassandra
    # On peut spécifier les adresses des nœuds du cluster dans le paramètre contact_points
    cluster = Cluster(contact_points=['localhost'], port=9042)
    # On crée un objet Session qui permet d'exécuter des requêtes CQL
    session = cluster.connect('sensors_data')
    # On se connecte à l'espace de clés sensors_data
    session.set_keyspace('data')

    print("To_dict")

    station1_dict = station1.to_dict("records")
    station2_dict = station2.to_dict("records")

    print("Loading..")
    # Pour chaque document, vérifier s'il existe déjà dans la famille de colonnes data
    # Si non, insérer le document avec la méthode execute()
    # Sinon, passer au document suivant
    for doc in station1_dict + station2_dict:
        # On peut utiliser les champs date et sensor comme critère de recherche
        # On crée une requête CQL avec ces deux clés et leurs valeurs
        query = f"SELECT * FROM data WHERE date = '{doc['date']}' AND sensor = '{doc['sensor']}'"
        # On utilise la méthode execute() pour exécuter la requête CQL
        # Si le résultat est vide, cela signifie que le document n'existe pas
        if not session.execute(query):
            # On crée une requête CQL pour insérer le document
            # On utilise la méthode format() pour remplacer les valeurs des champs
            insert_query = f"INSERT INTO data (date, sensor, PM2_5, CO) VALUES ('{doc['date']}', '{doc['sensor']}', {doc['PM2_5']}, {doc['CO']})"
            # On exécute la requête CQL avec la méthode execute()
            session.execute(insert_query)
            print(f"Inserted document with date {doc['date']} and sensor {doc['sensor']}")
        else:
            # On passe au document suivant
            print(f"Document with date {doc['date']} and sensor {doc['sensor']} already exists")
    print("Ok")


In [12]:
def load_in_mysql():
    station1, station2 = transform()
    print("Connection to database..")
    conn = MySQLdb.connect(host="localhost", user="root", passwd="", db="sensors_data")
    cursor = conn.cursor()

    print("To_dict")

    station1_dict = station1.to_dict("records")
    station2_dict = station2.to_dict("records")

    print("Loading..")
    for row in station1_dict:
        sql = "INSERT INTO data (`date`, `sensor`, `PM2_5`, `CO`) VALUES (%s, %s, %s, %s)"
        val = (row["date"], row['sensor'], row["PM2.5"], row["CO"])
        cursor.execute("SELECT * FROM data WHERE `date` = %s and `sensor` = %s", (row["date"], row['sensor']))
        if cursor.rowcount == 0:
            cursor.execute(sql, val)
            conn.commit()

    for row in station2_dict:
        sql = "INSERT INTO data (`date`, `sensor`, `PM2_5`, `CO`) VALUES (%s, %s, %s, %s)"
        val = (row["date"], row['sensor'], row["PM2.5"], row["CO"])
        cursor.execute("SELECT * FROM data WHERE `date` = %s and `sensor` = %s", (row["date"], row['sensor']))
        if cursor.rowcount == 0:
            cursor.execute(sql, val)
            conn.commit()

    print("Ok")
    conn.close()

In [13]:
load_in_mysql()

Extract..
Recuperation station1..


Recuperation station2..
Verif..
Df créés.
Transform..
Calcul de moyennes..
Terminé
Connection to database..
To_dict
Loading..
Ok


In [14]:
load_in_mongo()

Extract..
Recuperation station1..
Recuperation station2..
Verif..
Df créés.
Transform..
Calcul de moyennes..
Terminé
Transform..
Conection to mongo database..
To_dict
Loading..
Document with date 2022-12-11 00:00:00 and sensor SMART188 already exists
Document with date 2022-12-12 00:00:00 and sensor SMART188 already exists
Document with date 2022-12-13 00:00:00 and sensor SMART188 already exists
Document with date 2022-12-14 00:00:00 and sensor SMART188 already exists
Document with date 2022-12-15 00:00:00 and sensor SMART188 already exists
Document with date 2022-12-16 00:00:00 and sensor SMART188 already exists
Document with date 2022-12-17 00:00:00 and sensor SMART188 already exists
Document with date 2022-12-18 00:00:00 and sensor SMART188 already exists
Document with date 2022-12-19 00:00:00 and sensor SMART188 already exists
Document with date 2022-12-20 00:00:00 and sensor SMART188 already exists
Document with date 2022-12-21 00:00:00 and sensor SMART188 already exists
Document 

In [None]:
load_in_cassandra()