<table class="ee-notebook-buttons" align="left">
    <td><a target="_blank"  href="https://github.com/NINAnor/urban-treeDetection"><img width=32px src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" style="filter: invert(100%)"/> View source on GitHub</a></td>
    <td><a target="_blank"  href="https://drive.google.com/drive/folders/1mEQBfa-tVViVWFt27XzUP4Wr19u1iuZm"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" /> Run in Google Colab</a></td>
</table>

# Urban Tree Detection | Extrapolating Ecosystem Service Values

**Author:** Willeke A'Campo

**Description:** Script to clean the output i-Tree Eco dataset and extrapolate the Ecosystem Service Values to all trees in the municipality. The regression is trained on the in situ tree dataset from the municipality or if not availabe on Oslo's tree in situ tree data. 

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# import geopandas as gpd
from scipy import stats

# from splot.esda import moran_scatterplot, plot_moran

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score

In [None]:
# import  py-packages
import os
import logging

# import local packages
from src import MUNICIPALITY, RAW_PATH, INTERIM_PATH
from src import logger

In [None]:
print(RAW_PATH)
print(MUNICIPALITY)

In [None]:
# configure seaborn plot
sns.set_theme(
    context="notebook",  # paper, notebook, talk, and poster
    style="darkgrid",  # darkgrid, whitegrid, dark, white, and ticks
    palette="dark",  # deep, muted, bright, pastel, dark, and colorblind
    font="sans-serif",  # font family: serif, sans-serif, cursive, fantasy, and monospace
    font_scale=1,  # 1 = default font size (12pt)
    color_codes=True,  # False = don't color code from current palette
    rc=None,  # dict for additional settings (e.g. axes.labelsize=15)
)

In [None]:
# configure logger
logger.setup_logger(logfile=False)
logging.info(f"Start preparing the tree database of {MUNICIPALITY}.")

# print global project variables
print(f"interim_data_path: {INTERIM_PATH}")

### Import Reference Data Oslo

In [None]:
# load the Excel file into a lookup dictionairy
excel_path = os.path.join(INTERIM_PATH, "kristiansand_itree_eco.xlsx")
print(excel_path)

workbook = pd.ExcelFile(excel_path)
sheet_names = workbook.sheet_names
print(f"workbook sheet names: {sheet_names}")

# import Oslo's reference dataset
df_raw = pd.read_excel(excel_path, sheet_name="oslo_reference_data")

# import metadata
df_metadata = pd.read_excel(excel_path, sheet_name="metadata")
df_metadata = df_metadata[["urban-treeDetection_colnames", "python_colnames", "dtype"]]

# import genus list
df_genus = pd.read_excel(excel_path, sheet_name="oslo_unique_genus")
genus_bins = df_genus["genus"].tolist()

print("Raw data information: Oslo Reference Data")
display(df_raw.head())
print("Rows in dataframe: ", len(df_raw))
print(f"\nunique genus: {genus_bins}")
print("\nMetadata information: ")
display(df_metadata.head())

### Clean dataset
- remove rows with missing values
- remove rows with negative values (not done)

In [None]:
# drop rows with missing values
df = df_raw.dropna()
# drop rows with missing values
df = df.dropna()
display(df.head())
print(df.shape)

- rename columns 
- trim column names
- set dtype of columns 

In [None]:
# trim and rename columns using metadata
col_name_mapping = dict(
    zip(df_metadata["urban-treeDetection_colnames"], df_metadata["python_colnames"])
)
df.columns = df.columns.str.strip()
df.rename(columns=col_name_mapping, inplace=True)
df.columns = df.columns.str.strip()

# update column dtypes using metadata
dtype_mapping = dict(zip(df_metadata["python_colnames"], df_metadata["dtype"]))

# remove all values from dtype_mapping if they are not in df.columns
dtype_mapping = {
    key: dtype_mapping[key] for key in dtype_mapping.keys() if key in df.columns
}

# convert column dtypes using dtype_mapping
for key in dtype_mapping.keys():
    # print(f"column: {key}\t current dtype: {df[key].dtypes} \ttarget dtype: {dtype_mapping[key]}")
    # if the current dtype is not the target dtype, convert the column
    if df[key].dtypes != dtype_mapping[key]:
        try:
            if dtype_mapping[key] == "int":
                df[key] = df[key].astype(int)
                print(f"column: {key} converted to {df[key].dtypes}")
            elif dtype_mapping[key] == "float64":
                df[key] = df[key].astype(float)
                print(f"column: {key} converted to {df[key].dtypes}")
            elif dtype_mapping[key] == "object":
                df.astype({key: "object"}).dtypes
                print(f"column: {key} converted to {df[key].dtypes}")
        except ValueError:
            print(
                f"column: {key} ({df[key].dtypes}) could not be converted to {dtype_mapping[key]}"
            )
            pass
    else:
        print(
            f"column: {key} ({df[key].dtypes}) has already dtype: {dtype_mapping[key]}"
        )
        pass

### Import and clean Reference dataset (Kristiansand)

In [None]:
# import target dataset
excel_path = os.path.join(INTERIM_PATH, "kristiansand_itree_eco.xlsx")
df_target = pd.read_excel(excel_path, sheet_name="kristiansand_target")

In [None]:
# drop rows with missing values
df_target = df_target.dropna()
# drop rows with missing values
df_target = df_target.dropna()
display(df_target.head())
print(df_target.shape)

In [None]:
col_list = df.columns

# drop columns that are in the target dataset
col_list = [col for col in col_list if col not in df_target.columns]
print(col_list)

# add col list as columns to target_df
df_target = pd.concat([df_target, pd.DataFrame(columns=col_list)], axis=1)

In [None]:
df_target.columns = df_target.columns.str.strip()

# remove all values from dtype_mapping if they are not in df.columns
dtype_mapping = {
    key: dtype_mapping[key] for key in dtype_mapping.keys() if key in df_target.columns
}

# convert column dtypes using dtype_mapping
for key in dtype_mapping.keys():
    # print(f"column: {key}\t current dtype: {df[key].dtypes} \ttarget dtype: {dtype_mapping[key]}")
    # if the current dtype is not the target dtype, convert the column
    if df_target[key].dtypes != dtype_mapping[key]:
        try:
            if dtype_mapping[key] == "int":
                df_target[key] = df_target[key].astype(int)
                print(f"column: {key} converted to {df_target[key].dtypes}")
            elif dtype_mapping[key] == "float64":
                df_target[key] = df_target[key].astype(float)
                print(f"column: {key} converted to {df_target[key].dtypes}")
            elif dtype_mapping[key] == "object":
                df_target.astype({key: "object"}).dtypes
                print(f"column: {key} converted to {df_target[key].dtypes}")
        except ValueError:
            print(
                f"column: {key} ({df_target[key].dtypes}) could not be converted to {dtype_mapping[key]}"
            )
            pass
    else:
        print(
            f"column: {key} ({df_target[key].dtypes}) has already dtype: {dtype_mapping[key]}"
        )
        pass

### Correlation analysis Oslo

In [None]:
# drop columns with name 'id' and 'genus'
df_corr = df.drop(
    ["tree_id", "crown_id", "dbh", "dbh_in_situ", "sp_in_situ", "lon", "lat"], axis=1
)
correlation_matrix = df_corr.corr()

fig, ax = plt.subplots(figsize=(12, 8))
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm")
ax.set_title("Correlation Matirx", fontweight="ultralight", fontsize=25)
plt.xticks(rotation=45, fontweight="ultralight", fontsize=15)
plt.yticks(fontsize=15)
plt.show()

- select columns to keep for modelling
- round values for pred and response var to 3 decimals (ensure that X,Y columns are not rounded!)

In [None]:
# print(df.columns)
col_id = ["tree_id", "crown_id"]
col_coord = ["lon", "lat"]
predictors = ["tree_height", "crown_area", "pollution_zone", "genus"]
response_vars = [
    "carbon_storage",
    "carbon_seq",
    "runoff",
    "polution_no2",
    "polution_so2",
    "polution_pm25",
    "polution_co",
    "polution_o3",
    "totben_cap",
]

col_to_keep = col_id + predictors + response_vars + col_coord
print(col_to_keep)
df = df[col_to_keep]

# round response and predictor values to 2 decimals
df[predictors + response_vars] = df[predictors + response_vars].round(2)
display(df.head())


df_target = df_target[col_to_keep]
df_target[predictors + response_vars] = df_target[predictors + response_vars].round(2)
display(df_target)

In [None]:
# create a histogram for genus distribution
df_genus = df.groupby("genus").count()
df_genus = df_genus[["tree_id"]]
df_genus.rename(columns={"tree_id": "count"}, inplace=True)
df_genus.sort_values(by=["count"], inplace=True, ascending=False)
df_genus.reset_index(inplace=True)

# probability distribution of genus in percent
df_genus["probability"] = df_genus["count"] / df_genus["count"].sum()
df_genus["prob_percentage"] = round(df_genus["probability"] * 100)
display(df_genus.head())

df_genus["probability"].sum()

In [None]:
# plot histogram
fig, ax = plt.subplots(figsize=(30, 5))
sns.barplot(x="genus", y="prob_percentage", data=df_genus, ax=ax, color="darkgreen")
ax.set_title(
    "Tree Genus Probability Distribution in Oslo", fontweight="ultralight", fontsize=20
)
ax.set_xlabel("Genus", fontweight="bold")
ax.set_ylabel("Probability (%)", fontweight="bold")
plt.xticks(rotation=45, fontweight="ultralight")
plt.yticks(fontsize=10)
plt.show()

### Fill Target Dataset with Genus values using the probability distribution of Oslo 

In [None]:
# fil no data values in test_df['genus'] by using the probability distribution of df['genus']
df_target["genus"] = df_target["genus"].apply(
    lambda x: np.random.choice(df_genus["genus"], p=df_genus["probability"])
    if pd.isnull(x)
    else x
)

# print the probability distribution of target_data['genus']
target_data_genus = df_target.groupby("genus").count()
target_data_genus = target_data_genus[["tree_id"]]
target_data_genus.rename(columns={"tree_id": "count"}, inplace=True)
target_data_genus.sort_values(by=["count"], inplace=True, ascending=False)
target_data_genus.reset_index(inplace=True)

In [None]:
# probability distribution of genus in percent
target_data_genus["probability"] = (
    target_data_genus["count"] / target_data_genus["count"].sum()
)
target_data_genus["prob_percentage"] = round(target_data_genus["probability"] * 100)
display(target_data_genus.head())

# plot histogram
fig, ax = plt.subplots(figsize=(30, 5))
sns.barplot(
    x="genus", y="prob_percentage", data=target_data_genus, ax=ax, color="darkgreen"
)
ax.set_title("Test Tree Species Distribution", fontweight="ultralight", fontsize=20)
ax.set_xlabel("Genus", fontweight="bold")
ax.set_ylabel("Probability (%)", fontweight="bold")
plt.xticks(rotation=45, fontweight="ultralight")
plt.yticks(fontsize=10)
plt.show()

In [None]:
display(df_target.head())

### Non-weighted Regression with encoded genus info

In [None]:
# One-hot encoding for species
df["genus"] = df["genus"].astype("category")

# encode genus for regression
genus_encoded = pd.get_dummies(df["genus"], prefix="genus")

# get genus encoded column names
genus_encoded_cols = list(genus_encoded.columns)
# replace genus_ wiht "" in column names
genus_encoded_cols = [x.replace("genus_", "") for x in genus_encoded_cols]
genus_encoded_cols = [x.lower() for x in genus_encoded_cols]

# replace col names with formatted col names
genus_encoded.columns = genus_encoded_cols

df = pd.concat([df, genus_encoded], axis=1)
print(genus_encoded_cols)

In [None]:
target_genus_encoded = pd.get_dummies(df_target["genus"])
target_genus_encoded_cols = list(target_genus_encoded.columns)
target_genus_encoded_cols = [x.lower() for x in target_genus_encoded_cols]

# replace col names with formatted col names
target_genus_encoded.columns = target_genus_encoded_cols

df_target = pd.concat([df_target, target_genus_encoded], axis=1)
display(df_target.head())
print(target_genus_encoded)

In [None]:
# ensure that df and df_target contain the same column names
same_columns = df.columns.equals(df_target.columns)
print(same_columns)

# drop columns that are not in df_target
if not same_columns:
    df = df.drop([col for col in df.columns if col not in df_target.columns], axis=1)
    print(df.columns)
    print(df_target.columns)

In [None]:
# Extract the features (X) and the target variable (y) from the DataFrame
predictors = ["tree_height", "crown_area", "pollution_zone"] + genus_encoded_cols
response_vars = [
    "carbon_storage",
    "carbon_seq",
    "runoff",
    "polution_no2",
    "polution_so2",
    "polution_pm25",
    "polution_co",
    "polution_o3",
    "totben_cap",
]

for var in response_vars:
    y = df[var]

In [None]:
# genus encoded columns is all columns that are not within list 
no_genus = ['tree_id', 'crown_id', 'tree_height', 'crown_area', 'pollution_zone',
       'genus', 'carbon_storage', 'carbon_seq', 'runoff', 'polution_no2',
       'polution_so2', 'polution_pm25', 'polution_co', 'polution_o3',
       'totben_cap', 'lon', 'lat']
genus_encoded = [col for col in df.columns if col not in no_genus]
print(genus_encoded)

predictors = ["tree_height", "crown_area", "pollution_zone"] + genus_encoded

In [None]:
X = df[predictors]
X_target = df_target[predictors]
print(X_target)
seed = 42
test_size = 0.2

dict_results = {}

# TODO store model in dictionairy {model_name:model}
# TODO store model results in dataframe df_results dict. {model_name:df_results}

# loop over response variables and calc. linear regression
for var in response_vars:
    y = df[var]

    # SPLIT DATA
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=seed
    )

    # TRAIN MODEL
    # Create and fit the regression model
    model = LinearRegression()
    model.fit(X_train, y_train)

    b0 = round(model.intercept_, 1)
    b1 = round(model.coef_[0], 1)
    b2 = round(model.coef_[1], 1)
    b3 = round(model.coef_[2], 1)
    b4 = round(model.coef_[3], 1)
    y = var
    x1 = predictors[0]
    x2 = predictors[1]
    x3 = predictors[2]
    x4 = predictors[3]

    model_equation = f"y = {b0} + {b1}*{x1} + {b2}*{x2} + {b3}*{x3} + {b4}*{x4} + ..."

    # Make predictions on the training and testing sets
    y_train_pred = model.predict(X_train)
    y_test_pred = model.predict(X_test)

    # EVALUATE MODEL
    # store model results in dataframe df_results
    r2_train = round(r2_score(y_train, y_train_pred), 2)
    r2_test = round(r2_score(y_test, y_test_pred), 2)
    r2_train = round(r2_train, 2)
    rmse_train = round(np.sqrt(mean_squared_error(y_train, y_train_pred)), 2)

    df_results = pd.DataFrame(
        columns=[
            "model_name",
            "response_var",
            "equation",
            "train_rmse",
            "test_rmse",
            "train_r2",
            "test_r2",
        ]
    )
    df_results["model_name"] = f"model_{var}"
    df_results["response_var"] = var
    df_results["equation"] = model_equation
    df_results["train_rmse"] = rmse_train
    df_results["test_rmse"] = np.sqrt(mean_squared_error(y_test, y_test_pred))
    df_results["train_r2"] = r2_train
    df_results["test_r2"] = r2_test

    # append results dataframe to dict
    dict_results[f"model_{var}"] = df_results

    # DEPLOY MODEL
    # Apply the regression model to the df_target dataset
    y_target_pred = model.predict(X_target)

    # Fill the response columns in df_target with the predicted values
    df_target[var] = y_target_pred

    # DISPLAY RESULTS
    print(f"Model: {var}")
    print(f"Equation: {model_equation}")
    display(df_target.head())

    # plot results and save plot to output folder
    plt.clf()  # clear figure

    # Regression plot
    sns.jointplot(
        x=y_test,
        y=y_test_pred,
        kind="reg",
        truncate=False,
        # xlim=(0, 60), ylim=(0, 12),
        color="#427360",
        height=7,
    )
    plt.xlabel(f"actual {var}")
    plt.ylabel(f"predicted {var}")
    plt.figtext(
        0,
        -0.05,
        f"R2: {r2_train}   RMSE: {rmse_train}\n {model_equation}",
        ha="left",
        fontsize=8,
    )
    plt.show()

    # save plot to output folder
    # output_path = os.path.join(INTERIM_PATH, f'linear_reg_{var}.png')
    # plt.savefig(output_path, dpi=300, bbox_inches='tight')

### Export Results

In [None]:
display(df_target.head())

# round values
df_target[predictors + response_vars] = round(df_target[predictors + response_vars], 3)

# drop genus cols
drop_cols = target_genus_encoded.columns


df_target.drop(drop_cols, axis=1, inplace=True)
display(df_target.head())

In [None]:
# export the results in df_target
csv_path = os.path.join(INTERIM_PATH, "kristiansand_extrapolation_from_oslo_results.csv")
df_target.to_csv(csv_path, index=False, sep=";")

### BACKUP | Non-weighted Regression without genus

y = b0 + b1x1 + b2x2 + b3*x3 + ...

where:

y is the dependent variable,
x1, x2, x3, ... are the independent variables,
b0 is the y-intercept of the regression line, and
b1, b2, b3, ... are the coefficients for each independent variable.

In [None]:
# Extract the features (X) and the target variable (y) from the DataFrame
print(df.columns)

predictors = ["tree_height", "crown_area", "pollution_zone"]
response_vars = [
    "carbon_storage",
    "carbon_seq",
    "runoff",
    "polution_no2",
    "polution_so2",
    "polution_pm25",
    "polution_co",
    "polution_o3",
    "totben_cap",
]

for var in response_vars:
    y = df[var]

X = df[predictors]
seed = 42
test_size = 0.2

dict_results = {}

# TODO store model in dictionairy {model_name:model}
# TODO store model results in dataframe df_results dict. {model_name:df_results}

# loop over response variables and calc. linear regression
for var in response_vars:
    y = df[var]
    # display(X.head())
    # display(y.head())

    # Split the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=seed
    )

    # Create and fit the regression model
    model = LinearRegression()
    model.fit(X_train, y_train)

    b0 = round(model.intercept_, 1)
    b1 = round(model.coef_[0], 1)
    b2 = round(model.coef_[1], 1)
    b3 = round(model.coef_[2], 1)
    y = var
    x1 = predictors[0]
    x2 = predictors[1]
    x3 = predictors[2]
    model_equation = f"y = {b0} + {b1}*{x1} + {b2}*{x2} + {b3}*{x3}"

    # model_equation = f'y = {model.intercept_} + {model.coef_[0]}*x1 + \
    #      {model.coef_[1]}*x2 + {model.coef_[2]}*x3'

    # Make predictions on the training and testing sets
    y_train_pred = model.predict(X_train)
    y_test_pred = model.predict(X_test)

    # store model results in dataframe df_results
    # r2 train round to 2

    r2_train = round(r2_score(y_train, y_train_pred), 2)
    r2_test = round(r2_score(y_test, y_test_pred), 2)
    r2_train = round(r2_train, 2)
    df_results = pd.DataFrame(
        columns=[
            "model_name",
            "response_var",
            "equation",
            "train_rmse",
            "test_rmse",
            "train_r2",
            "test_r2",
        ]
    )
    df_results["model_name"] = f"model_{var}"
    df_results["response_var"] = var
    df_results["equation"] = model_equation
    df_results["train_rmse"] = np.sqrt(mean_squared_error(y_train, y_train_pred))
    df_results["test_rmse"] = np.sqrt(mean_squared_error(y_test, y_test_pred))
    df_results["train_r2"] = r2_train
    df_results["test_r2"] = r2_test

    # append results dataframe to dict
    dict_results[f"model_{var}"] = df_results

    # plot results and save plot to output folder
    fig1, ax1 = plt.subplots(figsize=(5, 5))
    # Add the regression line of best fit
    plt.plot(
        [y_test.min(), y_test.max()], [y_test.min(), y_test.max()], "--k", label="Ideal"
    )
    plt.plot(y_test, y_test_pred, "bo", label="Predictions")
    plt.legend()
    plt.scatter(y_test, y_test_pred)
    plt.xlabel(f"Actual {var}")
    plt.ylabel(f"Predicted {var}")
    plt.title(f"Actual vs Predicted {var}")
    # # print R2 and model equation under the plot
    # Add text under the figure in italic
    plt.figtext(0, -0.05, f"R2: {r2_train}\n {model_equation}", ha="left", fontsize=8)

    # plt.text(f"R2: {df_results['test_r2'][0]:.2f}", fontsize=12)
    # plt.text(f"Equation: {model_equation}", fontsize=12)

    # # save plot to output folder
    # output_path = os.path.join(INTERIM_PATH, f'linear_reg_{var}.png')
    # #plt.savefig(output_path, dpi=300, bbox_inches='tight')

    # FIGURE 2
    sns.jointplot(
        x=y_test,
        y=y_test_pred,
        kind="reg",
        truncate=False,
        # xlim=(0, 60), ylim=(0, 12),
        color="#427360",
        height=7,
    )
    plt.xlabel(f"Actual {var}")
    plt.ylabel(f"Predicted {var}")
    # # print R2 and model equation under the plot
    # Add text under the figure in italic
    plt.figtext(0, -0.05, f"R2: {r2_train}\n {model_equation}", ha="left", fontsize=8)
    plt.show()
    break