In [1]:
import numpy as np
import pandas as pd
import datetime
from matplotlib import pyplot
import gc
import multiprocessing as mp
from multiprocessing import Process, Queue
queue = Queue()

In [4]:
def read_data(file_title):
    df=pd.read_table(file_title, sep="\s+" )
    df.columns = ["time","consumption"]
    df.index= df.time
    df = df.sort_index()
    df.reset_index(drop=True, inplace=True)
    return (df)

In [3]:
redd_dfs={}
for i in [1,2,3,5,6]:
    redd_dfs["house_"+str(i)]=read_data("datasets/redd_house_"+str(i)+".dat")

In [4]:
def redd_house(house_nb):
    return(redd_dfs["house_"+str(house_nb)])

In [2]:
def no_data_breakpoints(df,house_nb,threshold):
    breakpoints=[]
    curr_time=df.time[0]
    time_diff=0
    for index in df.index[1:]:
        time_diff=df.time[index]-curr_time
        curr_time=df.time[index]
        if(time_diff>threshold):
            breakpoints.append(index-1)
    #queue.put((breakpoints,house_nb))
    return(breakpoints,house_nb)

In [6]:
processes = [Process(target=no_data_breakpoints, args=(redd_house(i),i,120)) for i in [1,2,3,5,6]]
for p in processes:
    p.start()
for p in processes:
    p.join()

In [3]:
def clean_queue(queue):
    while not queue.empty():
        queue.get()

In [4]:
clean_queue(queue)

In [8]:
redd_breakpoint={}
while not queue.empty():
    (breakpoints,house_nb)=queue.get()
    redd_breakpoint[house_nb]=breakpoints

In [10]:
redd_breakpoint

{5: [15,
  228,
  444,
  9208,
  20267,
  24360,
  26905,
  27100,
  27107,
  29429,
  30759,
  30776,
  31224,
  32092,
  32629,
  42144,
  47686,
  58084,
  58726],
 2: [39373, 82756, 126039, 174149, 204448, 293914, 314904],
 3: [122,
  138,
  6776,
  6781,
  6841,
  6857,
  6870,
  6886,
  7010,
  7020,
  7139,
  13426,
  13973,
  15104,
  15142,
  15168,
  20860,
  25748,
  34752,
  35643,
  36697,
  38520,
  41930,
  43767,
  43984,
  49146,
  49666,
  54157,
  54201,
  54379,
  54730,
  54980,
  54997,
  55004,
  55088,
  55269,
  55276,
  55283,
  88670,
  155333,
  155471,
  155537,
  155715,
  155925,
  155930,
  201316,
  232674,
  245124,
  261778,
  306373,
  332920,
  332937,
  403798,
  403803],
 6: [12158, 29096, 227487, 267613, 341014],
 1: [4339,
  31447,
  63242,
  73690,
  126028,
  192837,
  255927,
  288468,
  317528,
  340385,
  340393,
  355349,
  445388,
  481466,
  710462,
  719638,
  745859]}

In [3]:
def add_zeros(df,house_nb,breakpoints):
    for breakpoint in breakpoints:
        df.loc[breakpoint,'consumption']=0
    return (df,house_nb)

In [50]:
pool = mp.Pool(processes = 3)
result=pool.starmap(add_zeros,((redd_house(i),i,redd_breakpoint[i]) for i in [1,2,3,5,6]))

In [56]:
#list of tuples
#list[0]=(dataframe corresponding to house 1,1)
#list[0][0]=dataframe corresponding to house 1
#list[0][1]=1
clean_redd={}
for i in range(len(result)):
    clean_redd[result[i][1]]=result[i][0]

In [58]:
for breakpoint in redd_breakpoint[2]:
    print(clean_redd[2].iloc[breakpoint])

time           1.303253e+09
consumption    0.000000e+00
Name: 39373, dtype: float64
time           1.303422e+09
consumption    0.000000e+00
Name: 82756, dtype: float64
time           1.303593e+09
consumption    0.000000e+00
Name: 126039, dtype: float64
time           1.303774e+09
consumption    0.000000e+00
Name: 174149, dtype: float64
time           1.303889e+09
consumption    0.000000e+00
Name: 204448, dtype: float64
time           1.304226e+09
consumption    0.000000e+00
Name: 293914, dtype: float64
time           1.304305e+09
consumption    0.000000e+00
Name: 314904, dtype: float64


In [60]:
for key in clean_redd:
    clean_redd[key].to_csv("Redd_Cleaned/house_"+str(key)+".csv",index=False)

In [10]:
def read_df(file_title,house_nb):
    try:
        df=pd.read_table(file_title, sep="," )
        df.columns = ["time","consumption"]
        df.index= df.time
        df = df.sort_index()
        df.reset_index(drop=True, inplace=True)
        return (df,house_nb)
    except:
        return(None)

In [11]:
pool = mp.Pool(processes = 3)
result=pool.starmap(read_df,(("Refit/fridge_freezer/house_"+str(i+1)+".csv",i+1) for i in range(21)))

In [6]:
gc.collect()

15

In [12]:
refit_fridge_freezers={}
for i in range(len(result)):
    if result[i]!=None:
        refit_fridge_freezers[result[i][1]]=result[i][0]

In [13]:
refit_fridge_freezers

{2:                time  consumption
 0        1379455691           88
 1        1379455698           88
 2        1379455706           88
 3        1379455714           88
 4        1379455722           88
 ...             ...          ...
 5733521  1432800316           82
 5733522  1432800323           81
 5733523  1432800330           81
 5733524  1432800337           82
 5733525  1432800343           82
 
 [5733526 rows x 2 columns],
 3:                time  consumption
 0        1380136869            0
 1        1380136876            0
 2        1380136883            0
 3        1380136890            0
 4        1380136897            0
 ...             ...          ...
 6994589  1433242514            0
 6994590  1433242521            0
 6994591  1433242528            0
 6994592  1433242535            0
 6994593  1433242542            0
 
 [6994594 rows x 2 columns],
 4:                time  consumption
 0        1381486757            0
 1        1381486772            0
 2        1

In [14]:
pool = mp.Pool(processes = 4)
result=pool.starmap(no_data_breakpoints,(((refit_fridge_freezers[i],i,120)) for i in refit_fridge_freezers.keys()))

In [15]:
result

[([5366,
   9099,
   11711,
   11880,
   12054,
   16901,
   21157,
   26978,
   113872,
   114028,
   117455,
   125073,
   127163,
   130011,
   130087,
   131042,
   132175,
   137506,
   137767,
   138726,
   138859,
   139759,
   140266,
   141953,
   160991,
   162473,
   162598,
   162759,
   180605,
   197103,
   197224,
   197760,
   220389,
   220652,
   221319,
   221691,
   239603,
   245479,
   246040,
   247738,
   248793,
   248946,
   256793,
   260157,
   270792,
   271046,
   271833,
   274475,
   275910,
   298446,
   322098,
   324826,
   329147,
   332168,
   332472,
   332886,
   333248,
   345504,
   392563,
   411398,
   417036,
   417067,
   417369,
   420188,
   422144,
   428098,
   429323,
   429693,
   432164,
   432485,
   432695,
   440090,
   441006,
   441280,
   442052,
   444225,
   454549,
   455737,
   455847,
   459263,
   466059,
   481393,
   485950,
   487250,
   487265,
   487285,
   487295,
   487347,
   487357,
   487367,
   487387,
   487397

In [16]:
breakpoint_refit_fridge_freezer={}
for i in range(len(result)):
    breakpoint_refit_fridge_freezer[result[i][1]]=result[i][0]

In [17]:
pool = mp.Pool(processes = 3)
result=pool.starmap(add_zeros,((refit_fridge_freezers[i],i,breakpoint_refit_fridge_freezer[i]) for i in breakpoint_refit_fridge_freezer.keys()))

In [18]:
clean_refit_fridge_freezers={}
for i in range(len(result)):
    clean_refit_fridge_freezers[result[i][1]]=result[i][0]

In [19]:
for breakpoint in breakpoint_refit_fridge_freezer[2]:
    print(clean_refit_fridge_freezers[2].iloc[breakpoint])

time           1379488052
consumption             0
Name: 5366, dtype: int64
time           1379511642
consumption             0
Name: 9099, dtype: int64
time           1379528855
consumption             0
Name: 11711, dtype: int64
time           1379530807
consumption             0
Name: 11880, dtype: int64
time           1379532896
consumption             0
Name: 12054, dtype: int64
time           1379563433
consumption             0
Name: 16901, dtype: int64
time           1379589361
consumption             0
Name: 21157, dtype: int64
time           1379624983
consumption             0
Name: 26978, dtype: int64
time           1380152041
consumption             0
Name: 113872, dtype: int64
time           1380153974
consumption             0
Name: 114028, dtype: int64
time           1380175529
consumption             0
Name: 117455, dtype: int64
time           1380222476
consumption             0
Name: 125073, dtype: int64
time           1380236134
consumption             0
Name: 1271

Name: 641687, dtype: int64
time           1394419563
consumption             0
Name: 642814, dtype: int64
time           1394426283
consumption             0
Name: 643791, dtype: int64
time           1394435703
consumption             0
Name: 645219, dtype: int64
time           1394475354
consumption             0
Name: 651067, dtype: int64
time           1394486494
consumption             0
Name: 651726, dtype: int64
time           1394488998
consumption             0
Name: 652025, dtype: int64
time           1394503873
consumption             0
Name: 654290, dtype: int64
time           1394517543
consumption             0
Name: 656383, dtype: int64
time           1394561839
consumption             0
Name: 662759, dtype: int64
time           1394567164
consumption             0
Name: 663471, dtype: int64
time           1394584114
consumption             0
Name: 666102, dtype: int64
time           1394596354
consumption             0
Name: 667967, dtype: int64
time           1394598709

In [20]:
def df_to_csv(df,house_nb):
    df.to_csv("Refit_Cleaned/fridge_freezer/house_"+str(house_nb)+".csv",index=False)

In [21]:
pool = mp.Pool(processes = 3)
result=pool.starmap(df_to_csv,((clean_refit_fridge_freezers[i],i) for i in clean_refit_fridge_freezers.keys()))

In [22]:
pool = mp.Pool(processes = 3)
result=pool.starmap(read_df,(("Refit/fridge/house_"+str(i+1)+".csv",i+1) for i in range(21)))

In [23]:
result

[(               time  consumption
  0        1381323977           74
  1        1381323991           75
  2        1381324006           74
  3        1381324021           74
  4        1381324035           74
  ...             ...          ...
  6960003  1436529365            0
  6960004  1436529372            0
  6960005  1436529378            0
  6960006  1436529385            0
  6960007  1436529392            0
  
  [6960008 rows x 2 columns],
  1),
 None,
 None,
 (               time  consumption
  0        1381486757           48
  1        1381486772           50
  2        1381486786           51
  3        1381486801           52
  4        1381486815           53
  ...             ...          ...
  6760506  1436263030            0
  6760507  1436263037            0
  6760508  1436263044            0
  6760509  1436263051            0
  6760510  1436263058            0
  
  [6760511 rows x 2 columns],
  4),
 None,
 None,
 (               time  consumption
  0        13833432

In [24]:
refit_fridges={}
for i in range(len(result)):
    if result[i]!=None:
        refit_fridges[result[i][1]]=result[i][0]

In [26]:
pool = mp.Pool(processes = 4)
result=pool.starmap(no_data_breakpoints,(((refit_fridges[i],i,120)) for i in refit_fridges.keys()))

In [27]:
result

[([2987,
   3010,
   3030,
   9643,
   13965,
   15077,
   34138,
   36917,
   40288,
   65874,
   65875,
   81231,
   95785,
   113712,
   114696,
   126221,
   138312,
   164625,
   166581,
   167003,
   172239,
   184859,
   187949,
   191034,
   191057,
   193116,
   207239,
   208760,
   210201,
   210351,
   222435,
   222591,
   225017,
   241811,
   254449,
   262312,
   264842,
   268107,
   280519,
   291273,
   291844,
   292199,
   294368,
   294521,
   299752,
   306532,
   309097,
   309248,
   309493,
   323256,
   325640,
   359285,
   360376,
   372161,
   373559,
   375307,
   375357,
   375387,
   377153,
   390348,
   395619,
   398882,
   410204,
   419995,
   421882,
   427150,
   428392,
   429132,
   433695,
   435478,
   435777,
   436196,
   436853,
   437453,
   438716,
   448976,
   449376,
   455701,
   455906,
   459839,
   459862,
   460008,
   460581,
   468835,
   472170,
   486869,
   487090,
   487182,
   497336,
   499847,
   502051,
   508266,
   50

In [28]:
for i in range(len(result)):
    print(len(result[i][0]))

659
1229
367
323
213
106
89


In [29]:
breakpoint_refit_fridge={}
for i in range(len(result)):
    breakpoint_refit_fridge[result[i][1]]=result[i][0]

In [30]:
pool = mp.Pool(processes = 3)
result=pool.starmap(add_zeros,((refit_fridges[i],i,breakpoint_refit_fridge[i]) for i in breakpoint_refit_fridge.keys()))

In [31]:
clean_refit_fridges={}
for i in range(len(result)):
    clean_refit_fridges[result[i][1]]=result[i][0]

In [32]:
for breakpoint in breakpoint_refit_fridge[4]:
    print(clean_refit_fridges[4].iloc[breakpoint])

time           1381496286
consumption             0
Name: 1691, dtype: int64
time           1381541026
consumption             0
Name: 9625, dtype: int64
time           1381638032
consumption             0
Name: 27294, dtype: int64
time           1381758735
consumption             0
Name: 49672, dtype: int64
time           1382044827
consumption             0
Name: 102280, dtype: int64
time           1382399084
consumption             0
Name: 168346, dtype: int64
time           1382612373
consumption             0
Name: 207173, dtype: int64
time           1382828398
consumption             0
Name: 245879, dtype: int64
time           1383190247
consumption             0
Name: 311536, dtype: int64
time           1383190387
consumption             0
Name: 311537, dtype: int64
time           1383217802
consumption             0
Name: 316339, dtype: int64
time           1383233984
consumption             0
Name: 319227, dtype: int64
time           1383238413
consumption             0
Name: 

Name: 1362236, dtype: int64
time           1392061715
consumption             0
Name: 1362246, dtype: int64
time           1392068309
consumption             0
Name: 1362256, dtype: int64
time           1392069626
consumption             0
Name: 1362276, dtype: int64
time           1392070766
consumption             0
Name: 1362296, dtype: int64
time           1392071548
consumption             0
Name: 1362326, dtype: int64
time           1392071966
consumption             0
Name: 1362336, dtype: int64
time           1392072326
consumption             0
Name: 1362346, dtype: int64
time           1392077246
consumption             0
Name: 1362356, dtype: int64
time           1392098827
consumption             0
Name: 1362357, dtype: int64
time           1392106251
consumption             0
Name: 1362364, dtype: int64
time           1392128069
consumption             0
Name: 1362386, dtype: int64
time           1392135870
consumption             0
Name: 1362396, dtype: int64
time        

Name: 3567975, dtype: int64
time           1410290417
consumption             0
Name: 3568019, dtype: int64
time           1410488004
consumption             0
Name: 3593874, dtype: int64
time           1410798571
consumption             0
Name: 3643066, dtype: int64
time           1411480819
consumption             0
Name: 3751137, dtype: int64
time           1411501781
consumption             0
Name: 3754331, dtype: int64
time           1411504322
consumption             0
Name: 3754695, dtype: int64
time           1411652462
consumption             0
Name: 3778094, dtype: int64
time           1411723664
consumption             0
Name: 3789325, dtype: int64
time           1411735666
consumption             0
Name: 3791165, dtype: int64
time           1411829019
consumption             0
Name: 3806367, dtype: int64
time           1411834420
consumption             0
Name: 3806612, dtype: int64
time           1411845007
consumption             0
Name: 3808293, dtype: int64
time        

In [33]:
def df_to_csv(df,house_nb):
    df.to_csv("Refit_Cleaned/fridge/house_"+str(house_nb)+".csv",index=False)
pool = mp.Pool(processes = 3)
result=pool.starmap(df_to_csv,((clean_refit_fridges[i],i) for i in clean_refit_fridges.keys()))