In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
import random
from openai import OpenAI
import os
from dotenv import load_dotenv
import re
from typing import Dict, List, Tuple
from dataclasses import dataclass
from functools import lru_cache

# Constants
SAMPLE_SIZE = 50
RANDOM_SEED = 42
AGE_FILTERS = [["3.0", "4.0", "5.0", "6.0"], ["7.0", "8.0", "9.0", "10.0", "11.0"]]
DEMOGRAPHIC_COLUMNS = ['AGE', 'SEX', 'MARRIED', 'GRADUATE UNIVERSITY', 'CHILD', 'WORK', 'HOUSE INCOME']
prompt="Understanding Your Attitude Towards Covid-19 Prevention\n\
We invite you to participate in this questionnaire to help us gain insights into your attitudes and behaviors regarding Covid-19 prevention. Your responses will be invaluable in shaping effective public health strategies.\n\
Instructions:\n\
・Carefully consider each question independently.\n\
・Select the option that best reflects your current situation and personal views.\n\
・Answer each question based solely on your own experiences and beliefs.\n\
Answer Format Example:\n\
Go shopping everyday\n\
['1.Very true']\n\
Thank you for your participation."

#make options
options_list=["1.Very true","2.True","3.Neither","4.Not true","5.Not at all"]
questions_list=["Avoid a poorly-ventilated closed space",
"Avoid large gatherings",
"Avoid conversations or shouting in close proximity",
"Avoid places where items 1-3 above overlap",
"Do not go to dinner with friends",
"Do not go to mass gatherings",
"Participate in virtual events using online tools",
"Undertake frequent handwashing",
"Undertake cough etiquette (use handkerchiefs or sleeves instead of hands)",
"Disinfect things around",
"Avoid going out when you have a cold",
"Avoid going to clinic even when having a cold symptom",
"Prepare consultation and transportation methods for when you feel ill",
"Always wear a surgical-style mask when going out",
"Stockpile surgical-style masks",
"Stockpile food, toilet paper, tissue paper, etc.",
"Avoid contact with younger people",
"Avoid contact with older people",
"Get sufficient rest and sleep",
"Eat a nutritious diet",
"Do exercise such as jogging or exercise using DVD"]

questionaire="Have you ever conducted anything to prevent novel coronavirus infections or outbreaks?\n\n" + "\n\n".join([f"{question}\n{options_list}" for question in questions_list])

@dataclass
class SurveyData:
    raw_data: pd.DataFrame
    subquestion_dict: Dict
    choice_dict: Dict

class OpenAIClient:
    def __init__(self):
        load_dotenv()
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

    def generate_completion(self, role: str, prompt: str, questionnaire: str) -> str:
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": role},
                {"role": "user", "content": f"{prompt}\n{questionnaire}"}
            ],
            temperature=0.7,
            max_tokens=1000,
        )
        return response.choices[0].message.content

class DataProcessor:
    @staticmethod
    def load_data() -> SurveyData:
        df_raw_data = pd.read_csv("data.csv", header=0, skiprows=[1]).fillna("0")
        with open("subquestion_dict.json", "r") as f:
            dict_subquestion = json.load(f)
        with open("choice_dict.json", "r") as f:
            dict_choice = json.load(f)
        return SurveyData(df_raw_data, dict_subquestion, dict_choice)

    @staticmethod
    def get_choice_and_question_text(df_single_lines: pd.DataFrame, dict_subquestion: Dict, dict_choice: Dict) -> Dict:
        def process_line(series_single_line):
            return {
                dict_subquestion[col][1]: (series_single_line[col] if dict_subquestion[col][0] == "AGE" 
                                           else dict_choice[dict_subquestion[col][0]][str(series_single_line[col])])
                for col in df_single_lines.columns[:22]
            }
        return {j + 1: process_line(df_single_lines.iloc[j]) for j in range(df_single_lines.shape[0])}

    @staticmethod
    def aggregate_data_dict(dict_data: Dict, list_questions: List, list_options: List) -> Dict:
        return {
            question: {
                option: sum(1 for ans in dict_data.values() if ans.get(question) == option)
                for option in list_options
            }
            for question in list_questions
        }

    @staticmethod
    def separate_data_dicts(dict_data: Dict, list_filters: List[List[str]], key_filter: str) -> Dict:
        return {
            f'dict_{idx}': {k: v for k, v in dict_data.items() if v[key_filter] in filter_tuple}
            for idx, filter_tuple in enumerate(list_filters)
        }

    @staticmethod
    def calculate_ratio(df_aggregate: pd.DataFrame, attribute: str) -> pd.DataFrame:
        return pd.DataFrame({
            f"Ratio of true:{attribute}": round(
                ((df_aggregate["1.Very true"] + df_aggregate["2.True"]) * 100 / df_aggregate.sum(axis=1)), 2
            )
        })

    @staticmethod
    def merge_dicts(*dicts: Dict) -> Dict:
        return {key: {k: v for d in dicts for k, v in d.get(key, {}).items()} for key in set().union(*dicts)}

    @staticmethod
    def create_dummies(df: pd.DataFrame, list_columns: List[str]) -> pd.DataFrame:
        return pd.get_dummies(df[list_columns])

    @staticmethod
    def calculate_odds_ratio(df: pd.DataFrame) -> List[float]:
        series_base_row = df.iloc[0]
        return [
            ((TP * TN + 0.001) / (FP * FN + 0.001))
            for _, series_row in df.iterrows()
            for TP, TN, FP, FN in [(
                np.sum((series_base_row == 1) & (series_row == 1)),
                np.sum((series_base_row == 0) & (series_row == 0)),
                np.sum((series_base_row == 0) & (series_row == 1)),
                np.sum((series_base_row == 1) & (series_row == 0))
            )]
        ]

class Visualizer:
    @staticmethod
    def table_plot(df: pd.DataFrame, width: int, height: int):
        fig, ax = plt.subplots(figsize=(width, height))
        ax.axis('off')
        ax.table(
            cellText=df.values,
            colLabels=df.columns,
            rowLabels=df.index,
            loc='center',
            bbox=[0, 0, 1, 1]
        )
        plt.show()

class SurveyAnalyzer:
    def __init__(self, survey_data: SurveyData):
        self.survey_data = survey_data
        self.openai_client = OpenAIClient()
        self.data_processor = DataProcessor()

    def run_analysis(self):
        df_sampled_data = self.survey_data.raw_data.sample(SAMPLE_SIZE, random_state=RANDOM_SEED)
        dict_answer = self.data_processor.get_choice_and_question_text(
            df_sampled_data, self.survey_data.subquestion_dict, self.survey_data.choice_dict
        )

        list_questions = list(self.survey_data.subquestion_dict.values())
        list_options = list(self.survey_data.choice_dict.values())

        self.analyze_aggregate_data(dict_answer, list_questions, list_options)
        self.analyze_age_groups(dict_answer, list_questions, list_options)
        self.generate_ai_responses(dict_answer)
        self.perform_additional_analysis(df_sampled_data)

    def analyze_aggregate_data(self, dict_answer: Dict, list_questions: List, list_options: List):
        dict_aggregate = self.data_processor.aggregate_data_dict(dict_answer, list_questions, list_options)
        df_aggregate = pd.DataFrame.from_dict(dict_aggregate).T
        df_calculate = self.data_processor.calculate_ratio(df_aggregate, "all")
        Visualizer.table_plot(df_calculate, 10, 10)

    def analyze_age_groups(self, dict_answer: Dict, list_questions: List, list_options: List):
        merged_dict = self.data_processor.merge_dicts(dict_answer, dict_answer)
        separated_dict = self.data_processor.separate_data_dicts(merged_dict, AGE_FILTERS, "AGE")

        for idx in range(len(separated_dict)):
            dict_aggregate = self.data_processor.aggregate_data_dict(separated_dict[f'dict_{idx}'], list_questions, list_options)
            df_aggregate = pd.DataFrame.from_dict(dict_aggregate).T
            df_calculate = self.data_processor.calculate_ratio(df_aggregate, "over40")
            Visualizer.table_plot(df_calculate, 10, 10)

    def generate_ai_responses(self, dict_answer: Dict):
        df_sampled_responses, dict_sampled_responses = self.get_response_dict(dict_answer)
        # Further analysis with AI-generated responses can be added here

    def perform_additional_analysis(self, df_sampled_data: pd.DataFrame):
        df_dummies = self.data_processor.create_dummies(df_sampled_data, DEMOGRAPHIC_COLUMNS)
        df_merged_data = pd.concat([df_sampled_data, df_dummies], axis=1)
        list_binary_cols = df_merged_data.select_dtypes(include='uint8').columns
        df_merged_data[list_binary_cols] = df_merged_data[list_binary_cols].astype(int)

        list_odds_ratios = self.data_processor.calculate_odds_ratio(df_merged_data)
        print("Odds Ratios:", list_odds_ratios)

    @lru_cache(maxsize=None)
    def get_response_dict(self, answer_dict: Dict) -> Tuple[pd.DataFrame, Dict]:
        dict_response = {}
        for i in range(len(answer_dict)):
            role = f"You are the person with the following attributes: {str(answer_dict[i+1])}"
            content = self.openai_client.generate_completion(role, prompt, questionnaire)
            answers = self.extract_answer(content)
            dict_response[i+1] = answers
        df_response = pd.DataFrame(dict_response)
        return df_response, dict_response

    @staticmethod
    def extract_answer(content: str) -> Dict[str, str]:
        pattern = re.compile(r"\['(.*?)'\]")
        answers = pattern.findall(content)
        return dict(zip(questions_list, answers))

def main():
    survey_data = DataProcessor.load_data()
    analyzer = SurveyAnalyzer(survey_data)
    analyzer.run_analysis()

if __name__ == "__main__":
    main()