In [31]:
import os
import json
from datetime import datetime, timedelta
import numpy as np
from sentence_transformers import SentenceTransformer
import openai

# OpenAI API Key 설정 함수
def load_api_key(file_path="../api_key.txt"):
    """
    API Key를 파일에서 로드합니다.
    """
    try:
        with open(file_path, "r") as f:
            return f.read().strip()
    except FileNotFoundError:
        raise FileNotFoundError(f"API 키 파일이 {file_path}에 없습니다.")

# API Key 로드 및 설정
api_key = load_api_key()
openai.api_key = api_key

In [32]:
class Persona:
    """
    Persona 정보를 관리하는 클래스
    """
    def __init__(self):
        self.data = {
            "Age": "54  ",
            "Gender": "Female  ",
            "Existing Medical Conditions": "Breast cancer, depressed mood, insomnia, back pain  ",
            "Symptoms": "Difficulties sleeping, worsened back pain  ",
            "Experience": [
                "I was diagnosed with breast cancer during a routine examination",
                "I experienced a depressed mood and insomnia",
                "my back pain got worse impacting my sleep",
                "I started pain control and physical therapy",
                "I wanted to stop taking sleep medication"
            ]
        }

    def get_persona(self):
        """
        현재 정의된 persona 정보를 반환합니다.
        """
        return self.data

In [33]:
class ShortTermMemory:
    """
    Short-Term Memory를 관리하는 클래스
    """
    def __init__(self, reflection_threshold, max_memory_size, ltm, persona, output_file="../output/5min_short_term_persona0.json"):
        self.memory = {}
        self.recent_memory = []
        self.active_memory = []
        self.current_poignancy = reflection_threshold
        self.current_emotion_score = reflection_threshold
        self.reflection_threshold = reflection_threshold
        self.max_memory_size = max_memory_size
        self.ltm = ltm
        self.persona = persona  # Persona 객체
        self.output_file = output_file
        self.node_id_file = "../output/node_id.json"
        self.node_id = self.load_node_id()
        os.makedirs(os.path.dirname(self.output_file), exist_ok=True)

    def load_node_id(self):
        """
        이전에 저장된 node_id 값을 로드하거나 초기화합니다.
        """
        try:
            if os.path.exists(self.node_id_file):
                with open(self.node_id_file, "r", encoding="utf-8") as file:
                    return json.load(file).get("node_id", 0)
            else:
                return 0
        except Exception as e:
            print(f"Error loading node_id: {e}")
            return 0

    def save_node_id(self):
        """
        현재 node_id 값을 파일에 저장합니다.
        """
        try:
            with open(self.node_id_file, "w", encoding="utf-8") as file:
                json.dump({"node_id": self.node_id}, file, ensure_ascii=False, indent=4)
        except Exception as e:
            print(f"Error saving node_id: {e}")

    def add_observation(self, timestamp, description):
        """
        Observation 데이터를 추가하고 Reflection Trigger를 확인합니다.
        """
        try:
            # Persona 정보 가져오기
            persona_data = self.persona.get_persona()

            # 분석 수행 및 데이터 생성
            poignancy, emotion, emotion_score = self.analyze_with_gpt(description, persona_data)
            embedding = generate_embedding(description)

            # Observation 생성
            observation = {
                "node_id": self.node_id,
                "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
                "description": description,
                "embedding": embedding,
                "poignancy": poignancy,
                "emotion": emotion,
                "emotion_score": emotion_score
            }

            # 메모리에 저장
            self.memory[self.node_id] = observation
            self.recent_memory.append(observation)
            if len(self.recent_memory) > self.max_memory_size:
                self.recent_memory.pop(0)

            self.active_memory.append(observation)
            self.node_id += 1

            self.current_poignancy -= poignancy
            self.current_emotion_score -= emotion_score

            print(f"Poignancy Threshold: {self.current_poignancy}, Emotion Score Threshold: {self.current_emotion_score}")

            # Reflection Trigger 확인
            if self.current_poignancy <= 0 or self.current_emotion_score <= 0:
                print("\n=== Reflection Triggered ===")
                generate_reflection(self.recent_memory, self.reflection_threshold, self.ltm)
                self.current_poignancy = self.reflection_threshold
                self.current_emotion_score = self.reflection_threshold
                self.active_memory = []

            # 메모리를 파일에 저장
            self.save_to_file()
            self.save_node_id()

        except Exception as e:
            print(f"Error adding observation: {description}. Exception: {e}")

    def analyze_with_gpt(self, description, persona_data):
        """
        GPT를 호출하여 Observation 데이터를 분석합니다.
        """
        persona_text = (
            f"Persona Information:\n"
            f"Age: {persona_data['Age']}\n"
            f"Gender: {persona_data['Gender']}\n"
            f"Existing Medical Conditions: {persona_data['Existing Medical Conditions']}\n"
            f"Symptoms: {persona_data['Symptoms']}\n"
            f"Experience: {', '.join(persona_data['Experience'])}\n"
        )

        prompt = f"""
        {persona_text}

        On a scale of 1 to 10, where:
        - 1 represents a mundane event (e.g., brushing teeth, making bed)
        - 10 represents an extremely poignant event (e.g., a breakup, college acceptance)

        Please analyze the following memory and provide:
        1. Poignancy score (1-10).
        2. The primary emotion (choose only from: joy, sadness, anger, fear, anticipation, surprise, trust, disgust).
        3. The intensity of the emotion (1-10).

        Memory: "{description}"
        Respond with a JSON object containing the keys: "poignancy", "emotion", and "emotion_score".
        The "emotion" must strictly be one of the 8 options provided. If the memory does not clearly align with one emotion, select the closest matching option from the list.
        """
        valid_emotions = ["joy", "sadness", "anger", "fear", "anticipation", "surprise", "trust", "disgust"]

        max_retries = 50
        for attempt in range(max_retries):
            try:
                print(f"Sending request to OpenAI API... (Attempt {attempt + 1})")
                response = openai.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[{"role": "user", "content": prompt}],
                    temperature=0.2,
                    top_p=0.8
                )
                generated_text = response.choices[0].message.content.strip()
                print(f"Generated Text: {generated_text}")

                if generated_text.startswith("```json"):
                    generated_text = generated_text[7:-3].strip()

                result = json.loads(generated_text)
                emotion = result["emotion"]

                if emotion in valid_emotions:
                    return result["poignancy"], emotion, result["emotion_score"]
                else:
                    print(f"Invalid emotion detected: {emotion}. Retrying...")
            except Exception as e:
                print(f"Error analyzing with GPT: {e}")

        print("Max retries reached. Returning default values.")
        return 5, "neutral", 5

    def save_to_file(self):
        """
        Short-Term Memory 데이터를 파일에 저장합니다.
        """
        try:
            with open(self.output_file, "w", encoding="utf-8") as file:
                json.dump(self.memory, file, ensure_ascii=False, indent=4)
        except Exception as e:
            print(f"Error writing to file: {e}")

In [34]:
def generate_embedding(text):
    """
    텍스트의 임베딩을 생성합니다.
    """
    try:
        response = openai.embeddings.create(
            input=[text],
            model="text-embedding-ada-002"
        )
        return response.data[0].embedding
    except Exception as e:
        print(f"Error generating embedding: {e}")
        return []


def generate_questions(memory):
    """
    최근 Observation 데이터를 기반으로 Reflective 질문을 생성합니다.
    """
    memory_text = "\n".join(
        [f"Timestamp: {m['timestamp']}, Description: {m['description']}" for m in memory]
    )
    prompt = f"""
    Statements:
    {memory_text}

    Based on the above statements, what are the 3 most important high-level questions we can ask to understand the behavior or motivations behind these observations?

    Provide the questions in a numbered list, without explanations.
    """
    try:
        response = openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            top_p=0.8
        )
        return [q.strip() for q in response.choices[0].message.content.strip().split("\n") if q.strip()]
    except Exception as e:
        print(f"Error generating questions: {e}")
        return []


def find_relevant_shortterms(question, memory):
    """
    질문에 적합한 Short-Term Memory 5개를 찾습니다.
    """
    question_embedding = generate_embedding(question)
    similarities = []
    for m in memory:
        if "embedding" in m and m["embedding"]:
            similarity = np.dot(question_embedding, m["embedding"]) / (
                np.linalg.norm(question_embedding) * np.linalg.norm(m["embedding"]))
            similarities.append((similarity, m))
    similarities.sort(reverse=True, key=lambda x: x[0])  # 유사도 기준으로 정렬
    return [item[1] for item in similarities[:5]]  # 상위 5개 반환


def generate_reflection(stm_memory, reflection_threshold, ltm):
    """
    Reflection을 생성하고 Long-Term Memory에 저장합니다.
    """
    # Persona 정보 가져오기
    persona_data = persona.get_persona()
    persona_text = (
        f"Age: {persona_data['Age']}\n"
        f"Gender: {persona_data['Gender']}\n"
        f"Existing Medical Conditions: {persona_data['Existing Medical Conditions']}\n"
        f"Symptoms: {persona_data['Symptoms']}\n"
        f"Experience: {', '.join(persona_data['Experience'])}\n"
    )
    
    recent_memory = list(stm_memory)  # active_memory 사용
    questions = generate_questions(recent_memory)

    reflections = []
    for question in questions:
        relevant_shortterms = find_relevant_shortterms(question, recent_memory)

        memory_text = "\n".join(
            [f"Timestamp: {m['timestamp']}, Description: {m['description']}" for m in relevant_shortterms]
        )
        prompt = f"""
        Persona Information:
        {persona_text}

        Relevant Observations:
        {memory_text}

        Question:
        {question}

        Imagine you are experiencing the situation described above. Reflect on your thoughts and feelings in response to the question. Use a personal and conversational tone, avoiding specific time references like "last night" or "this morning." Focus on expressing your current thoughts and emotions clearly in 2-3 sentences, as if journaling or sharing with a trusted friend.
        """
        try:
            response = openai.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": prompt}],
                temperature=0.7,
                top_p=0.8
            )
            reflection = response.choices[0].message.content.strip()

            # Reflection 분석 및 데이터 추가
            poignancy, emotion, emotion_score = stm.analyze_with_gpt(reflection, persona_data)
            embedding = generate_embedding(reflection)

            # Reflection의 Timestamp를 가장 최근 Short-Term Memory의 Timestamp로 설정
            timestamp = recent_memory[-1]['timestamp'] if recent_memory else datetime.now().strftime("%Y-%m-%d %H:%M:%S")

            reflection_data = {
                "reflection": reflection,
                "timestamp": timestamp,
                "poignancy": poignancy,
                "emotion": emotion,
                "emotion_score": emotion_score,
                "embedding": embedding,
                "related_shortterms": relevant_shortterms,
                "questions": questions
            }

            # Long-Term Memory에 저장
            ltm.add_reflection(reflection_data)
            reflections.append(reflection_data)

        except Exception as e:
            print(f"Error generating reflection: {e}")

    return reflections

In [35]:
class LongTermMemory:
    """
    Long-Term Memory를 관리하는 클래스
    """
    def __init__(self, output_file="../output/5min_long_term_persona0.json"):
        self.memory = {}
        self.node_id = 0
        self.output_file = output_file
        os.makedirs(os.path.dirname(self.output_file), exist_ok=True)

    def add_reflection(self, reflection_data):
        """
        Reflection 데이터를 저장합니다.
        """
        self.memory[self.node_id] = reflection_data
        self.node_id += 1
        self.save_to_file()

    def save_to_file(self):
        """
        Long-Term Memory 데이터를 파일에 저장합니다.
        """
        try:
            with open(self.output_file, "w", encoding="utf-8") as file:
                json.dump(self.memory, file, ensure_ascii=False, indent=4)
        except Exception as e:
            print(f"Error writing to file: {e}")

    def print_memory(self):
        """
        Long-Term Memory 데이터를 출력합니다.
        """
        print("\n=== Long-Term Memory ===")
        for node_id, reflection in self.memory.items():
            print(reflection)

In [36]:
# Main 실행
if __name__ == "__main__":
    persona = Persona()
    ltm = LongTermMemory(output_file="../output/5min_long_term_persona0.json")
    stm = ShortTermMemory(reflection_threshold=150, max_memory_size=100, ltm=ltm, persona=persona)

    # 단일 JSON 파일 경로
    json_file_path = "../data/dummy_dataset/final_dummy_data_0.json"

    try:
        print(f"Processing file: {json_file_path}")
        with open(json_file_path, "r", encoding="utf-8") as file:
            data = json.load(file)
            for entry in data:
                timestamp = datetime.strptime(entry["timestamp"], "%Y.%m.%d.%H.%M.%S")
                description = entry["activity"]
                stm.add_observation(timestamp, description)

    except Exception as e:
        print(f"Error: {e}")

Processing file: ../data/dummy_dataset/final_dummy_data_0.json
Sending request to OpenAI API... (Attempt 1)
Generated Text: ```json
{
    "poignancy": 3,
    "emotion": "sadness",
    "emotion_score": 5
}
```
Poignancy Threshold: 147, Emotion Score Threshold: 145
Sending request to OpenAI API... (Attempt 1)
Generated Text: ```json
{
    "poignancy": 4,
    "emotion": "sadness",
    "emotion_score": 6
}
```
Poignancy Threshold: 143, Emotion Score Threshold: 139
Sending request to OpenAI API... (Attempt 1)
Generated Text: ```json
{
    "poignancy": 3,
    "emotion": "sadness",
    "emotion_score": 4
}
```
Poignancy Threshold: 140, Emotion Score Threshold: 135
Sending request to OpenAI API... (Attempt 1)
Generated Text: ```json
{
  "poignancy": 3,
  "emotion": "sadness",
  "emotion_score": 4
}
```
Poignancy Threshold: 137, Emotion Score Threshold: 131
Sending request to OpenAI API... (Attempt 1)
Generated Text: ```json
{
    "poignancy": 3,
    "emotion": "sadness",
    "emotion_score": 5

In [37]:
import json
from collections import Counter

# 파일에서 데이터를 로드하는 함수
def load_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        return json.load(file)

# 감정 필드 확인 함수
def extract_and_check_emotions(file_path, valid_emotions):
    data = load_data(file_path)
    emotion_list = []

    # 모든 데이터에서 emotion 필드 추출
    for node_id, node in data.items():
        if 'emotion' in node:
            emotion_list.append(node['emotion'])

    # 고유 감정 값 및 카운트 계산
    unique_emotions = set(emotion_list)
    emotion_counts = Counter(emotion_list)

    # 8가지 감정 외의 값 확인
    invalid_emotions = unique_emotions - set(valid_emotions)

    return {
        "unique_emotions": unique_emotions,
        "emotion_counts": emotion_counts,
        "invalid_emotions": invalid_emotions
    }

# 8가지 감정 정의
valid_emotions = ['joy', 'sadness', 'anger', 'fear', 'anticipation', 'surprise', 'trust', 'disgust']

# Short-Term Memory 파일 경로 (예시)
short_term_path = "../output/5min_short_term_persona0.json"

# Long-Term Memory 파일 경로 (예시)
long_term_path = "../output/5min_long_term_persona0.json"

# Short-Term Memory 감정 확인
short_term_results = extract_and_check_emotions(short_term_path, valid_emotions)
print("Short-Term Memory:")
print("Unique Emotions:", short_term_results["unique_emotions"])
print("Emotion Counts:", short_term_results["emotion_counts"])
print("Invalid Emotions:", short_term_results["invalid_emotions"])
print()

# Long-Term Memory 감정 확인
long_term_results = extract_and_check_emotions(long_term_path, valid_emotions)
print("Long-Term Memory:")
print("Unique Emotions:", long_term_results["unique_emotions"])
print("Emotion Counts:", long_term_results["emotion_counts"])
print("Invalid Emotions:", long_term_results["invalid_emotions"])


Short-Term Memory:
Unique Emotions: {'fear', 'trust', 'neutral', 'surprise', 'joy', 'disgust', 'anticipation', 'sadness'}
Emotion Counts: Counter({'trust': 357, 'sadness': 269, 'anticipation': 223, 'joy': 77, 'fear': 17, 'surprise': 13, 'disgust': 3, 'neutral': 2})
Invalid Emotions: {'neutral'}

Long-Term Memory:
Unique Emotions: {'fear', 'trust', 'joy', 'anticipation', 'sadness'}
Emotion Counts: Counter({'sadness': 33, 'trust': 32, 'joy': 16, 'anticipation': 8, 'fear': 1})
Invalid Emotions: set()


In [29]:
import json

# JSON 파일 경로
file_path = '../data/dummy_dataset/final_dummy_data_0.json'

# JSON 파일 읽기
with open(file_path, 'r', encoding='utf-8') as file:
    data = json.load(file)

# 'activity' 키의 개수 세기
activity_count = sum(1 for item in data if 'activity' in item)

print(f"'activity' 항목의 총 개수: {activity_count}")


'activity' 항목의 총 개수: 961


In [38]:
# import json

# def merge_json_files(file1_path, file2_path, output_path):
#     """
#     두 개의 JSON 파일을 합쳐 하나의 파일로 저장합니다.

#     Args:
#         file1_path (str): 첫 번째 JSON 파일 경로
#         file2_path (str): 두 번째 JSON 파일 경로
#         output_path (str): 합쳐진 결과를 저장할 JSON 파일 경로
#     """
#     try:
#         # 첫 번째 JSON 파일 읽기
#         with open(file1_path, "r", encoding="utf-8") as file1:
#             data1 = json.load(file1)

#         # 두 번째 JSON 파일 읽기
#         with open(file2_path, "r", encoding="utf-8") as file2:
#             data2 = json.load(file2)

#         # 데이터 타입 확인 및 병합
#         if isinstance(data1, list) and isinstance(data2, list):
#             # 데이터가 리스트 형식이면 리스트 병합
#             merged_data = data1 + data2
#         elif isinstance(data1, dict) and isinstance(data2, dict):
#             # 데이터가 딕셔너리 형식이면 딕셔너리 병합
#             merged_data = {**data1, **data2}
#         else:
#             raise ValueError("두 JSON 파일의 데이터 형식이 서로 다르거나 지원되지 않습니다. (list 또는 dict만 지원)")

#         # 합쳐진 데이터를 새로운 파일에 저장
#         with open(output_path, "w", encoding="utf-8") as output_file:
#             json.dump(merged_data, output_file, ensure_ascii=False, indent=4)

#         print(f"JSON 파일이 성공적으로 병합되었습니다. 결과 파일 경로: {output_path}")

#     except Exception as e:
#         print(f"JSON 병합 중 오류 발생: {e}")

# # 예제 실행
# file1_path = "../data/dummy_dataset/5min_dummy_data_0.json"
# file2_path = "../data/dummy_dataset/5min_dummy_data_1.json"
# output_path = "../data/dummy_dataset/5min_dummy_data_merge.json"

# merge_json_files(file1_path, file2_path, output_path)

In [46]:
# import json

# # longterm_with_dialogue.json 파일 경로
# file_path = "../output/5min_long_term_persona0.json"
# output_file_path = "../output/reflections_persona0.txt"  # 출력 파일 경로

# def extract_reflections(file_path):
#     try:
#         # JSON 파일 읽기
#         with open(file_path, "r", encoding="utf-8") as file:
#             data = json.load(file)
        
#         reflections = {}
#         # reflection 필드가 있는 노드만 추출
#         for node_id, content in data.items():
#             if "reflection" in content:  # "reflection" 키가 있는지 확인
#                 reflections[node_id] = content["reflection"]
        
#         return reflections
#     except Exception as e:
#         print(f"파일 읽기 중 오류 발생: {e}")
#         return {}

# def save_reflections_to_txt(reflections, output_file_path):
#     try:
#         with open(output_file_path, "w", encoding="utf-8") as file:
#             for node_id, reflection in reflections.items():
#                 file.write(f"노드 ID: {node_id}\n")
#                 file.write(f"Reflection: {reflection}\n\n")
#         print(f"Reflection 데이터가 {output_file_path} 파일에 저장되었습니다.")
#     except Exception as e:
#         print(f"파일 저장 중 오류 발생: {e}")

# # reflection 데이터 추출
# reflections = extract_reflections(file_path)

# # reflection 데이터 출력 및 저장
# if reflections:
#     print("\n=== 추출된 Reflection ===")
#     for node_id, reflection in reflections.items():
#         print(f"노드 ID: {node_id}")
#         print(f"Reflection: {reflection}\n")
    
#     # Reflection 데이터를 txt 파일로 저장
#     save_reflections_to_txt(reflections, output_file_path)
# else:
#     print("Reflection 데이터가 없습니다.")



=== 추출된 Reflection ===
노드 ID: 0
Reflection: I find myself sleeping for such a long stretch because my body is just so exhausted from the constant battle with pain and the emotional toll of everything I'm going through. It feels like a much-needed escape from the anxiety and the heaviness of my situation, even if just for a little while. I can't help but feel a mix of relief and worry—relief that I'm finally getting some rest, but worry that I'm relying on sleep as a way to cope with everything else.

노드 ID: 1
Reflection: Lately, I’ve been feeling a bit overwhelmed with everything going on—my battle with breast cancer, the persistent back pain, and the weight of my depressed mood. It’s like my body is telling me to rest, but I can't help but wonder if this prolonged sleep is just a way to escape or if it’s my mind and body trying to heal. I really want to find a balance, but it’s hard not to feel a little lost in all of this.

노드 ID: 2
Reflection: Lately, I’ve been feeling a bit overwh

In [40]:
# import json

# # File path for the uploaded JSON file
# file_path = "../output/shortterm_5min.json"

# # JSON 파일 열기 및 'description' 추출
# try:
#     with open(file_path, 'r', encoding='utf-8') as file:
#         data = json.load(file)
    
#     # Extract all descriptions from the JSON data
#     descriptions = [node.get("description", "") for node in data.values()]
    
#     # Save the descriptions to a new file or return them for user
#     output_file_path = "../output/descriptions_extracted.txt"
#     with open(output_file_path, 'w', encoding='utf-8') as output_file:
#         output_file.write("\n".join(descriptions))
    
#     output_message = f"Descriptions extracted successfully. File saved at: {output_file_path}"
# except Exception as e:
#     output_message = f"An error occurred: {str(e)}"

# output_message

In [42]:
# import json

# # JSON 파일 경로
# file_path = '../data/dummy_dataset/5min_dummy_data_merge.json'

# # JSON 파일 읽기
# with open(file_path, 'r', encoding='utf-8') as file:
#     data = json.load(file)

# # 'activity' 키의 개수 세기
# activity_count = sum(1 for item in data if 'activity' in item)

# print(f"'activity' 항목의 총 개수: {activity_count}")
