In [1]:
import pandas as pd
import numpy as np

In [None]:
# Firstly combine the two versions of estates df
# To get the information about finances while retaining
# the new Deployment data
estates_df = load_estates_df("data/Estate_Informationv3.xlsx")

In [None]:
# Cleaning column - Condition Grade
estates_df['Condition Grade'] = estates_df['Condition Grade'].replace(
    ['Not Surveyed', 'Not yet undertaken', 'Not surveyed', 'No grade available', np.nan],
    np.nan
)

# Cleaning column - Ownership
estates_df["Ownership"] = estates_df["Ownership"].replace(
['Leased In', 'Leased in'], "Leased In")

# Cleaning the Property Size column
estates_df["Property Size GIA sq.m"] = estates_df["Property Size GIA sq.m"].replace(
['TBC', 'Variable'], np.nan)

# Cleaning column - Year of Construction
def clean_year_value(year):
    if pd.isna(year) or year in ['Unknown', 'No survey available', 'no info.']:
        return np.nan
    elif year == "1900s / 1995":
        year = 1995
    elif isinstance(year, str):
        year = year.lower().strip()
        if 'circa' in year or 'c.' in year:
            year = year.replace('circa', '').replace('c.', '').replace("'", '').replace('s', '').strip()
        if 'refurbished' in year:
            year = year.split(' ')[-1]  # Take the year after 'refurbished'
        if '/' in year:
            parts = year.split('/')
            year = str(int(sum(int(part.strip()) for part in parts) / len(parts)))  # Average the years
        if 'post' in year.lower():
            year = '1945'
        if 's' in year:
            year = year.replace('s', '')
        year = year.replace('+', '').replace("'", '').replace(' ', '')
    try:
        return int(year)
    except ValueError:
        return np.nan

# Year
estates_df["Year of Construction"] = estates_df["Year of Construction"].apply(clean_year_value)



#no_condition_filter = estates_df["Condition Grade"].isna()
#estates_df.loc[no_condition_filter]

In [None]:
estates_df[estates_df["Ownership"].isna()]

In [None]:
# Encoding columns
def create_mappings(df, column):
    unique_values = pd.Series(df[column].unique())
    unique_values = unique_values[unique_values.notna()].sort_values()
    encoding = {value: idx for idx, value in enumerate(unique_values, start=1)}  # Start from 1 for easier decoding
    decoding = {idx: value for value, idx in encoding.items()}
    # Explicitly include NaN handling
    encoding[np.nan] = np.nan
    decoding[np.nan] = np.nan
    return encoding, decoding


estates_df_encoded = estates_df.copy()

# Creating Mappings
condition_encoding, condition_decoding = create_mappings(estates_df, 'Condition Grade')
classification_encoding, classification_decoding = create_mappings(estates_df, 'Primary Classification (Use)')
year_encoding, year_decoding = create_mappings(estates_df, 'Year of Construction')
division_encoding, division_decoding = create_mappings(estates_df, 'Division')
location_encoding, location_decoding = create_mappings(estates_df, 'Location Category (A=Urban, B=Rural, C=Remote, D=Islands)')
ownership_encoding, ownership_decoding = create_mappings(estates_df, 'Ownership')
building_encoding, building_decoding = create_mappings(estates_df, 'Record Level')



estates_df_encoded["Condition Grade"] = estates_df_encoded["Condition Grade"].map(condition_encoding)
estates_df_encoded["Primary Classification (Use)"] = estates_df_encoded["Primary Classification (Use)"].map(classification_encoding)
estates_df_encoded["Year of Construction"] = estates_df_encoded["Year of Construction"].map(year_encoding)
estates_df_encoded["Division"] = estates_df_encoded["Division"].map(division_encoding)
estates_df_encoded["Location Category (A=Urban, B=Rural, C=Remote, D=Islands)"] = estates_df_encoded["Location Category (A=Urban, B=Rural, C=Remote, D=Islands)"].map(location_encoding)
estates_df_encoded["Ownership"] = estates_df_encoded["Ownership"].map(ownership_encoding)
estates_df_encoded["Record Level"] = estates_df_encoded["Ownership"].map(building_encoding)

In [None]:
def categorical_imputation(df, column_name, columns_for_imputation, thresh):
    """
    Imputes missing categorical values in a specified column using KNN classifier.
    
    Args:
    - df (DataFrame): Input DataFrame containing categorical columns with missing values.
    - column_name (str): Name of the column to impute.
    - columns_for_imputation (list): List of columns to consider for imputation, including column_name.
    
    Returns:
    - DataFrame: DataFrame with imputed values.
    """
    # Make a copy of the original DataFrame
    df_copy = df.copy()
    df_copy = df_copy[columns_for_imputation]
    
    X_cols = [c for c in columns_for_imputation if c != column_name]
    
    # Filter rows where more than one column is missing (other than column_name)
    df_filtered = df_copy.dropna(subset=X_cols, thresh=1)
    
    if df_filtered.empty:
        print(f"No valid rows found for imputation in DataFrame. Returning original DataFrame.")
        return df_copy
    
    # Separate data into features (X) and target (y)
    X = df_filtered.dropna()[X_cols]  # Features (without target column and without NaNs)
    y = df_filtered.dropna()[column_name]  # Target (non-NaN values of target column)
    
    # Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Initialize KNeighborsClassifier
    knn_imputer = KNeighborsClassifier(n_neighbors=6)
    
    # Fit KNN model on training data
    knn_imputer.fit(X_train, y_train)
    
    # Evaluate accuracy on testing data
    y_pred = knn_imputer.predict(X_test)
    accuracy = accuracy_score(list(y_test.values), y_pred)
    print(f"Accuracy of KNN classifier: {accuracy:.2f}")
    
    # Predict missing values in testing data
    X_missing = df_copy.loc[df_copy[column_name].isnull(), X_cols]
    
    # Loop through each row in X_missing
    imputed_values = []
    for index, row in X_missing.iterrows():
        # Check if more than one column is NaN in the row
        if row.isnull().sum() > 0:
            continue  # Skip imputation for this row
        
        # Predict and save imputed value
        imputed_value = knn_imputer.predict([row])[0]
        imputed_values.append(imputed_value)
        
        # Assign imputed value back to df_copy
        df_copy.loc[index, column_name] = imputed_value
    

    return df_copy

In [None]:
columns_for_imputation = [
    "Ownership",
    "Record Level"
]

_ = categorical_imputation(estates_df_encoded, "Ownership", columns_for_imputation)

In [None]:
columns_for_imputation = [
    "Property Size GIA sq.m",
    "Primary Classification (Use)",
    "Ownership",
    "Year of Construction",
    "Division",
    "Location Category (A=Urban, B=Rural, C=Remote, D=Islands)", 
    "Condition Grade"
]

_ = categorical_imputation(estates_df_imputed, "Condition Grade", columns_for_imputation)

In [None]:
estates_df_encoded[columns_for_imputation[1:]] = estates_df_encoded[columns_for_imputation[1:]].astype('Int64')

In [None]:
# Imputing using a KNN Imputer
from sklearn.impute import KNNImputer

columns_for_imputation = [
    "Property Size GIA sq.m",
    "Primary Classification (Use)",
    "Ownership",
    "Year of Construction",
    "Division",
    "Location Category (A=Urban, B=Rural, C=Remote, D=Islands)", 
    "Condition Grade"
]

decoding_dicts = [
    None,
    classification_decoding,
    ownership_decoding,
    year_decoding,
    division_decoding,
    location_decoding,
    condition_decoding
]

#knn_imputer = KNNImputer(n_neighbors=3)
#estates_df_imputed = estates_df_encoded.copy()

#estates_df_imputed[columns_for_imputation] = knn_imputer.fit_transform(estates_df_encoded[columns_for_imputation])

In [None]:
# Decoding
def decode_column(df, column, decoding_dict):
    return df[column].map(decoding_dict)


for i, col in enumerate(columns_for_imputation):
    d = decoding_dicts[i]
    if d is None:
        continue
    estates_df_imputed[col] = decode_column(estates_df_imputed, col, d)


In [None]:
# Filters
police_station_filter = estates_df["Property Classification"] == "Police Station"
local_policing_filter = estates_df["Primary Classification (Use)"] == "Local Policing"
deployment_filter = estates_df["Deployment station? Y/N"] == "Yes"
estates_df = estates_df[police_station_filter & local_policing_filter & deployment_filter]