In [None]:
import numpy as np

In [None]:
X_train = np.array([
    [0, 1, 1],
    [0, 0, 1],
    [0, 0, 0],
    [1, 1, 0]
])
Y_train = ['Y', 'N', 'Y', 'Y']
X_test = np.array([[1, 1, 0]])

In [None]:
def get_label_indices(labels):
    from collections import defaultdict
    label_indices = defaultdict(list)
    for index, label in enumerate(labels):
        label_indices[label].append(index)
    return label_indices

In [None]:
label_indices = get_label_indices(Y_train)
label_indices

In [None]:
def get_prior(label_indices):
    prior = {label: len(indices) for label, indices in label_indices.items()}
    total_count = sum(prior.values())
    for label in prior:
        prior[label] /= total_count
    return prior

In [None]:
prior = get_prior(label_indices)
prior

In [None]:
def get_likelihood(features, label_indices, smoothing=0):
    likelihood = {}
    for label, indices in label_indices.items():
        likelihood[label] = features[indices, :].sum(axis=0) + smoothing
        total_count = len(indices)
        likelihood[label] = likelihood[label] / (total_count + 2 * smoothing)
        
    return likelihood

In [None]:
likelihood = get_likelihood(X_train, label_indices, smoothing=1)
likelihood

In [None]:
def get_posterior(X, prior, likelihood):
    posteriors = []
    for x in X:
        posterior = prior.copy()
        for label, likelihood_label in likelihood.items():
            for index, bool_value in enumerate(x):
                posterior[label] *= likelihood_label[index] if bool_value else (1 - likelihood_label[index])
        # normalization
        sum_posterior = sum(posterior.values())
        for label in posterior:
            if posterior[label] == float('inf'):
                posterior[label] = 1.0
            else:
                posterior[label] /= sum_posterior
        posteriors.append(posterior.copy())
    return posteriors

In [None]:
posterior = get_posterior(X_test, prior, likelihood)
posterior

### Realization of the same algorithm usin `scikit-learn` package

In [None]:
from sklearn.naive_bayes import BernoulliNB

In [None]:
# create instance
clf = BernoulliNB(alpha=1.0, fit_prior=True)

In [None]:
clf.fit(X_train, Y_train)

In [None]:
pred_prob = clf.predict_proba(X_test)
pred_prob

In [None]:
pred = clf.predict(X_test)
print(pred)

## Movie Recommender

In [None]:
import pandas as pd
from collections import defaultdict
from os import path
import numpy as np

In [None]:
pd.__version__

In [None]:
DATA_PATH = "./data/ml-latest-small/"
n_users = 610
n_movies = 9742
rating_path = path.join(DATA_PATH, "ratings.csv")

In [None]:
df_rating = pd.read_csv(rating_path)
df_rating.head()

In [None]:
df_rating["userId"] -= 1 
df_rating.head()

In [None]:
df_rating["userId"].dtype

In [None]:
def load_ratig_data(df, n_users, n_movies):
    data = np.zeros([n_users, n_movies], dtype=np.float32)
    movie_id_mapping = {}
    movie_n_rating = defaultdict(int)
    for row in df.itertuples(index=False):
        user_id, movie_id, rating = row.userId, row.movieId, row.rating
        # if movie_id not in movie_id_mapping:
        #     movie_id_mapping[movie_id] = len(movie_id_mapping)
        mapped_movie_id = movie_id_mapping.setdefault(movie_id, len(movie_id_mapping))
        data[user_id, movie_id_mapping[movie_id]] = rating
        if rating > 0:
            movie_n_rating[movie_id] += 1
    return data, movie_n_rating, movie_id_mapping

In [None]:
data, movie_n_rating, movie_id_mapping = load_ratig_data(df_rating, n_users, n_movies)

In [None]:
type(movie_n_rating)

In [None]:
def display_distribution(data):
    values, counts = np.unique(data, return_counts=True)
    for value, count in zip(values, counts):
        print(f"Number of rating {value}: {count}")
display_distribution(data)

In [None]:
# movie_id_most, n_rating_most = sorted(movie_n_rating.items(), 
#                                       key= lambda d: d[1], reverse=True)[0]
movie_id_most, n_rating_most = max(movie_n_rating.items(), key=lambda d: d[1])

print(f"Movie ID {movie_id_most} has {n_rating_most} ratings.")

In [None]:
X_raw = np.delete(data, movie_id_mapping[movie_id_most], axis=1)
Y_raw = data[:, movie_id_mapping[movie_id_most]]

In [None]:
X = X_raw[Y_raw > 0]
Y = Y_raw[Y_raw > 0]
print(X.shape)
print(Y.shape)

In [None]:
display_distribution(Y)

In [None]:
recommend = 3
Y[Y <= recommend] = 0
Y[Y > recommend] = 1
n_pos = (Y == 1).sum()
n_neg = (Y == 0).sum()
print(f"pos: {n_pos}, neg: {n_neg}")

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(X, 
                                                    Y, 
                                                    test_size=0.25, 
                                                    random_state=11)

In [None]:
print(len(Y_train), len(Y_test))

In [None]:
from sklearn.naive_bayes import MultinomialNB
clf = MultinomialNB(alpha=1.0, fit_prior=True)
clf.fit(X_train, Y_train)

In [None]:
prediction_prob = clf.predict_proba(X_test)
print(len(prediction_prob))

In [None]:
prediction = clf.predict(X_test)
print(prediction)

In [None]:
accuracy = clf.score(X_test, Y_test)
print(f"model accuracy {accuracy*100:.1f}%")

In [None]:
from sklearn.metrics import confusion_matrix, f1_score

In [None]:
print(confusion_matrix(Y_test, prediction, labels=[0, 1]))

In [None]:
print(f1_score(Y_test, prediction, labels=[0, 1]))

In [None]:
from sklearn.metrics import classification_report
report = classification_report(Y_test, prediction)
print(report)

In [None]:
pos_prob = prediction_prob[:, 1]
thresholds = np.arange(0.0, 1.1, 0.05)
true_pos, false_pos = [0] * len(thresholds), [0] * len(thresholds)
for pred, y in zip(pos_prob, Y_test):
    for i, threshold in enumerate(thresholds):
        if pred >= threshold:
            if  y == 1:
                true_pos[i] += 1
            else:
                false_pos[i] += 1
        else:
            break

In [None]:
n_pos_test = (Y_test == 1).sum()
n_neg_test = (Y_test == 0).sum()

In [None]:
true_pos_rate = [tp / n_pos_test for tp in true_pos]
tpr = true_pos / n_pos_test
false_pos_rate = [fp / n_neg_test for fp in false_pos]
fpr = false_pos / n_neg_test

In [None]:
type(tpr)

In [None]:
type(true_pos_rate)

In [None]:
import matplotlib.pyplot as plt
plt.figure()
lw =2
plt.plot(fpr, tpr, color='darkorange', lw=lw)
plt.plot([0, 1], [0, 1], color='blue', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC')
# plt.legend(loc="lower right")
plt.show()

In [None]:
from sklearn.model_selection import StratifiedKFold
k = 5
k_fold = StratifiedKFold(n_splits=k, random_state=42, shuffle=True)

In [None]:
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import roc_auc_score
smoothing_factor_option = [1, 2, 3, 4, 5, 6]
fit_prior_option = [True, False]
auc_record = {}

for train_indices, test_indices in k_fold.split(X, Y):
    X_train, X_test = X[train_indices], X[test_indices]
    Y_train, Y_test = Y[train_indices], Y[test_indices]
    
    # Check if X_test is empty
    if X_test.shape[0] == 0:
        print("Skipping fold due to empty X_test.")
        continue
    
    for alpha in smoothing_factor_option:
        if alpha not in auc_record:
            auc_record[alpha] = {}
        
        for fit_prior in fit_prior_option:
            clf = MultinomialNB(alpha=alpha, fit_prior=fit_prior)
            clf.fit(X_train, Y_train)
            prediction_prob = clf.predict_proba(X_test)
            pos_prob = prediction_prob[:, 1]
            
            # Check if Y_test contains both classes
            if len(set(Y_test)) > 1:
                auc = roc_auc_score(Y_test, pos_prob)
                auc_record[alpha][fit_prior] = auc + auc_record[alpha].get(fit_prior, 0.0)
            else:
                print(f"Skipping fold due to only one class present in Y_test: {set(Y_test)}")

In [None]:
auc_record

In [None]:
for smoothing, smoothing_record in auc_record.items():
    for fit_prior, auc in smoothing_record.items():
        print(f'    {smoothing}    {fit_prior}    {auc/k:.5f}')

In [None]:
# in our case parameters (1, False)
clf = MultinomialNB(alpha=1.0, fit_prior=False)
clf.fit(X_train, Y_train)
pos_prob = clf.predict_proba(X_test)[:, 1]
print("AUC for best model: ", roc_auc_score(Y_test, pos_prob))

In [None]:
X_test

In [None]:
pos_prob

In [None]:
prob = clf.predict_proba(X_test)

In [None]:
len(X_test[1])

In [None]:
len(X_train[1])

## Heart Disease

In [None]:
import pandas as pd
import numpy as np
from os.path import join
from collections import defaultdict
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
import matplotlib.pyplot as plt

In [None]:
DATA_PATH = "./data/"

In [None]:
df_hd = pd.read_csv(join(DATA_PATH, "heart_disease_uci.csv"))

In [None]:
df_hd.head()

In [None]:
df_hd.info()

In [None]:
processed_data = df_hd.drop(columns=['id', 'dataset'])

In [None]:
processed_data.sample(5)

In [None]:
processed_data_dropped = processed_data.dropna()

In [None]:
processed_data_dropped.info()

In [None]:
print(processed_data_dropped['sex'].unique())
print(processed_data_dropped['cp'].unique())
print(processed_data_dropped['restecg'].unique())
print(processed_data_dropped['slope'].unique())
print(processed_data_dropped['thal'].unique())

In [None]:
# Create mappings dynamically for all columns
columns_to_map = ['sex', 'cp', 'restecg', 'slope', 'thal']
mappings = {}
processed_data_dropped = processed_data_dropped.copy()

for col in columns_to_map:
    unique_values = processed_data_dropped[col].unique()
    mappings[col] = {value: idx for idx, value in enumerate(unique_values)}
    # Perform mapping and convert to integer
    processed_data_dropped[col] = processed_data_dropped[col].map(mappings[col]).astype(int)

# Display the updated DataFrame info
print(processed_data_dropped.info())


In [None]:
def convert_float_to_int(df, columns):
    for col in columns:
        # Check if all values are effectively integers
        if np.all(df[col] % 1 == 0):
            df[col] = df[col].astype(int)
    return df

# Identify float columns
float_columns = processed_data_dropped.select_dtypes(include=['float']).columns

# Convert eligible float columns to int
df_final = convert_float_to_int(processed_data_dropped, float_columns)

In [None]:
df_final.info()

In [None]:
df_final['exang'] = df_final['exang'].astype(int)
df_final['fbs'] = df_final['fbs'].astype(int)

In [None]:
df_final.head(15)

In [None]:
df_final.info()

In [None]:
import seaborn as sns

In [None]:
f,ax = plt.subplots(figsize=(20, 10))
sns.heatmap(df_final.corr(), annot=True, linewidths=0.5,linecolor="red", fmt= '.1f',ax=ax)
plt.show()

In [None]:
X = df_final.iloc[:, :-1]
Y = df_final.iloc[:, -1]

In [None]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
scaler.fit(X)
X = scaler.transform(X)

In [None]:
Y = Y.copy()
Y[Y > 0] = 1

In [None]:
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=23)

In [None]:
from sklearn.naive_bayes import GaussianNB
classifier = GaussianNB()
classifier.fit(X_train, Y_train)

In [None]:
Y_pred = classifier.predict(X_test)

In [None]:
from sklearn import metrics
print(f"Accuracy Score: {metrics.accuracy_score(Y_test, Y_pred):.3f}")

In [None]:
cm = confusion_matrix(Y_test, Y_pred)
f,ax = plt.subplots(figsize=(10, 10))
sns.heatmap(cm, annot=True, linewidths=0.5,linecolor="red", fmt= '.0f',ax=ax)
plt.show()

In [None]:
report = classification_report(Y_test, Y_pred)
print(report)