In [None]:
#pip install matplotlib
#!pip install scikit-learn #ensures it's installed in the correct env
#pip install -U scikit-learn

In [None]:
#imports only
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import sklearn
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit, RandomizedSearchCV
from scipy import stats
import sklearn.preprocessing as skl_pre
import sklearn.linear_model as skl_lm
import sklearn.discriminant_analysis as skl_da
import sklearn.neighbors as skl_nb
from sklearn.ensemble import RandomForestClassifier
from scipy import stats
import seaborn as sns
from pandas.plotting import scatter_matrix


In [None]:
file_path = 'siren_data_train.csv'
data = pd.read_csv(file_path)

#Split the binary columns into zeroes and ones and use that going forwards
data = pd.get_dummies(data, columns=["building","noise", "in_vehicle", "asleep","no_windows"])
data_copy = data.copy()

data.head()

In [None]:
data.describe()

# Create new features - distance, age group and distance group

### Create age group

#### Created new age buckets based on:
https://www.nidcd.nih.gov/health/statistics/hearing-loss-increases-with-age


![title](hearing_w_age.png)

In [None]:

#original bins
#age_bins = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
#age_labels = ['0-10', '10-20', '20-30', '30-40', '40-50', '50-60', '60-70', '70-80', '80-90', '90-100']

# Create age groups column
#data['age_group'] = pd.cut(data['age'], bins=age_bins, labels=age_labels, right=False)

#new bins based on hearing with age
new_age_bins = [0, 40, 50, 60, 70, float('inf')]
new_age_labels = ['0-39', '40-49', '50-59', '60-69', '70+']

# Create age groups column with new bins
data['age_group'] = pd.cut(data['age'], bins=new_age_bins, labels=new_age_labels, right=False)



In [None]:
data

### Create distance

In [None]:
#add distance to horn
# Calculate Euclidean distance between person and nearest horn
data['dist'] = np.sqrt((data['xcoor'] - data['near_x'])**2 + (data['ycoor'] - data['near_y'])**2)


### Create distance group

In [None]:

# Add distance groups
# Create bins for distance ranging from 0 to 3000 with a step size of 100
step_size = 500
distance_bins = np.arange(0, 3100, step_size)

# Create labels for distance bins
distance_labels = [f'{i}-{i+step_size}' for i in range(0, 3000, step_size)]
# Append a bin edge for values greater than 3000
distance_bins = np.append(distance_bins, np.inf)
distance_labels.append('>3000')

# Assign each distance value to a corresponding bin
data['distance_groups'] = pd.cut(data['dist'], bins=distance_bins, labels=distance_labels, right=False)

### Check data

In [None]:
data.head()

# Generate dummies for newly created categorical data but keep original columns for the plots later

In [None]:
dummies = pd.get_dummies(data, columns=["age_group", "distance_groups", "heard"])

keep_these_original_columns_for_plots = data[["heard", "distance_groups", "age_group"]]

data = pd.concat([keep_these_original_columns_for_plots, dummies], axis=1)

data_copy = data.copy()


In [None]:
data.head()

# Generating samples

In [None]:
#random sampling
def split_train_test(data, test_ratio): 
    shuffled_indices = np.random.permutation(len(data)) #creates array of indices that are randomly shuffled
    test_set_size = int(len(data) * test_ratio)
    test_indices = shuffled_indices[:test_set_size]
    train_indices = shuffled_indices[test_set_size:]
    return data.iloc[train_indices], data.iloc[test_indices]

#stratified sampling, where we create a sample that takes the distribution of age into account
def stratified_sampling(data, test_ratio, important_data_column):
    split = StratifiedShuffleSplit(n_splits=1, test_size = test_ratio, random_state=42)
    for train_index, test_index in split.split(data, important_data_column):
        strat_train_set = data.loc[train_index]
        strat_test_set = data.loc[test_index]
    return strat_train_set, strat_test_set


#create samples
train, test = stratified_sampling(data, 0.2, data["age"])
#print(len(test_set))
#print(len(train_set))

In [None]:
train


In [None]:
test

In [None]:
#compare performances, stratified sampling is better for age group

import pandas as pd
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit

def split_comparison(data, target_column, test_size=0.2, random_state=None):
    # Original data overall performance
    overall_performance = data[target_column].value_counts(normalize=True)
    
    # Stratified Sampling
    split = StratifiedShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state)
    for train_index, test_index in split.split(data, data[target_column]):
        strat_train_set = data.loc[train_index]
        strat_test_set = data.loc[test_index]
    
    # Random Sampling
    rand_train_set, rand_test_set = train_test_split(data, test_size=test_size, random_state=random_state)
    
    # Calculate performance metrics for stratified sampling
    strat_train_performance = strat_train_set[target_column].value_counts(normalize=True)
    strat_test_performance = strat_test_set[target_column].value_counts(normalize=True)
    
    # Calculate performance metrics for random sampling
    rand_train_performance = rand_train_set[target_column].value_counts(normalize=True)
    rand_test_performance = rand_test_set[target_column].value_counts(normalize=True)
    
    # Calculate percentage error
    strat_train_error = ((strat_train_performance - overall_performance) / overall_performance) * 100
    strat_test_error = ((strat_test_performance - overall_performance) / overall_performance) * 100
    
    rand_train_error = ((rand_train_performance - overall_performance) / overall_performance) * 100
    rand_test_error = ((rand_test_performance - overall_performance) / overall_performance) * 100
    
    # Create comparison table
    comparison_df = pd.DataFrame({
        'Overall performance': overall_performance,
        'Stratified': strat_test_performance,
        'Random': rand_test_performance,
        'Strat. % error': strat_test_error,
        'Rand. % error': rand_test_error
    })
    
    return comparison_df

# Example usage:
comparison_table = split_comparison(data, target_column='age', test_size=0.2, random_state=42)
print(comparison_table)


# Which features shall we include?

#### Drop some columns to easier filter out whats important

In [None]:
data.head()

In [None]:
mini_data = data.copy().drop(columns=["near_fid", "near_y", "near_x", "xcoor", "ycoor"], inplace=False)

### Check correlations

In [None]:
corr_matrix = mini_data.corr()
corr_matrix["heard_1"].sort_values(ascending=False)

In [None]:
#sns.heatmap(mini_data.corr());

# Increase the size of the heatmap.
plt.figure(figsize=(16, 6))
# Store heatmap object in a variable to easily access it when you want to include more features (such as title).
# Set the range of values to be displayed on the colormap from -1 to 1, and set the annotation to True to display the correlation values on the heatmap.
heatmap = sns.heatmap(mini_data.corr(), vmin=-1, vmax=1, annot=False)
# Give a title to the heatmap. Pad defines the distance of the title from the top of the heatmap.
heatmap.set_title('Correlation Heatmap', fontdict={'fontsize':12}, pad=12);

### From the above heatmap, we can see that these features positively affects heard_1:
Being in a building, not having any noise, not being in a vehicle, not being asleet (slight corr), to have windows, and being in the agegroup 0-50 y/o along with being 0-1000 (metres?) away from the horn




# Set X_train, X_test, y_train and y_test

In [None]:
#drop categorical values 
train = train.drop(columns=["distance_groups", "age_group"], axis=1)
test = test.drop(columns=["distance_groups", "age_group"], axis=1)

In [None]:
#right now we have all features, pls change

X_train = train.drop(columns=['heard', "heard_0", "heard_1"])
y_train = train['heard']

# Extract features and target for testing set
X_test = test.drop(columns=['heard', "heard_0", "heard_1"])
y_test = test['heard']


In [None]:
X_train

In [None]:
y_train

# (i) Does the distance to the nearest horn affect whether a person hears the siren or not?

In [None]:
data.columns

In [None]:
#Comparing distance and heard

#temp_data = data_copy

# Create stacked bar chart
pd.crosstab(data['distance_groups'], data['heard']).plot.bar(stacked=True)
plt.xlabel('Distance to Horn')
plt.ylabel('Count')
plt.title('Relationship between Distance to Horn and "Heard"')
plt.legend(title='Heard')
plt.show()

In [None]:
dist_num = data.pivot_table(index = ['distance_groups'], columns= 'heard', aggfunc='size')

dist_num[1] = dist_num[1].fillna(0)
dist_num[0] = dist_num[0].fillna(0)
dist_num_1 = np.array(dist_num[1])
dist_num_0 = np.array(dist_num[0])

dist_procent = (dist_num_1)/(dist_num_1+dist_num_0)*100

index_dist = np.array(dist_num.index)


plt.plot(index_dist, dist_procent)
plt.xlabel('Distance ()')
plt.title("Distance to horn and % who heard it from that distance")
plt.xticks(rotation=45)
plt.ylabel('The procent hearing (%)')

#### Is there a statistical difference between those that heard the signal and those that didn't given the distance?

In [None]:
from scipy import stats

# Split data into two groups based on 'heard' column
heard_group = data[data['heard'] == 1]['dist']
not_heard_group = data[data['heard'] == 0]['dist']


# Visualize distributions (optional)
# Example: sns.histplot(heard_group, label='Heard')
# Example: sns.histplot(not_heard_group, label='Not Heard')
# Add legend, labels, and titles as necessary

# Perform t-test
t_statistic, p_value = stats.ttest_ind(heard_group, not_heard_group)

# Interpret results
alpha = 0.05
if p_value < alpha:
    print("There is a statistically significant difference in the distributions.")
else:
    print("There is no statistically significant difference in the distributions.")


# (ii) Are the people who hear the siren younger than the people who do not hear it?

In [None]:

#Comparing age and heard

# Create stacked bar chart
pd.crosstab(data['age_group'], data['heard']).plot.bar(stacked=True)
plt.xlabel('Age Group')
plt.ylabel('Count')
plt.title('Relationship between Age Groups and "Heard"')
plt.legend(title='Heard')
plt.show()

In [None]:
#The relationship between age and hearing
age_num = data_copy.pivot_table(index = ['age'], columns= 'heard', aggfunc='size')

age_num[1] = age_num[1].fillna(0)
age_num[0] = age_num[0].fillna(0)
age_num_1 = np.array(age_num[1])
age_num_0 = np.array(age_num[0])

age_procent = (age_num_1)/(age_num_1+age_num_0)*100 #procent som hör per åldersgrupp

index_age = np.array(age_num.index)

plt.plot(index_age, age_procent)
plt.xlabel('age (years)')
plt.ylabel('The procent hearing (%)')



# (iii) Does the direction towards the nearest horn affect whether a person hears the siren or not?

In [None]:
# Create bins for angle ranging from 0 to 2000 with a step size of 100
angle_bins = np.arange(0, 181, 12)

# Create labels for angle bins
angle_labels = [f'{i}-{i+15}' for i in range(0, 180, 12)]

# Assign each angle value to a corresponding bin
data['ange_span'] = pd.cut(data['near_angle'], bins=angle_bins, labels=angle_labels, right=False)

angle_num = data.pivot_table(index = ['ange_span'], columns= 'heard', aggfunc='size')

angle_num_1 = np.array(angle_num[1])
angle_num_0 = np.array(angle_num[0])

angle_procent = (angle_num_1 )/(angle_num_1 +angle_num_0)*100 #procent som hör

index_angle = np.array(angle_num.index)

plt.plot(index_angle, angle_procent)
plt.xlabel('')
plt.ylabel('The procent hearing (%)')


# Random Forest

In [None]:
#Prepering the data for training and validation

data['dist'] = np.sqrt((data.near_x - data.xcoor)**2 + (data.near_y - data.ycoor)**2)

feat = ['near_fid', 'near_x', 'near_y', 'near_angle', 'xcoor', 'ycoor',
       'age', 'building_0', 'building_1', 'noise_0', 'noise_1', 'in_vehicle_0',
       'in_vehicle_1', 'asleep_0', 'asleep_1', 'no_windows_0', 'no_windows_1', 'dist']

x = data[feat] #inputs

y = data.heard # output

x = data[feat] #inputs

y = data['heard'] # output 
train_X, val_X, train_y, val_y = train_test_split(x,y, random_state=0, train_size=0.8)

In [None]:
model_tree = RandomForestClassifier()

par = {'n_estimators': stats.randint(50, 750), 'criterion':['gini', 'entropy'], 'max_depth': [4, 5, 6, 7, 8, 9], 'min_samples_split' : [1,2,3,4, 5, 6]}


RSC = RandomizedSearchCV(model_tree, param_distributions = par, n_iter = 10, cv=5, n_jobs = -1 )

RSC.fit(train_X, train_y)

model_tree = RSC.best_estimator_

model_tree.fit(train_X, train_y)

model_tree_prediction = model_tree.predict(val_X)

print(pd.crosstab(model_tree_prediction, val_y))
print(f"acc: {np.mean(model_tree_prediction == val_y)}")

In [None]:
print(pd.crosstab(model_tree_prediction, val_y))
print(f"acc: {np.mean(model_tree_prediction == val_y)}")

# LDA and QDA

### LDA

In [None]:
import sklearn.discriminant_analysis as skl_da

In [None]:
lda_model = skl_da.LinearDiscriminantAnalysis()
lda_model.fit(X_train, y_train)

In [None]:
predict_prob_lda = lda_model.predict_proba(X_test)

print("The class order in the model: ")
print(lda_model.classes_)

print("Examples of predicted probabilities for the above classes: ")
with np.printoptions(suppress=True, precision=6):
    print(predict_prob_lda[0:5])

In [None]:
prediction_lda = np.empty(len(X_test), dtype=object)
prediction_lda = np.where(predict_prob_lda[:, 0] >= 0.5, 0, 1) #0 is not heard, 1 is heard
print("Five first predictions: ")
print(prediction_lda[0:5], "\n")


# Confusion matrix
print("Confusion matrix: \n")
print(pd.crosstab(prediction_lda, y_test), "\n")

#Accuracy
print(f"Accuracy: {np.mean(prediction_lda == y_test):.3f}")

### QDA

In [None]:
qda_model = skl_da.QuadraticDiscriminantAnalysis()
qda_model.fit(X_train, y_train)

In [None]:
# As we get this warning above, we need to determine which variables correlate


In [None]:
X_train

In [None]:
#### Check which features correlate pairwise

In [None]:
X_train

#### Minimizing pairwise correlating features 
in_vehicle_0  correlates with noise_0 and same for opposite --> choose only noise






In [None]:
#choose either one of these "groups" to activate different combinations of features
#with removal of features that pairwise correlate with another feature

#using this set gives accuracy of 74%.... in comparison, using all features gives accuracy 90%
#X_train_mini = X_train.drop(columns=["xcoor", "ycoor", "near_x", "near_y", "near_angle", "dist", "age"], axis=1)
#X_test_mini = X_test.drop(columns=["xcoor", "ycoor", "near_x", "near_y", "near_angle", "dist", "age"], axis=1)

#all features
X_train_mini = X_train
X_test_mini = X_test




In [None]:
# Calculating the correlation matrix
corr_matrix = X_train_mini.corr()

# Plotting the correlation matrix
plt.figure(figsize=(8, 6))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', vmin=-1, vmax=1)
plt.title('Correlation Matrix')
plt.show()

In [None]:
qda_model = skl_da.QuadraticDiscriminantAnalysis()
qda_model.fit(X_train_mini, y_train)

###  add this info to the report: https://stats.stackexchange.com/questions/29385/collinear-variables-in-multiclass-lda-training

In [None]:
### As many values correlate together, one idea is to group them going forward. 
# Otherwise we likely get weird results below. But for now I will let it be, fix laterrr

In [None]:
predict_prob_qda = qda_model.predict_proba(X_test_mini)

print("The class order in the model: ")
print(qda_model.classes_)

print("Examples of predicted probabilities for the above classes: ")
with np.printoptions(suppress=True, precision=6):
    print(predict_prob_qda[0:5])

In [None]:
prediction_qda = np.empty(len(X_test), dtype=object)
prediction_qda = np.where(predict_prob_qda[:, 0] >= 0.5, 0, 1) #0 is not heard, 1 is heard
print("Five first predictions: ")
print(prediction_qda[0:5], "\n")


# Confusion matrix
print("Confusion matrix: \n")
print(pd.crosstab(prediction_qda, y_test), "\n")

#Accuracy
print(f"Accuracy: {np.mean(prediction_qda == y_test):.3f}")

#### This is from another course

In [None]:
def classification_report_interval(
    y_true,
    y_pred,
    labels=None,
    alpha = 0.01,
    union_bound_correction=True
):
    """Produces a classification report with precision, recall and accuracy
    It also uses Hoeffdings inequality to produce confidence intervals around
    each measurement. We can do this with or without multiple measurement
    correction (union bound correction).

    Example output is:
                labels           precision             recall

               0.0  0.88 : [0.50,1.00] 0.40 : [0.15,0.65]
               1.0  0.56 : [0.34,0.78] 0.93 : [0.65,1.00]

          accuracy                                        0.64 : [0.45,0.83]

    Parameters:
    y_true                          -- The true labels
    y_pred                          -- The predicted labels
    labels                          -- TODO
    alpha[0.01]                     -- The confidence level of the intervals
    union_bound_correction[True]    -- If we should compensate with the union bound because we
                                    have multiple intervals to compute in order to keep the level
                                    of confidence for all intervals jointly.

    Returns:
    a printable string.
    """
    import numpy as np

    def precision_recall(y_true,
        y_pred,
        labels=None,alpha=0.01, correction=1):
        p = []
        r = []
        f1 = []
        support = []
        for label in labels:
            y_true_pred_label = y_true[y_pred == label]
            precision = np.mean(y_true_pred_label == label)
            delta = (1/np.sqrt(len(y_true_pred_label)))*np.sqrt((1/2)*np.log(2*correction/alpha))
            p.append("%.2f : [%.2f,%.2f]" % (precision, np.maximum(precision-delta,0),np.minimum(precision+delta,1)))

            y_pred_true_label = y_pred[y_true == label]
            recall = np.mean(y_pred_true_label == label)
            delta = (1/np.sqrt(len(y_pred_true_label)))*np.sqrt((1/2)*np.log(2*correction/alpha))
            r.append("%.2f : [%.2f,%.2f]" % (recall, np.maximum(recall-delta,0),np.minimum(recall+delta,1)))

        return (p,r)

    def accuracy_interval(y_true,y_pred,alpha=0.01,correction=1):
        acc = np.mean(y_true == y_pred)
        delta = (1/np.sqrt(len(y_true)))*np.sqrt((1/2)*np.log(2*correction/alpha))
        return "%.2f : [%.2f,%.2f]" % (acc, np.maximum(acc-delta,0),np.minimum(acc+delta,1))

    digits = 18
    target_names = None
    if labels is None:
        labels = list(set(y_true).union(set(y_pred)))
        labels_given = False
    else:
        labels = np.asarray(labels)
        labels_given = True

    target_names = ["%s" % l for l in labels]

    headers = ["precision", "recall"]
    # compute per-class results without averaging
    # Simple correction using the union bound
    # We are computing 2 intervals for each label for precision and recall
    # In addition we are computing 2 intervals for accuracy
    # This is in total 2*n_labels+2
    if (union_bound_correction):
        correction = 2*len(labels)+2
    else:
        correction=1
    p, r = precision_recall(
        y_true,
        y_pred,
        labels=labels,
        alpha=alpha,
        correction=correction
    )

    rows = zip(target_names, p, r)

    name_width = max(len(cn) for cn in target_names)
    width = max(name_width, digits)
    head_fmt = "{:>{width}s} " + " {:>{digits}}" * len(headers)
    report = head_fmt.format("labels", *headers, width=width,digits=digits)
    report += "\n\n"
    row_fmt = "{:>{width}s} " + " {:>{digits}s}" * 2 + "\n"
    for row in rows:
        report += row_fmt.format(*row, width=width, digits=digits)
    row_fmt_acc = "{:>{width}s} " + " {:>{digits}s}" * 2 + " {:>{digits}s}""\n"
    report += "\n"
    accuracy = accuracy_interval(y_true,y_pred,alpha=alpha,correction=correction)
    report+=row_fmt_acc.format(*("accuracy","","",accuracy),width=width,digits=digits)

    return report

In [None]:
report_lda = classification_report_interval(
    y_test.values.reshape(-1),
    prediction_lda,
    alpha = 0.05,
)

print("Performance assessment LDA")
print("Confidence intervals are within brackets")
print(report_lda)

In [None]:
report_qda = classification_report_interval(
    y_test.values.reshape(-1),
    prediction_qda,
    alpha = 0.05,
)

print("Performance assessment QDA")
print("Confidence intervals are within brackets")
print(report_qda)