In [1]:
### Required to download the current forecast data files. S

# import gdown

# url = 'https://drive.google.com/uc?id=1u5j6P2M5e97a0Olr0P3uTxbi7hiDtRdc'
# output = 'Air_Quality_Data.zip'
# gdown.download(url, output, quiet=False)

# url = 'https://drive.google.com/uc?id=1MmdGnTtZ2HXIjsu98QcMcXJZxpNoWq0E'
# output = 'Weather_Data.zip'
# gdown.download(url, output, quiet=False)

# !unzip -q Air_Quality_Data.zip
# !unzip -q Weather_Data.zip

In [2]:
import numpy as np
import pandas as pd

from lightgbm import LGBMRegressor
from sklearn.metrics import mean_squared_error

import os
import joblib
from joblib import Parallel, delayed

In [3]:

def preprocess_forecast_data(FORECAST_DATA_PATH):

  forecast_data = pd.read_csv('Zindi_PM2_5_forecast_data.csv', usecols = ['created_at','channel_id', 'pm2_5'])
  forecast_data['created_at'] = forecast_data['created_at'].apply(lambda x: x.replace(r'+03:00', ''))
  forecast_data['created_at'] = pd.to_datetime(forecast_data['created_at'], format='%Y-%m-%d %H:%M:%S')
  forecast_data['date'] = forecast_data['created_at'].dt.date
  forecast_data = forecast_data[forecast_data['channel_id'].isin(metadata['channel_id'])].reset_index(drop=True)
  channel_max_dates = forecast_data.groupby('channel_id').apply(lambda x: x['created_at'].max())


  ### Set this as a list of chanels to be used. Can be read from a file.
  use_channels = channel_max_dates[channel_max_dates > pd.to_datetime('2020-07-12')].index.tolist()
  
  
  forecast_data = forecast_data[forecast_data['channel_id'].isin(use_channels)]

  return forecast_data


def preprocess_metadata(METADATA_PATH):

  metadata = pd.read_csv('meta.csv')
  fts = [c for c in metadata if c not in ['chan_id']]
  cat_fts = metadata[fts].select_dtypes('object').columns.tolist()
  metadata[cat_fts] = metadata[cat_fts].apply(lambda x: pd.factorize(x)[0])
  metadata = metadata.rename({'chan_id': 'channel_id'}, axis=1)
  drop_fts = ['loc_ref', 'chan_ref', 'loc_mob_stat', 'airqo_name', 'district_lookup', 'county_lookup', 'coords', 'loc_start_date', 'loc_end_date', 'gmaps_link',
              'OSM_link', 'nearby_sources', 'geometry', 'geometry_43', 'event_logging_link']
  metadata = metadata.drop(drop_fts, axis=1)
  metadata.to_csv('metadata_to_use.csv', index = False)

  return metadata


def get_boundary_layer_mapper(BOUNDARY_LAYER_PATH):
  
  boundary_layer = pd.read_csv('boundary_layer.csv')
  boundary_layer_mapper = pd.Series(index = boundary_layer['hour'], data = boundary_layer['height'])  
  
  return boundary_layer_mapper

In [4]:
### These constants must be set before running

### Prediction will start from this date-hour
TEST_DATE_HOUR_START = pd.to_datetime('2020-07-11 09:00:00')

### Prediction will end at this date-hour
TEST_DATE_HOUR_END = pd.to_datetime('2020-07-12 08:00:00')

### Training will start at this date-hour
TRAIN_DATE_HOUR_START = pd.to_datetime('2019-06-01 00:00:00')

### Training will end at this date-hour
TRAIN_DATE_HOUR_END = pd.to_datetime('2020-07-11 08:00:00')


### Boolean value to indicate whether to train or only predict
TRAIN_MODEL_NOW = True
TARGET_COL = 'pm2_5'

########### PATHS #############

### These paths must be set
FORECAST_DATA_PATH = 'Zindi_PM2_5_forecast_data.csv'
METADATA_PATH = 'meta.csv'
BOUNDARY_LAYER_PATH = 'boundary_layer.csv'

### Trained Model Path. Not required if TRAIN_MODEL_NOW = True
TRAIN_MODEL_PATH = 'model.pkl'

In [5]:
#### Other Constants. Do not modify them

#### Does need to be changed for now. Indicates which channels to predict. 
CHANNELS_TO_PREDICT_PATH = 'all_channels.pkl'
N_HRS_BACK = 24 
SEQ_LEN = 24
ROLLING_SEQ_LEN = 24*90
MAX_LAGS = N_HRS_BACK + max(ROLLING_SEQ_LEN, SEQ_LEN) + 48 # Extra 48 or 2 days for safety
TEST_LAG_LAST_DATE_HOUR = TEST_DATE_HOUR_START - pd.Timedelta(hours = MAX_LAGS)

In [6]:
%%time

#### Required to preprocess some of the files

metadata = preprocess_metadata(METADATA_PATH)

#### Later preprocessed forecast data can be saved, so as to avoid preprocessing it again and again. Can add it separately to train and test pipelines to save memory
forecast_data = preprocess_forecast_data(FORECAST_DATA_PATH)

boundary_layer_mapper = get_boundary_layer_mapper(BOUNDARY_LAYER_PATH)

CPU times: user 46.3 s, sys: 3.98 s, total: 50.3 s
Wall time: 50.3 s


In [7]:
def get_agg_channel_data_train(chan_num, freq='1H'):
  '''
  Get Hourly Aggregates using Mean of the Data for training.
  '''

  chan = train_forecast_data[train_forecast_data['channel_id'] == chan_num]
  chan = chan.sort_values(by = 'created_at')[['created_at', 'pm2_5']].set_index('created_at')
  chan = chan.interpolate('time', limit_direction='both')

  chan_agg = chan.resample(freq).mean().reset_index()
  chan_agg['channel_id'] = chan_num

  return chan_agg

def get_agg_channel_data_test(chan_num, freq='1H'):
  '''
  Get Hourly Aggregates using Mean of the Data for testing.
  '''

  chan = test_forecast_data[test_forecast_data['channel_id'] == chan_num]
  chan = chan.sort_values(by = 'created_at')[['created_at', 'pm2_5']].set_index('created_at')
  chan = chan.interpolate('time', limit_direction='both')

  chan_agg = chan.resample(freq).mean().reset_index()
  chan_agg['channel_id'] = chan_num

  return chan_agg


In [8]:
def make_train():

  ### Aggregate data every 1 hour using mean
  all_channels = train_forecast_data['channel_id'].unique()
  joblib.dump(all_channels, 'all_channels.pkl')

  op = Parallel(n_jobs = -1)(delayed(get_agg_channel_data_train)(chan_num, freq='1H') for chan_num in all_channels)
  train = pd.concat(op, axis=0).reset_index(drop=True)[train_forecast_data.columns.tolist()]
  train = train.groupby('channel_id').apply(lambda x: x.set_index('created_at')['pm2_5'].interpolate('time', limit_direction = 'both')).reset_index()

  return train

In [9]:
def get_lag_features(df_tmp):

  df_tmp = df_tmp.sort_values(by='created_at')
  
  ### Shift Features
  df_tmp = df_tmp.assign(**{
      f'{TARGET_COL} (t-{t})': df_tmp.groupby('channel_id')[TARGET_COL].shift(t)
      for t in range(N_HRS_BACK + SEQ_LEN -1, N_HRS_BACK - 1, -1)
  })

  FIRST_SHIFT_COL = f'{TARGET_COL} (t-{N_HRS_BACK})'
  
  ### Rolling Features
  periods  = [4, 12, 24, 48, 24 * 7, 24 * 14, 24 * 31]

  for agg_func in ['min', 'max', 'mean', 'std']:
     df_tmp = df_tmp.assign(**{
      f'rolling_{agg_func}_{period}_hrs': df_tmp.groupby('channel_id')[FIRST_SHIFT_COL].transform(lambda x: x.rolling(period, min_periods=1).agg(agg_func))
      for period in periods
    })

  df_tmp = df_tmp.assign(**{
      f'rolling_range_{period}_hrs': df_tmp[f'rolling_max_{period}_hrs'] - df_tmp[f'rolling_min_{period}_hrs']
      for period in periods
  })

  df_tmp = df_tmp.sort_index()

  return df_tmp

def get_other_features(df_tmp):
  
  D_COL = 'created_at'
  for attr in ['hour', 'day', 'dayofweek', 'month', 'is_month_start', 'is_month_end', 'week', 'year']:
      df_tmp[f'{D_COL}_{attr}'] = getattr(df_tmp[D_COL].dt, attr)

  df_tmp['boundary_layer_height'] = df_tmp['created_at_hour'].map(boundary_layer_mapper)
  df_tmp['chan_id'] = df_tmp['channel_id']
  df_tmp = pd.get_dummies(df_tmp, columns=['chan_id'])

  df_tmp = pd.merge(df_tmp, metadata, on = 'channel_id', how = 'left')
  return df_tmp

def preprocess_df(df_tmp, is_test = False):

  df_tmp = get_lag_features(df_tmp)
  df_tmp = get_other_features(df_tmp)

  return df_tmp

In [10]:
def train_model(train):

  features = [c for c in train.columns if c not in ["created_at",  "pm2_5", "channel_id"]]
  TARGET_COL = "pm2_5"

  trn = train.groupby('channel_id').apply(lambda x: x[:-24*7]).reset_index(drop=True)
  val = train.groupby('channel_id').apply(lambda x: x[-24*7:]).reset_index(drop=True)
  y_trn, y_val  = trn[TARGET_COL], val[TARGET_COL]

  clf = LGBMRegressor(n_estimators=5000, learning_rate=0.05, colsample_bytree=0.4, reg_alpha=0, reg_lambda=1, max_depth=-1, random_state=1)
  clf.fit(trn[features], y_trn, eval_set = [(val[features], y_val)], verbose=50, early_stopping_rounds=150, eval_metric='rmse')

  val_preds = clf.predict(val[features])
  rmse_val = mean_squared_error(val[TARGET_COL], val_preds) ** 0.5
  print(f'Validation RMSE is {rmse_val}')

  best_iter = clf.best_iteration_
  clf = LGBMRegressor(n_estimators=best_iter, learning_rate=0.05, colsample_bytree=0.4, reg_alpha=2, reg_lambda=1, max_depth=-1, random_state=1)
  clf.fit(train[features], train[TARGET_COL])

  return clf

In [11]:
%%time

if TRAIN_MODEL_NOW == True:
  train_forecast_data = forecast_data[(forecast_data['created_at'] >= TRAIN_DATE_HOUR_START) & (forecast_data['created_at'] <= TRAIN_DATE_HOUR_END)].drop('date', axis=1)
  train = make_train()
  train = preprocess_df(train, is_test = False)
  clf = train_model(train)

  joblib.dump(clf, 'model.pkl')

Training until validation scores don't improve for 150 rounds.
[50]	valid_0's l2: 623.035	valid_0's rmse: 24.9607
[100]	valid_0's l2: 594.518	valid_0's rmse: 24.3827
[150]	valid_0's l2: 586.664	valid_0's rmse: 24.2211
[200]	valid_0's l2: 581.809	valid_0's rmse: 24.1207
[250]	valid_0's l2: 581.896	valid_0's rmse: 24.1225
[300]	valid_0's l2: 579.714	valid_0's rmse: 24.0773
[350]	valid_0's l2: 579.396	valid_0's rmse: 24.0707
[400]	valid_0's l2: 578.313	valid_0's rmse: 24.0481
[450]	valid_0's l2: 578.045	valid_0's rmse: 24.0426
[500]	valid_0's l2: 576.035	valid_0's rmse: 24.0007
[550]	valid_0's l2: 576.352	valid_0's rmse: 24.0073
[600]	valid_0's l2: 574.774	valid_0's rmse: 23.9744
[650]	valid_0's l2: 574.715	valid_0's rmse: 23.9732
[700]	valid_0's l2: 574.621	valid_0's rmse: 23.9712
[750]	valid_0's l2: 575.023	valid_0's rmse: 23.9796
[800]	valid_0's l2: 575.054	valid_0's rmse: 23.9803
Early stopping, best iteration is:
[695]	valid_0's l2: 574.048	valid_0's rmse: 23.9593
Validation RMSE is 

In [12]:
def get_next_24hrs_predictions():

  clf = joblib.load(TRAIN_MODEL_PATH)
  all_channels = joblib.load('all_channels.pkl')
  op = Parallel(n_jobs = -1)(delayed(get_agg_channel_data_test)(chan_num, freq='1H') for chan_num in all_channels)
  test = pd.concat(op, axis=0).reset_index(drop=True)[test_forecast_data.columns.tolist()]
  
  test = test.groupby('channel_id').apply(lambda x: x.set_index('created_at')['pm2_5'].interpolate('time', limit_direction = 'both')).reset_index()

  test = preprocess_df(test, is_test = True)
  test = test[(test['created_at'] >= TEST_DATE_HOUR_START) & (test['created_at'] <= TEST_DATE_HOUR_END) ]

  test_orig = test[["created_at",  "pm2_5", "channel_id"]].copy()

  features = [c for c in train.columns if c not in ["created_at",  "pm2_5", "channel_id"]]
  test_preds = clf.predict(test[features])
  
  test_orig['preds'] = test_preds

  return test_orig

def get_predictions_for_channel(next_24_hrs, chan_num):
  return next_24_hrs[next_24_hrs['channel_id'] == chan_num]

In [13]:
for f in [TRAIN_MODEL_PATH, CHANNELS_TO_PREDICT_PATH]:
  if not os.path.isfile(f):
    raise Exception(f'{f} not present. Please recheck the file path.')

In [14]:
### Predicting Next 24 hours for channels

test_forecast_data = forecast_data[(forecast_data['created_at'] >= TEST_LAG_LAST_DATE_HOUR) & (forecast_data['created_at'] <= TEST_DATE_HOUR_END + pd.Timedelta(days=2))].drop('date', axis=1)
next_24hrs_predictions = get_next_24hrs_predictions()
print(mean_squared_error(next_24hrs_predictions[TARGET_COL], next_24hrs_predictions['preds']) ** 0.5)
get_predictions_for_channel(next_24hrs_predictions, 672528)

23.66220148727405


Unnamed: 0,created_at,pm2_5,channel_id,preds
2232,2020-07-11 09:00:00,45.66,672528,54.395179
2233,2020-07-11 10:00:00,43.77875,672528,48.286154
2234,2020-07-11 11:00:00,43.269535,672528,45.62267
2235,2020-07-11 12:00:00,58.042791,672528,48.065547
2236,2020-07-11 13:00:00,63.115349,672528,47.478797
2237,2020-07-11 14:00:00,55.516579,672528,47.888279
2238,2020-07-11 15:00:00,49.266818,672528,47.285235
2239,2020-07-11 16:00:00,45.071395,672528,47.925711
2240,2020-07-11 17:00:00,45.014545,672528,52.060151
2241,2020-07-11 18:00:00,58.552326,672528,62.153624
