# Extracting data from csvs

In [1]:
%store -r sensorIds
%store -r sensorPaths

In [2]:
processLocation = False

In [3]:
import pandas as pd
import numpy as np
import psycopg2
from datetime import datetime
from datetime import timezone
import os

In [4]:
def dataSplit(csvpath,dateString):

    data = {} #intialise empty dictionary to store each day of records

    df_temp = pd.read_csv(csvpath,parse_dates=True, index_col="timestamp")

    #convert index from float to int
    df_temp.index = df_temp.index.astype(int,copy=False)

    # using the dates which are already supplied. This strategy in the line below converts them and rounds down to date using 'd' flag
    # This strategy (line below) will keep just the date
    df_temp['day'] = pd.to_datetime(df_temp[dateString], dayfirst=True, errors='coerce').dt.date

    the_unique_dates = df_temp['day'].unique()
    #print('Unique dates:',the_unique_dates)
    # this gives the same result as the for loop below
  
    #splitting the dataframe into separate days
    #for each day in unique dates set:
    for day in the_unique_dates:
        try:
            # In my code below I assign the subset of records to a new dataframe called dft
            # create 'midnight' timestamps
            timestampKey = int((pd.to_datetime(day, errors='coerce')).timestamp())

            # select the records for this day
            dft = df_temp[df_temp['day']==day]

            #drop the day column to save space (we don't need this anymore)
            dft = dft.drop("day", axis=1)

            data[timestampKey] = dft

        except KeyError as e:
            print(e)
    
    return data

In [5]:
#TODO read json from cypress to get the date range for the data. 
#convert date range into a list of date timestamps (dateList)
#for each date in dateList check if there is a matching timestampkey in measurement dictionary for each sensor 
#flag the missing days of data 


In [6]:
#checks if the spliited dataframes total records match the original dataframe
def testSize(my_dictionary,path):
    split_total = 0

    #extracted dataframes:
    for timestampKey in my_dictionary:
        df_temp = my_dictionary[timestampKey]
        split_total += len(df_temp.index.values)

    #original dataframes:
    df_temp = pd.read_csv(path,parse_dates=True, index_col="timestamp")
    total = len(df_temp.index.values)

    #check if the sizes match
    if split_total != total:
        print(False)
        print("extracted dataframe size: " + str(split_total) + "\n" + "original dataframe size: " + str(total))


In [7]:
measurement_dictionary = {}
location_dictionary = {}

# sensorDictionary = {}
for key in sensorPaths:
    value = sensorPaths[key]
    for i in range(len(value)):
        if i == 0:
            measurement_dictionary[key] = dataSplit(value[i],"date")
            #test size per sensor
            testSize(measurement_dictionary[key],value[i])
        else:
            location_dictionary[key] = dataSplit(value[i],"date")
            #test size per sensor
            testSize(location_dictionary[key],value[i])


In [8]:
#disable location procesdsing if no location csv found
if len(location_dictionary) == 0:
    processLocation = False 
    print("No locations found")

# Getting hourly averages for each day

In [9]:
def hourlyAverage(dictionary,measure_dict_type):
    daily_averages_dictionary = {}

    df = pd.DataFrame

        #loop through each day of data 
    for timestampKey in dictionary:
            
        df = dictionary[timestampKey]
        df = df.set_index('date')
        df.index = pd.to_datetime(df.index, dayfirst=True)
        df.sort_index()

        #resample min max if using location dictionary or mean if using measurement dictionary
        if measure_dict_type == True:
            df = df.resample('60min').mean()
        else:
            df = df.resample('60min').agg(['min','max']) 

            hoursummary = []
            #loop through each hour of data
            try:
                for row in df.iterrows():
                    min_x= row[1][0]    # lat
                    max_x = row[1][1]   # lat
                    
                    min_y= row[1][2]    # long
                    max_y = row[1][3]   # long

                    geometry_string = "POLYGON(({} {}, {} {}, {} {}, {} {},{} {}))".format(min_x,min_y,   min_x,max_y,   max_x,max_y,   max_x,min_y,   min_x,min_y)

                    
                    hoursummary.append((row[0],geometry_string))
    
            except Exception as e:
                print('The dataframe is empty therefore no bounding box will be applied :{}'.format(e))

            df = pd.DataFrame.from_records(hoursummary,columns=['date', 'boundingBox'])
            df = df.set_index('date')
    
    #generate timestamp index for new dataframe
    df['timestamp'] = df.index.values.astype(np.int64) // 10 ** 9
    df = df.set_index('timestamp')
    daily_averages_dictionary[timestampKey] = df   #assign new dataframe to coressponding key

    return daily_averages_dictionary

In [10]:
sensor_dictionary = {}
measurement_averages_dictionary = {}
location_averages_dictionary = {}

for key in sensorIds:
    hourly_dictionary = {}

    measurement_averages_dictionary[key] = hourlyAverage(measurement_dictionary[key],True)
    
    if processLocation == True:
        location_averages_dictionary[key] = hourlyAverage(location_dictionary[key],False)
        #df = hourlyAverage(sensorIds,location_dictionary,False)
        
        for key in measurement_averages_dictionary:
            for timestampKey in measurement_averages_dictionary[key]:
                try:
                    tempdf = measurement_averages_dictionary[key][timestampKey]
                    tempdf2 = location_averages_dictionary[key][timestampKey]
                    
                    #merge both location and measurement into one dataframe
                    df = pd.concat([tempdf2, tempdf], axis=1)

                    hourly_dictionary[timestampKey] = df
                except Exception as e:
                    print('Location data unavailable for this day {} : {}'.format(str(timestampKey),e))
sensor_dictionary[key] = hourly_dictionary

In [12]:
if processLocation == True:
    for key in sensor_dictionary:
        for timestampKey in sensor_dictionary[key]:
            tempdf = sensor_dictionary[key][timestampKey]
        break

    tempdf.head(2)

# Preparing data for upload into PostGres

In [21]:
sensor_summaries = {}
sensor_data = {}

for key in sensorIds:

    ldf = mdf =  pd.DataFrame
    geometry_string = ""
    timestamp_sensor_key = ""

    #looking at each day of data 
    for timestampKey in measurement_dictionary[key]:
   
        # concatenating numbers into text: 
        timestamp_sensor_key = "%s_%s" % (timestampKey, key)

        #only process locations if they exist 
        #if no location exists then use empty dataframe and empty geometry string
        if processLocation == True:
            #try extract the bounding box for the current day of data
            try:
                #location dataframe
                ldf = location_dictionary[key][timestampKey]

                #create bounding box polygon
                min_y= ldf['longitude'].min()
                max_y = ldf['longitude'].max()

                min_x= ldf['latitude'].min()
                max_x = ldf['latitude'].max()

                ldf = ldf.drop('date', axis=1)
                #POLYGON(minx miny, minx Maxy, maxx Maxy, maxx miny, minx miny)
                geometry_string = "POLYGON(({} {}, {} {}, {} {}, {} {},{} {}))".format(min_x,min_y,   min_x,max_y,   max_x,max_y,   max_x,min_y,   min_x,min_y)
            
            except Exception as e:
                print('No location dataframe found at timestampkey: {} for sensor {}'.format(e,key))

                geometry_string = 'NULL'
                #create an empty dataframe with the necessary columns which can be converted to a Json
                d = {'timestamp': [timestampKey], 'latitude': [np.NaN], 'longitude': [np.NaN]}
                ldf = pd.DataFrame(data=d)
                ldf.set_index('timestamp', inplace=True)

        else:
            geometry_string = 'NULL'
            #create an empty dataframe with the necessary columns which can be converted to a Json
            d = {'timestamp': [timestampKey], 'latitude': [np.NaN], 'longitude': [np.NaN]}
            ldf = pd.DataFrame(data=d)
            ldf.set_index('timestamp', inplace=True)
                
        # try get measurement dataframe for the current day of data
        try:
            #measurement dataframe
            mdf = measurement_dictionary[key][timestampKey] 
            mdf = mdf.drop('date', axis=1)
        except Exception as e:
             print('No measurement dataframe found at timestampkey: {} for sensor {}'.format(e,key))


        #summaryArray = [timestamp_start,sensor_id,bouding_box,measurement_count]
        summaryArray = [timestampKey,int(key),geometry_string,len(mdf.index.values)] #inserting row into temp array
        sensor_summaries[timestamp_sensor_key] = summaryArray    #assign new dataframe to coressponding key

        #dataArray = [id, mesaurement_json,location_json]
        dataArray = [mdf.to_json(orient="columns"),ldf.to_json(orient="columns")]
        sensor_data[timestamp_sensor_key] = dataArray    #assign new dataframe to coressponding key    

# Exporting to PostgresSQL 


# Writing records from a Dictionary of arrays to a SQL database
loop over all the keys and execute insert query

In [15]:
#Connecting to an existing database
con = psycopg2.connect(
    host="localhost",
    database="airQuality",
    user="Riyad", 
    password="123",
    # attempt to connect for 3 seconds then raise exception
    connect_timeout = 3)

In [16]:
#Opening a cursor to execute database operations
cursor = con.cursor()
query = "SELECT * FROM sensor_network.sensors"
#change dataframe to csv and save file
sensorsdf = pd.read_sql_query(query, con, index_col='plume_id')
sensorsdf = sensorsdf.convert_dtypes() #convert to correct types
cursor.close()

In [17]:
sensorsdf.head()

Unnamed: 0_level_0,sensor_serial_number,id,type_id,zephyr_id,active,last_update
plume_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
16701.0,02:00:00:00:40:45,5,1,,False,
18683.0,02:00:00:00:48:03,6,1,,False,
18720.0,02:00:00:00:48:28,8,1,,False,
18749.0,02:00:00:00:48:45,9,1,,False,
18704.0,02:00:00:00:48:18,10,1,,False,


In [18]:
for key in sensor_summaries:

    #split key to get only sensorid
    s = key.split('_')[1]
    #get the new key from sensors table
    sensor_id = sensorsdf.loc[int(s)]['id']

    summary_df = sensor_summaries[key]
    data_df = sensor_data[key]

    try:
        #Opening a cursor to execute database operations
        cursor = con.cursor()

        #inserting sensor data and return the id of new record
        cursor.execute("INSERT INTO sensor_data.archive_measurements (measurements,locations) VALUES(%s, %s) \n RETURNING id", (str(data_df[0]),str(data_df[1])) )
        con.commit() 
        
        #set id of new record into local variable
        sensor_data_id = cursor.fetchone()[0]

        #inserting sensor summary
        if summary_df[2] == 'NULL':
            cursor.execute("INSERT INTO sensor_network.sensor_summaries (timestamp_start, sensor_id, b_box, sensor_data_id, measurement_count) VALUES(%s, %s, NULL, %s, %s)", (int(summary_df[0]), int(sensor_id),int(sensor_data_id),int(summary_df[3])))
        else:
            cursor.execute("INSERT INTO sensor_network.sensor_summaries (timestamp_start, sensor_id, b_box, sensor_data_id, measurement_count) VALUES(%s, %s, %s, %s, %s)", (int(summary_df[0]), int(sensor_id),str(summary_df[2]),int(sensor_data_id),int(summary_df[3])))
        
        con.commit() 

        # update sensor table 
        current_date = datetime.fromtimestamp(int(key.split('_')[0]))
        cursor.execute("UPDATE sensor_network.sensors SET last_update = %s WHERE id = %s", (current_date,int(sensor_id)))
        con.commit() 

        cursor.close()
    #if table name does not exist exit loop 
    except(psycopg2.errors.UndefinedTable) as error:
        print('ERROR: ' + error)
        break

In [19]:
#closing the connection
con.close()