In [1]:
import sys
from BayesNetReader import BayesNetReader
from DataReader import CSV_DataReader
import pandas as pd 

In [5]:
class CPT_Generator(BayesNetReader):
    configfile_name = None
    bn = None
    nbc = None
    countings = {}
    CPTs = {}
    constant_l = 1  # to avoid zero probabilities

    def __init__(self, configfile_name, datafile_name):
        self.configfile_name = configfile_name
        self.bn = BayesNetReader(configfile_name)
        self.csv = CSV_DataReader(datafile_name)
        self.generate_prior_and_conditional_countings()
        self.generate_probabilities_from_countings()
        self.write_CPTs_to_configuration_file()

    def generate_prior_and_conditional_countings(self):
        print("\nGENERATING countings for prior/conditional distributions...")
        print("-------------------------------------------------------------")

        for pd in self.bn.bn["structure"]:
            print(str(pd))
            p = pd.replace('(', ' ')
            p = p.replace(')', ' ')
            tokens = p.split("|")

            # generate countings for prior probabilities
            if len(tokens) == 1:
                variable = tokens[0].split(' ')[1]
                variable_index = self.get_variable_index(variable)
                counts = self.initialise_counts(variable)
                self.get_counts(variable_index, None, counts)

            # generate countings for conditional probabilities
            if len(tokens) == 2:
                variable = tokens[0].split(' ')[1]
                variable_index = self.get_variable_index(variable)
                parents = tokens[1].strip().split(',')
                parent_indexes = self.get_parent_indexes(parents)
                counts = self.initialise_counts(variable, parents)
                self.get_counts(variable_index, parent_indexes, counts)

            self.countings[pd] = counts
            print("counts="+str(counts))
            print()

    def generate_probabilities_from_countings(self):
        print("\nGENERATING prior and conditional probabilities...")
        print("---------------------------------------------------")

        for pd, counts in self.countings.items():
            print(str(pd))
            tokens = pd.split("|")
            variable = tokens[0].replace("P(", "")
            cpt = {}

            # generate prior probabilities
            if len(tokens) == 1:
                _sum = 0
                for key, count in counts.items():
                    _sum += count

                Jl = len(counts)*self.constant_l
                for key, count in counts.items():
                    cpt[key] = (count+self.constant_l)/(_sum+Jl)

            # generate conditional probabilities
            if len(tokens) == 2:
                parents_values = self.get_parent_values(counts)
                for parents_value in parents_values:
                    _sum = 0
                    for key, count in counts.items():
                        if key.endswith("|"+parents_value):
                            _sum += count

                    J = len(self.csv.rv_key_values[variable])
                    Jl = J*self.constant_l
                    for key, count in counts.items():
                        if key.endswith("|"+parents_value):
                            cpt[key] = (count+self.constant_l)/(_sum+Jl)

            self.CPTs[pd] = cpt
            print("CPT="+str(cpt))
            print()

    def get_variable_index(self, variable):
        for i in range(0, len(self.csv.rand_vars)):
            if variable == self.csv.rand_vars[i]:
                return i
        print("WARNING: couldn't find index of variables=%s" % (variable))
        return None

    def get_parent_indexes(self, parents):
        indexes = []
        for parent in parents:
            index = self.get_variable_index(parent)
            indexes.append(index)
        return indexes

    def get_parent_values(self, counts):
        values = []
        for key, count in counts.items():
            value = key.split('|')[1]
            if value not in values:
                values.append(value)
        return values

    def initialise_counts(self, variable, parents=None):
        counts = {}

        if parents is None:
            # initialise counts of variables without parents
            for var_val in self.csv.rv_key_values[variable]:
                if var_val not in counts:
                    counts[var_val] = 0

        else:
            # enumerate all sequence values of parent variables
            parents_values = []
            last_parents_values = []
            for i in range(0, len(parents)):
                parent = parents[i]
                for var_val in self.csv.rv_key_values[parent]:
                    if i == 0:
                        parents_values.append(var_val)
                    else:
                        for last_val in last_parents_values:
                            parents_values.append(last_val+','+var_val)

                last_parents_values = parents_values.copy()
                parents_values = []

            # initialise counts of variables with parents
            for var_val in self.csv.rv_key_values[variable]:
                for par_val in last_parents_values:
                    counts[var_val+'|'+par_val] = 0

        return counts

    def get_counts(self, variable_index, parent_indexes, counts):
        # accumulate countings
        for values in self.csv.rv_all_values:
            if parent_indexes is None:
                # case: prior probability
                value = values[variable_index]
            else:
                # case: conditional probability
                parents_values = ""
                for parent_index in parent_indexes:
                    value = values[parent_index]
                    if len(parents_values) == 0:
                        parents_values = value
                    else:
                        parents_values += ','+value
                value = values[variable_index]+'|'+parents_values
            counts[value] += 1

    def write_CPTs_to_configuration_file(self):
        print("\nWRITING config file with CPT tables...")
        print("See rewritten file "+str(self.configfile_name))
        print("---------------------------------------------------")
        name = self.bn.bn["name"]

        rand_vars = self.bn.bn["random_variables_raw"]
        rand_vars = str(rand_vars).replace('[', '').replace(']', '')
        rand_vars = str(rand_vars).replace('\'', '').replace(', ', ';')

        structure = self.bn.bn["structure"]
        structure = str(structure).replace('[', '').replace(']', '')
        structure = str(structure).replace('\'', '').replace(', ', ';')

        with open(self.configfile_name, 'w') as cfg_file:
            cfg_file.write("name:"+str(name))
            cfg_file.write('\n')
            cfg_file.write('\n')
            cfg_file.write("random_variables:"+str(rand_vars))
            cfg_file.write('\n')
            cfg_file.write('\n')
            cfg_file.write("structure:"+str(structure))
            cfg_file.write('\n')
            cfg_file.write('\n')
            for key, cpt in self.CPTs.items():
                cpt_header = key.replace("P(", "CPT(")
                cfg_file.write(str(cpt_header)+":")
                cfg_file.write('\n')
                num_written_probs = 0
                for domain_vals, probability in cpt.items():
                    num_written_probs += 1
                    line = str(domain_vals)+"="+str(probability)
                    line = line+";" if num_written_probs < len(cpt) else line
                    cfg_file.write(line)
                    cfg_file.write('\n')
                cfg_file.write('\n')

In [None]:
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("USAGE: CPT_Generator.py [your_config_file.txt] [training_file.csv]")
        print("EXAMPLE> CPT_Generator.py config-playtennis.txt play_tennis-train.csv")
        exit(0)
    else:
        configfile_name = sys.argv[1]
        datafile_name = sys.argv[2]
        CPT_Generator(configfile_name, datafile_name)

In [4]:
#############################################################################
# CSV_DataReader.py
#
# This program is the data reading code of the Naive Bayes classifier from week 1.
# It assumes the existance of data in CSV format, where the first line contains
# the names of random variables -- the last being the variable to predict.
#
# Version: 1.0, Date: 20 September 2024
# Contact: hcuayahuitl@lincoln.ac.uk
#############################################################################


class CSV_DataReader:
    rand_vars = []
    rv_key_values = {}
    rv_all_values = []
    predictor_variable = None
    num_data_instances = 0

    def __init__(self, file_name):
        if file_name is None:
            return
        else:
            self.read_data(file_name)

    def read_data(self, data_file):
        print("\nREADING data file %s..." % (data_file))
        print("---------------------------------------")

        self.rand_vars = []
        self.rv_key_values = {}
        self.rv_all_values = []

        with open(data_file) as csv_file:
            for line in csv_file:
                line = line.strip()
                if len(self.rand_vars) == 0:
                    self.rand_vars = line.split(',')
                    for variable in self.rand_vars:
                        self.rv_key_values[variable] = []
                else:
                    values = line.split(',')
                    self.rv_all_values.append(values)
                    self.update_variable_key_values(values)
                    self.num_data_instances += 1

        self.predictor_variable = self.rand_vars[len(self.rand_vars)-1]

        print("RANDOM VARIABLES=%s" % (self.rand_vars))
        print("VARIABLE KEY VALUES=%s" % (self.rv_key_values))
        print("VARIABLE VALUES=%s" % (self.rv_all_values))
        print("PREDICTOR VARIABLE=%s" % (self.predictor_variable))
        print("|data instances|=%d" % (self.num_data_instances))

    def update_variable_key_values(self, values):
        for i in range(0, len(self.rand_vars)):
            variable = self.rand_vars[i]
            key_values = self.rv_key_values[variable]
            value_in_focus = values[i]
            if value_in_focus not in key_values:
                self.rv_key_values[variable].append(value_in_focus)

In [3]:
#############################################################################
# BayesNetUtil.py
#
# Implements functions to simplify the implementation of algorithms for
# probabilistic inference with Bayesian networks.
#
# Version: 1.0, 06 October 2022
# Contact: hcuayahuitl@lincoln.ac.uk
#############################################################################


def tokenise_query(prob_query):
    print("\nTOKENISING probabilistic query="+str(prob_query))

    query = {}
    prob_query = prob_query[2:]
    prob_query = prob_query[:len(prob_query)-1]
    query["query_var"] = prob_query.split("|")[0]
    query["evidence"] = prob_query.split("|")[1]

    evidence = {}
    if query["evidence"].find(','):
        for pair in query["evidence"].split(','):
            tokens = pair.split('=')
            evidence[tokens[0]] = tokens[1]
        query["evidence"] = evidence

    print("query="+str(query))
    return query


def get_parents(child, bn):
    for conditional in bn["structure"]:
        if conditional.startswith("P("+child+")"):
            return None
        elif conditional.startswith("P("+child+"|"):
            parents = conditional.split("|")[1]
            parents = parents[:len(parents)-1]
            return parents

    print("ERROR: Couldn't find parent(s) of variable "+str(child))
    exit(0)


def get_probability_given_parents(V, v, evidence, bn):
    parents = get_parents(V, bn)
    probability = 0
    if parents is None:
        cpt = bn["CPT("+V+")"]
        probability = cpt[v]
    else:
        cpt = bn["CPT("+V+"|"+parents+")"]
        values = v
        for parent in parents.split(","):
            separator = "|" if values == v else ","
            values = values + separator + evidence[parent]
        probability = cpt[values]

    return probability


def get_domain_values(V, bn):
    domain_values = []

    for key, cpt in bn.items():
        if key == "CPT("+V+")":
            domain_values = list(cpt.keys())

        elif key.startswith("CPT("+V+"|"):
            for entry, prob in cpt.items():
                value = entry.split("|")[0]
                if value not in domain_values:
                    domain_values.append(value)

    if len(domain_values) == 0:
        print("ERROR: Couldn't find values of variable "+str(V))
        exit(0)

    return domain_values


def get_index_of_variable(V, bn):
    for i in range(0, len(bn["random_variables"])):
        variable = bn["random_variables"][i]
        if V == variable:
            return i

    print("ERROR: Couldn't find index of variable "+str(V))
    exit(0)


def normalise(counts):
    _sum = 0
    for value, count in counts.items():
        _sum += count

    distribution = {}
    for value, count in counts.items():
        p = float(count/_sum)
        distribution[value] = p

    return distribution


In [2]:
# Reading the CSV file into a DataFrame
df1 = pd.read_csv(r'C:\Users\Student\Documents\Module Dev Containers\AAI-assignment\mycode\dementia_data-MRI-features.csv')

In [None]:
# Reading the CSV file into a DataFrame
df1 = pd.read_csv(r'C:\Users\Student\Documents\Module Dev Containers\AAI-assignment\mycode\dementia_data-MRI-features.csv')

# Printing the shape of the DataFrame
print(df1.shape)
print(df1.head(5))

In [37]:
# # Using pandas `cut` function for equal-width bins, 4 bins in this case
# bins=[59, 69, 79, 89, 99]
# labels=["60-69", "70-79", "80-89", "90+"]
# df1['AgeGroup'] = pd.cut(df1['Age'], bins=bins, labels=labels)

# df1.head(5)

In [None]:
# Checking unique values in column 'Group'
df1['Group'].unique()

# Filtering rows in df1 where the 'Group' column is equal to 'Converted' and assigning them to df2
df2 = df1.loc[df1['Group'] == 'Converted']

# Dropping the rows from df1 that have been assigned to df2 using the corresponding index values
df1 = df1.drop(df2.index)

#df1 is the data frame that doesn't have the converted data
df1.head(40)

#df2 is the new data frame that contains the converted data
# df2.head(5)

In [None]:
# Creating a new column 'Last_Visit' to identify the last visit for each patient
df2['Last_Visit'] = df2.groupby('Subject ID')['Visit'].transform('max')

# Updating the 'Group' column based on 'Visit' and 'Last_Visit' conditions
df2.loc[df2['Visit'] < df2['Last_Visit'], 'Group'] = 'Nondemented'
df2.loc[df2['Visit'] == df2['Last_Visit'], 'Group'] = 'Demented'

# Dropping the 'Last_Visit' column
df2.drop('Last_Visit', axis=1, inplace=True)

# Displaying the updated DataFrame
df2.head(37)

In [None]:
# Combining the DataFrames df1 and df2
frames = [df1, df2]
df = pd.concat(frames)

df['Group'].unique()
df.head(50)

In [41]:
# # 3. Domain-specific binning (if you have specific thresholds)
# # Define custom bins based on clinical knowledge or research
# # Example: Small, Medium, Large ranges (these ranges are illustrative)
# bins = [1000, 1400, 1600, 2000]
# labels = ['Small', 'Medium', 'Large']
# df1['eTIV_custom'] = pd.cut(df1['eTIV'], bins=bins, labels=labels)

# # Display the first few rows
# df1.head(10)

In [None]:
# Renaming the 'M/F' column to 'Gender' in the DataFrame
df.rename(columns={'M/F': 'Gender'}, inplace=True)

# Drop unnecessary columns from the DataFrame if they exist
columns_to_drop = ['Subject ID', 'MRI ID', 'Hand', 'Gender', 'MR Delay']
existing_columns_to_drop = [col for col in columns_to_drop if col in df.columns]
df.drop(columns=existing_columns_to_drop, inplace=True)

# Display the current column names
print(df.columns)

In [None]:
# Checking for missing values in the DataFrame
df.isna().sum()

In [None]:
# Imputing missing values in the 'SES' column with the mode
df.SES.fillna(df.SES.mode()[0], inplace=True)

# Imputing missing values in the 'MMSE' column with the mean
df.MMSE.fillna(df.MMSE.mean(), inplace=True)

df.isna().sum()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Creating a count plot with 'Group' on the x-axis
sns.countplot(data=df, x='Group', palette='Set2').set(title = 'Dementia Group');

In [None]:
# Visualizing the distribution of 'Age' for each 'Group'
sns.displot(data=df, x='Age', hue='Group', kind="kde", palette='Set2');

In [None]:

# Visualizing the correlation matrix of numeric columns using a heatmap
sns.heatmap(df.corr(numeric_only=True), vmin=-1, cmap='coolwarm', annot=True);

In [None]:
# Learn the structure of the Bayesian Network
model = bnlearn.structure_learning.fit(df)

# Visualize the structure of the learned network
# model.plot()
bnlearn.plot(model)