In [201]:
import psycopg2
from psycopg2.extras import register_json
import pandas as pd
import numpy as np

In [202]:
import psycopg2

class Database:
    def __init__(self, host, database, user, password, port):
        self.password = password
        self.host = host
        self.db_name = database
        self.user = user
        self.port = port
        print("Database Ready")
    def getConnection(self):
        conn = psycopg2.connect(host=self.host, dbname=self.db_name, user=self.user, password=self.password, port=self.port)
        return conn
    def selectDataframe(self, query):
        conn = None
        try:
            conn = self.getConnection()
            df = pd.read_sql_query(query, con=conn)
            conn.close()
            return df
        except psycopg2.DatabaseError as error:
            raise error
        finally:
            if conn is not None:
                conn.close()
    def select(self, query, parameter=[]):

        conn = psycopg2.connect(
            host=self.host,
            dbname=self.db_name,
            user=self.user,
            password=self.password,
            port=self.port)
        try:
            cursor = conn.cursor()
            cursor.execute(query, parameter)
        except:
            print("cant execute query")
            raise
        rows = cursor.fetchall()
        conn.close()
        return rows
    def delete(self, query):
        conn = None
        try:
            conn = self.getConnection()
            cursor = conn.cursor()
            cursor.execute(query)
            conn.commit()
            cursor.close()
            conn.close()
        except psycopg2.DatabaseError as error:
            raise error
        finally:
            if conn is not None:
                conn.close()
    def insert(self, query, parameter=[]):
        conn = psycopg2.connect(
            host=self.host,
            dbname=self.db_name,
            user=self.user,
            password=self.password,
            port=self.port)
        try:
            cursor = conn.cursor()
            cursor.execute(query, parameter)
            conn.commit()
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            cursor.close()
            conn.close()
            return 1
        cursor.close()
        conn.close()
    def insert_returning_id(self, query, parameter=[]):
        conn = psycopg2.connect(
            host=self.host,
            dbname=self.db_name,
            user=self.user,
            password=self.password,
            port=self.port) 
        try:
            cursor = conn.cursor()
            cursor.execute(query, parameter)
            id_returning = cursor.fetchone()[0]
            conn.commit()
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            cursor.close()
            conn.close()
            return -1
        cursor.close()
        conn.close()
        return id_returning

    def copy_from_df(self, df):
        conn = psycopg2.connect(
            host=self.host,
            dbname=self.db_name,
            user=self.user,
            password=self.password,
            port=self.port )
        cur = conn.cursor()
        s_buf = io.StringIO()
        df.to_csv(s_buf, header=False, index=True)
        # adapt_row is a custom method, differing from psyopg's adapt
        s_buf.seek(0)
        cur.copy_from(
         s_buf, "raw_data", sep=",", columns=("timestamp", "variable_id", "value") )
        conn.commit()
        cur.close()
        conn.close()
    def insertDataframe(self, df, table):
        # save dataframe to an in memory buffer
        my_df = df.set_index('timestamp')
        buffer = StringIO()
        my_df.to_csv(buffer, index_label='id', header=False)
        buffer.seek(0)

        conn = self.getConnection()
        cursor = conn.cursor()
        try:
            cursor.copy_from(buffer, table, sep=",", columns=("timestamp", "value", "variable_id"))
            conn.commit()
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            cursor.close()
            return 1
        print("Insert dataframe done with success!")
        cursor.close()
    def toPandas(self, query, params = None):
        conn = None
        try:
            conn = self.getConnection()
            data = pd.read_sql(query, conn, params=params)
            return data
        except psycopg2.DatabaseError as error:
            raise error
        finally:
            if conn is not None:
                conn.close()

In [203]:
import pandas as pd

class Query:
    def __init__(self, host, db_name, user, port, password):
        self.db = Database(
            host=host, database=db_name, user=user, port=port, password=password )
    def get_df_by_variable_id(self, variable_id, variable_name):
        query = """
            SELECT r.timestamp, r.value, r.variable_id FROM raw_data r
            WHERE r.variable_id = %s;
        """
        data = self.db.select(query, (variable_id,))
        df = pd.DataFrame(data=data, columns=["timestamp", variable_name,'id'])
        return df.drop(columns=['id'])
    def getAllVariableData(self, variable_id):
        query = """
                SELECT timestamp, value, variable_id
                FROM raw_data
                WHERE variable_id={0}
                UNION ALL 
                SELECT timestamp, value, variable_id
                FROM measures
                WHERE variable_id={0}
                """.format(variable_id)
        data = self.db.selectDataframe(query)
        return data
    def deleteData(self, offset_variable_id, startDate):
        sqlQuery = """
                    DELETE FROM measures
                    WHERE timestamp >= to_timestamp('{}','YYYY-MM-DD HH24:MI:SS') and variable_id={}
                    """.format(startDate, offset_variable_id)
        self.db.delete(sqlQuery)

    def insert_result_variable(self, df, res_name, res_metric_id) :
        # create new sensor for the resulted variable
        query = """
                INSERT INTO sensors(name,sensor_type_id,x,y,z,installation_date,initial_frequency,site_gateway_id) values(%s,%s,%s,%s,%s,%s,%s,%s) RETURNING id;
            """
        parameter = (res_name,12,1650233,8392540,100,'04/03/2021',20,1)
        sensor_id = self.db.insert_returning_id(query, parameter)
        # print('sensor_id'+ str(sensor_id))
        # creating variables in db for the created sensors
        query = """
                INSERT INTO variables (name,metric_id,sensor_id) values(%s,%s,%s) RETURNING id;
            """ 
        parameter = (res_name, res_metric_id, sensor_id)
        variable_id = self.db.insert_returning_id(query, parameter)
        # inserting res value to db (raw_data table)
        for i in range (0,len(df)) :
            query = """
                INSERT INTO raw_data (timestamp, value, variable_id) values(%s,%s,%s);
            """ 
            parameter =  (df.iloc[i]['timestamp'], df.iloc[i]['res'][0], variable_id)
            self.db.insert(query, parameter)

        return variable_id, sensor_id

   

In [242]:
dbHost = '13.36.221.162'
dbName = 'thminsight'
dbUser = 'thminsight'
dbPort = '5432'
dbPassword = 'psS8hqr6oaoJ7fgk'

In [243]:
db=Database(dbHost, dbName, dbUser,dbPassword,dbPort )
queryManager = Query(dbHost, dbName, dbUser,dbPort, dbPassword)

Database Ready
Database Ready


In [290]:
ids=db.toPandas('SELECT  name , id from variables join raw_data ON variables.id=raw_data.variable_id')

In [291]:
sr='SELECT  name ,metric , unit , metric_id  from metrics join variables ON variables.metric_id=metrics.id'

In [292]:
metrics=db.toPandas(sr)

In [298]:
ids

Unnamed: 0,name,id
0,Current_mA,1246
1,Current_mA,1249
2,Distance,1238
3,Temperature,1245
4,Temperature,1245
...,...,...
865062,courant_1,5409
865063,courant_1,5409
865064,courant_1,5409
865065,courant_1,5409


In [299]:
metrics

Unnamed: 0,name,metric,unit,metric_id
0,TBM2a_STA1_Pressure,sans unite,,39
1,TBM2a_STA1_Humidity,sans unite,,39
2,TBM2a_STA1_timerMesures,sans unite,,39
3,TBM2a_STA1_Voltage,sans unite,,39
4,TBM2a_STA1_Temperature,sans unite,,39
...,...,...,...,...
4049,TBM2a_STA1_MPO901_F1_Vt,angle,gon,11
4050,TBM2a_STA1_MPO901_F1_SD,position,m,15
4051,TBM2a_STA1_MPO901_F2_Hz,angle,gon,11
4052,TBM2a_STA1_MPO901_F2_Vt,angle,gon,11


In [301]:


id_temperature_var = 1245
id_water_level_var = 3142
id_udef_var=1238
id_res_var = 1098
res_metric_id = 47
res_name = 'res'

df_temp = queryManager.get_df_by_variable_id(id_temperature_var,'temperature')
df_def = queryManager.get_df_by_variable_id(id_udef_var, 'deplacement')
df_H = queryManager.get_df_by_variable_id(id_water_level_var, 'H')



In [305]:
df_def

Unnamed: 0,timestamp,deplacement
0,2022-02-19 09:28:36+00:00,-1.0
1,2022-02-18 05:08:44+00:00,-1.0
2,2022-02-23 01:48:13+00:00,-1.0
3,2022-02-22 08:28:17+00:00,-1.0
4,2022-02-22 22:28:14+00:00,-1.0
...,...,...
5049,2022-04-12 12:04:07+00:00,-1.0
5050,2022-04-12 12:44:06+00:00,-1.0
5051,2022-04-12 13:04:06+00:00,-1.0
5052,2022-04-12 13:24:06+00:00,-1.0


In [131]:
        query = """
                SELECT timestamp, value, variable_id
                FROM raw_data
                WHERE variable_id={0}
                UNION ALL 
                SELECT timestamp, value, variable_id
                FROM measures
                WHERE variable_id={0}
                """.format(1244)

In [132]:
db.selectDataframe(query)

Unnamed: 0,timestamp,value,variable_id
0,2022-02-18 11:09:46+00:00,-1.0,1244
1,2022-02-19 16:09:38+00:00,-1.0,1244
2,2022-02-17 17:49:52+00:00,-1.0,1244
3,2022-02-20 07:09:33+00:00,-1.0,1244
4,2022-02-20 08:49:33+00:00,-1.0,1244
...,...,...,...
5070,2022-04-12 13:16:42+00:00,-1.0,1244
5071,2022-04-12 13:36:42+00:00,-1.0,1244
5072,2022-04-12 13:56:41+00:00,-1.0,1244
5073,2022-04-12 14:16:41+00:00,-1.0,1244


In [104]:
raw_data.head()

Unnamed: 0,alias,id
0,TBM2a_STA1_Pressure,1626
1,TBM2a_STA1_Humidity,1627
2,TBM2a_STA1_timerMesures,1628
3,TBM2a_STA1_Voltage,1629
4,TBM2a_STA1_Temperature,1625
