Install dask and its dependencies using the following commands.

In [1]:
#!pip3 install modin[dask]
#!pip3 install 'fsspec>=0.3.3'
#!pip3 install gcsfs --upgrade

In [1]:
#import modin.pandas as pd
import dask.dataframe as dd
import pandas as pd
import numpy as np

from sklearn.preprocessing import LabelEncoder

In [3]:
#import tensorflow

In [2]:
%%time

building_metadata = dd.read_csv("gs://123test_bucket/building_metadata.csv")
sample_submission = dd.read_csv("gs://123test_bucket/sample_submission.csv")
test = dd.read_csv("gs://123test_bucket/test.csv")
train = dd.read_csv("gs://123test_bucket/train.csv")
weather_test = dd.read_csv("gs://123test_bucket/weather_test.csv")
weather_train = dd.read_csv("gs://123test_bucket/weather_train.csv")

CPU times: user 320 ms, sys: 76 ms, total: 396 ms
Wall time: 2.5 s


In [5]:
train = train.repartition(npartitions=20)

In [6]:
building_metadata.head(2)

Unnamed: 0,site_id,building_id,primary_use,square_feet,year_built,floor_count
0,0,0,Education,7432,2008.0,
1,0,1,Education,2720,2004.0,


In [7]:
sample_submission.head(2)

Unnamed: 0,row_id,meter_reading
0,0,0
1,1,0


In [8]:
test.head(2)

Unnamed: 0,row_id,building_id,meter,timestamp
0,0,0,0,2017-01-01 00:00:00
1,1,1,0,2017-01-01 00:00:00


In [9]:
train.head(2)

Unnamed: 0,building_id,meter,timestamp,meter_reading
0,0,0,2016-01-01 00:00:00,0.0
1,1,0,2016-01-01 00:00:00,0.0


In [27]:
# Reducing memory

## Function to reduce the DF size
def reduce_mem_usage(df, verbose=True):
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage().sum().compute() / 1024**2    
    for col in df.columns:
        col_type = df[col].dtype
        if col_type in numerics:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)    
    end_mem = df.memory_usage().sum().compute() / 1024**2
    if verbose: print('Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction)'.format(end_mem, 100 * (start_mem - end_mem) / start_mem))
    return df

In [28]:
train = reduce_mem_usage(train) #no change in memory type when using dask

Mem. usage decreased to 231.36 Mb (0.0% reduction)


In [None]:
test = reduce_mem_usage(test)
weather_train = reduce_mem_usage(weather_train)
weather_test = reduce_mem_usage(weather_test)
building_metadata = reduce_mem_usage(building_metadata)

In [9]:
print(building_metadata.head(2))
print(test.head(2))
print(train.head(2))
print(weather_test.head(2))
print(weather_train.head(2))

   site_id  building_id primary_use  square_feet  year_built  floor_count
0        0            0   Education         7432      2008.0          NaN
1        0            1   Education         2720      2004.0          NaN
   row_id  building_id  meter            timestamp
0       0            0      0  2017-01-01 00:00:00
1       1            1      0  2017-01-01 00:00:00
   building_id  meter            timestamp  meter_reading
0            0      0  2016-01-01 00:00:00            0.0
1            1      0  2016-01-01 00:00:00            0.0
   site_id            timestamp  air_temperature  cloud_coverage  \
0        0  2017-01-01 00:00:00             17.8             4.0   
1        0  2017-01-01 01:00:00             17.8             2.0   

   dew_temperature  precip_depth_1_hr  sea_level_pressure  wind_direction  \
0             11.7                NaN              1021.4           100.0   
1             12.8                0.0              1022.0           130.0   

   wind_speed 

In [10]:
weather_train = weather_train.groupby('site_id').apply(lambda group: group.interpolate(limit_direction='both'), meta = weather_train)
weather_test = weather_test.groupby('site_id').apply(lambda group: group.interpolate(limit_direction='both'), meta = weather_test)

In [11]:
weather_train.head(2)

Unnamed: 0,site_id,timestamp,air_temperature,cloud_coverage,dew_temperature,precip_depth_1_hr,sea_level_pressure,wind_direction,wind_speed
0,0,2016-01-01 00:00:00,25.0,6.0,20.0,-1.0,1019.7,0.0,0.0
1,0,2016-01-01 01:00:00,24.4,4.0,21.1,-1.0,1020.2,70.0,1.5


In [58]:
train_sub = train[train.building_id.isin(np.arange(0,10))].compute()
# Sorting
train_sub = train_sub.sort_values(by = ['building_id', 'meter', 'timestamp'])

In [59]:
len(train_sub)

102372

In [62]:
check = train_sub.groupby(['building_id', 'meter', 'timestamp' ])['meter_reading'].count()

In [76]:
lag = 24*7
all_cols = set(train_sub.columns)
cols = list(set(all_cols) - set(['timestamp']))

def ts_lag(df, cols=cols,  group =['building_id', 'meter'], lag = lag):
    for i in range(0, lag):
        num = i+1
        cols_shift = {x:x + str(num) for x in cols}
        sub = df[cols]
        sub_shift = sub.groupby(group).shift(num).rename(cols_shift, axis=1)
        check = sub_shift.index.value_counts()
        if i == 1:
            return sub_shift
            print(i)
            print(check[check >1])
        df = df.merge(sub_shift, 
                      left_on = df.index,
                      right_on = sub_shift.index).drop('key_0', axis = 1)  
        #print(len(df))
    return df
    

In [35]:
def pd_shift(df, cols, period):
    #df = df.reset_index()
    #ind = df['index']
    df = df.drop(cols, axis = 1)

    epmty_df = pd.DataFrame(columns = df.columns)
    epmty_df = epmty_df.append([pd.Series()]*period, ignore_index=True)

    period = -1*period
    
    epmty_df = epmty_df.append(df.head(period))
    #epmty_df = epmty_df.set_index(ind)
    
    return epmty_df

In [16]:
lag = 24*7
all_cols = set(train_sub.columns)
cols = list(set(all_cols) - set(['timestamp']))

def ts_lag2(df, cols=cols,  group =['building_id', 'meter'], lag = lag):
    for i in range(0, lag):
        num = i+1
        cols_shift = {x:x + str(num) for x in cols}
        sub = df[cols]
        ind = df.index
        sub_shift = sub.groupby(group).apply(pd_shift, cols = group,  period = num).rename(cols_shift, axis=1)
        #sub_shift.index = sub_shift.index.get_level_values(-1)
        #print(sub_shift.head())
        sub_shift.index = ind
        df = df.merge(sub_shift, 
                      left_on = df.index,
                      right_on = sub_shift.index).drop('key_0', axis = 1)
        #if i%10 == 0:
        #    print(len(df))
        
    return df
    

In [17]:
#result = pd_shift(train, 1)
#len(result) == len(train)

There is an error that one gets when using the pd_shift function on dask dataframes.
See here for some help maybe: 
https://stackoverflow.com/questions/51212688/how-to-use-all-the-cpu-cores-using-dask

In [77]:
%%time
train_sub_lag = ts_lag(train_sub)

CPU times: user 88 ms, sys: 4 ms, total: 92 ms
Wall time: 89.2 ms


In [85]:
train_sub_lag#.groupby(train_sub_lag.index)['meter_reading2'].count()

Unnamed: 0,meter_reading2
0,
1,
2,0.0
3,0.0
4,0.0
...,...
107497,0.0
107498,0.0
107499,0.0
107500,0.0


In [36]:
%%time
train_sub_lag2 = ts_lag2(train_sub)

CPU times: user 45.9 s, sys: 10.9 s, total: 56.8 s
Wall time: 56.4 s


In [37]:
len(train_sub_lag) == len(train_sub_lag2)

True

In [30]:
#train_sub_lag.dropna()

In [43]:
(train_sub_lag.columns == train_sub_lag.columns).all()

True

In [57]:
train_sub_lag2[train_sub_lag2['building_id'] == 9].tail(20)

Unnamed: 0,building_id,meter,timestamp,meter_reading,meter_reading1,meter_reading2,meter_reading3,meter_reading4,meter_reading5,meter_reading6,...,meter_reading159,meter_reading160,meter_reading161,meter_reading162,meter_reading163,meter_reading164,meter_reading165,meter_reading166,meter_reading167,meter_reading168
107482,9,1,2016-12-31 04:00:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
107483,9,1,2016-12-31 05:00:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
107484,9,1,2016-12-31 06:00:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
107485,9,1,2016-12-31 07:00:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
107486,9,1,2016-12-31 08:00:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
107487,9,1,2016-12-31 09:00:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
107488,9,1,2016-12-31 10:00:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
107489,9,1,2016-12-31 11:00:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
107490,9,1,2016-12-31 12:00:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
107491,9,1,2016-12-31 13:00:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [53]:
train_sub_lag.loc[train_sub_lag['meter_reading168'] != train_sub_lag2['meter_reading168'],
                  ['building_id', 'meter', 'timestamp', 'meter_reading168']]

Unnamed: 0,building_id,meter,timestamp,meter_reading168
0,0,0,2016-01-01 00:00:00,
1,0,0,2016-01-01 01:00:00,
2,0,0,2016-01-01 02:00:00,
3,0,0,2016-01-01 03:00:00,
4,0,0,2016-01-01 04:00:00,
...,...,...,...,...
100535,9,0,2016-12-31 19:00:00,57.3351
100536,9,0,2016-12-31 20:00:00,57.7446
100537,9,0,2016-12-31 21:00:00,57.3351
100538,9,0,2016-12-31 22:00:00,57.3351


In [50]:
train_sub_lag2.loc[train_sub_lag['meter_reading168'] != train_sub_lag2['meter_reading168'],
                  ['building_id', 'meter', 'timestamp', 'meter_reading168']]

Unnamed: 0,building_id,meter,timestamp,meter_reading168
0,0,0,2016-01-01 00:00:00,
1,0,0,2016-01-01 01:00:00,
2,0,0,2016-01-01 02:00:00,
3,0,0,2016-01-01 03:00:00,
4,0,0,2016-01-01 04:00:00,
...,...,...,...,...
100535,9,0,2016-12-31 19:00:00,42.2104
100536,9,0,2016-12-31 20:00:00,0.0000
100537,9,0,2016-12-31 21:00:00,0.0000
100538,9,0,2016-12-31 22:00:00,126.6310


In [44]:
(train_sub_lag.fillna(-999) == train_sub_lag2.fillna(-999)).all()

building_id          True
meter                True
timestamp            True
meter_reading        True
meter_reading1       True
                    ...  
meter_reading164    False
meter_reading165    False
meter_reading166    False
meter_reading167    False
meter_reading168    False
Length: 172, dtype: bool

#### To-do: investigate where the values are not matching in the two dataframes.

In [241]:
train_sub_lag.head(2)

Unnamed: 0,building_id,meter,timestamp,meter_reading,meter_reading1,meter_reading2,meter_reading3,meter_reading4,meter_reading5,meter_reading6,...,meter_reading159,meter_reading160,meter_reading161,meter_reading162,meter_reading163,meter_reading164,meter_reading165,meter_reading166,meter_reading167,meter_reading168
0,0,0,2016-01-01 00:00:00,0.0,,,,,,,...,,,,,,,,,,
1,0,0,2016-01-01 01:00:00,0.0,0.0,,,,,,...,,,,,,,,,,


In [242]:
train_sub_lag2.head(2)

Unnamed: 0,building_id,meter,timestamp,meter_reading,meter_reading1,meter_reading2,meter_reading3,meter_reading4,meter_reading5,meter_reading6,...,meter_reading159,meter_reading160,meter_reading161,meter_reading162,meter_reading163,meter_reading164,meter_reading165,meter_reading166,meter_reading167,meter_reading168
0,0,0,2016-01-01 00:00:00,0.0,,,,,,,...,,,,,,,,,,
1,0,0,2016-01-01 01:00:00,0.0,0.0,,,,,,...,,,,,,,,,,


In [164]:
train_sub.head()

Unnamed: 0,building_id,meter,timestamp,meter_reading
0,0,0,2016-01-01 00:00:00,0.0
2301,0,0,2016-01-01 01:00:00,0.0
4594,0,0,2016-01-01 02:00:00,0.0
6893,0,0,2016-01-01 03:00:00,0.0
9189,0,0,2016-01-01 04:00:00,0.0


In [43]:
#In place of sorting
train = train.set_index('timestamp')

In [None]:
train.groupby(['building_id', 'meter'])['meter_reading'].tail(-1)

In [20]:
%%time
train_lag = ts_lag(train)

AttributeError: 'Column not found: shift'