## Bulk insert in situ soil moisture data into database</br>

This notebook shows how to read data from a text file in json format into a Pandas dataframe and then bulk insert it into a table in a Postgres database.

Brandi Downs, October 2024

In [None]:
import psycopg2
import configparser
import getpass
import pandas as pd
import os
import time
from io import StringIO
import glob
import numpy as np
import json
import csv

In [None]:
# Read in data
fpath = '/home/jovyan/sds-ondemand/data/inundation/staging/'
files = sorted(glob.glob(fpath + '/*.txt'))
files

In [None]:
pd.set_option('display.max_colwidth', None)
temp = []
with open(files[1], 'r') as file:
    for line in file:
        temp.append(json.loads(line.strip()))
new_measurements = pd.json_normalize(temp, sep='_')
new_measurements_original = new_measurements.copy()
new_measurements['Timestamp'] = pd.to_datetime(new_measurements['Timestamp'])
new_measurements

To connect to the database, you need the database connection properties stored in a configuration file.</br> 
The configuration file should look like this and be saved in .ini format:

[DEFAULT] </br>
dbname=database_name </br>
host=host_address </br>
port=port_number </br>

where database_name, host_address, and port_number are replaced by their values without quotations.</br>
</br>
<font size=4>Example contents of <tt>calvaldb_config.ini</tt>:</font></br>
[DEFAULT] </br>
dbname=my_db </br>
host=nisar-dev.abcdef123456.us-west-2.rds.amazonaws.com </br>
port=1234 </br>

In [None]:
# Establish a connection to the database

config_file = '~/sds-ondemand/calvaldb_config.ini'

def get_config_data():
    
    file = os.path.expanduser(config_file)

    # Check if the file exists
    if not os.path.exists(file):
        raise FileNotFoundError(f"Configuration file not found: {file}")

    # Parse the configuration file
    config = configparser.ConfigParser()
    try:
        config.read(file)
    except Exception as e:
        raise Exception(f"Configuration file error: {e}")

    # Get database connection properties
    conn_properties = config['DEFAULT']
    dbname = conn_properties.get('dbname')
    host = conn_properties.get('host')
    port = conn_properties.get('port')
    if not dbname or not host or not port:
        raise Exception("Missing required properties")
        
    # Get username and password    
    user = input('User name: ')
    password = getpass.getpass('Password: ')
    
    return dbname, host, port, user, password

dbname, host, port, user, password = get_config_data()
conn = psycopg2.connect(dbname=dbname, host=host, port=port, user=user, password=password)    
cur = conn.cursor()


In [None]:
#conn.rollback()

In [None]:
# List soil moisture calval sites

# Get column names
schema_name = 'soil_moisture'
table_name = 'site'
query = "SELECT column_name FROM information_schema.columns WHERE table_schema = %(schema_name)s AND table_name = %(table_name)s;"
cur.execute(query, {'schema_name': schema_name, 'table_name': table_name})
type_cols = cur.fetchall()
type_cols = [k[0] for k in type_cols]

# Get types table in Pandas DataFrame
query = "SELECT * FROM soil_moisture.site ORDER BY site_id;"
cur.execute(query);
sites = pd.DataFrame(cur.fetchall(), columns=type_cols)
sites.head()

In [None]:
# List soil moisture sensors

# Get column names
schema_name = 'soil_moisture'
table_name = 'sensor'
query = "SELECT column_name FROM information_schema.columns WHERE table_schema = %(schema_name)s AND table_name = %(table_name)s;"
cur.execute(query, {'schema_name': schema_name, 'table_name': table_name})
type_cols = cur.fetchall()
type_cols = [k[0] for k in type_cols]

# Get types table in Pandas DataFrame
query = "SELECT * FROM soil_moisture.sensor ORDER BY sensor_id;"
cur.execute(query);
sensors = pd.DataFrame(cur.fetchall(), columns=type_cols)
sensors


In [None]:
# Assign sensor_id to new_measurements
new_measurements['sensor_id'] = 1
new_measurements.head()

In [None]:
#conn.rollback()

In [None]:
# List existing soil moisture measurements in database and get last measurement_id

# Get column names
schema_name = 'soil_moisture'
table_name = 'measurement'
query = "SELECT column_name FROM information_schema.columns WHERE table_schema = %(schema_name)s AND table_name = %(table_name)s;"
cur.execute(query, {'schema_name': schema_name, 'table_name': table_name})
type_cols = cur.fetchall()
type_cols = [k[0] for k in type_cols]

query = "SELECT * FROM soil_moisture.measurement ORDER BY measurement_id DESC LIMIT 1;"
cur.execute(query);
measurements = pd.DataFrame(cur.fetchall(), columns=type_cols)
measurements


In [None]:
# Rename columns to match column names in database
col_map = {'Timestamp': 'datetime_utc', 
           'Payload Data_battery voltage (V)': 'battery_voltage_v',
           'Payload Data_soil moisture (%)': 'soil_moisture_pct',
           'Payload Data_soil temperature (C)': 'soil_temperature_c',
           'Payload Data_soil temperature (F)': 'soil_temperature_f',
           'Payload Data_soil conductivity (uS/cm)': 'soil_conductivity_us_cm'}
new_measurements = new_measurements.rename(columns = col_map)
new_measurements.head()

In [None]:
# Create new dataframe that exactly matches the measurement table in the database to prepare for uploading

df = new_measurements[['sensor_id','datetime_utc','soil_moisture_pct','soil_conductivity_us_cm','soil_temperature_c','soil_temperature_f','battery_voltage_v']]
df = df.sort_values(by="datetime_utc")
df

In [None]:
# # Ensure new data to be uploaded has not already been uploaded to the database by comparing measurement_ids from the csv
# # and the database

# last_old_meas_id = M['measurement_id'][0]    # the last (most recent) measurement_id from the calval database
# first_new_meas_id = df['measurement_id'][0]  # the first measurement_id from the new dataset / csv
# print(last_old_meas_id, first_new_meas_id)

# # if measurement_id already exists in database, subset the new dataframe to include 
# # only those measurement ids that have not yet been uploaded to the database
# if first_new_meas_id <= last_old_meas_id:
#     print('Measurement ID ' + str(first_new_meas_id) + ' already exists in database. Subsetting dataframe.')
#     next_meas_id = last_old_meas_id + 1
#     df = df.loc[df['measurement_id'] >= next_meas_id]
# else:
#     print('Measurement ID ' + str(first_new_meas_id) + ' does not alreay exist in database. Okay to proceed.')

In [None]:
# If a query results in an error, roll the cursor back to the last transaction with:
#conn.rollback()

### For this section, the column names in the dataframe must be the same as the column names in the database table.


In [None]:
measurements.tail()

In [None]:
df.head()

In [None]:
#conn.rollback()

In [None]:
# StringIO() is an in-memory stream for text that gives you file-like access to strings.
sio = StringIO()
writer = csv.writer(sio)
writer.writerows(df.values)

# set the stream position
sio.seek(0)

# Insert dataframe into table
start_time = time.time() 

sql_query = """COPY soil_moisture.measurement (sensor_id, datetime_utc, soil_moisture_pct, soil_conductivity_us_cm, soil_temperature_c, soil_temperature_f, battery_voltage_v) 
            FROM STDIN WITH CSV"""
cur.copy_expert(sql=sql_query, file=sio)
conn.commit()

end_time = time.time() 
total_time = end_time - start_time 
print(f"Insert time: {total_time} seconds") 

In [None]:
# Close the cursor and connection to the database

cur.close()
conn.close()