In [2]:
import pandas as pd
import numpy as np

In [None]:
# 1. Load the feature-engineered data
file_path = '../data/processed/feature_engineered_30min.csv'
df = pd.read_csv(file_path, index_col=0, parse_dates=True)

In [4]:
import numpy as np

# Create empty DataFrames with float64 dtype to properly handle NaN values
train_df = df.astype('float64').copy()
train_df[:] = np.nan

test_df = df.astype('float64').copy()
test_df[:] = np.nan

In [5]:
# 3. For each column, compute local min_date and max_date, then split 80%/20%
for col in df.columns:
    # local min/max valid dates for this column
    min_date = df[col].first_valid_index()
    max_date = df[col].last_valid_index()

    # If the column is entirely NaN or empty
    if min_date is None or max_date is None or min_date == max_date:
        continue  # skip this column

    # Calculate the total time span
    total_span = max_date - min_date
    # 80% cutoff
    cutoff_date = min_date + 0.8 * total_span

    # TRAIN portion: from min_date to cutoff_date
    train_mask = (df.index >= min_date) & (df.index <= cutoff_date)
    # TEST portion: from cutoff_date to max_date
    test_mask = (df.index > cutoff_date) & (df.index <= max_date)

    # Assign values to train_df and test_df
    train_df.loc[train_mask, col] = df.loc[train_mask, col]
    test_df.loc[test_mask, col] = df.loc[test_mask, col]

In [6]:
# 4. Optional: drop rows that are entirely NaN (if you prefer a smaller DataFrame)
train_df.dropna(axis=0, how='all', inplace=True)
test_df.dropna(axis=0, how='all', inplace=True)

In [8]:
# 5. Save to disk
train_df.to_csv('../data/processed/train_local_80pct.csv')
test_df.to_csv('../data/processed/test_local_20pct.csv')

print("Local train/test splits saved to data/processed/")

Local train/test splits saved to data/processed/


# LOC

In [2]:
import pandas as pd
import numpy as np
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler

In [3]:
# 1) Load train and test
train_df = pd.read_csv('../data/processed/train_local_80pct.csv', index_col=0, parse_dates=True)
test_df = pd.read_csv('../data/processed/test_local_20pct.csv', index_col=0, parse_dates=True)

# 2) Select numeric columns & fill NaNs
train_data = train_df.select_dtypes(include='number').fillna(0)
test_data = test_df.select_dtypes(include='number').fillna(0)

# 3) Scale
scaler = StandardScaler()
X_train = scaler.fit_transform(train_data)
X_test = scaler.transform(test_data)

In [None]:
lof = LocalOutlierFactor(
    n_neighbors=20,      # how many neighbors to consider
    contamination=0.05,  # fraction of outliers to expect
    novelty=True         # IMPORTANT: allows separate train/test usage
)

In [None]:
# 1) Fit on the training data
lof.fit(X_train)

# 2) Predict on the test data
#   +1 = normal, -1 = anomaly
y_test_pred = lof.predict(X_test)

# 3) LOF scores: bigger = more normal, smaller = more outlier
scores_test = lof.decision_function(X_test)

In [6]:
results_df = test_df.copy()
results_df['lof_label'] = y_test_pred
results_df['lof_score'] = scores_test

# Filter anomalies
anomalies = results_df[results_df['lof_label'] == -1]
print(f"LOF flagged {len(anomalies)} anomalies out of {len(results_df)} test points.")


LOF flagged 14026 anomalies out of 14026 test points.


# One class SVM

In [7]:
import pandas as pd
import numpy as np
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler

# 1) Load train and test
train_df = pd.read_csv('../data/processed/train_local_80pct.csv', index_col=0, parse_dates=True)
test_df = pd.read_csv('../data/processed/test_local_20pct.csv', index_col=0, parse_dates=True)

# 2) Select numeric columns & fill NaNs
train_data = train_df.select_dtypes(include='number').fillna(0)
test_data = test_df.select_dtypes(include='number').fillna(0)

# 3) Scale
scaler = StandardScaler()
X_train = scaler.fit_transform(train_data)
X_test = scaler.transform(test_data)


In [8]:
oc_svm = OneClassSVM(
    kernel='rbf',
    gamma='auto',   # or pick a numeric value, e.g., 0.001
    nu=0.05         # fraction of outliers you expect
)

oc_svm.fit(X_train)


: 

: 

In [None]:
y_test_pred = oc_svm.predict(X_test)  # +1 = normal, -1 = anomaly
scores_test = oc_svm.decision_function(X_test)


In [None]:
results_df = test_df.copy()
results_df['svm_label'] = y_test_pred
results_df['svm_score'] = scores_test

anomalies = results_df[results_df['svm_label'] == -1]
print(f"One-Class SVM flagged {len(anomalies)} anomalies out of {len(results_df)} test points.")
