In [None]:
! pip install psycopg2
! pip install cachetools
! pip install pandas
! pip install seaborn

import psycopg2
import psycopg2.extras

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

import heapq as hq

from cachetools import cached, TTLCache

import time

In [None]:
sns.set(rc={'figure.figsize':(15, 10)})

In [None]:
def create_connection():
    dbname = 'postgres'
    user = 'postgres'
    password = 'postgres'
    host = '192.168.11.2'
    return psycopg2.connect(dbname=dbname, user=user, password=password, host=host)

In [None]:
def fetch_devices(connection, sensor):
	with connection.cursor() as cursor:
		cursor.execute(f"""SELECT DISTINCT device_id FROM sensor_data_{sensor}""")
		return [row[0] for row in cursor.fetchall()]

def fetch_tail(connection, sensor, signal, device_id, include_timestamp=False):
	with connection.cursor() as cursor:
		sql = f"""WITH last_received_at AS (
					SELECT MAX(received_at) as last_received_at
					FROM sensor_data_{sensor}
					JOIN weights_{sensor}_{signal} USING (id)
					WHERE device_id = %s
				), last_peak AS (
					SELECT *
					FROM sensor_data_{sensor}
					JOIN weights_{sensor}_{signal} USING (id)
					WHERE device_id = %s
					AND received_at > (SELECT last_received_at - interval '5 minute'  FROM last_received_at)
					ORDER BY weight DESC
					LIMIT 1
				)
				SELECT id, {signal} as signal
				FROM sensor_data_{sensor}
				WHERE device_id = %s AND id >= COALESCE((SELECT id FROM last_peak), 0)"""
		cursor.execute(sql, (device_id, device_id, device_id,))
		
		columns = ['id', 'signal']
		if include_timestamp:
			columns.append('received_at')
		return pd.DataFrame.from_records(cursor.fetchall(), index=['id'], columns=columns)

def fetch(connection, sensor, signal, device_id, include_timestamp=False):
	with connection.cursor() as cursor:
		sql = f"""SELECT id, {signal} as signal {', received_at ' if include_timestamp else ''} 
			FROM sensor_data_{sensor}
			WHERE device_id = %s AND received_at > now() - interval '1 day'
			ORDER BY received_at ASC"""
		cursor.execute(sql, (device_id,))
		
		columns = ['id', 'signal']
		if include_timestamp:
			columns.append('received_at')
		return pd.DataFrame.from_records(cursor.fetchall(), index=['id'], columns=columns)

def fetch_smart_ui(connection, sensor, signal, device_id, percentile):
	with connection.cursor() as cursor:
		sql = f"""WITH top_weights AS (
					SELECT id, {signal} as signal, received_at, PERCENT_RANK() OVER (ORDER BY weight DESC) percentile
					FROM sensor_data_{sensor}
					JOIN weights_{sensor}_{signal} USING (id)
					WHERE device_id = %s
				)
				(SELECT id, {signal} as signal, received_at
				FROM sensor_data_{sensor}
				WHERE device_id = %s
				ORDER BY id ASC LIMIT 1)
				UNION
				(SELECT id, signal, received_at
				FROM top_weights
				WHERE percentile < %s)
				UNION
				(SELECT id, {signal} as signal, received_at
				FROM sensor_data_{sensor}
				WHERE device_id = %s
				ORDER BY id DESC LIMIT 1)"""
		cursor.execute(sql, (device_id, device_id, percentile, device_id,))
		
		columns = ['id', 'signal', 'received_at']
		return pd.DataFrame.from_records(cursor.fetchall(), index=['id'], columns=columns)

def fetch_statistics(connection, sensor, signal, device_id):
	with connection.cursor() as cursor:
		sql = f"""SELECT avg({signal}), stddev_pop({signal}) FROM sensor_data_{sensor} WHERE device_id = %s"""
		cursor.execute(sql, (device_id,))
		return cursor.fetchone()

def fetch_weights(connection, sensor, signal, device_id):
	with connection.cursor() as cursor:
		sql = f"""	SELECT id, weight
					FROM sensor_data_{sensor}
					JOIN weights_{sensor}_{signal} USING (id)
					WHERE device_id = %s
					ORDER BY id"""
		cursor.execute(sql, (device_id,))

		columns = ['id', 'weight']
		df = pd.DataFrame.from_records(cursor.fetchall(), index=['id'], columns=columns)
		df.weight = df.weight.astype('float')
		return df

def fetch_scd30_ppm():
	with create_connection() as connection:
		return fetch(connection, 'scd30', 'ppm', 'zero', include_timestamp=True)

def fetch_scd30_ppm_smart(percentile=0.1):
	with create_connection() as connection:
		return fetch_smart_ui(connection, 'scd30', 'ppm', 'zero', percentile=percentile)

def fetch_scd30_ppm_weights():
	with create_connection() as connection:
		return fetch_weights(connection, 'scd30', 'ppm', 'zero')

def fetch_sgp40_voc():
	with create_connection() as connection:
		return fetch(connection, 'sgp40', 'voc', 'zero', include_timestamp=True)

In [None]:
def remove_old_data(connection, sensor):
	with connection.cursor() as cursor:
		cursor.execute(f"""DELETE FROM sensor_data_{sensor}
		WHERE received_at < now() - interval '1 day'""")

def update_weight(connection, sensor, signal, series):
	with connection.cursor() as cursor:
		sql = f"""INSERT INTO weights_{sensor}_{signal} (id, weight) VALUES %s
		ON CONFLICT (id) DO UPDATE SET weight = EXCLUDED.weight
		WHERE weights_{sensor}_{signal}.weight < EXCLUDED.weight"""
		data = [(id, weight) for id, weight in series.items()]
		psycopg2.extras.execute_values(cursor, sql, data)

In [None]:
def calculate_weights(data, ratio = 1):
    y = data
    x = np.arange(len(y))
    indeces = {0:0, len(y)-1:0}

    processed = 2
    limit = max(10, int(len(data) * ratio))

    queue = []
    hq.heappush(queue, (0, (0, len(y)-1)))

    while queue and processed < limit:
        _, (left, right) = hq.heappop(queue)

        if right - left == 1:
            continue

        y_range = y[left:right + 1]
        x_range = x[left:right + 1]
        
        x1, y1, x2, y2 = x_range[0], y_range[0], x_range[-1], y_range[-1]
        a = (y2 - y1) / (x2 - x1)
        b = -x1 * (y2 - y1) / (x2 - x1) + y1
        y_hat = a*x_range + b
        diff = np.abs(y_range - y_hat)
        diff = diff[1:-1]

        i = np.argmax(diff)
        error = diff[i]
        i += left + 1

        indeces[i] = error
        hq.heappush(queue, (-error, (left, i)))
        hq.heappush(queue, (-error, (i, right)))
        processed += 1 

    indeces = dict(sorted(indeces.items(), key=lambda item: item[0]))
    return np.array([indeces[x] if x in indeces.keys() else 0 for x in x])

In [None]:
def calculate_weights_for_series(series, ratio=1):
    data = series.to_numpy()
    start = time.time()
    weight = calculate_weights(data, ratio)
    end = time.time()
    print(f"weight calculation took {end-start:.2f}")

    return pd.Series(index=series.index, data=weight)

In [None]:
def process_weights():
    sensors = {'scd30': ['ppm', 'temperature', 'humidity'],'sgp40': ['voc']}
    with create_connection() as connection:
        for sensor, signals in sensors.items():
            remove_old_data(connection, sensor)
            devices = fetch_devices(connection, sensor)
            for signal in signals:
                for device in devices:
                    df = fetch_tail(connection, sensor, signal, device)
                    
                    print(f"{sensor}/{signal}/{device} tail length {len(df)}")
                    
                    df.signal = df.signal.astype(float)
                    
                    mean, std = fetch_statistics(connection, sensor, signal, device)
                    df.signal = (df.signal - float(mean)) / float(std)
                    
                    weights = calculate_weights_for_series(df.signal)
                    weights = weights[weights > 0]
                    update_weight(connection, sensor, signal, weights)
        connection.commit()

In [None]:
weights = fetch_scd30_ppm_weights()
weights['log_weight'] = np.log(weights.weight)
#sns.histplot(data=weights, x="log_weight")

mean_weight = np.mean(weights.weight)
median_weight = np.median(weights.weight)
mean_log_weight = np.mean(weights.log_weight)
median_log_weight = np.median(weights.log_weight)

print(f"mean: {mean_weight}, median: {median_weight}, mean_log: {mean_log_weight}, median_log:{median_log_weight}")

print(len(weights.weight[weights.weight > mean_weight]))
print(len(weights.weight[weights.weight > median_weight]))
print(len(weights.log_weight[weights.log_weight > mean_log_weight]))
print(len(weights.log_weight[weights.log_weight > median_log_weight]))


In [None]:
smart = fetch_scd30_ppm_smart(percentile=1)
smart['type'] = 'smart'
original = fetch_scd30_ppm()
original['type'] = 'original'
print(len(smart), len(original))
graph = pd.concat([original, smart], ignore_index=True)
sns.lineplot(data=graph, x='received_at', y='signal', hue='type')

In [None]:
process_weights()