This notebook file contains all the code used for the experiments that were done on random forest. Data and results can be found in the offically used data folder. This file is a cleaned up version of the file the experiments were run in. I kept that file accesible in the legacy code folder as "randomforest_legacy.ipynb", but this file contains all the useful code with none of the errant test functions and print statements. I also added more comments to this file for readability.

In [None]:
from pyrosm.data import sources
import pyrosm
from collections import Counter, defaultdict
import json
import pandas as pd
import ast
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import f1_score, precision_score, recall_score, classification_report
from skmultilearn.model_selection import iterative_train_test_split
import pickle
from openpyxl import Workbook

# For below import to work move convert_report2excel from support files to the same directory as this file
from convert_report2excel import convert_report2excel

In [None]:
def create_tag_lists(pois, n: int) -> tuple[list[str], dict[str:int]]:
    """
    Returns all_good_tags, a list of the tags that appear more than n
    Returns tag2idx, a dictionary linking every allowed tag to an index number
    """

    # This function is also used and introduced in our data notebook
    # In this notebook we don't use the full functionality, we just need the tag2idx for our vectors
    # For a better breakdown of this function, check the data notebook

    tag_freq = defaultdict(int)
    
    for idx, row in pois.iterrows(): 
        tags = row.get("tags", {})
        
        if isinstance(tags, dict) and tags:
            for tag_key in tags:
                tag_freq[tag_key] += 1

    all_tags = list(tag_freq.keys())
    all_good_tags = [tag for tag in tag_freq if tag_freq[tag] > n]
    tag2idx = {tag: i for i, tag in enumerate(all_good_tags)}
    idx2tag = {i: tag for tag, i in tag2idx.items()}

    tag_freq = dict(sorted(tag_freq.items(), key = lambda x: x[1], reverse = True))
    print(f"All tags sorted by frequency: {tag_freq}")
    print(f" All allowed tags: {all_good_tags}")
    print(f"Len all tags: {len(all_tags)}, Len good tags: {len(all_good_tags)}")

    # We only return these two objects
    # During testing and in other versions I have returned tag_freq or idx2tag but that wasn't needed for the experiments
    return (all_good_tags, tag2idx)    

def tags_to_vec(tag_dict, tag2idx):
    vector = np.zeros(len(tag2idx), dtype= np.float32)
    if isinstance(tag_dict, dict) or isinstance(tag_dict, list):
        for tag in tag_dict:
            if tag in tag2idx:
                vector[tag2idx[tag]] = 1
    return vector

def tags_to_vec_singular(solo_tag, tag2idx):
    vector = np.zeros(len(tag2idx), dtype= np.float32)
    if solo_tag in tag2idx:
        vector[tag2idx[solo_tag]] = 1
    return vector

def vector_pois(pois, tag2idx, n):
    X = []
    y = []

    for tag in pois["tags"]:
        if not isinstance(tag, dict) or len(tag) < n: # set the len to 1 will include empty {}
            continue

        tag_keys = list(tag.keys())

        np.random.shuffle(tag_keys)
        
        feature_idx = len(tag_keys) // 2

        input_tags = {k: tag[k] for k in tag_keys[:feature_idx] + tag_keys[feature_idx + 1:]}
        output_tags = {tag_keys[feature_idx] : tag[tag_keys[feature_idx]]}

        X.append(tags_to_vec(input_tags, tag2idx))
        y.append(tags_to_vec(output_tags, tag2idx))

    X = np.stack(X)
    y = np.stack(y)
    
    print("X shape:", X.shape)

    print("y_shape", y.shape)

    return X,y

def vector_pois_test(poisX, poisy, tag2idx, n):
    X = []
    y = []

    for tag in poisX:
        
        if len(tag) < n: # set the len to 1 will include empty {}
            print(tag)
            continue
        X.append(tags_to_vec(tag, tag2idx))
    for tag in poisy:
        y.append(tags_to_vec_singular(tag, tag2idx))

    
    X = np.stack(X)
    y = np.stack(y)
    print("X_shape:", X.shape)

    print("y_shape", y.shape)
    
    return X,y

In [None]:
# Importing the training set to create the tree
with open('trainingset', 'rb') as fp:
    pois = pickle.load(fp)
# And the test set for querying and answers for checking
with open('testset_questions', 'rb') as fp:
    test_pois_X = pickle.load(fp)
with open('testset_answer', 'rb') as fp:
    test_pois_y = pickle.load(fp)

In [None]:
n_good_tags = 10 # Frequency of tags for them to be allowed
n_per_instance = 1 # Amount of tags an instance needs to have to be part of the data
                   # If set to 1 it will include empty cells


good_tags, tag2idx = create_tag_lists(pois, n_good_tags)

X, Y = vector_pois(pois, tag2idx, n_per_instance)
X_test, y_test = vector_pois_test(test_pois_X, test_pois_y, tag2idx, 1)


X_train, y_train, X_val, y_val = iterative_train_test_split(X, Y, test_size=0.2)

In [None]:
check_model = False
if check_model:
    model = RandomForestClassifier(random_state=42)

    # Define search space
    param_dist = {
        'n_estimators': [100, 200, 300],
        'max_depth': [None, 10, 20, 30],
        'max_features': ['sqrt', 'log2']
    }

    # Wrap in RandomizedSearchCV
    search = RandomizedSearchCV(
        model,
        param_distributions=param_dist,
        n_iter=20,  # Try 20 random combinations
        cv=3,       # 3-fold cross-validation
        scoring='f1_weighted',  # Or accuracy, or a custom metric
        n_jobs=-1,  # Use all cores
        verbose=1
    )

    search.fit(X_train, y_train)

    # Best model after search
    best_model = search.best_estimator_
    print(best_model)

if not check_model:
    best_model = RandomForestClassifier(max_features='log2', n_estimators=300, max_depth=None) # Thepreviousbestmodel

In [None]:
best_model.fit(X_train, y_train)

In [None]:
def find_best_thresholds(y_true, y_probs, thresholds=np.arange(0.1, 0.9, 9)):
    best_thresholds = []
    for i in range(y_true.shape[1]):
        best_f1 = 0
        best_thresh = 0.5
        for thresh in thresholds:
            preds = (y_probs[:, i] >= thresh).astype(int)
            f1 = f1_score(y_true[:, i], preds, zero_division=0)
            if f1 > best_f1:
                best_f1 = f1
                best_thresh = thresh
        best_thresholds.append(best_thresh)
    return np.array(best_thresholds)

Y_probs_val = best_model.predict_proba(X_val)      
prob_matrix_val = np.array([
    probs[:, 1] if probs.shape[1] > 1 else np.zeros(probs.shape[0])
    for probs in Y_probs_val]).T

thresholds = find_best_thresholds(y_val, prob_matrix_val)

In [None]:
Y_probs_test = best_model.predict_proba(X_test)

prob_matrix_test = np.array([
     probs[:, 1] if probs.shape[1] > 1 else np.zeros(probs.shape[0])
     for probs in Y_probs_test]).T

Y_preds = (prob_matrix_test >= thresholds).astype(int)

In [None]:
print(classification_report(y_test, Y_preds, zero_division=0))

In [None]:
workbook = Workbook()
workbook.remove(workbook.active) # Delete default sheet.

report = classification_report(
    y_test,
    Y_preds,
    zero_division=0,
    output_dict=True
)

workbook = convert_report2excel(
    workbook=workbook,
    report=report,
    sheet_name = "randomforest10_report"
)
workbook.save("randomforest10_report.xlsx")