In [36]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [35]:
import json
import pandas as pd
import numpy as np
from datetime import datetime
from pathlib import Path

# anomaly detection
from sklearn.ensemble import IsolationForest

# settings
pd.options.plotting.backend = "plotly"

In [43]:
import snowflake.connector
import pandas as pd
from snowflake.connector.pandas_tools import write_pandas

prediction_table = "4_FEATURIZ_DRIFTOPTIMERING_PREDICT_TEST"
training_table = "4_FEATURIZ_DRIFTOPTIMERING_TRAINING_TEST"
output_table = "1_RAW_DRIFTOPTIMERINGSMODEL_TEST"

conn = snowflake.connector.connect(
        user='GOVTECH_ROBOT_USER',
        password='Y^?:mZ"s|dD%c&8G3cun',
        account='iu89556.west-europe.azure',    
        role="GOVTECH_DB_RW",
        warehouse='GOVTECH_ANALYST_WH',
        database='GOVTECH_DB',
        schema='RAW'
    )

In [2]:
def get_snowflake_data(conn, table):

    cursor = conn.cursor()

    cursor.execute(f'Select * from "RAW"."{table}";')

    # convert the tuple of tuples to a Pandas DataFrame
    df = pd.DataFrame(cursor.fetchall(), columns=['ID', 'Kommune', 'Skole', 'Date', 'Time', 'Dayname',
                      'Tidspunkt_Type', 'Type', 'Navn', 'CO2', 'Temp', 'Motion', 'IAQ', 'Booket', 'Skemalagt'])

    return df

In [44]:
dataf = get_snowflake_data(conn, "4_FEATURIZ_DRIFTOPTIMERING_TRAINING_TEST")
# capitalize column names
dataf.columns = dataf.columns.str.upper()

In [45]:
dataf.sample().to_dict(orient='records')

[{'ID': '03.S.06',
  'KOMMUNE': 'Aarhus',
  'SKOLE': 'Strandskolen',
  'DATE': datetime.date(2022, 12, 28),
  'TIME': datetime.time(5, 30),
  'DAYNAME': 'Onsdag',
  'TIDSPUNKT_TYPE': 'Skole',
  'TYPE': 'Skole-ferie',
  'NAVN': 'Juleferie',
  'CO2': 414.0,
  'TEMP': 14.0,
  'MOTION': nan,
  'IAQ': 0.02,
  'BOOKET': nan,
  'SKEMALAGT': nan}]

In [46]:
types = ['Normal Dag', 'Skole-ferie', 'Helligdag', 'Mærkedage']
# map the types to numbers
dataf.TYPE.map({types[i]: i for i in range(len(types))})

array(['Normal Dag', 'Skole-ferie', 'Helligdag', 'Mærkedage'],
      dtype=object)

### IMPORT

In [23]:
class DataLoader():

    SENSOR_COLUMNS = ["CO2", "TEMP", "MOTION", "IAQ", "SKEMALAGT", "BOOKET"]

    @classmethod
    def diagnose(cls, df, col, dfunc="unique", **kwargs):
        print(getattr(df[col], dfunc)(**kwargs))
        return df

    @classmethod
    def fill_na(cls, df, cols, values, types):
        return df.assign(
            **{col: df[col].fillna(value).astype(_type) for col, value, _type in zip(cols, values, types)}
            # .fillna(value).astype(_type
            # fillna(method="ffill", limit=2)
        )

    @classmethod
    def display_missing_values(cls, df, print_nans):
        if print_nans:
            for i, munc in df.groupby('KOMMUNE'):
                print(f"Missing values for {i}")
                display(
                    
                    munc
                    # filter between the first and last timeslot of activity
                    .sort_values("DATETIME", ascending=True)
                    [lambda d: d["DATETIME"].between(
                            *(
                                d.
                                dropna(
                                    subset=cls.SENSOR_COLUMNS,
                                    how="all"
                                )
                                ["DATETIME"]
                                .iloc[[0, -1]]
                            ),
                            inclusive="both"
                        )
                    ]
                    
                    .groupby('ID')
                    [cls.SENSOR_COLUMNS]
                    .apply(lambda x: x.isnull().sum()/len(x))
                    .style.format(precision=2)
                    .background_gradient(cmap='Reds', axis=0, vmin=0, vmax=1)
                )
        return df

    @classmethod 
    def merge_dt(cls, df, date, time, name, sep=" "):
        return df.assign(**{name: lambda d: pd.to_datetime(d[date].astype(str) + sep + d[time].astype(str))})

    @classmethod
    def drop_cols(cls, df, cols):
        return df.drop(columns=cols)
    
    @classmethod
    def full_process(cls, df, print_nans, **kwargs):
        
        return (
            df
            # .pipe(cls.drop_cols, cols=["KOMMUNE_DATO_LOKALE_TIME"])
            .pipe(cls.diagnose, col="KOMMUNE", dfunc="unique")
            .pipe(cls.merge_dt, date="DATE", time="TIME", name="DATETIME")
            .pipe(cls.display_missing_values, print_nans)
            .pipe(cls.fill_na, 
                cols=cls.SENSOR_COLUMNS[:-1],
                values=[487, 20.0, 0.0, .03, 0.0],
                types=[float, float, float, float, float]
            )
        )

    @classmethod
    def _load(cls, path):
        return pd.read_csv(path)
    
    @classmethod
    def load(cls, path = "data/Skemaer.csv", steps : dict = {}, print_nans = True, df = None, **kwargs):
        if isinstance(df, pd.DataFrame):
            dataf = df
        elif path:
            dataf = cls._load(path)
        else:
            raise ValueError("No data source specified")
        

        if "all" in steps:
            return cls.full_process(dataf, print_nans, **kwargs)

        for func, func_kwargs in steps.items():
            dataf = getattr(cls, func)(dataf, **func_kwargs)
            
        return dataf

### Pipelines

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np

class RoomStdMean(BaseEstimator, TransformerMixin):
    def __init__(self, high_value_threshold=2000):
        self.high_value_threshold = high_value_threshold
    
    def fit(self, X, y=None):
        filtered_X = X[(X['CO2'] < self.high_value_threshold) & (X['CO2'].notna())]
        self.room_stats = filtered_X.groupby('ID')['CO2'].agg(['mean', 'std']).to_dict()
        return self
    
    def transform(self, X):
        X['CO2_Z_SCORE'] = X.apply(
            lambda row: (row['CO2'] - self.room_stats['mean'].get(row['ID'], np.nan)) / self.room_stats['std'].get(row['ID'], np.nan)
            if pd.notna(row['CO2']) else np.nan, axis=1
        )
        return X
    

class StatelessFeatures(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X['CO2_ROLLING_MEAN'] = X['CO2'].rolling(window=4).mean()
        X['TEMP_ROLLING_MEAN'] = X['TEMP'].rolling(window=4).mean()
        X['CO2/TEMP'] = X['CO2'] / X['TEMP']
        X['MINUTES'] = X['DATETIME'].dt.hour * 60 + X['DATETIME'].dt.minute
        X['DOW'] = X['DATETIME'].dt.dayofweek
        X['BOOKET'] = X['BOOKET'].fillna(0.0)
        X['Motion_Available'] = ~X['MOTION'].isna()
        X['Motion_or_CO2'] = X['MOTION'].fillna(X['CO2'])
        return X

In [None]:
from sklearn.pipeline import Pipeline

pipeline = Pipeline([
    ('stateless_features', StatelessFeatures()),
    ('room_std_mean', RoomStdMean(high_value_threshold=2000))
])


### PREPROCESSING

In [48]:
def filter_values(df, col, values):
    return df[lambda d: d[col].isin(values)]


def drop_inactive_ranges(df, range_length=5):
    # Identify intervals of 5+ rows of identical values
    to_drop = (
        df.groupby("ID")
        ["CO2"].transform(
            lambda x: x.rolling(range_length).apply(
                lambda d: d.nunique() == 1
            )
        )
    )
    cdataf = df.drop(index=to_drop[lambda d: d.eq(1.0)].index)
    print(f"Turns {df.shape[0]} rows into {cdataf.shape[0]} rows - Dropping {(df.shape[0] - cdataf.shape[0])/1000}K rows")
    return cdataf

def add_date_range_group(grp):
    grp['DATE_RANGE_GROUP'] = grp['DATETIME'].transform(lambda x: (x.diff().dt.total_seconds()/ 60).ne(15).cumsum())
    return grp

def acceleration_features(df):

    return (
        df.sort_values("DATETIME").groupby("ID").apply(add_date_range_group)
        .reset_index(drop=True)
        .assign(
            
            CO2_ACC=lambda d: d.groupby(["ID", "DATE_RANGE_GROUP"])["CO2"].ffill().pct_change().fillna(0).replace(np.inf, 0),
            TEMP_ACC=lambda d: d.groupby(["ID", "DATE_RANGE_GROUP"])["TEMP"].ffill().pct_change().fillna(0).replace(np.inf, 0),
            MOTION_ACC=lambda d: d.groupby(["ID", "DATE_RANGE_GROUP"])["MOTION"].ffill().pct_change().fillna(0).replace(np.inf, 0),
            IAQ_ACC=lambda d: d.groupby(["ID", "DATE_RANGE_GROUP"])["IAQ"].ffill().pct_change().fillna(0).replace(np.inf, 0),
        )
    )


def preprocess_for_modelling(df):
    return (
        df
        .drop(columns=[
            "DATE",
            "TIDSPUNKT_TYPE",
            "TYPE",
            "DATE_RANGE_GROUP",
            "DAYNAME",
            "TIME",
            "SKOLE",
            "KOMMUNE",
            "NAVN"
            ]
        )
    )

def remove_outliers(df, columns):
    if 'CO2' in columns:
        df = df[(df['CO2'] >= 100) & (df['CO2'] <= 6000)]
        
    if 'TEMP' in columns:
        df = df[(df['TEMP'] >= 1) & (df['TEMP'] <= 50)]
        
    return df

def engineer_features(df):
    df['CO2_ROLLING_MEAN'] = df['CO2'].rolling(window=4).mean()
    df['TEMP_ROLLING_MEAN'] = df['TEMP'].rolling(window=4).mean()
    df['CO2/TEMP'] = df['CO2'] / df['TEMP']
    return df

def encode_time_features(df):
    # Example: Convert TIME to total minutes past midnight

    types = ['Normal Dag', 'Skole-ferie', 'Helligdag', 'Mærkedage']

    df = (
        df.assign(
            MINUTES=lambda d: d["DATETIME"].dt.hour * 60 + d["DATETIME"].dt.minute,
            TYPE=lambda d: d["TYPE"].map({types[i]: i for i in range(len(types))}),
            SCHOOL_TIME = lambda d: d["TIDSPUNKT_TYPE"].map({'Fritid': False, 'Skole': True}),
            DOW=lambda d: d["DATETIME"].dt.dayofweek,
            BOOKET=lambda d: d["BOOKET"].fillna(0.0),

        )
    )

    # Rolling standard deviation
    df['CO2_ROLLING_STD'] = df.groupby('ID')['CO2'].transform(lambda x: x.rolling(window=4).std())

    # Z-score for each room
    mean_std_df = df.groupby('ID')['CO2'].agg(['mean', 'std']).reset_index()
    mean_std_df.columns = ['ID', 'CO2_MEAN', 'CO2_STD']
    df = pd.merge(df, mean_std_df, on='ID', how='left')
    df['CO2_Z_SCORE'] = (df['CO2'] - df['CO2_MEAN']) / df['CO2_STD']
    return df


def utilize_motion_sensors(df):
    df['Motion_Available'] = ~df['MOTION'].isna()
    df['Motion_or_CO2'] = df['MOTION'].fillna(df['CO2'])
    return df


### Estimate usage

In [25]:
def estimate_usage(
        data : pd.DataFrame,
        usage_coeff : float = 2.1,
        usage_limit : float = .2
    ) -> float:

    """Estimate usage score for a given room based
    on the number of timeslots with assumed activity,
    e.g., a booked timeslot or a timeslot in school 
    schema """

    used_slots = max(
        data[lambda d:
            d["SKEMALAGT"].astype(bool) | 
            d["BOOKET"].fillna(0).astype(bool)
        ].shape[0],  # number of slots with assumed activity
        0.1
    )
    

    # Evaluate against heuristic usage limit
    est_usage = min(
        usage_coeff * used_slots / data.shape[0], 
        usage_limit
    )

    # print(
    #     f"{data['ID'].iloc[0]} | "
    #     + f"Est. usage score {est_usage:.2f} | "
    #     + f"usage coeff: {usage_coeff:.2f}"
    # )
    return est_usage

### MODELLING

In [26]:
def fit_predict(df, **kwargs) -> tuple:
    model_IF = IsolationForest(**kwargs)
    model_IF.fit(df)

    scores = model_IF.decision_function(df)
    predictions = model_IF.predict(df)
    return scores, predictions


def format_predictions(preds : list) -> list:
    return [1 if pred == -1 else 0 for pred in preds]


def format_scores(scores : list) -> list:
    """ Normalize scores in range [-1, 1] to
    [0, 1] where 1 is most anomalous

    Args:
        scores (list): List of scores
    """
    return np.interp(scores, (min(scores), max(scores)), (0, 1))


def run_model(
        room : pd.DataFrame,
        features : list,
        usage_coeff : float,
        usage_limit : float,
        **kwargs
    ) -> pd.DataFrame:

    """ Run fit_predict for every room in every building """

    _est_usage = estimate_usage(room, usage_coeff, usage_limit)
    
    scores, predictions = fit_predict(
        room[features], 
        contamination=_est_usage,
        **kwargs
    )
    return room.assign(
        usage_score=1 - format_scores(scores),
        in_use=format_predictions(predictions)
    )


### Heuristics

In [27]:

def add_heuristics(data, apply_rules):
    
    return data if not apply_rules else (

        data
        .assign(
            in_use=lambda d: np.where(
                d["CO2"].lt(400),
                0,
                d["in_use"],
            ),
        )
        .assign( # Remove anomalies when CO2 accelerates or CO2 is high
            in_use=lambda d: np.where(
                (d["CO2_ACC"].gt(0) & d["CO2"].gt(600)) | (d["CO2"].gt(1000)),
                1, 
                d["in_use"]
            )
        )

        .assign(
            # If the prior rules change the in_use value, then
            # update the usage score to reflect this change
            usage_score=lambda d: np.where(
                (d["in_use"].eq(1) & d["usage_score"].lt(.5))
                |
                (d["in_use"].eq(0) & d["usage_score"].gt(.5)),
                1 - d["usage_score"],
                d["usage_score"]
            )
        )

        # .assign( 
        #     # Combine single usages - 1 IF n-1 = 1, n = 0, n+1 = 1, n+2 = 1
        #     # Use for cases with a premature prediction of use
        #     # NOTE: The check for 487 is to check for the fillna value.
        #     # Otherwise, this does not work for Favrskov, since every
        #     # other room in this school is a fillna value.
        #     in_use=lambda d: np.where(
        #         (d["in_use"].shift(-1).eq(1) & d["CO2"] != 487)
        #         & 
        #         d["in_use"].eq(0) 
        #         & 
        #         (d["in_use"].shift(1).eq(1) & d["CO2"].shift(1) != 487)
        #         & 
        #         (d["in_use"].shift(2).eq(1) & d["CO2"].shift(2) != 487),
        #         1,
        #         d["in_use"]
        #     )
        # )

        # .assign( 
        #     # Remove single in_use - 0 IF n-1 = 0, n = 1, n+1 = 0
        #     # NOTE: The check for 487 is to check for the fillna value.
        #     # Otherwise, this does not work for Favrskov, since every
        #     # other room in this school is a fillna value.
        #     in_use=lambda d: np.where(
        #         (d["in_use"].shift(-1).eq(0) & d["CO2"].shift(-1) != 487)
        #         & 
        #         d["in_use"].eq(1) 
        #         & 
        #         (d["in_use"].shift(1).eq(0) & d["CO2"].shift(1) != 487),
        #         0,
        #         d["in_use"]
        #     )
        # )

    )


### Exports

In [28]:
def create_dir(run_id, _kommune) -> None:
    Path(f"results/{run_id}/{_kommune}").mkdir(parents=True, exist_ok=True)


def export_plots(data, kommune, run_id):

    for i, room in data.groupby("ID"):

        _kommune = kommune.lower()
        create_dir(run_id, _kommune)

        fig = room.plot.bar(    
            x='DATETIME',
            y='CO2',
            color='in_use',
            title=f'Anvendelsesmodel - Lokale {i} - {kommune} Kommune',
            width=3000,
            hover_data=data[["CO2_ACC"]],
        )
        fig.update_traces(dict(marker_line_width=0))
        fig.write_html(f'results/{run_id}/{_kommune}/anomaly-{_kommune}-{i}.html')
        
    return data

### Exit report

In [29]:
def exit_report(df, kommune, original_data):

    original = original_data[lambda d: d["KOMMUNE"] == kommune]
    print(
        f"EXIT REPORT - {kommune} | Size: {df.shape[0]} | "
        + f"Orig. size: {original.shape[0]}\n"
        + f"Mean usage rate: {df['in_use'].mean():.2f} | "
        + f"Mean usage score: {df['usage_score'].mean():.2f}\n"
        + "\n\n"
    )
    assert df['ID'].nunique() == original['ID'].nunique(), "Flow dropped rooms!"
    return df

*** 

### Pipeline

#### Set feature set

In [30]:
room_features = [
    # "SKEMALAGT",
    "CO2_ACC",
    # "TEMP_ACC"
]

#### Define flow

In [31]:
def heartbeat(df, kommune, msg=""):
    print(f"🧪 | Size: {df.shape[0]} | {df.columns} | {msg}")
    return df

In [42]:
def flow(
        data : pd.DataFrame,
        run_id : str,
        usage_coeff : float,
        usage_limit : float,
        random_state : int,
        apply_rules : bool,
        features : list,
    ) -> pd.DataFrame:

    """ Run the full flow for all municipalities"""
                
    for kommune in data["KOMMUNE"].unique(): # kommune means municipality in Danish
        print(f"Running flow for {kommune}")

        yield (
            
            # Processing
            data

            # filter out data from other municipalities
            .pipe(filter_values, col="KOMMUNE", values=[kommune])

            # drop consecutive rows with identical values, because
            # these are likely to be inactive periods or faulty sensors
            .pipe(drop_inactive_ranges, range_length=5)
            .pipe(remove_outliers, columns=['CO2', 'TEMP'])

            # adds features for acceleration
            # such as CO2_ACC, TEMP_ACC, MOTION_ACC, IAQ_ACC
            .pipe(acceleration_features)
            .pipe(engineer_features)
            .pipe(encode_time_features)
            .pipe(utilize_motion_sensors)
            .pipe(heartbeat, kommune, msg="Test")
            

            # Creates other features such as 
            # 
            # AKTIVITET=lambda d: pd.factorize(d["TIDSPUNKT_TYPE"])[0],
            # DOW=lambda d: d["DATETIME"].dt.dayofweek,
            # HOUR=lambda d: d["DATETIME"].dt.hour,
            # DAY_TYPE=lambda d: pd.factorize(d["TYPE"])[0],
            #BOOKET=lambda d: d["BOOKET"].fillna(0.0),
            .pipe(preprocess_for_modelling)

            # Modelling
            .groupby("ID").apply(
                run_model,
                features=features,
                usage_limit=usage_limit, # the heuristic limit for usage, i.e. estimated anomaly rate / contamination
                usage_coeff=usage_coeff, # A coefficient to scale the estimated anomaly rate / contamination
                # Note that the contamination is estimated based on the number of timeslots with assumed activity
                # assumed activity is either a booked timeslot or a timeslot in an external activity schema
                # we cannot always assume that either is correct.
                random_state=random_state, # random state for reproducibility
            )
            .reset_index(drop=True)

            # heuristics
            # adds heuristic rules such as 
            # - if CO2 is below 400, then the room is not in use
            # - if CO2 is above 600 and CO2 is accelerating, then the room is in use
            # - if CO2 is above 1000, then the room is in use
            # - if the room is in use, then the usage score should be above 0.5
            # - if the room is not in use, then the usage score should be below 0.5
            
            .pipe(
                add_heuristics,
                apply_rules=apply_rules
            )

            # Export plots
            .pipe(export_plots, kommune=kommune, run_id=run_id)

            # Postprocess 
            [["DATETIME", "ID", "in_use", "usage_score"]]
            .assign(KOMMUNE=kommune)

            # Exit report
            .pipe(exit_report, kommune=kommune, original_data=data)
        )

### Run flow

In [33]:
def run_flow(**kwargs):

    run_id = f"RUN-{datetime.now().strftime('%Y-%m-%d-%H-%M')}"
    print(f"RUNNING FLOW '{run_id}'\n")

    return {
        "run_id": run_id,
        "data": pd.concat(flow(run_id=run_id, **kwargs)),
        "paramters": {k: v for k, v in kwargs.items() if k != "data"}
    }


In [37]:
results = run_flow(
    data=DataLoader.load(
        df=dataf,
        print_nans=False,
        steps=["all"]
    ),
    usage_coeff=2.1,
    usage_limit=.2,
    random_state=123,
    apply_rules=True,
    features=room_features,
)

['Favrskov' 'Syddjurs' 'Aarhus']
RUNNING FLOW 'RUN-2023-09-28-00-09'

Running flow for Favrskov
Turns 10166 rows into 10166 rows - Dropping 0.0K rows
EXIT REPORT - Favrskov | Size: 10166 | Orig. size: 10166
Mean usage rate: 0.13 | Mean usage score: 0.19



Running flow for Syddjurs
Turns 29760 rows into 4984 rows - Dropping 24.776K rows
EXIT REPORT - Syddjurs | Size: 4984 | Orig. size: 29760
Mean usage rate: 0.19 | Mean usage score: 0.21



Running flow for Aarhus
Turns 7692 rows into 7692 rows - Dropping 0.0K rows
EXIT REPORT - Aarhus | Size: 7692 | Orig. size: 7692
Mean usage rate: 0.15 | Mean usage score: 0.21





### Combine with original data and save

In [38]:
(
    DataLoader.load(
        path="data/full.csv",
        steps={
            "merge_dt": dict(date="DATE", time="TIME", name="DATETIME")
        }
    )
    .merge(
        results["data"],
        on=["DATETIME", "ID", "KOMMUNE"],
        how="left"
    )
    [['DATE', 'TIME', 'DATETIME', 'ID', 'KOMMUNE', 'in_use', 'usage_score']]
    .to_csv(f"results/{results['run_id']}/results.csv", index=False)
)


### Save run parameters

In [None]:
# filter out dataframes
# save parameters as json
with open(f"results/{results['run_id']}/run_params.json", 'w') as fp:
    json.dump(results["paramters"], fp)

### Diagnostics

In [None]:
results["data"].usage_score.value_counts(bins=20).sort_index()

In [None]:
results["data"].usage_score.hist(
    bins=100, 
    title="Usage score distribution",
    histnorm='percent',


).update_layout(
    showlegend=False,
    xaxis_title="Usage score",
    # add '0' suffix to yaxis
    # yaxis_tickformat=".1%/100"
)

In [None]:
df = results["data"].groupby(["KOMMUNE", "ID"]).agg(
    {
        "usage_score": ["mean"],
        "in_use": ["mean", "count"],
    }
).reset_index(level=[0, 1])
df.columns = ["KOMMUNE", "ID", "MEAN_USAGE_SCORE", "MEAN_IN_USE_RATE", "COUNT"]

In [None]:
df.round(2).plot.bar(
    x="ID",
    y="MEAN_IN_USE_RATE",
    title="Mean predicted usage rate per room",
    color="KOMMUNE",
    width=600,
    hover_data=df[["MEAN_IN_USE_RATE", "COUNT"]],
)

In [None]:
df = results["data"].assign(DAY=lambda d: d["DATETIME"].dt.date).groupby(["KOMMUNE", "DAY"]).agg(
    {
        "usage_score": ["mean"],
        "in_use": ["mean", "count"],
    }
).reset_index(level=[0, 1])
df.columns = ["KOMMUNE", "DAY", "MEAN_USAGE_SCORE", "MEAN_USAGE_RATE", "COUNT"]

In [None]:
fig = df.plot.bar(    
    x='DAY',
    y='MEAN_USAGE_RATE',
    color='KOMMUNE',
    width=800,
)
fig.update_traces(dict(marker_line_width=0))
fig
