In [1]:
import psycopg2 as psy
from datetime import datetime, timedelta
import json

from osgeo import ogr
from osgeo import osr
from datetime import datetime, timedelta

import numpy as np
import tiledb
import os

  """)


In [2]:
# create testdb
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

con = psy.connect("dbname=postgres host=timescaledb user=postgres password=foobar")

with con:
    con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    with con.cursor() as cur:
        cur.execute("DROP DATABASE IF EXISTS testdb;")
        cur.execute('CREATE DATABASE testdb')

# not sure if the AUTOCOMMIT is needed
with con:
    con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    with con.cursor() as cur:   
        cur.execute("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;")
con.close()

In [3]:
con = psy.connect("dbname=testdb host=timescaledb user=postgres password=foobar")

with con:
    with con.cursor() as cur:   
        cur.execute("CREATE EXTENSION IF NOT EXISTS postgis;")
        cur.execute("SELECT postgis_version();")
        print(cur.fetchall())
        cur.execute("SELECT srid, auth_name, proj4text FROM spatial_ref_sys LIMIT 3;")
        for x in cur.fetchall():
            print(x)
        cur.execute("SELECT srid, auth_name, proj4text FROM spatial_ref_sys WHERE srid = 3003")
        print(cur.fetchall())
con.close()

[('2.5 USE_GEOS=1 USE_PROJ=1 USE_STATS=1',)]
(3819, 'EPSG', '+proj=longlat +ellps=bessel +towgs84=595.48,121.69,515.35,4.115,-2.9383,0.853,-3.408 +no_defs ')
(3821, 'EPSG', '+proj=longlat +ellps=aust_SA +no_defs ')
(3824, 'EPSG', '+proj=longlat +ellps=GRS80 +towgs84=0,0,0,0,0,0,0 +no_defs ')
[(3003, 'EPSG', '+proj=tmerc +lat_0=0 +lon_0=9 +k=0.9996 +x_0=1500000 +y_0=0 +ellps=intl +towgs84=-104.1,-49.1,-9.9,0.971,-2.917,0.714,-11.68 +units=m +no_defs ')]


In [4]:
source = osr.SpatialReference()
source.ImportFromEPSG(4326)

target = osr.SpatialReference()
target.ImportFromEPSG(3003)

transform = osr.CoordinateTransformation(source, target)

def map_to_monte_mario(wkt):
    geom = ogr.CreateGeometryFromWkt(wkt)
    geom.Transform(transform)
    return geom.ExportToWkt()


In [5]:
def drop_table(conn, tname):
    SQL = "DROP TABLE IF EXISTS %s" % tname
    with conn:
        with conn.cursor() as cur:
            cur.execute(SQL)            

def drop_events_table(conn):
    drop_table(conn, 'events')
    
def drop_positions_table(conn):
    drop_table(conn, 'positions')

def create_events_table(conn):
    SQL = """
        CREATE TABLE events (
            time   TIMESTAMPTZ NOT NULL,
            sensor int4        NOT NULL);
        """
    with conn:
        with conn.cursor() as cur:
            cur.execute(SQL)
    SQL = "SELECT create_hypertable('events', 'time');"
    with conn:
        with conn.cursor() as cur:
            cur.execute(SQL)
            
def create_positions_table(conn):
    query = """
        CREATE TABLE positions (
            sensor    int4 primary key,
            name      VARCHAR(20),
            type      VARCHAR(20),
            category  VARCHAR(20),
            geom      geometry,
            precision REAL
        );
    """
    with conn:
        with conn.cursor() as cur:
            cur.execute(query)


In [6]:
con = psy.connect("dbname=testdb host=timescaledb user=postgres password=foobar")
drop_positions_table(con)
drop_events_table(con)
create_positions_table(con)
create_events_table(con)

In [7]:
def generate_point_sensors_definitions(conn, n, m, rect):
    ll = np.array(rect[0])
    ur = np.array(rect[1])
    sensor_types = ['stype%d' % i for i in range(m)]
    positions = ll + (ur - ll) * np.random.uniform(size=2*n).reshape([n, 2])
    labels = np.random.choice(sensor_types, n)
    SQL = """INSERT INTO positions (sensor, name, type, category, geom, precision)
             VALUES (%s, %s, %s, %s, ST_GeomFromText(%s, 3003), 10);"""
    with conn:
        for i, (l, p) in enumerate(zip(labels, positions)):
            with conn.cursor() as cur:
                cur.execute(SQL, 
                    (i, 'foo%s' % i, l, 'meteo',
                     map_to_monte_mario("POINT (%s %s)" % (p[0], p[1]))))
    sensors_by_type = {}
    for i,l in enumerate(labels):
        sensors_by_type.setdefault(l,[]).append(i)
    return sensors_by_type

In [8]:
n_sensors = 10
n_types = 2
rect = ((9.088151, 39.203460), (9.220304, 39.276140))
center = (0.5 * (rect[0][0] + rect[1][0]), 
          0.5 * (rect[0][1] + rect[1][1]))


%time sensors_by_type = generate_point_sensors_definitions(con, n_sensors, n_types, rect)

CPU times: user 3.16 ms, sys: 0 ns, total: 3.16 ms
Wall time: 11.1 ms


In [9]:
with con:
    with con.cursor() as cur:
        cur.execute('SELECT sensor, type, ST_AsText(ST_Transform(geom,4326)) from positions LIMIT 10;')
        for r in cur.fetchall():
            print(r)

(0, 'stype0', 'POINT(9.20346535422244 39.2610924352225)')
(1, 'stype0', 'POINT(9.21616870234363 39.2224230576244)')
(2, 'stype1', 'POINT(9.18227645177556 39.2069468930276)')
(3, 'stype1', 'POINT(9.16246496301775 39.2510251684259)')
(4, 'stype1', 'POINT(9.2017921778295 39.2250639046251)')
(5, 'stype1', 'POINT(9.15687199322154 39.2669552357252)')
(6, 'stype0', 'POINT(9.22029303374086 39.2113068367249)')
(7, 'stype0', 'POINT(9.17912085859247 39.2648172671239)')
(8, 'stype1', 'POINT(9.16244300274994 39.2734409044244)')
(9, 'stype0', 'POINT(9.21882094335519 39.2496724182223)')


In [10]:
def generate_point_sensor_type_table(conn, tname):
    SQL1 = """
        CREATE TABLE {} (
            sensor    int4,
            time   TIMESTAMPTZ NOT NULL,
            temperature REAL,
            humidity    REAL,
            foo    REAL
        );
    """.format(tname)
    SQL2 = "SELECT create_hypertable('{}', 'time');".format(tname)
    SQL3 = "CREATE INDEX {}_index on {}(sensor);".format(tname, tname)
    with conn:
        with conn.cursor() as cur:
            cur.execute(SQL1)
            cur.execute(SQL2)                        
            cur.execute(SQL3)


def generate_point_sensor_type_tables(conn, m):
    sensor_types = ['stype%d' % i for i in range(m)]    
    for tname in sensor_types:
        generate_point_sensor_type_table(conn, tname)

In [11]:
%time generate_point_sensor_type_tables(con, n_types)

CPU times: user 3.19 ms, sys: 201 µs, total: 3.39 ms
Wall time: 30.7 ms


In [12]:
def generate_time_series(n_events, start_date, plambda_secs, n_fields):
    deltas = np.cumsum(np.random.poisson(lam=plambda_secs, size=n_events))
    tstamps = [start_date + timedelta(seconds=int(t)) for t in deltas]
    data = np.random.uniform(size=n_fields*n_events).reshape(n_events, 
                                                             n_fields)
    return tstamps, data


def generate_point_sensors_data(conn, n_events, sensors_by_type,
                                start_date, plambda_secs):
    # FIXME not very idiomatic...
    n_sensors = sum(len(sensors_by_type[t]) for t in sensors_by_type)
    type_by_sensor = [None] * n_sensors
    for t in sensors_by_type:
        for i in sensors_by_type[t]:
            type_by_sensor[i] = t
    type_by_sensor = np.array(type_by_sensor)
    # --
    sensors = np.random.choice(n_sensors, n_events)
    stypes = type_by_sensor[sensors]
    tstamps, data = generate_time_series(n_events, start_date, plambda_secs, 3)
    SQL1 = "INSERT INTO events (time, sensor) VALUES (%s, %s);"
    SQL2 = """INSERT INTO {} (sensor, time, temperature, humidity, foo) 
              VALUES (%s, %s, %s, %s, %s);"""
    with conn:
        for i, st, t, d in zip(sensors, stypes, tstamps, data):
            with conn.cursor() as cur:
                cur.execute(SQL1, (t, int(i)))
                cur.execute(SQL2.format(st),
                            (int(i), t, d[0], d[1], d[2]))


In [13]:
n_events = 100
tstart = datetime.now()
plambda_sec = 5

%time generate_point_sensors_data(con, n_events, sensors_by_type, tstart, plambda_sec)

CPU times: user 16.3 ms, sys: 23 µs, total: 16.3 ms
Wall time: 74.7 ms


In [16]:
SQL = """
SELECT p.type, p.sensor, ST_AsText(ST_Transform(p.geom,4326))
FROM events e, positions p
WHERE e.time > %s AND e.time < %s
      AND ST_DWithin(p.geom, ST_GeomFromText(%s, 3003), %s)
      AND e.sensor = p.sensor
ORDER BY p.type;
"""

SQL1 = """
SELECT p.type, p.sensor, ST_AsText(ST_Transform(p.geom,4326))
FROM events e, positions p
WHERE e.time > %s AND e.time < %s
      AND e.sensor = p.sensor
ORDER BY p.type;
"""


t1 = datetime.now() - timedelta(minutes=10)
t2 = datetime.now()
pos = 'POINT(%s %s)' % center
radius = 10000

with con:
    with con.cursor() as cur:
        cur.execute(SQL, (t1, t2, map_to_monte_mario(pos), radius))
        # cur.execute(SQL1, (t1, t2))
        for r in cur.fetchall():
            print(r)

('stype0', 6, 'POINT(9.22029303374086 39.2113068367249)')
('stype0', 0, 'POINT(9.20346535422244 39.2610924352225)')
('stype1', 3, 'POINT(9.16246496301775 39.2510251684259)')
('stype1', 2, 'POINT(9.18227645177556 39.2069468930276)')
