In [2]:
import pandas as pd
import os
import numpy as np
import btrdb
import uuid
import json

from scipy.io import loadmat

from datetime import datetime, timedelta
from btrdb.utils import timez

from multiprocessing import Pool

# About the data

https://drive.google.com/drive/folders/0B6vSVhUYNe86TGQ3bmgwdG1ySnM

In [7]:
path_to_data = os.path.join('../../../../../Volumes/NO NAME/PQ_Data/Day1')
clean_data = os.path.join('../../../../../Volumes/NO NAME/PQ_Data/clean')
if not os.path.exists(clean_data):
    os.mkdir(clean_data)

In [8]:
cols = ['hours', 'minutes', 'seconds', 'phA_volt', 'phB_volt', 'phC_volt', 'phA_amp', 'phB_amp', 'phC_amp']
units = [None, None, None,'volts','volts','volts','amps','amps','amps']


In [9]:
def split_df(x, key='Data_VI', st=0, end=None):
    if end==None:
        end=len(x[key])
    times = pd.DataFrame(x[key][:,:3], columns=cols[:3])
    meas = pd.DataFrame(x[key][:,3:], columns=cols[3:])
    return times, meas

def make_ns(times, t0=timez.to_nanoseconds(datetime(2020,1,1))):
    nano = 1e9 * (3600*times['hours'] + 60*times['minutes'] + times['seconds'].astype(float))
    nano += t0
    return nano.astype(int).tolist()
    
#     return [t0 + timez.ns_delta(**t) for i, t in times.iterrows()]


In [10]:
laurels_api_key = '2301C47D67FB1C2C48D0CC7B'
db = btrdb.connect("api.ni4ai.org:4411", apikey=laurels_api_key)

collection = "lndunn/PQdata/underground"

metadata = {'documentation': 'https://grouper.ieee.org/groups/td/pq/data/downloads/XLPE_data.pdf',
            'source': 'https://drive.google.com/drive/folders/0B6vSVhUYNe86TGQ3bmgwdG1ySnM',
            
           'equipment_specs': {'cable type': 'XLPE underground cable',
                                        'cable rating': '8 kV',
                                        'Year of installation': '1991',
                                        'PT ratio': '8050:115',
                                        'CT ratio': '600:5',},
            
            'measurement_params': {'Sampling mode': 'Continuous (gap-less) data recording',
                                       'Sampling rate': '64 samples/cycle',
                                       'Measured signals': 'Three-phase bus voltages and feeder currents',}
           }

In [11]:
streams = db.streams_in_collection(collection)
for s in streams:
    pv = s.update(collection=collection,
                 annotations=metadata,
                 replace=True)

In [12]:
import psutil
psutil.virtual_memory()

svmem(total=8589934592, available=4160274432, percent=51.6, used=4428562432, free=357138432, active=2829770752, inactive=1902387200, wired=1598791680)

In [9]:
# # restructure raw data into csv files
# for hr in range(1,25):
#     x = loadmat(os.path.join(path_to_data, 'Hour_%i.mat'%(hr)))
#     times, meas = split_df(x)
    
#     ts = pd.Series(make_ns(times), name='timestamp')
    
#     dest = os.path.join(clean_data, 'timestamp')
#     if not os.path.exists(dest):
#         os.mkdir(dest)
#     ts.to_csv(os.path.join(dest, 'hour%i.csv'%(hr)), index=False, header=True)
    
#     for key in meas.keys():
#         dest = os.path.join(clean_data, key)
#         if not os.path.exists(dest):
#             os.mkdir(dest)
#         meas[key].to_csv(os.path.join(dest, 'hour%i.csv'%(hr)), index=False, header=True)


In [13]:
os.listdir(clean_data)

['timestamp',
 'phA_volt',
 'phB_volt',
 'phC_volt',
 'phA_amp',
 'phB_amp',
 'phC_amp']

In [14]:
uuids = {}
for s in db.streams_in_collection(collection):
    uuids[s.name] = s.uuid

In [15]:
def check_count(stream, tstamps, tol=5):
    start = timez.ns_to_datetime(tstamps[0])
    end = timez.ns_to_datetime(tstamps[-1])
    
    count = stream.count(start, end, precise=True)
    return tol > np.abs(len(tstamps) - count)
    
def check_extremes(stream, tstamps, vals):
    start = timez.ns_to_datetime(tstamps[0])
    end = timez.ns_to_datetime(tstamps[-1])
    
    spoints, _ = s.aligned_windows(start, end, 44)
    min_val = min(s.min for s in spoints)
    max_val = min(s.max for s in spoints)
    
    correct_min = min_val == min(vals)
    correct_max = max_val == max(vals)
    
    return correct_min and correct_max 

In [29]:
def get_data(name, hours):
    for hr in hours:
        fname = os.path.join(clean_data, name, 'hour%i.csv'%(hr))
        if hr == hours[0]:
            vals = pd.read_csv(fname).rename(columns={name: hr})
        else:
            vals[hr] = pd.read_csv(fname)
    return np.array(vals).flatten().tolist()

def func(hours):
    print('starting', hours)
    tstamps = get_data('timestamp', hours)
    for stream in os.listdir(clean_data):
        t0 = datetime.now()
        if stream == 'timestamp':
            continue
            
        s = db.stream_from_uuid(uuids[stream])
        
        if check_count(s, tstamps):
            continue
            
        else:
            vals = get_data(stream, hours)
        
            print('\n', '\tpreprocessing', '\n\t', str(datetime.now()-t0).split('.')[0])

            s.delete(tstamps[0], tstamps[-1])
            print('\n', '\tdeleting old data', '\n\t', str(datetime.now()-t0).split('.')[0])

            s.insert(list(zip(tstamps, vals)))
            print('\n', '\tnew data inserted', '\n\t', str(datetime.now()-t0).split('.')[0])
        
            if check_count(s, tstamps):
                continue
                
        print('\nfinished with', stream)
    print(hours[-1], 'window complete')
        
dt = 4
windows = [[h+i for i in range(dt)] for h in range(1,25) if h%dt==1]

def mp_handler():    
    p = Pool(4)
    p.map(func, windows)




In [30]:
mp_handler()

starting [9, 10, 11, 12]
starting [5, 6, 7, 8]
starting [1, 2, 3, 4]
starting [13, 14, 15, 16]
8 window complete
16 window complete
starting [17, 18, 19, 20]
starting [21, 22, 23, 24]
20 window complete

 	preprocessing 
	 0:03:34

 	preprocessing 
	 0:03:35


 	deleting old data 
	 0:03:39
 	deleting old data 
	 0:03:39

 	new data inserted 
	 1:12:47

 	preprocessing 
	 0:12:37

 	deleting old data 
	 0:12:39

 	new data inserted 
	 1:52:11
12 window complete

 	new data inserted 
	 0:52:05

 	preprocessing 
	 0:00:28

 	deleting old data 
	 0:00:28

 	new data inserted 
	 0:16:20

 	preprocessing 
	 0:00:25

 	deleting old data 
	 0:00:26

 	new data inserted 
	 1:06:50

 	preprocessing 
	 0:46:19

 	deleting old data 
	 0:46:27

 	new data inserted 
	 1:32:15
4 window complete


ValueError: Value out of range: -71996360350720

In [27]:
windows

[[1, 2, 3, 4],
 [5, 6, 7, 8],
 [9, 10, 11, 12],
 [13, 14, 15, 16],
 [17, 18, 19, 20],
 [21, 22, 23, 24]]

In [23]:
from multiprocessing import Pool
import time

data = (
    ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
    ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)

def mp_worker(vals):
    inputs, the_time = vals
    print (" Processs %s\tWaiting %s seconds" % (inputs, the_time))
    time.sleep(int(the_time))
    print (" Process %s\tDONE" % inputs)

def mp_handler():
    p = Pool(2)
    p.map(mp_worker, data)

mp_handler()

['a', '2']
 Processs a	Waiting 2 seconds
['b', '4']
 Processs b	Waiting 4 seconds
 Process a	DONE
['c', '6']
 Processs c	Waiting 6 seconds
 Process b	DONE
['d', '8']
 Processs d	Waiting 8 seconds
 Process c	DONE
['e', '1']
 Processs e	Waiting 1 seconds
 Process e	DONE
['f', '3']
 Processs f	Waiting 3 seconds
 Process d	DONE
['g', '5']
 Processs g	Waiting 5 seconds
 Process f	DONE
['h', '7']
 Processs h	Waiting 7 seconds
 Process g	DONE
 Process h	DONE


In [54]:



for window in windows:
    for hr in window:
        fname = os.path.join(clean_data, 'timestamp', 'hour%i.csv'%(hr))
        if hr == window[0]:
            tstamps = pd.read_csv(fname).rename(columns={'timestamp': hr})
        else:
            tstamps[hr] = pd.read_csv(fname)
    tstamps = np.array(tstamps).flatten().tolist()
    
    for stream in os.listdir(clean_data):
        t0 = datetime.now()
        if stream == 'timestamp':
            continue
            
        s = db.stream_from_uuid(uuids[stream])
        
        if check_count(s, tstamps):
            continue
        
        for hr in window:
            fname = os.path.join(clean_data, stream, 'hour%i.csv'%(hr))
            if hr == window[0]:
                vals = pd.read_csv(fname).rename(columns={stream: hr})
            else:
                vals[hr] = pd.read_csv(fname)
        vals = np.array(vals).flatten().tolist()
    
        print('\n', '\tpreprocessing', '\n\t', str(datetime.now()-t0).split('.')[0])

        s.delete(tstamps[0], tstamps[-1])
        print('\n', '\tdeleting old data', '\n\t', str(datetime.now()-t0).split('.')[0])

        s.insert(list(zip(tstamps, vals)))
        print('\n', '\tnew data inserted', '\n\t', str(datetime.now()-t0).split('.')[0])
        
        if check_count(s, tstamps):
            continue
        

2

In [104]:
# streams = db.streams_in_collection(collection)
# for s in streams:
#     s.obliterate()

In [105]:
# for col, unit in zip(cols[3:], units[3:]):
#     print (col, unit)
#     streams = db.streams_in_collection(collection)
#     if col not in [s.name for s in streams]:
#         s = db.create(uuid=uuid.uuid4(),
#                       collection=collection,
#                       tags={"name": col, "unit": unit},
# #                       annotations=json.dumps(metadata)
#                      )
        
# print(db.streams_in_collection(collection))

phA_volt volts
phB_volt volts
phC_volt volts
phA_amp amps
phB_amp amps
phC_amp amps
[<Stream collection=lndunn/PQdata/underground name=phA_volt>, <Stream collection=lndunn/PQdata/underground name=phB_volt>, <Stream collection=lndunn/PQdata/underground name=phC_volt>, <Stream collection=lndunn/PQdata/underground name=phA_amp>, <Stream collection=lndunn/PQdata/underground name=phB_amp>, <Stream collection=lndunn/PQdata/underground name=phC_amp>]


In [6]:

streams = db.streams_in_collection(collection)

In [24]:
time = datetime.now()
t0 = timez.to_nanoseconds(datetime(2020,1,1))

hr = 24

x = loadmat(os.path.join(path_to_data, 'Hour_%i.mat'%(hr)))
times, meas = split_df(x)
    
ts = make_ns(times)

assert len(ts[:50]) == len(list(set(ts[:50])))
    
for j, s in enumerate(db.streams_in_collection(collection)):
    print('\t', j, s.name)
    _last = s.latest()
    if _last == None:
        pass
    elif _last[0][0] == ts[-1]:
        continue
    elif _last[0][0] > ts[0]:
        s.delete(ts[0], ts[-1])

    s.insert(list(zip(ts, meas[s.name].tolist())))
    print('\t', datetime.now()-time)


	 0 phA_volt
	 0:02:48.531801
	 1 phB_volt
	 0:05:34.973421
	 2 phC_volt
	 0:08:10.965217
	 3 phA_amp
	 0:10:45.826814
	 4 phB_amp
	 0:13:24.439823
	 5 phC_amp
	 0:15:53.788878


In [23]:
time = datetime.now()
t0 = timez.to_nanoseconds(datetime(2020,1,1))

issues = []
for hr in range(1,25):
    x = loadmat(os.path.join(path_to_data, 'Hour_%i.mat'%(hr)))
    times, meas = split_df(x)
    
    ts = make_ns(times)
    
    assert len(ts[:50]) == len(list(set(ts[:50])))
    
    for s in db.streams_in_collection(collection):
        count = s.count(ts[0], ts[-1])
        if count != len(ts):
            issues.append((hr, s.name, count, len(ts), len(list(set(ts)))))
            print(issues[-1])

(1, 'phB_volt', 16082217, 13824000, 13824000)
(1, 'phB_amp', 16082217, 13824000, 13824000)
(1, 'phC_volt', 16082217, 13824000, 13824000)
(1, 'phA_amp', 16082217, 13824000, 13824000)
(1, 'phC_amp', 16082217, 13824000, 13824000)
(1, 'phA_volt', 16082217, 13824000, 13824000)
(2, 'phB_volt', 12663170, 13824000, 13824000)
(2, 'phB_amp', 12663170, 13824000, 13824000)
(2, 'phC_volt', 12663170, 13824000, 13824000)
(2, 'phA_amp', 12663170, 13824000, 13824000)
(2, 'phC_amp', 12663170, 13824000, 13824000)
(2, 'phA_volt', 12663170, 13824000, 13824000)
(3, 'phB_volt', 12663170, 13824000, 13824000)
(3, 'phB_amp', 12663170, 13824000, 13824000)
(3, 'phC_volt', 12663170, 13824000, 13824000)
(3, 'phA_amp', 12663170, 13824000, 13824000)
(3, 'phC_amp', 12663170, 13824000, 13824000)
(3, 'phA_volt', 12663170, 13824000, 13824000)
(4, 'phB_volt', 12663171, 13824000, 13824000)
(4, 'phB_amp', 12663171, 13824000, 13824000)
(4, 'phC_volt', 12663171, 13824000, 13824000)
(4, 'phA_amp', 12663171, 13824000, 13824000)

KeyboardInterrupt: 

In [27]:
len(times), len(meas), sum(pd.notnull(meas['phA_amp'])), len(times)/60/60/60

(13824000, 13824000, 13824000, 64.0)

In [17]:
x = loadmat(os.path.join(path_to_data, 'Hour_%i.mat'%(hr)))

In [60]:
# iterate through all hours in day
# (takes a long time, will probably time out)

time = datetime.now()
t0 = timez.to_nanoseconds(datetime(2020,1,1))

for hr in range(1,24):
    print('hour', hr, '...')
    
    last = streams[-1].latest()
    
    if last == None:
        pass
    elif last[0][0] > t0 + 1e9*3600*(hr-1):
        continue

    x = loadmat(os.path.join(path_to_data, 'Hour_%i.mat'%(hr)))
    times, meas = split_df(x)
    
    ts = make_ns(times)
    
    for j, s in enumerate(db.streams_in_collection(collection)):
        _last = streams[j].latest()
        if _last == None:
            # this means stream has no data yet
            pass
        elif _last[0][0] == ts[-1]:
            # skip if day already finished ingesting
            continue

        s.insert(list(zip(ts, meas[s.name].tolist())))
        print('\t', s.name, '\t', datetime.now()-time)


hour 1 ...
	 phA_volt 	 0:02:09.189075
	 phB_volt 	 0:04:10.313639
	 phC_volt 	 0:06:24.983900


_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "Connection reset by peer"
	debug_error_string = "{"created":"@1605994435.207225000","description":"Error received from peer ipv4:34.202.86.244:4411","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Connection reset by peer","grpc_status":14}"
>

In [74]:

for s in streams:
    if s.name == 'phA_amp':
        s.obliterate()
        s = db.create(uuid=uuid.uuid4(),
                      collection=collection,
                      tags={"name": 'phA_amp', "unit": 'amps'},
#                       annotations=json.dumps(metadata)
                     )

In [77]:
len(db.streams_in_collection(collection)), j

(6, 3)

In [None]:
import os
duration = 1  # seconds
freq = 440  # Hz
os.system('play -nq -t alsa synth {} sine {}'.format(duration, freq))

In [16]:
# hr = 3
# x = loadmat(os.path.join(path_to_data, 'Hour_%i.mat'%(hr)))
# times, meas = split_df(x)

# print(times.shape, meas.shape)

# ixs = np.arange(0, len(meas)+1, 100000)
for i, (st, end) in enumerate(zip(ixs[i:-1], ixs[i+1:])):

    ts = make_ts(times.iloc[st:end])
    for s in streams:
        vals = meas[s.name].iloc[st:end].tolist()
        s.insert(list(zip(ts, vals)))

    if i % 10 == 0:
        print ('\t', i, 'of', len(ixs))


	 0 of 139
	 10 of 139


_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "Broken pipe"
	debug_error_string = "{"created":"@1605859326.937908000","description":"Error received from peer ipv4:54.227.221.51:4411","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Broken pipe","grpc_status":14}"
>

In [14]:
i

19