In [2]:
import pandas as pd
import numpy as np
from google.colab import files
from itertools import combinations
from scipy.stats import beta, chi2_contingency
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional, Union

@dataclass
class AntibioticResult:
    name: str
    s_percentage: float
    lower_ci: float
    upper_ci: float
    s_count: int
    r_count: int
    tested_percentage: Optional[float] = None
    global_effectiveness: Optional[float] = None
    lower_ci_global: Optional[float] = None
    upper_ci_global: Optional[float] = None

@dataclass
class CombinationResult:
    combination: Tuple[str, str]
    effectiveness: float
    global_effectiveness: float
    lower_ci: float
    upper_ci: float
    s_count: int
    r_count: int
    tested_percentage: float
    lower_ci_global: Optional[float] = None
    upper_ci_global: Optional[float] = None
    p_value_vs_drug1: Optional[float] = None
    p_value_vs_drug2: Optional[float] = None

class AntibioticAnalyzer:
    VALID_VALUES = {'S', 'R', 'N'}

    def __init__(self, data_file_path: str, gram_file_path: str, exclusion_file_path: Optional[str] = None, confidence_level: float = 0.95):
        self.data_file_path = data_file_path
        self.gram_file_path = gram_file_path
        self.exclusion_file_path = exclusion_file_path
        self.CONFIDENCE_LEVEL = confidence_level
        self.gram_stain_map = self._load_gram_stain_map()
        self.excluded_count = 0
        self.data = self._load_and_validate_data()

    def _load_gram_stain_map(self) -> Dict[str, str]:
        try:
            df = pd.read_excel(self.gram_file_path)
            return {str(row['Microorganism']).lower(): str(row['Gram_stain']).lower().replace('gram-', '') for index, row in df.iterrows()}
        except Exception:
            return {}

    def _save_gram_stain_map(self):
        df = pd.DataFrame(list(self.gram_stain_map.items()), columns=['Microorganism', 'Gram_stain'])
        df['Gram_stain'] = 'Gram-' + df['Gram_stain']
        df.to_excel(self.gram_file_path, index=False)

    def _get_gram_stain(self, organism_name: str) -> str:
        organism_lower = organism_name.lower()
        for key, value in self.gram_stain_map.items():
            if key in organism_lower:
                return value
        print(f"\nMicroorganism '{organism_name}' not found in the Gram stain map.")
        while True:
            response = input("Is it Gram-positive or Gram-negative? (type 'positive' or 'negative'): ").lower().strip()
            if response in ['positive', 'negative']:
                self.gram_stain_map[organism_lower] = response
                self._save_gram_stain_map()
                return response
            print("Invalid input. Please type 'positive' or 'negative'.")

    def _load_and_validate_data(self) -> pd.DataFrame:
        try:
            data = pd.read_excel(self.data_file_path)
            if 'Microorganism' not in data.columns:
                data.rename(columns={data.columns[0]: 'Microorganism'}, inplace=True)
            if self.exclusion_file_path:
                try:
                    exclusion_df = pd.read_excel(self.exclusion_file_path)
                    exclusion_list = exclusion_df.iloc[:, 0].str.lower().tolist()
                    initial_count = len(data)
                    data = data[~data['Microorganism'].str.lower().str.contains('|'.join(exclusion_list), regex=True, na=False)]
                    self.excluded_count = initial_count - len(data)
                    print(f"\nSuccessfully excluded {self.excluded_count} isolates based on the provided list.")
                except Exception as e:
                    print(f"Warning: Could not apply exclusion list. Error: {e}")
                    self.excluded_count = 0
            data['Gram_stain'] = data['Microorganism'].apply(self._get_gram_stain)
            antibiotic_cols = data.columns[1:-1]
            for col in antibiotic_cols:
                if data[col].dtype == 'object':
                    data[col] = data[col].str.upper().str.strip()
            data.replace({'I': 'S'}, inplace=True)
            data.fillna('N', inplace=True)
            self._validate_data(data)
            return data
        except Exception as e:
            raise ValueError(f"Error loading data file: {e}")

    def _validate_data(self, data: pd.DataFrame):
        if data.empty: raise ValueError("No valid data remaining after filtering.")
        antibiotic_cols = data.columns[1:-1]
        invalid_cells = []
        for i, row in data.iterrows():
            for col in antibiotic_cols:
                if row[col] not in self.VALID_VALUES:
                    invalid_cells.append(f"Row {i+2}, Col {col}: {row[col]}")
        if invalid_cells:
            msg = "\n".join(invalid_cells[:5])
            if len(invalid_cells) > 5: msg += "\n..."
            raise ValueError(f"Invalid values detected (only S, R, N allowed):\n{msg}")

    def _calculate_confidence_interval(self, success_count: int, failure_count: int) -> Tuple[float, float]:
        n = success_count + failure_count
        if n == 0: return 0.0, 0.0
        alpha = 1 - self.CONFIDENCE_LEVEL
        lower = beta.ppf(alpha / 2, success_count + 0.5, failure_count + 0.5)
        upper = beta.ppf(1 - alpha / 2, success_count + 0.5, failure_count + 0.5)
        if success_count == 0: lower = 0.0
        if failure_count == 0: upper = 1.0
        return lower, upper

    def _perform_chi_square(self, s_comb: int, f_comb: int, s_single: int, f_single: int) -> Optional[float]:
        try:
            obs = np.array([[s_comb, f_comb], [s_single, f_single]])
            if np.sum(obs) < 2: return None
            chi2, p, dof, ex = chi2_contingency(obs, correction=True)
            return p
        except Exception:
            return None

    def _calculate_single_antibiotics(self, df: pd.DataFrame, threshold: float) -> List[AntibioticResult]:
        if df.empty: return []
        filtered_data = self._apply_threshold_filter(df, threshold)
        results = []
        total_samples = len(filtered_data)
        for col in filtered_data.columns[1:-1]:
            s_count = (filtered_data[col] == 'S').sum()
            r_count = (filtered_data[col] == 'R').sum()
            total_tested = s_count + r_count
            s_percentage = (s_count / total_tested * 100) if total_tested > 0 else 0
            lower_ci, upper_ci = self._calculate_confidence_interval(s_count, r_count)
            global_failures = total_samples - s_count
            global_effectiveness = (s_count / total_samples) * 100
            lower_ci_global, upper_ci_global = self._calculate_confidence_interval(s_count, global_failures)
            results.append(AntibioticResult(col, s_percentage, lower_ci * 100, upper_ci * 100, s_count, r_count, (total_tested / total_samples) * 100, global_effectiveness, lower_ci_global * 100, upper_ci_global * 100))
        return sorted(results, key=lambda x: x.s_percentage, reverse=True)

    def _calculate_combinations(self, df: pd.DataFrame, threshold: float) -> List[CombinationResult]:
        if df.empty: return []
        filtered_data = self._apply_threshold_filter(df, threshold)
        antibiotic_cols = filtered_data.columns[1:-1]
        combinations_list = list(combinations(antibiotic_cols, 2))
        results = []
        single_stats = {}
        for col in antibiotic_cols:
            s = (filtered_data[col] == 'S').sum()
            tested = (filtered_data[col] != 'N').sum()
            single_stats[col] = {'s': s, 'r': tested - s}
        for comb in combinations_list:
            col1, col2 = comb
            mask = (filtered_data[col1] != 'N') | (filtered_data[col2] != 'N')
            s_count = ((filtered_data[col1] == 'S') | (filtered_data[col2] == 'S')).sum()
            tested_count = mask.sum()
            r_count_local = tested_count - s_count
            effectiveness = (s_count / tested_count * 100) if tested_count > 0 else 0
            lower_ci, upper_ci = self._calculate_confidence_interval(s_count, r_count_local)
            global_failures = len(filtered_data) - s_count
            global_effectiveness = (s_count / len(filtered_data)) * 100
            lower_ci_global, upper_ci_global = self._calculate_confidence_interval(s_count, global_failures)
            s1, f1 = single_stats[col1]['s'], single_stats[col1]['r']
            p1 = self._perform_chi_square(s_count, r_count_local, s1, f1)
            s2, f2 = single_stats[col2]['s'], single_stats[col2]['r']
            p2 = self._perform_chi_square(s_count, r_count_local, s2, f2)
            results.append(CombinationResult(comb, effectiveness, global_effectiveness, lower_ci * 100, upper_ci * 100, s_count, ((filtered_data[col1] == 'R') & (filtered_data[col2] == 'R')).sum(), (tested_count / len(filtered_data)) * 100, lower_ci_global * 100, upper_ci_global * 100, p1, p2))
        return sorted(results, key=lambda x: x.effectiveness, reverse=True)

    def _apply_threshold_filter(self, df: pd.DataFrame, threshold: float) -> pd.DataFrame:
        if df.empty: return pd.DataFrame()
        threshold_val = threshold / 100.0
        valid_columns = []
        for col in df.columns[1:-1]:
            tested_ratio = df[col].isin(['S', 'R']).sum() / len(df)
            if tested_ratio >= threshold_val:
                valid_columns.append(col)
        return df[['Microorganism'] + valid_columns + ['Gram_stain']]

    def analyze(self, threshold: float) -> Dict:
        all_isolates = self.data
        return {
            'All Isolates': {'Singles': self._calculate_single_antibiotics(all_isolates, threshold), 'Combinations': self._calculate_combinations(all_isolates, threshold)},
            'Gram-Positive Isolates': {'Singles': self._calculate_single_antibiotics(all_isolates[all_isolates['Gram_stain'] == 'positive'], threshold), 'Combinations': self._calculate_combinations(all_isolates[all_isolates['Gram_stain'] == 'positive'], threshold)},
            'Gram-Negative Isolates': {'Singles': self._calculate_single_antibiotics(all_isolates[all_isolates['Gram_stain'] == 'negative'], threshold), 'Combinations': self._calculate_combinations(all_isolates[all_isolates['Gram_stain'] == 'negative'], threshold)}
        }

    def generate_summary_data(self) -> Dict:
        summary = {}
        summary['Exclusion Count'] = pd.DataFrame({'Number of Excluded Isolates': [self.excluded_count]})
        counts = self.data['Gram_stain'].value_counts()
        summary['Gram Stain Distribution'] = pd.DataFrame({'Count': counts, 'Percentage': (counts/len(self.data)*100).round(2)})
        path_counts = self.data['Microorganism'].value_counts().head(10)
        summary['Top 10 Pathogens'] = pd.DataFrame({'Count': path_counts, 'Percentage': (path_counts/len(self.data)*100).round(2)})
        return summary

class ResultsExporter:
    @staticmethod
    def to_excel(all_results: Dict, summary_data: Dict, filename: str = "antibiotic_results_v2.xlsx") -> None:
        wb = Workbook()
        ws_summary = wb.create_sheet('Summary', 0)
        ws_summary.append(['Excluded Isolates'])
        for r in dataframe_to_rows(summary_data['Exclusion Count'], index=False, header=False): ws_summary.append(r)
        ws_summary.append([])
        ws_summary.append(['Gram Stain Distribution'])
        for r in dataframe_to_rows(summary_data['Gram Stain Distribution'], index=True, header=True): ws_summary.append(r)
        ws_summary.append([])
        ws_summary.append(['Top 10 Pathogens'])
        for r in dataframe_to_rows(summary_data['Top 10 Pathogens'], index=True, header=True): ws_summary.append(r)

        group_names = {'All Isolates': 'All', 'Gram-Positive Isolates': 'Gram+', 'Gram-Negative Isolates': 'Gram-'}
        for analysis_type, data in all_results.items():
            short_name = group_names[analysis_type]
            ws_single = wb.create_sheet(f"{short_name} - Singles")
            single_df = pd.DataFrame([vars(r) for r in data['Singles']])
            for r in dataframe_to_rows(single_df, index=False, header=True): ws_single.append(r)
            ws_comb = wb.create_sheet(f"{short_name} - Combs")
            comb_df = pd.DataFrame([vars(r) for r in data['Combinations']])
            comb_df['Combination'] = comb_df['combination'].apply(lambda x: f"{x[0]} + {x[1]}")
            comb_df.drop('combination', axis=1, inplace=True)
            if 'p_value_vs_drug1' in comb_df.columns:
                comb_df['p_value_vs_drug1'] = comb_df['p_value_vs_drug1'].apply(lambda x: f"{x:.4f}" if pd.notnull(x) else "N/A")
                comb_df['p_value_vs_drug2'] = comb_df['p_value_vs_drug2'].apply(lambda x: f"{x:.4f}" if pd.notnull(x) else "N/A")
            for r in dataframe_to_rows(comb_df, index=False, header=True): ws_comb.append(r)
        del wb['Sheet']
        wb.save(filename)
        files.download(filename)
        print(f"\nResults successfully exported to '{filename}'.")

def main():
    try:
        print("--- STEP 1: Upload the DATABASE file (e.g., my_data.xlsx) ---")
        uploaded_data = files.upload()
        data_file_path = next(iter(uploaded_data))
        print(f"-> OK, database file loaded: {data_file_path}\n")

        print("--- STEP 2: Upload the GRAM STAIN MAP (e.g., gram_map.xlsx) ---")
        print("(If you don't have one, upload an empty Excel with headers 'Microorganism' and 'Gram_stain')")
        uploaded_gram = files.upload()
        gram_file_path = next(iter(uploaded_gram))
        print(f"-> OK, gram map file loaded: {gram_file_path}\n")

        exclusion_file_path = None
        exclude_choice = input("Do you want to upload an EXCLUSION list? (yes/no): ").lower().strip()
        if exclude_choice == 'yes':
            print("--- STEP 3: Upload the EXCLUSION file ---")
            uploaded_exclusion = files.upload()
            exclusion_file_path = next(iter(uploaded_exclusion))
            print(f"-> OK, exclusion file loaded.\n")

        threshold = float(input("\nEnter minimum testing threshold (0-100): "))
        confidence_input = float(input("Enter confidence level (e.g., 95): "))

        print("\nProcessing data...")
        analyzer = AntibioticAnalyzer(data_file_path, gram_file_path, exclusion_file_path, confidence_level=confidence_input/100.0)
        all_results = analyzer.analyze(threshold)
        summary = analyzer.generate_summary_data()
        ResultsExporter.to_excel(all_results, summary)

    except Exception as e:
        print(f"\nError: {e}")

if __name__ == "__main__":
    main()

--- STEP 1: Upload the DATABASE file (e.g., my_data.xlsx) ---


Saving EMOCOLT.xlsx to EMOCOLT (2).xlsx
-> OK, database file loaded: EMOCOLT (2).xlsx

--- STEP 2: Upload the GRAM STAIN MAP (e.g., gram_map.xlsx) ---
(If you don't have one, upload an empty Excel with headers 'Microorganism' and 'Gram_stain')


Saving gram_stain_map.xlsx to gram_stain_map.xlsx
-> OK, gram map file loaded: gram_stain_map.xlsx

Do you want to upload an EXCLUSION list? (yes/no): no

Enter minimum testing threshold (0-100): 20
Enter confidence level (e.g., 95): 95

Processing data...

Microorganism 'Rhizobium  radiobacter' not found in the Gram stain map.
Is it Gram-positive or Gram-negative? (type 'positive' or 'negative'): negative


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>


Results successfully exported to 'antibiotic_results_v2.xlsx'.
