In [1]:
import os
os.chdir('../..')

In [2]:
import numpy as np
from nilmtk import STATS_CACHE

from utils.constants import DatasetType, DAY_IN_SEC
from utils.data_reading import clean_and_store_data, read_cleaned_data, \
    convert_to_nilmtk_format, store_processed_stable_periods, \
    get_full_keys_of_stable_periods, read_stable_periods
from utils.preprocessing import generate_sync_signals, get_stable_periods, \
    interpolate_missed_data, generate_async_signals, reformat_to_accumulated
from utils.timing import time_measure
from utils.visualization import plot_sync_async_comparison

STATS_CACHE.store.close()


DS = DatasetType.IDEAL
DURATION = DAY_IN_SEC*30
MAX_GAP = 300

# convert_to_nilmtk_format(DS)
# clean_and_store_data(DS)
# ideal = read_cleaned_data(DS)
# process_stable_periods(DS, DURATION, MAX_GAP)

full_keys = get_full_keys_of_stable_periods(DS)
accumulated = read_stable_periods(DS, [full_keys[0]])[0]

[reading 1 stable periods of IDEAL] finished in 0m 0.14s


In [3]:
sync_signals = generate_sync_signals(accumulated, 100)
print('len(sync_signals)', len(sync_signals))

async_signals = generate_async_signals(accumulated, 60000)
print('len(async_signals)', len(async_signals))

[generating sync signals] finished in 0m 0.00s
len(sync_signals) 25921
[generating async signals] finished in 0m 0.36s
len(async_signals) 25475


In [19]:
import numpy as np
import pandas as pd
from numpy.polynomial.polynomial import Polynomial
from scipy.interpolate import InterpolatedUnivariateSpline, splrep, PPoly
from multiprocessing.pool import ThreadPool
from multiprocessing import Pool

from utils.preprocessing import datetime_index_to_floats
from utils.timing import time_measure


def fill_squared_coef_task(squared_coefs, interpolations, i):
    p = Polynomial(interpolations.c[:, i][::-1]) / 100
    p_squared = p ** 2
    coefs_to_apply = p_squared.coef[::-1]
    coefs_to_apply = np.pad(coefs_to_apply, (0, 3 - len(coefs_to_apply)))
#     return coefs_to_apply
    for ind, c in enumerate(coefs_to_apply):
        squared_coefs[ind, i] = c

        
def fill_squared_coef_task_single_arg(arg):
    fill_squared_coef_task(*arg)

def accumulated_distance(original: pd.Series, generated: pd.Series):
#     with time_measure(f'accumulated_distance'):
    with time_measure(f'linear interpolation of diff', is_active=False):
        f_dates_orig = datetime_index_to_floats(original.index)
        f_dates_gen = datetime_index_to_floats(generated.index)
        united_dates = f_dates_orig.union(f_dates_gen)
        point_count = len(united_dates)

        lin_orig = InterpolatedUnivariateSpline(f_dates_orig, original.values, k=1)
        lin_gen = InterpolatedUnivariateSpline(f_dates_gen, generated.values, k=1)
        all_values_orig = lin_orig(united_dates)
        all_values_gen = lin_gen(united_dates)
        diff = all_values_orig - all_values_gen

        tck = splrep(united_dates, diff, k=1, s=0)
        interpolations = PPoly.from_spline(tck)

        assert interpolations.c.shape[0] == 2, 'must be linear'
        assert interpolations.c.shape[1] == point_count + 1, 'interval count is wrong'

    squared_coefs = np.empty((3, point_count + 1), dtype=np.float64)
    with time_measure(f'interpolating squared'):
        arguments = [(squared_coefs, interpolations, i) for i in range(point_count + 1)]
#         for i in range(point_count + 1):
#             fill_squared_coef_task(squared_coefs, interpolations, i)

        THREAD_NUM = 8
        with ThreadPool(THREAD_NUM) as pool:
            pool.map(fill_squared_coef_task_single_arg, arguments, chunksize=point_count//THREAD_NUM)
#             for i in range(0, point_count + 1):
#                 _ = pool.apply_async(fill_squared_coef_task, (squared_coefs, interpolations, i))
#             pool.close()
#             pool.join()
#             res = [pool.apply_async(fill_squared_coef_task, (squared_coefs, interpolations, i)) for i in range(point_count + 1)]
#             for r in res:  
#                 r.wait()
#         pool.join()
            
#         with Pool(4) as pool:
#             _ = pool.apply_async(fill_squared_coef_task, args=(squared_coefs, interpolations, i))
#             pool.join()

    squared_diffs = PPoly(squared_coefs, interpolations.x)
        
    with time_measure(f'integration', is_active=False):
        distance = squared_diffs.integrate(united_dates[0], united_dates[-1])
        print('distance')
        print(distance)
        return distance


def consumption_rate_distance():
    pass




In [20]:
accumulated_distance(accumulated[:30000], async_signals[:650])

[interpolating squared] finished in 0m 6.38s
distance
18933408.153005347


array(18933408.15300535)

In [14]:
accumulated_distance(accumulated[:500], async_signals[:5])
accumulated_distance(accumulated[:100000], async_signals[:1000])

[interpolating squared] finished in 0m 0.13s
distance
15512.302950351565
[interpolating squared] finished in 0m 18.91s
distance
170770353926498.44


array(1.70770354e+14)

In [14]:
accumulated_distance(accumulated[:500], async_signals[:5])
accumulated_distance(accumulated[:100000], async_signals[:1000])
# accumulated_distance(accumulated, async_signals)
# accumulated_distance(accumulated, sync_signals)


[interpolating squared] finished in 0m 0.08s
distance
15512.302950351565
[interpolating squared] finished in 0m 17.01s
distance
170770353926498.44


array(1.70770354e+14)

In [None]:
# accumulated_distance(accumulated[:30000], async_signals[:650])
# distance
# 18933408.153005347
# [accumulated_distance] finished in 0m 5.54s

In [21]:
from time import sleep, perf_counter

def task(id):
    print(f'Начинаем задачу {id}...')
    sleep(1)
    return f'Завершили задачу {id}'

In [None]:

with time_measure(f'integration'):
    with Pool(4) as pool:
        results = pool.map(task, range(4))
        for result in results:
            print(result)


    # results = [task(i) for i in range(4)]
    # for result in results:
    #     print(result)

