Preparation for model training

In [14]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, FeatureUnion
import joblib

Encoders

In [15]:
class CustomWindDirectionEncoder(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.directional_strings = ['S', 'SSW', 'SW', 'SSE', 'WNW', 'NNW', 'WSW', 'NW', 'N', 'NE', 'ENE', 'E', 'ESE', 'SE', 'NNE', 'W']
    
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)
        
        X = X.copy()
        X['wind_direction'] = X['wind_direction'].apply(lambda x: x if x in self.directional_strings else 'OTHER')
        
        X_encoded = pd.get_dummies(X, columns=['wind_direction'], prefix='', prefix_sep='')
        
        for col in self.directional_strings:
            if col not in X_encoded.columns:
                X_encoded[col] = 0
        
        X_encoded = X_encoded[self.directional_strings]
        X_encoded = X_encoded.astype(int)
        
        return X_encoded

In [16]:
class CustomWeatherIdEncoder(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.weather_ids = [804, 500, 741, 803, 801, 800, 200, 501, 721, 300, 211, 502, 711, 212, 701, 600, 616, 612, 511, 601, 602, 301]
    
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)
        
        X = X.copy()
        X_encoded = pd.get_dummies(X, columns=['weather_id'], prefix='', prefix_sep='')
        
        for weather_id in self.weather_ids:
            if str(weather_id) not in X_encoded.columns:
                X_encoded[str(weather_id)] = 0
        
        X_encoded = X_encoded[[str(weather_id) for weather_id in self.weather_ids]]
        X_encoded = X_encoded.astype(int)
        
        return X_encoded

In [17]:
class WindDegToDirectionEncoder(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.directional_strings = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW']
    
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        def wind_deg_to_direction(deg):
            if deg < 0 or deg > 360:
                return 'MISSING'
            idx = int((deg + 11.25) / 22.5) % 16
            return self.directional_strings[idx]
        
        X['wind_direction'] = X['wind_deg'].apply(wind_deg_to_direction)
        return X[['wind_direction']]

Pipeline definition

In [18]:
class CascadedModel(BaseEstimator):
    def __init__(self, models):
        self.models = models

    def predict(self, X):
        results = {}
        for grid_code in self.models:
            taxi_model = self.models[grid_code]['taxi_model']
            aqi_model = self.models[grid_code]['aqi_model']

            X_taxi = X.copy()
            X_taxi_encoded = self._preprocess(X_taxi, include_taxi_density=False)
            
            predicted_taxi_density = taxi_model.predict(X_taxi_encoded)
            X_aqi = X.copy()
            X_aqi['taxi_density'] = predicted_taxi_density
            X_aqi_encoded = self._preprocess(X_aqi, include_taxi_density=True)

            predicted_aqi = aqi_model.predict(X_aqi_encoded)
            results[grid_code] = predicted_aqi[0]

        return results

    def _preprocess(self, X, include_taxi_density=False):
        X = X.copy()
        wind_direction = WindDegToDirectionEncoder().fit_transform(X)
        wind_direction_encoded = CustomWindDirectionEncoder().fit_transform(wind_direction)
        weather_id_encoded = CustomWeatherIdEncoder().fit_transform(X[['weather_id']])

        X_encoded = pd.concat([X.drop(columns=['wind_deg', 'wind_direction', 'weather_id']).reset_index(drop=True),
                               wind_direction_encoded.reset_index(drop=True),
                               weather_id_encoded.reset_index(drop=True)], axis=1)
        
        if include_taxi_density:
            X_encoded = X_encoded[['time_stamp', 'taxi_density', 'humidity', 'wind_direction', 'temp', 'wind_speed', 'wind_gust', 'pressure', 'weather_id']]
        else:
            X_encoded = X_encoded[['time_stamp', 'humidity', 'wind_direction', 'temp', 'wind_speed', 'wind_gust', 'pressure', 'weather_id']]
        
        return X_encoded

Model training

In [19]:
final_grid_dataset = pd.read_csv('final_grid_dataset_final.csv')

taxi_model_inputs = ['time_stamp', 'humidity', 'wind_direction', 'temp', 'wind_speed', 'wind_gust', 'pressure', 'weather_id']
taxi_model_output = 'taxi_density'
aqi_model_inputs = ['time_stamp', 'taxi_density', 'humidity', 'wind_direction', 'temp', 'wind_speed', 'wind_gust', 'pressure', 'weather_id']
aqi_model_output = 'pm2.5_aqi'

wind_direction_encoder = CustomWindDirectionEncoder()
weather_id_encoder = CustomWeatherIdEncoder()

rf_taxi_models = {}
rf_aqi_models = {}
cascaded_models = {}

num_of_grids = final_grid_dataset['grid_code'].unique().size
count = 0

for grid_code in final_grid_dataset['grid_code'].unique():
    data = final_grid_dataset[final_grid_dataset['grid_code'] == grid_code]

    X_taxi = data[taxi_model_inputs]
    y_taxi = data[taxi_model_output]
    X_aqi = data[aqi_model_inputs]
    y_aqi = data[aqi_model_output]

    wind_direction_encoded = wind_direction_encoder.fit_transform(X_taxi[['wind_direction']])
    weather_id_encoded = weather_id_encoder.fit_transform(X_taxi[['weather_id']])

    X_taxi_encoded = pd.concat([X_taxi.drop(columns=['wind_direction', 'weather_id']).reset_index(drop=True), 
                                wind_direction_encoded.reset_index(drop=True), 
                                weather_id_encoded.reset_index(drop=True)], axis=1)
    X_aqi_encoded = pd.concat([X_aqi.drop(columns=['wind_direction', 'weather_id']).reset_index(drop=True), 
                               wind_direction_encoded.reset_index(drop=True), 
                               weather_id_encoded.reset_index(drop=True)], axis=1)

    taxi_model = RandomForestRegressor(random_state=42)
    taxi_model.fit(X_taxi_encoded, y_taxi)
    rf_taxi_models[grid_code] = taxi_model

    aqi_model = RandomForestRegressor(random_state=42)
    aqi_model.fit(X_aqi_encoded, y_aqi)
    rf_aqi_models[grid_code] = aqi_model

    cascaded_models[grid_code] = {
        'taxi_model': taxi_model,
        'aqi_model': aqi_model
    }
    
    count += 1
    print(f'grid_code: {grid_code}, {count} / {num_of_grids}.')

joblib.dump(cascaded_models, 'cascaded_models.pkl')

grid_code: 0@7, 1 / 358.
grid_code: 0@8, 2 / 358.
grid_code: 0@9, 3 / 358.
grid_code: 1@9, 4 / 358.
grid_code: 3@6, 5 / 358.
grid_code: 3@7, 6 / 358.
grid_code: 4@6, 7 / 358.
grid_code: 4@7, 8 / 358.
grid_code: 4@8, 9 / 358.
grid_code: 4@10, 10 / 358.
grid_code: 4@11, 11 / 358.
grid_code: 5@6, 12 / 358.
grid_code: 5@7, 13 / 358.
grid_code: 5@8, 14 / 358.
grid_code: 5@9, 15 / 358.
grid_code: 5@10, 16 / 358.
grid_code: 5@11, 17 / 358.
grid_code: 5@12, 18 / 358.
grid_code: 5@13, 19 / 358.
grid_code: 5@14, 20 / 358.
grid_code: 5@15, 21 / 358.
grid_code: 5@16, 22 / 358.
grid_code: 6@6, 23 / 358.
grid_code: 6@7, 24 / 358.
grid_code: 6@9, 25 / 358.
grid_code: 6@10, 26 / 358.
grid_code: 6@11, 27 / 358.
grid_code: 6@12, 28 / 358.
grid_code: 6@13, 29 / 358.
grid_code: 6@14, 30 / 358.
grid_code: 6@15, 31 / 358.
grid_code: 6@16, 32 / 358.
grid_code: 6@17, 33 / 358.
grid_code: 6@18, 34 / 358.
grid_code: 6@19, 35 / 358.
grid_code: 6@20, 36 / 358.
grid_code: 7@6, 37 / 358.
grid_code: 7@7, 38 / 358.
g

MemoryError: could not allocate 1048576 bytes

In [None]:
cascaded_model = CascadedModel(models=cascaded_models)
joblib.dump(cascaded_model, 'aqi_model.pkl')

Test model

In [None]:
aqi_model = joblib.load('aqi_model.pkl')

input_data = {
    'time_stamp': 1680310800,
    'humidity': 62,
    'wind_deg': 281,
    'temp': 287.59444444444443,
    'wind_speed': 4.4704,
    'wind_gust': 0.0,
    'pressure': 1009.482859,
    'weather_id': 804
}

prediction = aqi_model.predict(pd.DataFrame([input_data]))
prediction