# Advanced Queries

In [1]:
import time
import statistics as stats
start_program = time.time()

In [2]:
import random
import time
from tqdm import tqdm
    
def str_time_prop(start, end, time_format, prop):
    """Get a time at a proportion of a range of two formatted times.

    start and end should be strings specifying times formatted in the
    given format (strftime-style), giving an interval [start, end].
    prop specifies how a proportion of the interval to be taken after
    start.  The returned time will be in the specified format.
    """

    stime = time.mktime(time.strptime(start, time_format))
    etime = time.mktime(time.strptime(end, time_format))

    ptime = stime + prop * (etime - stime)

    return time.strftime(time_format, time.localtime(ptime))


def random_date(start, end, prop, dform = '%Y-%m-%dT%H:%M:%S'):
    return str_time_prop(start, end, dform, prop)
    
def get_list(elm, n_elm, max_r = 10, prefix = '', suffix = '', apostrophe = True):
    res = ''
    elms = random.sample(range(max_r), n_elm)
    for i in range(n_elm): 
        item = prefix + elm + str(elms[i]) +  suffix 
        if apostrophe: 
            item = "'" + item + "'"
        res += item 
        if i < n_elm - 1: 
            res += ", "
    return res

import math

def percentile(data, perc: int):
    size = len(data)
    return sorted(data)[int(math.ceil((size * perc) / 100)) - 1]


In [3]:
query1, query2, query3, query4 , query5 = [{},{}],[{},{}],[{},{}],[{},{}],[{},{}]


In [4]:
max_duration = {
    1: 5,
    2: 5,
    3: 5,
    4: 5,
    5: 5
}
rangesUnit = {
    1: "minute",
    2: "minute",
    3: "minute",
    4: "minute",
    5: "minute"
}

n_it = 10


In [5]:
random.seed(1)

set_st = [str(random.randint(0,9)) for i in range(500)]
set_s = [str(random.randint(0,99)) for i in range(500)]
set_date = [random.random() for i in range(500)]


# Druid

In [6]:
# A simple class
# attribute
d_q1 = """WITH series AS (
   SELECT __time, "value" FROM d1 WHERE id_station='st<stid>' AND s = 's<sid>'
   AND __time > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> 
   and __time < TIMESTAMP '<timestamp>'
),
stats AS (
   SELECT
       avg("value") as series_mean ,
       stddev("value") as series_stddev
   FROM
       series
)
SELECT
   __time, "value",
   ("value" - series_mean) / CASE WHEN series_stddev = 0 THEN 1 ELSE series_stddev END as zscore
FROM
   series,
   stats"""


d_q2 = """
WITH series AS (
   SELECT  "__time", "value" FROM d1 WHERE id_station='st<stid>' AND s = 's<sid>' AND __time > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> AND __time < TIMESTAMP '<timestamp>'
),
bounds AS (
   SELECT
       avg("value") - stddev("value") AS lower_bound,
       avg("value") + stddev("value") AS upper_bound
   FROM
       series
)
SELECT
   "__time", "value",
   "value" NOT BETWEEN lower_bound AND upper_bound AS is_anomaly
FROM
   series,
   bounds
"""

# d_q3 = """select id_station, AVG("value") FROM d1 
#     where __time > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> \
#     and __time < TIMESTAMP '<timestamp>' and s = 's<sid>'
#     GROUP BY id_station"""



d_q4 = """
WITH t1 AS (
  SELECT "__time", "value" as s1 FROM d1 WHERE "id_station"='st<stid>' AND "s" = 's<sid1>'
  AND "__time" > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit>
  and "__time" < TIMESTAMP '<timestamp>'
),
t2 AS (
  SELECT "__time", "value" as s2 FROM d1 WHERE "id_station"='st1' AND "s" = 's<sid2>'
  AND "__time" > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit>
  and "__time" < TIMESTAMP '<timestamp>'
)
SELECT ((SUM(s1 * s2) - (SUM(s1) * SUM(s2)) / COUNT(*))) /
  (SQRT(SUM(s1 * s1) - (SUM(s1) * SUM (s1)) / COUNT(*)) * SQRT(SUM(s2 * s2) - (SUM(s2) * SUM(s2)) / COUNT(*) )) 
AS pearson_corr 
FROM
  t1,
  t2
WHERE t1."__time" = t2."__time"
"""



d_q5 = """
WITH t1 AS (
  SELECT "__time", "value" as s1 FROM d1 WHERE "id_station"='st<stid>' AND "s" = 's<sid1>'
  AND "__time" > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit>
  and "__time" < TIMESTAMP '<timestamp>'
),
t2 AS (
  SELECT "__time", "value" as s2 FROM d1 WHERE "id_station"='st<stid>' AND "s" = 's<sid2>'
  AND "__time" > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit>
  and "__time" < TIMESTAMP '<timestamp>'
)
select sum(power(s1-s2,2))
FROM
  t1,
  t2
WHERE t1."__time" = t2."__time"
"""
#druid = Druid()


In [7]:
# query = PyDruid('http://diufrm118:8083', 'druid/v2/')

# ts = query.timeseries(
#     datasource='d1',
#     granularity={"type": "duration", "duration": 5000},
#     aggregations={"value": stringfirst("value")},
#     intervals='2019-03-01/pt1h',
#     filter=Dimension('s') == 's4',
#     context={"skipEmptyBuckets": "false"}   
# )



# # print(ts)

# query.export_pandas()

# # query.execute(d_q5)
# # print((time.time()-start)*1000)


In [8]:
from pydruid.client import *
from pylab import plt
from pydruid.db import connect
from pydruid.utils.aggregators import *
from pydruid.utils.filters import *

class Druid:
 
    # A sample method 
    @staticmethod
    def query(query, max_d, rangesUnit, n_it):
        conn = connect(host='diufrm102', port=8082, path='/druid/v2/sql/', scheme='http')
        curs = conn.cursor()
        curs.execute("select * FROM d1 where id_station in ('st5') and s='s14' and __time > TIMESTAMP '2019-03-04 00:00:00' - INTERVAL '1' DAY and __time < TIMESTAMP '2019-03-04 00:00:00' ")
        curs.fetchall()
        results = [[],[]]
        for duration in tqdm(range(int(max_d/5), max_d + 1, int(max_d/5))):
            runtimes = []
            for i in range(n_it):
                date = random_date("2019-04-01 00:00:00", "2019-05-01 00:00:00", set_date[(duration*i)%500], dform = '%Y-%m-%d %H:%M:%S')
                temp = query.replace("<timestamp>", date)
                temp = temp.replace("<nb>", str(duration))
                temp = temp.replace("<sid>", str(set_s[(duration*i)%500]))
                temp = temp.replace("<sid1>", str(set_s[(duration*i)%500]))
                temp = temp.replace("<sid2>", str(set_s[(duration*(i+1))%500]))
                temp = temp.replace("<stid>", str(set_st[(duration*i)%500]))
                temp = temp.replace("<rangesUnit>", str(rangesUnit))
                start = time.time()
#                 print(temp)
                curs.execute(temp)
                curs.fetchall()
#                 print(temp, curs.rowcount)
                runtimes.append((time.time()-start)*1000)
            #print(runtimes)
            print(temp)
            results[0].append(stats.mean(runtimes))
            results[1].append(stats.stdev(runtimes))
        conn.close()
        return results[0],results[1]
            
    


In [None]:
query1[0]["druid"],query1[1]["druid"] = Druid.query(d_q1, max_duration[1], rangesUnit[1], n_it)


In [None]:
query2[0]["druid"],query2[1]["druid"] = Druid.query(d_q2, max_duration[2], rangesUnit[2], n_it)


In [None]:
query4[0]["druid"],query4[1]["druid"] = Druid.query(d_q4, max_duration[4], rangesUnit[4], n_it)


In [None]:
query5[0]["druid"],query5[1]["druid"] = Druid.query(d_q5, max_duration[5], rangesUnit[5], n_it)


# eXtremeDB

In [9]:
# A simple class
# attribute
e_q1 = """
select seq_search(t,<timestamp> - <nb> * <rangesUnit>,<timestamp>) as tt, s<sid>@tt,(s<sid>@tt - (select seq_search(t,<timestamp> - <nb> * <rangesUnit>,<timestamp>) as tt,
	seq_avg(s<sid>@tt) 
	from d1_v
	where id_station='st<stid>'))/(select seq_dev(s<sid>) as series_stddev 
	from d1_v where id_station='st<stid>') 
	from d1_v where id_station='st<stid>';
"""


e_q2 = """
select seq_search(t,<timestamp> - <nb> * <rangesUnit>,<timestamp>) as tt, s<sid>@tt,s<sid>@tt < 
(select diff from (select seq_search(t,<timestamp> - <nb> * <rangesUnit>,<timestamp>) as tt, 
	seq_avg(s<sid>@tt)-seq_dev(s<sid>@tt) as diff
	from d1_v where id_station='st<stid>')) 
or s<sid>@tt > (select diff from (select seq_search(t,<timestamp> - <nb> * <rangesUnit>,<timestamp>) as tt, 
	seq_avg(s<sid>@tt)+seq_dev(s<sid>@tt) as diff
	from d1_v where id_station='st<stid>')) as is_anomaly  
from d1_v 
where id_station='st<stid>';
"""


# e_q3 = """
# select id_station,seq_filter_pos(t > <timestamp>) as period,
# seq_avg(s<sid>@period) as mean,seq_sum(s<sid>@period*60)/(60*61/2) as weighted_mean 
# from d1_v;
# """

e_q3 = """
select seq_search(t,<timestamp> - <nb> * <rangesUnit>,<timestamp>) as tt, seq_wavg(s<sid1>@tt,s<sid2>@tt) FROM d1_v WHERE id_station = 'st<stid>';
"""

e_q4 = """
select seq_search(t,<timestamp> - <nb> * <rangesUnit>,<timestamp>) as tt, seq_corr(s<sid1>@tt,s<sid2>@tt) FROM d1_v WHERE id_station = 'st<stid>';
"""

e_q5 = """
select seq_search(t,<timestamp> - <nb> * <rangesUnit>,<timestamp>) as tt, seq_sum(seq_pow(abs(s<sid1>@tt-s<sid2>@tt),2)) FROM d1_v WHERE id_station = 'st<stid>';
"""


In [10]:
from tqdm import tqdm 
import exdb 
import datetime
exdb.init_runtime(debug = False, shm = False, disk = False, tmgr = 'mursiw', UsePerfmon = True)
con = exdb.connect('diufrm118', 5001)
curs = con.cursor()
res = []
for i in range(10):
    start = time.time()
    curs.execute("select seq_search(t,1555315999 - 12 * 86400,1555315999) as tt, !seq_filter_search(s87@tt > 0.95, tt) as fe, s87@fe FROM d1_v WHERE id_station = 'st4';")
    #curs.fetchall()
    res.append((time.time()-start)*1000)
con.close()
print(res)

[80.41763305664062, 79.60247993469238, 78.77135276794434, 78.70340347290039, 78.70674133300781, 78.72343063354492, 78.69863510131836, 78.70984077453613, 78.67574691772461, 78.62997055053711]


In [11]:
class EXtremeDB:
 
    # A sample method 
    @staticmethod
    def query(query, max_d, rangesUnit, n_it):
        # map the inputs to the function blocks
        import exdb 
        import datetime
        exdb.init_runtime(debug = False, shm = False, disk = False, tmgr = 'mursiw')
        con = exdb.connect('diufrm118', 5001)
        curs = con.cursor()
        curs.execute("SELECT s23 FROM d1_v where id_station = 'st3'")
        curs.fetchall()
        results = [[],[]]
        options = {"day" : 60 * 60* 24,
                   "week" : 60 * 60* 24 * 7,
                   "minute" : 60,
                   "hour" : 60 * 60,
                   "second" : 1,
                   "month" : 60 * 60 * 24 * 30,
                   "year" :  60 * 60 * 24 * 30 * 12
        }
        for duration in tqdm(range(int(max_d/5), max_d + 1, int(max_d/5))):
            runtimes = []
            for i in range(n_it):
                date = random_date("2019-04-01 00:00:00", "2019-05-01 00:00:00", set_date[(duration*i)%500], dform = '%Y-%m-%d %H:%M:%S')
                date = int(time.mktime(datetime.datetime.strptime(date, '%Y-%m-%d %H:%M:%S').timetuple()))
                temp = query.replace("<timestamp>", str(date))
                temp = temp.replace("<nb>", str(duration))
                temp = temp.replace("<sid>", str(set_s[(duration*i)%500]))
                temp = temp.replace("<sid1>", str(set_s[(duration*i)%500]))
                temp = temp.replace("<sid2>", str(set_s[(duration*(i+1))%500]))
                temp = temp.replace("<stid>", str(set_st[(duration*i)%500]))
                temp = temp.replace("<rangesUnit>", str(options[rangesUnit]))
#                 print(temp)
                start = time.time()
                curs.execute(temp)
                # print(curs.rowcount)
                curs.fetchall()
                diff = (time.time()-start)*1000
                #print(temp, diff)
                runtimes.append(diff)
            print(runtimes)
            #print(temp)
            results[0].append(stats.mean(runtimes))
#             results[1].append(percentile(runtimes,95))
            results[1].append(stats.stdev(runtimes))
        con.close()
        return results


In [None]:
query1[0]["extreme"],query1[1]["extreme"] = EXtremeDB.query(e_q1, max_duration[1], rangesUnit[1], n_it)
query1[0]["extreme"],query1[1]["extreme"] 

In [None]:
query2[0]["extreme"],query2[1]["extreme"] = EXtremeDB.query(e_q2, max_duration[2], rangesUnit[2], n_it)
query2[0]["extreme"],query2[1]["extreme"] 

In [None]:
query3[0]["extreme"],query3[1]["extreme"] = EXtremeDB.query(e_q3, max_duration[3], rangesUnit[3], n_it)
query3[0]["extreme"],query3[1]["extreme"]
# https://www.mcobject.com/docs/extremedb.htm#Users_Guides/SQL/SQL_Language_Reference/Functions/Window_Functions.htm

In [None]:
query4[0]["extreme"],query4[1]["extreme"] = EXtremeDB.query(e_q4, max_duration[4], rangesUnit[4], n_it)
query4[0]["extreme"],query4[1]["extreme"] 

In [None]:
query5[0]["extreme"],query5[1]["extreme"] = EXtremeDB.query(e_q5, max_duration[5], rangesUnit[5], n_it)
query5[0]["extreme"],query5[1]["extreme"] 

# Influx

# MonetDB

In [12]:
m_q1 = """WITH series AS (
   SELECT time, s<sid> FROM d1 WHERE id_station='st<stid>' AND time > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> AND time < TIMESTAMP '<timestamp>'
)
SELECT
   time,
   (s<sid> - (avg(s<sid>) OVER ())) / (stddev_samp(s<sid>) OVER ()) as zscore
FROM
   series;"""


m_q2 = """WITH series AS (
   SELECT time, s<sid> FROM  d1 WHERE id_station='st<stid>' AND time > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> AND time < TIMESTAMP '<timestamp>'
),
bounds AS (
   SELECT
       avg(s<sid>) - stddev_samp(s<sid>) AS lower_bound,
       avg(s<sid>) + stddev_samp(s<sid>) AS upper_bound
   FROM
       series
)
SELECT
   time, s<sid>,
   s<sid> NOT BETWEEN lower_bound AND upper_bound AS is_anomaly
FROM
   series,
   bounds;"""


# m_q3 = """SELECT id_station, avg(s<sid>) as mean,
# sum(s<sid> *
# (60 - extract(second from (timestamptz '<timestamp>' - interval '<nb>' <rangesUnit>)))
# ) / (60 * 61 / 2) as weighted_mean
# FROM d1 WHERE "time" > timestamptz '<timestamp>'
# GROUP BY id_station;"""

m_q3 = """
SELECT
id_station,
avg(s<sid>) as mean,
sum(
s<sid> *
(60 - extract(second from (timestamptz '<timestamp>'  - interval '1' minute)))
) / (60 * 61 / 2) as weighted_mean
FROM
d1
WHERE
"time" > timestamptz '<timestamp>'  - interval '1' minute
and "time" < timestamptz '<timestamp>' 
GROUP BY
id_station;
"""

# m_q3 = """
# WITH setup AS (
# 	SELECT lag(s<sid>) OVER (PARTITION BY id_station ORDER BY time) as prev_temp, 
# 		sys.epoch(time) as ts_e, 
# 		sys.epoch(lag(time) OVER (PARTITION BY id_station ORDER BY time)) as prev_ts_e, 
# 		s<sid>, id_station
# 	FROM  d1 WHERE "time" > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> AND "time" < TIMESTAMP '<timestamp>'), 
# nextstep AS (
# 	SELECT CASE WHEN prev_temp is NULL THEN NULL 
# 		ELSE (prev_temp + s<sid>) / 2 * (ts_e - prev_ts_e) END as weighted_sum, 
# 		* 
# 	FROM setup)
# SELECT id_station,
#     avg(s<sid>), -- the regular average
# 	sum(weighted_sum) / (max(ts_e) - min(ts_e)) as time_weighted_average -- our derived average
# FROM nextstep
# GROUP BY id_station;
# """

m_q4 = """
SELECT ((SUM(s<sid1> * s<sid2>) - (SUM(s<sid1>) * SUM(s<sid2>)) / COUNT(*))) / 
    (SQRT(SUM(s<sid1> * s<sid1>) - (SUM(s<sid1>) * SUM (s<sid1>)) / COUNT(*)) * SQRT(SUM(s<sid2> * s<sid2>) - (SUM(s<sid2>) * SUM(s<sid2>)) / COUNT(*) )) 
    AS pearson_corr FROM d1  
    WHERE id_station='st<stid>' AND time > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> AND time < TIMESTAMP '<timestamp>';
"""

m_q5 = """
select sum(power(s<sid1>-s<sid2>,2)) from d1 WHERE id_station='st<stid>' AND time > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> AND time < TIMESTAMP '<timestamp>';
"""


In [13]:
import pymonetdb
import time

class MonetDB:
 
    # A sample method 
    @staticmethod
    def query(query, max_d, rangesUnit, n_it):
        connection = pymonetdb.connect(username="monetdb", port=54320, password="monetdb", hostname="diufrm118", database="mydb")
        cursor = connection.cursor()
        cursor.execute("""select time, s91 FROM d1 where id_station='st4' AND time > TIMESTAMP '2019-03-09T13:43:54' - INTERVAL '3' day AND time < TIMESTAMP '2019-03-09T13:43:54'""")
        cursor.fetchall()
        results = [[],[]]
        for duration in tqdm(range(int(max_d/5), max_d + 1, int(max_d/5))):
            runtimes = []
            for i in range(n_it):
                date = random_date("2019-04-01T00:00", "2019-05-01T00:00", set_date[(duration*i)%500], dform = '%Y-%m-%dT%H:%M')
                temp = query.replace("<timestamp>", date)
                temp = temp.replace("<nb>", str(duration))
                temp = temp.replace("<rangesUnit>", str(rangesUnit))
                temp = temp.replace("<sid>", str(set_s[(duration*i)%500]))
                temp = temp.replace("<sid1>", str(set_s[(duration*i)%500]))
                temp = temp.replace("<sid2>", str(set_s[(duration*(i+1))%500]))
                temp = temp.replace("<stid>", str(set_st[(duration*i)%500]))
#                 print(temp)
                start = time.time()
                cursor.execute(temp)
                cursor.fetchall()
                runtimes.append((time.time()-start)*1000)
            #print(temp)
            results[0].append(stats.mean(runtimes))
            results[1].append(percentile(runtimes,95))
#             results[1].append(stats.stdev(runtimes))
        connection.close()
        return results

    # A sample method 
    @staticmethod
    def queryZ(query, max_d, rangesUnit, n_it):
        connection = pymonetdb.connect(username="monetdb", port=54320, password="monetdb", hostname="diufrm118", database="mydb")
        cursor = connection.cursor()
        cursor.execute("""select time, s91 FROM d1 where id_station='st4' AND time > TIMESTAMP '2019-03-09T13:43:54' - INTERVAL '3' day AND time < TIMESTAMP '2019-03-09T13:43:54'""")
        cursor.fetchall()
        results = [[],[]]
        for duration in tqdm(range(int(max_d/5), max_d + 1, int(max_d/5))):
            runtimes = []
            for i in range(n_it):
                date = random_date("2019-04-01 00:00", "2019-05-01 00:00", set_date[(duration*i)%500], dform = '%Y-%m-%d %H:%M')
                temp = query.replace("<timestamp>", date)
                temp = temp.replace("<nb>", str(duration))
                temp = temp.replace("<rangesUnit>", str(rangesUnit))
                temp = temp.replace("<sid>", str(set_s[(duration*i)%500]))
                temp = temp.replace("<stid>", str(set_st[(duration*i)%500]))
                #print(temp)
                start = time.time()
                cursor.execute(temp)
                cursor.fetchall()
                runtimes.append((time.time()-start)*1000)
            #print(temp)
            results[0].append(stats.mean(runtimes))
#             results[1].append(percentile(runtimes,95))
            results[1].append(stats.stdev(runtimes))
        connection.close()
        return results


In [None]:
query1[0]["monetdb"],query1[1]["monetdb"] = MonetDB.query(m_q1, max_duration[1], rangesUnit[1], n_it)


In [None]:
query2[0]["monetdb"],query2[1]["monetdb"] = MonetDB.query(m_q2, max_duration[2], rangesUnit[2], n_it)


In [None]:
query3[0]["monetdb"],query3[1]["monetdb"] = MonetDB.query(m_q3, max_duration[3], rangesUnit[3], n_it)
query3[0]["monetdb"],query3[1]["monetdb"] 

In [None]:
query4[0]["monetdb"],query3[1]["monetdb"] = MonetDB.query(m_q4, max_duration[4], rangesUnit[4], n_it)


In [None]:
query5[0]["monetdb"],query5[1]["monetdb"] = MonetDB.query(m_q5, max_duration[5], rangesUnit[5], n_it)


# QuestDB

In [14]:
# q_q1 = """
# WITH series AS (
#     SELECT time, s<sid> FROM  d1 WHERE id_station='st<stid>' AND ts < '<timestamp>' AND ts >  '<timestamp>' - <nb>*<rangesUnit>* 1000000L '
# ),
# stats AS (
#    SELECT
#        avg(s<sid>) as series_mean ,
#        avg(s<sid>) as series_stddev
#    FROM
#        series
# )
# SELECT
#    s<sid>,
#    (s<sid> - series_mean) / series_stddev as zscore
# FROM
#    series CROSS JOIN stats
# """

# q_q2 = """"""


# q_q3 = """SELECT id_station, avg(s<sid>) as mean,
# sum(s<sid> *
# (60 - extract(second from (timestamptz '<timestamp>' - interval '1' minute)))
# ) / (60 * 61 / 2) as weighted_mean
# FROM d1 WHERE ts IN '<timestamp>;1m'
# GROUP BY id_station;"""

# m_q3 = """
# WITH setup AS (
# 	SELECT lag(s<sid>) OVER (PARTITION BY id_station ORDER BY time) as prev_temp, 
# 		sys.epoch(time) as ts_e, 
# 		sys.epoch(lag(time) OVER (PARTITION BY id_station ORDER BY time)) as prev_ts_e, 
# 		s<sid>, id_station
# 	FROM  d1 WHERE "time" > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> AND "time" < TIMESTAMP '<timestamp>'), 
# nextstep AS (
# 	SELECT CASE WHEN prev_temp is NULL THEN NULL 
# 		ELSE (prev_temp + s<sid>) / 2 * (ts_e - prev_ts_e) END as weighted_sum, 
# 		* 
# 	FROM setup)
# SELECT id_station,
#     avg(s<sid>), -- the regular average
# 	sum(weighted_sum) / (max(ts_e) - min(ts_e)) as time_weighted_average -- our derived average
# FROM nextstep
# GROUP BY id_station;
# """

q_q4 = """
SELECT ((SUM(s<sid1> * s<sid2>) - (SUM(s<sid1>) * SUM(s<sid2>)) / COUNT())) / 
    (SQRT(SUM(s<sid1> * s<sid1>) - (SUM(s<sid1>) * SUM (s<sid1>)) / COUNT()) * SQRT(SUM(s<sid2> * s<sid2>) - (SUM(s<sid2>) * SUM(s<sid2>)) / COUNT() )) 
    AS pearson_corr FROM d1 
    WHERE  id_station='st<stid>' AND ts < '<timestamp>' AND ts >  '<timestamp>' - <nb>*<rangesUnit>* 1000000L
"""


q_q5 = """
select sum(power(s<sid1>-s<sid2>,2)) from d1
WHERE  id_station='st<stid>' AND ts < '<timestamp>' AND ts >  '<timestamp>' - <nb>*<rangesUnit>* 1000000L
"""


In [15]:

class QuestDB:
 
    # A sample method 
    @staticmethod
    def query(query, max_d, rangesUnit, n_it):
        import psycopg2
        import time
        connection = psycopg2.connect(user="admin",
                                          password="quest",
                                          host="diufrm118",
                                          port="8812",
                                          database="d1")
        options = {"day" : 60 * 60* 24,
                   "week" : 60 * 60* 24 * 7,
                   "minute" : 60,
                   "hour" : 60 * 60,
                   "second" : 1,
                   "month" : 60 * 60 * 24 * 30,
                   "year" :  60 * 60 * 24 * 30 * 12
        }
        cursor = connection.cursor()
        cursor.execute("select ts, s9 FROM d1 where id_station='st4' AND ts IN '2019-03-23;1d'")
        cursor.fetchall()
        results = [[],[]]
        for duration in tqdm(range(int(max_d/5), max_d + 1, int(max_d/5))):
            runtimes = []
            for i in range(n_it):
                date = random_date("2019-03-01", "2019-04-01", set_date[(duration*i)%500], dform = '%Y-%m-%d')
                temp = query.replace("<timestamp>", date)
                temp = temp.replace("<nb>", str(duration))
                temp = temp.replace("<rangesUnit>", str(options[rangesUnit]))
                temp = temp.replace("<sid>", str(set_s[(duration*i)%500]))
                temp = temp.replace("<sid1>", str(set_s[(duration*i)%500]))
                temp = temp.replace("<sid2>", str(set_s[(duration*(i+1))%500]))
                temp = temp.replace("<stid>", str(set_st[(duration*i)%500]))
                start = time.time()
                print(temp)
                cursor.execute(temp)
                cursor.fetchall()
                #print(temp, cursor.rowcount)
                #print(len)
                runtimes.append((time.time()-start)*1000)
#             print(temp)
            results[0].append(stats.mean(runtimes))
            print(runtimes)
#             results[1].append(percentile(runtimes,95))
            results[1].append(stats.stdev(runtimes))
        connection.close()
        return results


In [None]:
query4[0]["questdb"],query4[1]["questdb"] = QuestDB.query(q_q4, max_duration[4], rangesUnit[4], n_it)
query4[1]["questdb"]

In [None]:
query5[0]["questdb"],query5[1]["questdb"] = QuestDB.query(q_q5, max_duration[5], rangesUnit[5], n_it)
query5[1]["questdb"]

# TimescaleDB

In [16]:
t_q1 = """WITH series AS (
   SELECT time, s<sid> FROM d1 WHERE id_station='st<stid>' AND time > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> AND time < TIMESTAMP '<timestamp>'
)
SELECT
   time,
   (s<sid> - (avg(s<sid>) OVER ())) / (stddev_samp(s<sid>) OVER ()) as zscore
FROM
   series;"""


t_q2 = """WITH series AS (
   SELECT time, s<sid> FROM  d1 WHERE id_station='st<stid>' AND time > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> AND time < TIMESTAMP '<timestamp>'
),
bounds AS (
   SELECT
       avg(s<sid>) - stddev(s<sid>) AS lower_bound,
       avg(s<sid>) + stddev(s<sid>) AS upper_bound
   FROM
       series
)
SELECT
   time, s<sid>,
   s<sid> NOT BETWEEN lower_bound AND upper_bound AS is_anomaly
FROM
   series,
   bounds;"""


t_q3 = """SELECT id_station, avg(s<sid>) as mean, sum(
s<sid> *
(60 - extract(seconds from '<timestamp>'::timestamptz - interval '1 minute'))
) / (60 * 61 / 2) as weighted_mean
FROM d1 WHERE "time" > TIMESTAMP '<timestamp>'::timestamptz - INTERVAL '1 minute' 
AND time < TIMESTAMP '<timestamp>'
GROUP BY id_station;
;
"""


# t_q3 = """
# WITH setup AS (
# 	SELECT lag(s<sid>) OVER (PARTITION BY id_station ORDER BY time) as prev_temp, 
# 		extract('epoch' FROM time) as ts_e, 
# 		extract('epoch' FROM lag(time) OVER (PARTITION BY id_station ORDER BY time)) as prev_ts_e, 
# 		s<sid>, id_station
# 	FROM  d1 WHERE "time" > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> AND "time" < TIMESTAMP '<timestamp>'), 
# nextstep AS (
# 	SELECT CASE WHEN prev_temp is NULL THEN NULL 
# 		ELSE (prev_temp + s<sid>) / 2 * (ts_e - prev_ts_e) END as weighted_sum, 
# 		*
# 	FROM setup)
# SELECT id_station,
#     avg(s<sid>), -- the regular average
# 	sum(weighted_sum) / (max(ts_e) - min(ts_e)) as time_weighted_average -- our derived average
# FROM nextstep
# GROUP BY id_station;
# """

t_q4 = """
SELECT corr(s<sid1>, s<sid2>)
FROM d1
WHERE id_station='st<stid>' 
AND time > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> 
AND time < TIMESTAMP '<timestamp>';
"""

# t_q4 = """
# SELECT ((SUM(s<sid1> * s<sid2>) - (SUM(s<sid1>) * SUM(s<sid2>)) / COUNT(*))) / 
#     (SQRT(SUM(s<sid1> * s<sid1>) - (SUM(s<sid1>) * SUM (s<sid1>)) / COUNT(*)) * SQRT(SUM(s<sid2> * s<sid2>) - (SUM(s<sid2>) * SUM(s<sid2>)) / COUNT(*) )) 
#     AS pearson_corr FROM d1  
#     WHERE id_station='st<stid>' AND time > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> AND time < TIMESTAMP '<timestamp>';
# """

t_q5 = """
select sum(power(s<sid1>-s<sid2>,2)) from d1
    WHERE id_station='st<stid>' AND time > TIMESTAMP '<timestamp>' - INTERVAL '<nb>' <rangesUnit> AND time < TIMESTAMP '<timestamp>';
"""


In [17]:

class TimescaleDB:
 
    # A sample method 
    @staticmethod
    def query(query, max_d, rangesUnit, n_it):
        import psycopg2
        CONNECTION = "postgres://postgres:postgres@diufrm118:5432/postgres"
        conn = psycopg2.connect(CONNECTION)
        cursor = conn.cursor()
        cursor.execute("select time, s4 FROM d1 where id_station='st1' AND time > TIMESTAMP '2019-03-06T16:57:36' - INTERVAL '1' day AND time < TIMESTAMP '2019-03-06T16:57:36';")
        cursor.fetchall()
        results = [[],[]]
        for duration in tqdm(range(int(max_d/5), max_d + 1, int(max_d/5))):
            runtimes = []
            for i in range(n_it):
                date = random_date("2019-04-01T00:00", "2019-05-01T00:00", set_date[(duration*i)%500], dform = '%Y-%m-%dT%H:%M')
                temp = query.replace("<timestamp>", date)
                temp = temp.replace("<nb>", str(duration))
                temp = temp.replace("<rangesUnit>", str(rangesUnit))
                temp = temp.replace("<sid>", str(set_s[(duration*i)%500]))
                temp = temp.replace("<sid1>", str(set_s[(duration*i)%500]))
                temp = temp.replace("<sid2>", str(set_s[(duration*(i+1))%500]))
                temp = temp.replace("<stid>", str(set_st[(duration*i)%500]))
                start = time.time()
#                 print(temp)
                cursor.execute(temp)
                cursor.fetchall()
                #print(cursor.fetchall())
                runtimes.append((time.time()-start)*1000)
            #print(temp)
            results[0].append(stats.mean(runtimes))
            results[1].append(percentile(runtimes,95))
#             results[1].append(stats.stdev(runtimes))
        conn.close()
        return results

    
    @staticmethod
    def queryZ(query, max_d, rangesUnit, n_it):
        import psycopg2
        CONNECTION = "postgres://postgres:postgres@diufrm118:5432/postgres"
        conn = psycopg2.connect(CONNECTION)
        cursor = conn.cursor()
        cursor.execute("select time, s4 FROM d1 where id_station='st1' AND time > TIMESTAMP '2019-03-06T16:57:36' - INTERVAL '1' day AND time < TIMESTAMP '2019-03-06T16:57:36';")
        cursor.fetchall()
        results = [[],[]]
        for duration in tqdm(range(int(max_d/5), max_d + 1, int(max_d/5))):
            runtimes = []
            for i in range(n_it):
                date = random_date("2019-04-01 00:00 UTC", "2019-05-01 00:00 UTC", set_date[(duration*i)%500], dform = '%Y-%m-%d %H:%M UTC')
                temp = query.replace("<timestamp>", date)
                temp = temp.replace("<nb>", str(duration))
                temp = temp.replace("<rangesUnit>", str(rangesUnit))
                temp = temp.replace("<sid>", str(set_s[(duration*i)%500]))
                temp = temp.replace("<stid>", str(set_st[(duration*i)%500]))
                start = time.time()
                #print(temp)
                cursor.execute(temp)
                cursor.fetchall()
                #print(cursor.fetchall())
                runtimes.append((time.time()-start)*1000)
            #print(temp)
            results[0].append(stats.mean(runtimes))
#             results[1].append(percentile(runtimes,95))
            results[1].append(stats.stdev(runtimes))
        conn.close()
        return results


In [None]:
query1[0]["timescaledb"],query1[1]["timescaledb"] = TimescaleDB.query(t_q1, max_duration[1], rangesUnit[1], n_it)


In [None]:
query2[0]["timescaledb"],query2[1]["timescaledb"] = TimescaleDB.query(t_q2, max_duration[2], rangesUnit[2], n_it)


In [None]:
query3[0]["timescaledb"],query3[1]["timescaledb"] = TimescaleDB.query(t_q3, max_duration[3], rangesUnit[3], n_it)
query3[0]["timescaledb"],query3[1]["timescaledb"]

In [None]:
query4[0]["timescaledb"],query4[1]["timescaledb"] = TimescaleDB.query(t_q4, max_duration[4], rangesUnit[4], n_it)
query4[0]["timescaledb"],query4[1]["timescaledb"]

In [None]:
query5[0]["timescaledb"],query5[1]["timescaledb"] = TimescaleDB.query(t_q5, max_duration[5], rangesUnit[5], n_it)
query5[0]["timescaledb"],query5[1]["timescaledb"]

# Plot Results

In [None]:
from matplotlib import pyplot as plt

colors = ['r', 'b', 'g', 'm', 'c', 'y']
colors_dic = {}
for i in range(len(list(query1[0].keys()))): 
    colors_dic[list(query1[0].keys())[i]] = colors[i]
colors = colors_dic
queries = [query1, query2, query3]

for q in queries:
    plt.figure()
    pp = []
    for i in range(len(q[0].keys())):
        sys = list(q[0])[i]
        x = [j for j in range(int(max_duration[1]/5), max_duration[1] + 1, int(max_duration[1]/5))]
        y = q[0][sys]
        yerr = q[1][list(q[0])[i]]
        p = plt.plot(x, y, '-', color='%s' % colors[sys])
        plt.yscale('log')
        pp.append(p[0])
        plt.errorbar(x, y=y, yerr=yerr, color='%s' % colors[sys]) 
    plt.legend(pp, q[0].keys(), numpoints=1)
    plt.plot()


#     fig = plt.figure()
#     plt.errorbar(np.arange(len(query1[0][k])), query1[0][k], yerr=query1[1][k], label = k)

In [None]:
import pandas as pd
dfs = [
    pd.DataFrame.from_dict(query1[0],orient='index').transpose(),
    pd.DataFrame.from_dict(query2[0],orient='index').transpose(),
    pd.DataFrame.from_dict(query3[0],orient='index').transpose(),
    pd.DataFrame.from_dict(query4[0],orient='index').transpose(),
    pd.DataFrame.from_dict(query5[0],orient='index').transpose()
]

In [None]:
dfs

In [None]:
import numpy as np
for i in range(len(dfs)): 
    rang = [j for j in range(int(max_duration[1+i]/5), max_duration[1+i] + 1, int(max_duration[1+i]/5))]
    dfs[i].index = np.array(rang)
    dfs[i].set_index([pd.Index(rang)]).plot(title='query' + str(i+1), xlabel='window range ('+rangesUnit[i+1] + ')', ylabel='time (ms)', logy = True,kind = 'line')
    print(dfs[i].head())
    dfs[i].to_csv('results/q'+str(i)+'.txt', sep = '\t')
    

In [None]:
stop_program = time.time()

In [None]:
print('Benchmark Runtime: %s minutes' % str((stop_program - start_program)/60))

In [None]:
# import PyGnuplot as gp
# import numpy as np
# X = np.arange(10)
# Y = np.sin(X/(2*np.pi))
# Z = Y**2.0
# gp.s([X,Y,Z])
# gp.c('plot "tmp.dat" u 1:2 w lp')
# gp.c('replot "tmp.dat" u 1:3 w lp')
# gp.p('myfigure.ps')

In [None]:

# db1 = exdb.open_database("d1_v")
# print(db1)
# con1 = db1.connect();
# cursor = con1.cursor()
# cursor.execute("SELECT count(*) FROM d1_v")
# res = cursor.fetchall()


In [18]:
def queryAll(duration_range):
    
    max_d = 1000
    rangesUnit = "minute"
    results = [{} for i in range(5)]
    for i in range(5):
        results[i]["druid"] = []
        results[i]["extremedb"] = []
        results[i]["influx"] = []
        results[i]["monetdb"] = []
        results[i]["questdb"] = []
        results[i]["timescaledb"] = []
        
    for duration in tqdm(duration_range):
        Dduration = duration
        DrangesUnit = rangesUnit
        if Dduration > 99: 
            Dduration = Dduration // 60
            Dduration -= Dduration%5
            DrangesUnit = "hour"  
            if Dduration > 99: 
                Dduration = Dduration // 24
                Dduration -= Dduration%5
                DrangesUnit = "day"          
        results[0]["druid"].append(Druid.query(d_q1, Dduration, DrangesUnit, n_it)[0][-1])
        results[0]["extremedb"].append(EXtremeDB.query(e_q1, duration, rangesUnit, n_it)[0][-1])
#         results[0]["influx"].append(Influx.query(i_q1, duration, rangesUnit, n_it)[0][-1])
        results[0]["monetdb"].append(MonetDB.query(m_q1, duration, rangesUnit, n_it)[0][-1])
#         results[0]["questdb"].append(QuestDB.query(q_q1, duration, rangesUnit, n_it)[0][-1])
        results[0]["timescaledb"].append(TimescaleDB.query(t_q1, duration, rangesUnit, n_it)[0][-1])

        results[1]["druid"].append(Druid.query(d_q2, Dduration, DrangesUnit, n_it)[0][-1])
        results[1]["extremedb"].append(EXtremeDB.query(e_q2, duration, rangesUnit, n_it)[0][-1])
#         results[1]["influx"].append(Influx.query(i_q2, duration, rangesUnit, n_it)[0][-1])
        results[1]["monetdb"].append(MonetDB.query(m_q2, duration, rangesUnit, n_it)[0][-1])
#         results[1]["questdb"].append(QuestDB.query(q_q2, duration, rangesUnit, n_it)[0][-1])
        results[1]["timescaledb"].append(TimescaleDB.query(t_q2, duration, rangesUnit, n_it)[0][-1])

#         results[2]["druid"].append(Druid.query(d_q3, Dduration, DrangesUnit, n_it)[0][-1])
        results[2]["extremedb"].append(EXtremeDB.query(e_q3, duration, rangesUnit, n_it)[0][-1])
#         results[2]["influx"].append(Influx.query(i_q3, duration, rangesUnit, n_it)[0][-1])
        results[2]["monetdb"].append(MonetDB.query(m_q3, duration, rangesUnit, n_it)[0][-1])
#         results[2]["questdb"].append(QuestDB.query(q_q3, duration, rangesUnit, n_it)[0][-1])
        results[2]["timescaledb"].append(TimescaleDB.query(t_q3, duration, rangesUnit, n_it)[0][-1])

        results[3]["druid"].append(Druid.query(d_q4, Dduration, DrangesUnit, n_it)[0][-1])
        results[3]["extremedb"].append(EXtremeDB.query(e_q4, duration, rangesUnit, n_it)[0][-1])
#         results[3]["influx"].append(Influx.query(i_q4, duration, rangesUnit, n_it)[0][-1])
        results[3]["monetdb"].append(MonetDB.query(m_q4, duration, rangesUnit, n_it)[0][-1])
        results[3]["questdb"].append(QuestDB.query(q_q4, duration, rangesUnit, n_it)[0][-1])
        results[3]["timescaledb"].append(TimescaleDB.query(t_q4, duration, rangesUnit, n_it)[0][-1])

        
        results[4]["druid"].append(Druid.query(d_q5, duration, rangesUnit, n_it)[0][-1])
        results[4]["extremedb"].append(EXtremeDB.query(e_q5, duration, rangesUnit, n_it)[0][-1])
#         results[4]["influx"].append(Influx.query(i_q5, duration, rangesUnit, n_it)[0][-1])
        results["monetdb"].append(MonetDB.query(m_q5, duration, rangesUnit, n_it)[0][-1])
        results[4]["questdb"].append(QuestDB.query(q_q5, duration, rangesUnit, n_it)[0][-1])
        results[4]["timescaledb"].append(TimescaleDB.query(t_q5, duration, rangesUnit, n_it)[0][-1])
    return results

In [19]:
max_range = 1*60*24*30*2
duration_range = [ 2**i for i in range(1, int(math.log2(max_range)+1)) ]
results = queryAll(duration_range)
results[4] = { k: results[4][k] for k in ['extremedb', 'influx', 'questdb', 'timescaledb'] }

results       


  0%|                                                                                                                                                                   | 0/16 [00:00<?, ?it/s]


ValueError: range() arg 3 must not be zero