In [1]:
from datetime import datetime
import numpy as np
import psycopg2 as pg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, register_adapter, AsIs
import time
import ast
import random

register_adapter(np.uint16, AsIs)
register_adapter(np.int64, AsIs)
register_adapter(np.int32, AsIs)
register_adapter(np.float64, AsIs)

# Build a New Database and Table.

In [2]:
u = p = 'postgres' # User and password. 
database_name = 'acs_test'
table_name = 'acs'

In [3]:
maintcon = pg2.connect(f"dbname=postgres user={u} password={p}") # Maintenance connection.
maintcon.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
maintcur = maintcon.cursor()

In [4]:
try: # Try to create the database if it does not exist.
    statement = f'CREATE DATABASE {database_name}'
    maintcur.execute(statement)
except:
    pass

In [5]:
dbcon = pg2.connect(f"dbname={database_name} user={u} password={p}") # Database connection.
dbcon.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
dbcur = dbcon.cursor()

In [6]:
pk = 'time'
fields_dtypes = {'time': 'TIMESTAMP',  # Create a table with fields time, wavelength, a, and current (text data)
                 'wavelength': 'FLOAT[]',
                 'a': 'FLOAT[]',
                 'current': 'TEXT'}
fields_dtypes_str = ', '.join([' '.join([k, v]) for k, v in fields_dtypes.items()]) + f", PRIMARY KEY ({pk})"
statement = f"CREATE TABLE IF NOT EXISTS {table_name}({fields_dtypes_str})"
dbcur.execute(statement)

# Generate Test Data

In [7]:
for i in range(10000):
    wvls = list(np.arange(410,750,4)) # Generate ACS-like bins.
    vals = [random.uniform(0,1) for i in range(len(wvls))] # Create random floats between 0 and 1.
    txt = str(dict(zip(wvls, vals)))
    insert_data = {'time': datetime.now(),
                   'wavelength': wvls,
                   'a': vals,
                   'current': txt}
    fields = ', '.join(list(insert_data.keys()))
    data = list(insert_data.values())
    values = ', '.join(['%s'] * len(data))
    statement = f"INSERT INTO {table_name} ({fields}) VALUES ({values})"
    dbcur.execute(statement, data)
    time.sleep(0.05)

# Get Size of Columns

In [8]:
statement  = 'SELECT sum(pg_column_size(current)) FROM acs'
dbcur.execute(statement)
current_size = dbcur.fetchall()

statement  = 'SELECT sum(pg_column_size(wavelength)) FROM acs'
dbcur.execute(statement)
wavelength_size = dbcur.fetchall()

statement  = 'SELECT sum(pg_column_size(a)) FROM acs'
dbcur.execute(statement)
a_size = dbcur.fetchall()

In [9]:
print(f'Text Storage Size: {current_size[0][0]} bytes')
print(f'Array Storage Size: {wavelength_size[0][0] + a_size[0][0]} bytes')

Text Storage Size: 21479896 bytes
Array Storage Size: 14080000 bytes


# Get Time to Obtain Data

In [15]:
%%time
statement = 'SELECT time, wavelength,a from ACS'
dbcur.execute(statement)
array_data = dbcur.fetchall()

CPU times: total: 328 ms
Wall time: 580 ms


In [16]:
%%time
statement = 'SELECT time, current from ACS'
dbcur.execute(statement)
text_data = dbcur.fetchall()

CPU times: total: 0 ns
Wall time: 97.3 ms


# Convert data to lists.

In [24]:
%%time
wvls = [i[1] for i in array_data]
a = [i[2] for i in array_data]

CPU times: total: 0 ns
Wall time: 1.06 ms


In [25]:
%%time
txt = [i[1] for i in text_data]
no_txt = [ast.literal_eval(v) for v in txt] 
txt_wvls = []
txt_a = []
for item in no_txt:
    txt_wvls.append(list(item.keys()))
    txt_a.append(list(item.values()))

CPU times: total: 2.53 s
Wall time: 2.66 s
