In [1]:
!pip install xgboost




In [2]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from xgboost import XGBClassifier
import warnings
warnings.filterwarnings("ignore")

df = pd.read_csv("on-street-parking-bay-sensors.csv")
df.columns = df.columns.str.lower()

print("Dataset Loaded:", df.shape)

df["slot_id"] = df["kerbsideid"]

unique_slot_ids = df['slot_id'].unique()
if len(unique_slot_ids) > 37:
    selected_slot_ids = unique_slot_ids[:37]
    df = df[df['slot_id'].isin(selected_slot_ids)]
    print(f"Dataset reduced to {len(df['slot_id'].unique())} unique slot IDs.")
else:
    print(f"Dataset already has {len(unique_slot_ids)} or fewer unique slot IDs.")

print("New dataset shape:", df.shape)

Dataset Loaded: (3309, 6)
Dataset reduced to 37 unique slot IDs.
New dataset shape: (37, 7)


In [3]:
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# Ensure 'time' column is datetime and localized
df["time"] = pd.to_datetime(df["lastupdated"], errors="coerce")
df = df.dropna(subset=["time"])

if df["time"].dt.tz is None:
    df["time"] = df["time"].dt.tz_localize('UTC')
else:
    df["time"] = df["time"].dt.tz_convert('UTC')

# Create 'status_now' column
df["status_now"] = df["status_description"].astype(str).str.lower().replace({
    "unoccupied": 0,
    "free": 0,
    "occupied": 1,
    "present": 1
}).astype(int)

# Create time-based features
df["hour"] = df["time"].dt.hour
df["day"] = df["time"].dt.day
df["weekday"] = df["time"].dt.weekday

# Encode 'status_description' to 'status_encoded'
le = LabelEncoder()
df["status_encoded"] = le.fit_transform(df["status_description"])
print("Classes:", le.classes_)

# Define features and target
feature_cols = ["hour", "day", "weekday"]
X = df[feature_cols]
y = df["status_encoded"]
print("Using Features:", feature_cols)

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=42, shuffle=True
)

print("Preprocessing steps completed.")

Classes: ['Present' 'Unoccupied']
Using Features: ['hour', 'day', 'weekday']
Preprocessing steps completed.


In [4]:
print(df.columns)

Index(['lastupdated', 'status_timestamp', 'zone_number', 'status_description',
       'kerbsideid', 'location', 'slot_id', 'time', 'status_now', 'hour',
       'day', 'weekday', 'status_encoded'],
      dtype='object')


In [5]:
df["slot_id"] = df["kerbsideid"]


In [6]:
df["time"] = pd.to_datetime(df["lastupdated"], errors="coerce")
df = df.dropna(subset=["time"])

# Ensure timezone is UTC
if df["time"].dt.tz is None:
    df["time"] = df["time"].dt.tz_localize('UTC')
else:
    df["time"] = df["time"].dt.tz_convert('UTC')

In [7]:
df["status_now"] = df["status_description"].astype(str).str.lower().replace({
    "unoccupied": 0,
    "free": 0,
    "occupied": 1,
    "present": 1
}).astype(int)


In [8]:
df["hour"] = df["time"].dt.hour
df["day"] = df["time"].dt.day
df["weekday"] = df["time"].dt.weekday

In [9]:
le = LabelEncoder()
df["status_encoded"] = le.fit_transform(df["status_description"])

print("\nClasses:", le.classes_)


Classes: ['Present' 'Unoccupied']


In [10]:
feature_cols = ["hour", "day", "weekday"]  # NO bay_id used in training

X = df[feature_cols]
y = df["status_encoded"]

print("\nUsing Features:", feature_cols)


Using Features: ['hour', 'day', 'weekday']


In [11]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=42, shuffle=True
)


In [12]:
model = XGBClassifier(
    n_estimators=300,
    max_depth=8,
    learning_rate=0.08,
    subsample=0.8,
    colsample_bytree=0.8,
    objective="multi:softmax",
    num_class=len(df["status_encoded"].unique()),
    random_state=42,
    eval_metric="mlogloss"
)

model.fit(X_train, y_train)

In [13]:
train_acc = model.score(X_train, y_train)
test_acc = model.score(X_test, y_test)

print("\nTraining Accuracy:", train_acc)
print("Test Accuracy:", test_acc)

print("\nClassification Report:")
print(classification_report(y_test, model.predict(X_test)))



Training Accuracy: 0.9259259259259259
Test Accuracy: 0.7

Classification Report:
              precision    recall  f1-score   support

           0       0.83      0.71      0.77         7
           1       0.50      0.67      0.57         3

    accuracy                           0.70        10
   macro avg       0.67      0.69      0.67        10
weighted avg       0.73      0.70      0.71        10



In [None]:
from IPython.display import clear_output
import time

CHECK_INTERVAL = 5

# Create a global mapping from slot_id to p-id
# Get all unique slot_ids currently in the DataFrame and sort them for consistent mapping
sorted_unique_slot_ids = sorted(df['slot_id'].unique())
slot_id_map = {slot_id: f"p{i+1}" for i, slot_id in enumerate(sorted_unique_slot_ids)}

# Placeholder function for predict_future_free_live.
def predict_future_free_live(df_latest, model, slot_id_map):
    try:
        # Ensure 'time' column is datetime and localize it if not already
        if df_latest['time'].dt.tz is None:
            df_latest['time'] = df_latest['time'].dt.tz_localize('UTC')
        else:
            df_latest['time'] = df_latest['time'].dt.tz_convert('UTC')

        # Simulate features for 30 minutes in the future
        future_time = df_latest['time'] + pd.Timedelta(minutes=30)
        future_df = pd.DataFrame({
            'hour': future_time.dt.hour,
            'day': future_time.dt.day,
            'weekday': future_time.dt.weekday
        })

        # Make predictions using the model
        future_predictions = model.predict(future_df)

        # Filter for slots predicted to be free (assuming 0 is free)
        free_future_slots_indices = future_predictions == 0
        original_free_future_slot_ids = df_latest.loc[free_future_slots_indices, 'slot_id'].tolist()

        # Map original slot_ids to p-ids
        mapped_free_future_slot_ids = [slot_id_map.get(sid, str(sid)) for sid in original_free_future_slot_ids]
        return mapped_free_future_slot_ids
    except Exception as e:
        print(f"Error in predict_future_free_live: {e}")
        return [] # Return empty list on error or if prediction isn't possible




while True:
    clear_output(wait=True)

    # Get latest reading per parking slot
    df_latest = df.sort_values("time").groupby("slot_id").tail(1).reset_index(drop=True)

    # Pass slot_id_map to the prediction function
    free_future = predict_future_free_live(df_latest, model, slot_id_map)

    # Get current free slots and map their slot_ids to p-ids
    original_free_now_slot_ids = df_latest.loc[df_latest["status_now"] == 0, "slot_id"].tolist()
    free_now = [slot_id_map.get(sid, str(sid)) for sid in original_free_now_slot_ids]

    print(f"[{time.strftime('%H:%M:%S')}]")

    # Future predictions
    if free_future:
        print("üîÆ Slots FREE in 30 minutes:")
        print(", ".join(map(str, free_future)))
    else:
        print("‚ö†Ô∏è No slots will be free in 30 minutes.")

    # Current free slots
    print("\nüü¢ Slots FREE right now:")
    if free_now:
        print(", ".join(map(str, free_now)))
    else:
        print("No slots free now.")

    time.sleep(CHECK_INTERVAL)

[12:08:52]
üîÆ Slots FREE in 30 minutes:
p21, p8, p9, p10, p13, p22, p15, p14, p18, p16, p17, p34, p24, p36, p35, p37, p23, p11, p12

üü¢ Slots FREE right now:
p2, p5, p4, p1, p6, p7, p8, p9, p20, p32, p29, p28, p30, p36, p19, p26, p25
