In [1]:
# Uploading the dataset and reading it into a pandas DataFrame
from google.colab import files
import pandas as pd

# This line prompts you to upload files when you run the cell
uploaded = files.upload()

# Iterate through the uploaded files (in this case, likely just one)
for fn in uploaded.keys():
    print(f"Loaded file: {fn}")
    # Read the CSV file into a pandas DataFrame
    df = pd.read_csv(fn)

# Display the first 5 rows of the DataFrame to get a glimpse of the data
df.head()

Saving dataset.csv to dataset.csv
Loaded file: dataset.csv


Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00


In [2]:
# Model 1:baseline linear pricing model
# This function calculates a price based on a base price and occupancy rate.
def baseline_linear_price(row, base_price=10, alpha=5):
    # Calculate the occupancy rate (Occupancy divided by Capacity)
    occupancy_rate = row['Occupancy'] / row['Capacity']
    # Return the calculated price -base price plus a factor of the occupancy rate
    return base_price + alpha * occupancy_rate

In [3]:
# Install the catboost library, which is used for gradient boosting
%pip install catboost

Collecting catboost
  Downloading catboost-1.2.8-cp311-cp311-manylinux2014_x86_64.whl.metadata (1.2 kB)
Downloading catboost-1.2.8-cp311-cp311-manylinux2014_x86_64.whl (99.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.2/99.2 MB[0m [31m7.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: catboost
Successfully installed catboost-1.2.8


In [4]:
import numpy as np

# Function to calculate a demand score based on various factors
def demand_score(row, weights):
    # Extract relevant features from the row
    occ_ratio = row['Occupancy'] / row['Capacity']
    q = row['QueueLength']
    t = row['TrafficConditionNearby']
    s = row['IsSpecialDay']
    v = row['VehicleType']

    # Map 'TrafficConditionNearby' to numerical values for calculations
    traffic_mapping = {'low': 0, 'average': 0.5, 'high': 1}
    t_numeric = traffic_mapping.get(t, 0) # Get the numerical value, default to 0 if not found


    # Calculate the demand score using the assigned weights
    score = (
        weights['occ'] * occ_ratio +# Impact of occupancy ratio
        weights['queue'] * q -# Impact of queue length (positive correlation)
        weights['traffic'] * t_numeric +# Impact of traffic condition (negative correlation, higher traffic = lower demand score?) - check logic here
        weights['special'] * s +# Impact of special days
        weights['vehicle'] * v# Impact of vehicle type
    )
    return score

# Function to calculate a demand based price
def demand_based_price(row, base_price=10, weights=None, lambda_=0.3):
    # Get the demand score for the current row
    demand = demand_score(row, weights)
    # Normalize the demand score to a value between 0 and 1
    normalized = (demand - df['Demand'].min()) / (df['Demand'].max() - df['Demand'].min())
    # Calculate the price based on the base price and normalized demand
    price = base_price * (1 + lambda_ * normalized)
    # Clip the price to ensure it stays within a reasonable range (0.5x to 2x the base price)
    return np.clip(price, 0.5 * base_price, 2 * base_price)

# Assigning weights manually for the demand score calculation
weights = {'occ': 1.0, 'queue': 0.5, 'traffic': 0.3, 'special': 1.2, 'vehicle': 0.8}

# Convert vehicletype to numeric column
df['VehicleType'] = df['VehicleType'].map({'car': 1, 'bike': 0.5, 'truck': 1.5, 'cycle': 0.25}) # Added 'cycle'

# Precompute the 'Demand' column for all rows
df['Demand'] = df.apply(lambda x: demand_score(x, weights), axis=1)

# Calculate the demandbbased price and storing in new column
df['Price_Demand'] = df.apply(lambda x: demand_based_price(x, weights=weights), axis=1)

In [5]:
from sklearn.neighbors import BallTree
import numpy as np

# Function to calculate the average price of nearby parking lots using ball tree
def nearby_lots_price_optimized(row_index, all_data, tree, radius_km=1.0):
    # Convert the radius from kilometers to radians, as BallTree uses radians for distance calculations
    radius_radians = np.deg2rad(radius_km / 111.32)  # Approximate conversion for latitude/longitude

    # Query the BallTree to find the indices of points (parking lots) within the specified radius
    # k=1 includes the point itself in the results
    indices = tree.query_radius(np.deg2rad(all_data[['Latitude', 'Longitude']].iloc[[row_index]]), r=radius_radians)[0]

    # Filter out the row itself from the nearby indices and calculate the average 'Price_Demand' of the remaining nearby lots
    nearby_indices = indices[indices != row_index]
    if nearby_indices.size > 0:
        # Return the mean price of the nearby lots
        return all_data['Price_Demand'].iloc[nearby_indices].mean()
    else:
        # If no other lots are found within the radius, return the lot's own demand-based price
        return all_data['Price_Demand'].iloc[row_index]

# Create a BallTree spatial index from the latitude and longitude data
# Convert lat/lon to radians as required by BallTree for accurate distance calculations
tree = BallTree(np.deg2rad(df[['Latitude', 'Longitude']]), metric='haversine')

# Apply the optimized function to calculate the 'Competitor_Price' for each row
# This calculates the average price of nearby lots for each parking lot
df['Competitor_Price'] = df.index.to_series().apply(lambda row_index: nearby_lots_price_optimized(row_index, df, tree))


# Function to calculate a competitive price based on occupancy and nearby competitor prices
def competitive_price(row, base_price=10):
    # If the parking lot is at or over capacity, set the price slightly lower than the competitor's average price
    if row['Occupancy'] >= row['Capacity']:
        return row['Competitor_Price'] - 1  # Offer a discount
    # If the competitor's average price is higher than the base price, set the price slightly higher than the base price
    elif row['Competitor_Price'] > base_price:
        return base_price + 1
    # Or else use the base price
    else:
        return base_price

# Calculate the competitive price for each row and store it in a new column
df['Price_Competitive'] = df.apply(lambda x: competitive_price(x), axis=1)

In [6]:
from sklearn.model_selection import train_test_split, GridSearchCV
from catboost import CatBoostRegressor
from sklearn.metrics import mean_squared_error

# Prepare data for the CatBoost model
features = ['Occupancy', 'Capacity', 'QueueLength', 'TrafficConditionNearby', 'IsSpecialDay', 'VehicleType']
target = 'Price_Demand'

X = df[features]
y = df[target]

# Identify the index of feature 'TrafficConditionNearby'
categorical_features_indices = [features.index('TrafficConditionNearby')]

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # 80% for training, 20% for testing

# Initialize the CatBoostRegressor model
model = CatBoostRegressor(silent=True)

# GridSearchCV will test different combinations of these parameters to find the best ones
params = {
    'depth': [4, 6],           # Depth of the trees in the model
    'learning_rate': [0.05, 0.1], # Step size shrinkage used in update to prevent overfitting
    'iterations': [100, 200]   # Number of boosting iterations (trees)
}

# Set up GridSearchCV
grid = GridSearchCV(model, params, cv=3, scoring='neg_mean_squared_error') # cv=3 means 3-fold cross-validation

# Train the model using the training data
grid.fit(X_train, y_train, cat_features=categorical_features_indices)

# Printing the best parameters found by GridSearchCV
print("Best Params:", grid.best_params_)

# Get the best model that resulted from the grid search
best_model = grid.best_estimator_
# Make predictions on the test set using the best model
preds = best_model.predict(X_test)

# Calculate and print the Mean Squared Error (MSE) of the predictions on the test set
print("Test MSE:", mean_squared_error(y_test, preds))

Best Params: {'depth': 6, 'iterations': 200, 'learning_rate': 0.1}
Test MSE: 5.104717036144589e-05


In [7]:
from bokeh.plotting import figure, output_notebook, show
# Configure Bokeh to display plots directly in the Colab notebook
output_notebook()

# Create a Bokeh figure for the plot
p = figure(title="Dynamic Pricing Over Time", x_axis_label='Time', y_axis_label='Price', width=800)

# Select a sample parking lot to visualize its pricing over time
sample_lot_id = df['SystemCodeNumber'].unique()[0]
sample_lot = df[df['SystemCodeNumber'] == sample_lot_id]

# Create a list for the x-axis data, representing the time steps
x_axis_data = list(range(len(sample_lot)))

# Add lines to the plot for the two pricing models
# Plot the 'Price_Demand' over time for the sample lot
p.line(x_axis_data, sample_lot['Price_Demand'], legend_label='Demand-Based', line_color="blue")
# Plot the 'Price_Competitive' over time for the sample lot
p.line(x_axis_data, sample_lot['Price_Competitive'], legend_label='Competitive', line_color="green")

# Display
show(p)