<a href="https://colab.research.google.com/github/CrAvila/IA/blob/main/Taller1/Stroke_PM_Front.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install gradio

In [54]:
# Function to classify inputs from the user
def classify_input(value, config):
    ranges = config["ranges"]
    categories = config["categories"]
    for i, (start, end) in enumerate(ranges):
        if start <= value < end:
            return categories[i]
    return categories[-1]

In [58]:
# Model

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from random import random
from itertools import combinations

data = pd.read_csv('https://drive.google.com/uc?export=download&id=1NYlG6ZYmh-TdHgEuzTz1K-yfFxIoFEUC')

# Events defined in the model
events = {
    "quantitative" : {

        "age": {
            "ranges" : [
                (0,18),
                (18,65),
                (65, float('inf'))
            ],

            "categories" : [
                'Child',
                'Adult',
                'Senior'
            ]
        },

        "avg_glucose_level": {
            "ranges" : [
                (0,80),
                (80,120),
                (120, float('inf'))
            ],

            "categories" : [
                'Low',
                'Normal',
                'High'
            ]
        },

        "bmi": {
            "ranges": [
                (0,18.5),
                (18.5, 25),
                (25, float('inf'))
            ],

            "categories": [
                'Underweight',
                'Normal',
                'Overweight',
                'Unknown'
            ]
        }
    },

    "qualitative" :  {
        "gender" : data['gender'].unique().tolist(),
        "work_type": data['work_type'].unique().tolist(),
        "Residence_type" : data['Residence_type'].unique().tolist(),
        "smoking_status" : data['smoking_status'].unique().tolist()
    },

    "boolean" : {
        "hypertension" : {
            "statuses" : [0,1]
        },

        "heart_disease" : {
            "statuses" : [0,1]
        },

        "ever_married" : {
            "statuses" : [0,1]
        } ,

        "stroke" : {
            "statuses" : [0,1]
        }
    }
}

#Function to process original data
def process_data(df, variables_to_use):

    processed_dataframe = df.copy()

    #Classify que quantitative values
    for quantitative_column in list(events['quantitative'].keys()):

        #Specific function to classify the column value
        def categorize(value):
            if isinstance(value, int) or isinstance(value, float):
                for i, (lower, upper) in enumerate(events["quantitative"][quantitative_column]["ranges"]):
                    if lower <= value < upper:
                        return events["quantitative"][quantitative_column]["categories"][i]
            else:
                return value

        # Replace value into the processed dataframe
        processed_dataframe[quantitative_column] = processed_dataframe[quantitative_column].apply(categorize)

    replace_mapping = {
        "No" : 0,
        "Yes" : 1
    }

    processed_dataframe['ever_married'] = processed_dataframe['ever_married'].replace(replace_mapping)

    new_columns = [col for col in variables_to_use]
    new_columns.append('stroke')

    cols_to_drop = [col for col in data.columns.values.tolist() if col not in new_columns]
    cols_to_drop = [col for col in cols_to_drop if col in processed_dataframe.columns.tolist()]

    processed_dataframe = processed_dataframe.drop(columns=cols_to_drop)
    processed_dataframe.fillna('Unknown', inplace=True)

    processed_dataframe = processed_dataframe[new_columns]

    return processed_dataframe

def split_data(df, test_percentage):
    # Separate data into stroke and no stroke groups
    stroke_group = df[df['stroke'] == 1]
    no_stroke_group = df[df['stroke'] == 0]

    # Shuffle records in both groups
    stroke_group_shuffled = stroke_group.sample(frac=1, random_state=42).reset_index(drop=True)
    no_stroke_group_shuffled = no_stroke_group.sample(frac=1, random_state=42).reset_index(drop=True)

    # Calculate the number of records needed for 20 %
    num_test_stroke = int(test_percentage * len(stroke_group_shuffled))
    num_test_no_stroke = int(test_percentage * len(no_stroke_group_shuffled))

    # Create test and training sets for the no stroke group
    test_set_no_stroke = no_stroke_group_shuffled[:num_test_no_stroke]
    train_set_no_stroke = no_stroke_group_shuffled[num_test_no_stroke:]

    # Create test and training set for the stroke group
    test_set_stroke = stroke_group_shuffled[:num_test_stroke]
    train_set_stroke = stroke_group_shuffled[num_test_stroke:]

    # Combine test sets
    final_test_set = pd.concat([test_set_stroke, test_set_no_stroke])

    # Combine train sets
    final_train_set = pd.concat([train_set_stroke, train_set_no_stroke])

    return final_test_set, final_train_set

variables_to_use = [
    'age',
    'avg_glucose_level',
    'bmi',
    'hypertension',
    'heart_disease',
]

processed_data = process_data(data, variables_to_use)
test_set, train_set = split_data(processed_data, 0.2)

def build_tree(data):

    count_dict = {}
    prob_dict = {}

    unique_values_dict = {
        column: data[column].unique().tolist() for column in data.columns
    }

    def calculate_probabilities(df, values_dict, conditions=(), index=0, accumulated_prob=1.0, prob_dict={}):

        # Return when the current variable is the last one
        if index == len(categories):
            return

        # Get current categories and values
        current_category = categories[index]
        current_values = values_dict[current_category]

        # Iterate though current values
        for value in current_values:
            new_conditions = conditions + ((current_category, value),)

            filtered_df = df
            for condition in new_conditions:
                column, val = condition
                filtered_df = filtered_df[filtered_df[column] == val]

            # Calculate the count and probability of current branch, if the count is zero then divide by 1
            count = len(filtered_df)
            prob = count / (len(df) or 1)
            branch_prob = accumulated_prob * prob

            if index not in prob_dict:
                prob_dict[index] = {}
                count_dict[index] = {}

            # Store the count and the probability of the branch
            prob_dict[index][new_conditions] = branch_prob
            count_dict[index][new_conditions] = count

            next_index = index + 1
            calculate_probabilities(filtered_df, values_dict, new_conditions, next_index, accumulated_prob, prob_dict)

    # Get categories
    categories = list(unique_values_dict.keys())
    # Calculate probabilities and counts of next branch
    calculate_probabilities(data, unique_values_dict, prob_dict=prob_dict)

    return count_dict, prob_dict

# Calculate counts and probabilities as a tree
count_tree, probability_tree = build_tree(train_set)

def get_probability(model, variables, input_data, events):

    # Categorize the quantitative variables
    def get_from_tree(variable, value, events):
        if isinstance(value, str):
            return value

        if variable in events['quantitative']:
            ranges = events['quantitative'][variable]['ranges']
            categories = events['quantitative'][variable]['categories']
            for i, (lower, upper) in enumerate(ranges):
                if lower <= value < upper:
                    return categories[i]
        return value

    # Create the tuple of tuples for the input
    categorized_input = []
    for variable, value in zip(variables, input_data):
        categorized_value = get_from_tree(variable, value, events)
        categorized_input.append(categorized_value)

    input_tuple = tuple((variable, value) for variable, value in zip(variables,categorized_input))

    # Specific for this model
    key = input_tuple + (('stroke', 1),)

    # Extract probability
    probability = model[len(variables)][key]

    return probability



def get_stroke_risk(model, variables, input_data):
    probability = get_probability(model, variables, input_data, events)
    rgn = random()

    risk = 0

    if rgn < probability or probability > 0.05:
        risk = 1

    return risk, probability

In [60]:
def predict_stroke(age, avg_glucose_average, bmi_value, bmi_known, hypertension, heart_disease):
    # Classify the input features
    age_category = classify_input(age, events["quantitative"]["age"])
    glucose_category = classify_input(avg_glucose_average, events["quantitative"]["avg_glucose_level"])

    bmi_config = events["quantitative"]["bmi"]
    bmi_category = "Unknown" if bmi_known else classify_input(bmi_value, bmi_config)
    hypertension = 1 if hypertension else 0
    heart_disease = 1 if heart_disease else 0

    inputs = [age_category, glucose_category, bmi_category, hypertension, heart_disease]
    r, p = get_stroke_risk(probability_tree, variables_to_use, inputs)
    pf = "{:.2%}".format(p)

    # Your prediction logic here
    prediction_result = f"You are{' ' if r == 1 else ' not '}at risk of a stroke."
    desc = f"{pf} of people with these characteristics suffered a stroke."

    return prediction_result, desc

with gr.Blocks() as app:
    age = gr.Number(label="Age")
    avg_glucose_average = gr.Number(label="Average Glucose")
    bmi_value = gr.Number(label="Body Mass Index", default=None)
    bmi_known = gr.Checkbox(label="I don't know my BMI")
    hypertension = gr.Checkbox(label="Hypertension")
    heart_disease = gr.Checkbox(label="Heart Disease")

    output = [gr.Textbox(label="Stroke Prediction"),
              gr.Textbox(label="Description")]

    predict_btn = gr.Button("Predict")
    predict_btn.click(fn=predict_stroke, inputs=[age, avg_glucose_average, bmi_value, bmi_known, hypertension, heart_disease], outputs=output)

app.launch()

  bmi_value = gr.Number(label="Body Mass Index", default=None)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Note: opening Chrome Inspector may crash demo inside Colab notebooks.

To create a public link, set `share=True` in `launch()`.


<IPython.core.display.Javascript object>

