# 데이터 저장
* 초기 RAW 데이터화

In [None]:
import os
import asyncio
import nest_asyncio
from dotenv import load_dotenv
from llama_parse import LlamaParse
nest_asyncio.apply()
load_dotenv()
api_key = os.getenv("LLAMA_CLOUD_API_KEY")
if not api_key:
    raise ValueError("❌ LLAMA_CLOUD_API_KEY가 설정되지 않았습니다. .env 파일을 확인하세요.")
parser = LlamaParse(result_type="text")  # ✅ "text"로 설정하여 순수 텍스트만 반환

async def process_pdf(folder_path="./랩큐"):
    """비동기로 폴더 내의 PDF 파일 읽기 및 텍스트 추출"""
    pdf_files = []
    for root, _, files in os.walk(folder_path):
        for file in files:
            if file.lower().endswith(".pdf"):
                pdf_files.append(os.path.join(root, file))
                
    for pdf_path in pdf_files:
        print(f"📄 처리할 PDF: {pdf_path}")
        documents = await parser.aload_data(pdf_path)
        extracted_text = "\n\n".join([f"--- Page {i+1} ---\n\n{doc.text}" for i, doc in enumerate(documents)])
        relative_path = os.path.relpath(pdf_path, folder_path)  # 기준 폴더 대비 상대 경로
        txt_relative_path = os.path.splitext(relative_path)[0] + ".txt"  # 확장자만 변경
        txt_filepath = os.path.join(folder_path, txt_relative_path)  # 원본 폴더 구조 유지
        with open(txt_filepath, "w", encoding="utf-8") as f:
            f.write(extracted_text)
            
        print(f"✅ 저장 완료: {txt_filepath}")
        
folder_path = "./랩큐"
asyncio.run(process_pdf(folder_path=folder_path))


In [None]:
import pdfplumber
from PIL import Image
import os

def save_pdf_images(pdf_path, base_folder):
    """PDF의 각 페이지를 이미지로 변환하여 저장"""
    with pdfplumber.open(pdf_path) as pdf:
        pdf_dir = os.path.splitext(os.path.relpath(pdf_path, base_folder))[0]  # 원본 폴더 구조 유지
        image_folder = os.path.join(base_folder, pdf_dir + "_images")  # "_images" 폴더에 저장
        os.makedirs(image_folder, exist_ok=True)

        for i, page in enumerate(pdf.pages):
            # 페이지를 이미지로 변환
            im = page.to_image(resolution=300)  # 300 DPI 해상도 설정
            image_path = os.path.join(image_folder, f"page_{i+1}.png")
            im.annotated.save(image_path, format="PNG")

        print(f"📷 총 {len(pdf.pages)}개의 페이지가 이미지로 저장되었습니다: {image_folder}")


## 이미지 데이터 처리
* RoboFlow를 활용해, image 데이터 graph crop 진행 및 학습 진행
  * 사용모델 yolov11
* 현재는 Project 제거 상태(데이터 보완을 위해)

In [None]:
# import the inference-sdk
from inference_sdk import InferenceHTTPClient
import os
from PIL import Image

API_KEY = os.getenv("ROBOFLOW_API_KEY", "Your API Key")
CLIENT = InferenceHTTPClient(
    api_url="https://detect.roboflow.com",
    api_key=API_KEY
)
folder_path = "/content/drive/MyDrive/랩큐"
idx = 0

for root, _, files in os.walk(folder_path):
  folder_name = os.path.basename(root)
  parent_folder = os.path.dirname(root)
  output_folder = os.path.join(parent_folder, f"{folder_name}")
  for file in files:
    if file.endswith(".png"):
      file_path = os.path.join(root, file)
      print(f"Processing: {file_path}")
      result = CLIENT.infer(file_path, model_id="graph_pr/2")
      if 'predictions' in result and result['predictions']:
        img = Image.open(file_path)
        img_width, img_height = img.size
        for graph in result['predictions']:
          # 좌표 변환
          x, y, width, height = map(int, [graph['x'], graph['y'], graph['width'], graph['height']])
          left = max(0, x - width // 2)
          upper = max(0, y - height // 2)
          right = min(img_width, x + width // 2)
          lower = min(img_height, y + height // 2)
          cropped_img = img.crop((left, upper, right, lower))
          output_file = os.path.join(output_folder, f"graph_{idx}.jpg")
          cropped_img.save(output_file)
          print(f"Saved: {output_file}")
          idx += 1
      else:
          print(f"No graphs detected in {file_path}")

## 이미지 데이터 처리 2
* 위에서 Yolo를 통해 추출된 데이터를 GPT4o-mini를 활용하여 그래프 요약 및 분석 진행.

In [None]:
import os
import base64
from openai import OpenAI
from dotenv import load_dotenv
import pandas as pd

# 환경 변수 로드
load_dotenv()

# OpenAI API 설정
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

# 이미지가 포함된 최상위 폴더 경로
base_folder = "./tasty-data/랩큐/"

# 이미지 분석 함수
def analyze_image(image_path, company_name):
    with open(image_path, "rb") as image_file:
        # Base64 인코딩
        encoded_image = base64.b64encode(image_file.read()).decode("utf-8")

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": f"""너는 이미지 분석 전문가야. {company_name}의 이미지를 분석하여 그래프가 포함되었는지 판단합니다. 만약 그래프가 포함되었다면, 다음과 같은 구조화된 그래프 설명을 생성합니다:
1.축 정보: X축과 Y축 라벨 및 범위
2.추세 및 패턴: 선형, 지수, 사인파 등의 유형 식별
3.핵심 포인트: 극점(최고점, 최저점), 교차점, 마커 등
4.통찰: 그래프가 나타내는 의미
Chain-of-Thought 분석:
그래프를 분석하기 위해 다음 단계를 따릅니다:
1.축 정보: X축과 Y축의 라벨 및 범위를 식별합니다.
2.추세 및 형태: 그래프가 선형인지, 진동하는지, 지수적 증가를 보이는지 확인합니다.
3.핵심 포인트: 최고점, 최저점, 교차점, 마커 등을 찾아 강조합니다.
4.통찰: 그래프가 나타내는 내용을 요약합니다.
그리고 대답은 한국어로 진행해줘,  만약 그래프가 아니라면 3번과4번을 중점으로 진행해줘
대답의 마지막에 해당 ```이미지를 대표하는 설명```을 작성해줘"""},
            {
                "role": "user",
                "content": [
                    {"type": "image_url", 
                     "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"},}
                ],
            },
        ],
        max_tokens=800,
    )
    return response.choices[0].message.content
base_folder = "./랩큐/"
# 모든 하위 폴더까지 탐색하여 이미지 파일 처리
for root, dirs, files in os.walk(base_folder):
    # *_images로 끝나는 폴더만 선택
    if not root.endswith("_images"):
        continue
    folder_name = os.path.basename(root)  
    parent_dir = os.path.dirname(root)  
    company_name = os.path.splitext(os.path.basename(parent_dir))[0]
    output_file = os.path.join(parent_dir, f"{folder_name.split('_images')[0]}_img.xlsx")
    image_data_list = []
    for filename in files:
        if filename.lower().endswith((".jpg",)):  # 지원하는 이미지 확장자
            image_path = os.path.join(root, filename)
            try:
                summary = analyze_image(image_path, company_name)
                print(f"✅ 분석 완료: {image_path}")
                print(summary)
                image_data_list.append([summary, summary])
            except Exception as e:
                print(f"⚠️ 오류 발생: {image_path} - {str(e)}")
    if not image_data_list:
        continue
    new_df = pd.DataFrame(image_data_list, columns=["embedding", "text"])
    new_df.to_excel(output_file, index=False, engine="openpyxl")
    print(f"✅ '{output_file}' 파일에 결과 저장 완료.")
    


## 텍스트 데이터 처리
* Rough하게 저장된 txt 파일을 불러와 GPT 4o -mini를 활용해 텍스트와 표 데이터를 분리하는 작업 진행

In [None]:
import re
import os
from collections import defaultdict
import pandas as pd
from openai import OpenAI
from dotenv import load_dotenv


## 데이터 추출용 함수 선언
def extract_data(text_data_list):
    # 딕셔너리 key "embedding_data"와 "table_title"에 리스트를 저장
    table_embeddings = defaultdict(list)
    table_title = ""
    excel_data =[]
    for i, text in enumerate(text_data_list):
        text_data, table_data = text.split("표 데이터", 1)
        for txt in text_data.split("\n\n"):
            if not txt.strip():
                continue
            excel_data.append([txt, txt])
        for delimiter in ["### 그래프(도표) 데이터", "### 그래프 (도표) 데이터", "그래프(도표) 데이터"]:
            table_data = table_data.split(delimiter, 1)[0]
        
        for table in table_data.split("\n\n"):
            if not table.strip():  # 빈 테이블 건너뛰기
                continue

            col_keywords = ""  # 각 테이블마다 초기화
            row_keywords = ""
            rows = [row.strip() for row in table.split("\n") if row.strip()]

            # 유효한 테이블인지 확인
            if len(rows) < 2:
                if len(rows[0]) > 5:  # 제목으로 판단
                    if table_title not in table_embeddings["table_title"]:
                        table_title = rows[0].strip()
                    else:
                        table_title = ""
                continue

            if '|' in table:  # Markdown 표 형식 처리
                if table_title == "":  # 제목이 없는 경우
                    table_title = rows[0]
                    row_keywords = rows[1].split('|')
                else:
                    row_keywords = rows[0].split('|')
                
                row_keywords = ', '.join(row_keywords).strip()
                for row in rows[1:]:
                    if '---' not in row:  # 구분선 제외
                        cols = row.split('|')
                        if len(cols) > 1 and cols[1].strip():
                            col_keywords += ', ' + cols[1].strip()

            # 숫자 비중이 낮은 쪽을 키워드로 선택
            row_num_count = len(re.findall(r'\d', row_keywords))
            col_num_count = len(re.findall(r'\d', col_keywords))
            row_temp = row_num_count / max(1, len(row_keywords))
            col_temp = col_num_count / max(1, len(col_keywords)) 
            keywords = col_keywords if row_temp >= col_temp else row_keywords
            
            # 표 제목 정리: "표 X." 또는 "### 표 X:" 제거
            table_title = re.sub(r"^(\#*\**\s*표\s*\d+[\.\:\-]?\s*)", "", table_title).strip()
            # 제목과 키워드 조합
            if keywords:
                table_embedding = f"{table_title}{keywords.strip()}"
            else:
                table_embedding = table_title
            # 결과 저장
            table_embeddings["table_title"].append(table_title)
            excel_data.append([table_embedding, table])
            table_title = ""
            
    true_data = []
    for ebd, data in excel_data:
        if len(data) > 40:
            true_data.append([ebd,data])
    return true_data

# OpenAI API 설정 및 텍스트 분류 진행(표와 텍스트 데이터)
load_dotenv()
client = OpenAI(
    api_key=os.environ.get("OPENAI_API_KEY"),  
)
def read_text_files_by_pages(base_folder):
    text_files = {}
    # 모든 하위 폴더 포함하여 .txt 파일 찾기
    for root, _, files in os.walk(base_folder):
        for file in files:
            if file.endswith(".txt"):
                file_path = os.path.join(root, file)
                with open(file_path, "r", encoding="utf-8") as in_file:
                    content = in_file.read().strip()
                    pages = content.split("\n--- Page ")  # 페이지 구분자 기준으로 분할 진행
                    text_files[file_path] = [p.strip() for p in pages if p.strip()]  # 빈 값 제거
    return text_files

def classify_text_with_gpt(page_content):

    response =  client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "당신은 문서를 분석하는 전문가입니다. 다음 문서에서 일반 텍스트와 표 데이터, 그래프(도표)데이터를 구분해 주세요 없으면 없음이라고 알려주세요. 표는 행을 띄어쓰기로 구분하고, 표를 여러개로 분리할 수 있다면 각각 분리해줘, 표의 빈칸은 N/A로 채우고 마크다운으로 예쁘게 정리해. 텍스트는 텍스트 내용 전체를 작성하고 표도 데이터를 놓치지말고, 표와 텍스트에 대한 구분은 표 데이터, 텍스트 데이터로 구분해."},
            {"role": "user", "content": f" \n\n{page_content}"}
        ],
        max_tokens=6000
    )
    return response.choices[0].message.content 


# 실행 코드
folder_path = "./랩큐/한화솔루션"  # 여기에 폴더 경로 입력
files_with_pages = read_text_files_by_pages(folder_path)

for file_path, pages in files_with_pages.items():
    base_name = os.path.splitext(os.path.basename(file_path))[0]  # 원본 파일 이름 (확장자 제거)
    output_file_name = f"{base_name}_gpt_text.xlsx"
    save_dir = os.path.dirname(file_path)
    output_path = os.path.join(save_dir, output_file_name)
    text_data_list = []
    # 파일이 이미 존재하면 건너뛰기
    if os.path.exists(output_path):
        print(f"⚠️ '{output_path}' 파일이 이미 존재하여 건너뜁니다.")
        continue
    for page in pages:
        data = classify_text_with_gpt(page)
        text_data_list.append(data)
    print(f"✅ '{file_path}' 파일 분석 완료.")
    true_data = extract_data(text_data_list)
    df = pd.DataFrame(true_data, columns=["embedding", "text"])
    df.to_excel(output_path, index=False, engine='openpyxl')
    print(f"✅ '{output_path}' 파일에 결과가 저장되었습니다.")


## 텍스트 데이터 처리 2
* 분리된 텍스트 데이터 전처리 및 불필요한 데이터 삭제 진행

In [None]:
import os
import pandas as pd
import re
file_path = "./랩큐"
def english_ratio(text):
        if not isinstance(text, str):
            return 0
        clean_text = re.sub(r'[^a-zA-Z가-힣0-9\s]', '', text)
        clean_text = clean_text.replace(' ', '')
        total_chars = len(clean_text)
        english_chars = len(re.findall(r'[a-zA-Z]', clean_text))
        return (english_chars / total_chars) * 100 if total_chars > 0 else 0
    
    
def clean_text(text: str) -> str:
    text = str(text)
    text = text.strip()  # 앞뒤 공백 제거
    text = re.sub(r"[\t]", " ", text)  # 탭 문자는 공백으로 변경
    text = re.sub(r"\n{2,}", " ", text)  # 연속된 개행(두 개 이상) → 공백으로 변경
    text = re.sub(r"[ ]+", " ", text)  # 여러 개의 공백을 하나로 변경
    return text

def is_meaningless_text(text: str) -> bool:
    if re.search(r"^\s*\|.*\|", text, re.MULTILINE):
        lines = text.strip().split("\n")
        max_columns = max(len(line.split("|")) for line in lines)
        columns = [col.strip() if col.strip() else f"col_{i}" for i, col in enumerate(lines[0].split("|"))]
        if len(columns) < max_columns:
            columns += [f"col_{i}" for i in range(len(columns), max_columns)]
        data = []
        try:
            for line in lines[2:]:
                values = [val.strip() if val.strip() else "N/A" for val in line.split("|")]
                if len(values) < max_columns:
                    values += ["N/A"] * (max_columns - len(values))
                data.append(values)
            df = pd.DataFrame(data, columns=columns[:max_columns])
            if df.empty:
                return False
            total_cells = df.size
            na_count = (df == "N/A").sum().sum()
            if total_cells > 0:  # 0으로 나누는 오류 방지
                na_ratio_exact = (na_count / total_cells) * 100
                #print(f"N/A 비율: {na_ratio_exact:.2f}%")
                if na_ratio_exact >= 61:
                    #print(df)
                    return True
            else:
                print("⚠ Warning: No valid cells to calculate N/A ratio!")
                return False
        except Exception as e:
            print(f"Error: {e}")
            return False
    return False

remove_keywords = [
    "이 자료에 게재된 내용들은 본인의 의견을 정확하게 반영하고 있으며",
    "본 자료에 기재된 내용들은 작성자 본인의 의견을 정확하게 반영하고 있으며",
    "본 조사분석자료에 게재된 내용들이 본인의 의견을 정확하게 반영하고 있으며",
    "이 자료는 조사분석 담당자가 객관적 사실에 근거해 작성하였으며",
    "동 자료는 기관투자가 또는 제3자에게 사전제공한 사실이 없습니다",
    "본 자료를 작성한 애널리스트는 외부 압력이나 간섭을 받지 않았으며",
    "고지사항 본 조사분석자료는 당사의 리서치센터가 신뢰할 수 있는 자료 및 정보로부터 얻은 것이나",
    "본 자료는 투자자의 투자를 권유할 목적으로 작성된 것이 아니라",
    "본 조사분석자료에는 외부의 부당한 압력이나 간섭 없이 애널리스트의 의견이 정확하게 반영되었음을 확인합니다",
    '당사의 금융투자분석사는 자료작성일 현재 본 자료에 관련하여 재산적 이해관계가 없습니다',
    "이 자료에 게재된 내용들은 작성자의 의견을 정확하게 반영하고 있으며",
    "이 문서의 내용이 본인의 의견을 반영하고 외부의 압력 없이 작성되었음을 나타냅니다",
    "본 조사자료는 고객의 투자에 정보를 제공할 목적으로 작성되었으며",
    "동 자료는 제공시점 현재 기관투자가 또는 제3자에게 사전 제공한 사실이 없습니다",
    "전일기준 당사에서 1% 이상 보유하고 있지 않습니다",
    "조사분석자료가 고객의 투자 참고용으로 작성되었음을 명시하고 있습니다.",
    "당사는 상기 명시한 사항 외 고지해야 하는 특별한 이해관계가 없습니다.",
    "이 이미지는 표 형태의 데이터로 구성되어 있으며, 그래프는 포함되어 있지 않습니다.",
    "자료: CJ제일제당, DS투자증권 리서치센터 추정",
    "매도 -10% 이하의 주가하락이 예상되는 경우 비중축소",
    "동 자료에 게재된 내용들은 외부의 압력이나 부당한 간섭없이 본인의 의견을 정확하게 반영하여 작성되었음을 확인합니다",
    "종목투자의견은 향후 12개월간 추천일 종가대비 해당종목의 예상 목표수익률을 의미함.",
    "Sell(매도): KOSPI 대비 기대수익률 -10% 이하",
    "본 자료를 작성한 애널리스트는 자료작성일 현재 추천 종목과 재산적 이해관계가 없습니다",
    "중립 : 업종내 커버리지 업체들의 투자의견이 시가총액 기준으로 중립적일 경우 중립 : 향후 6개월 수익률이 -10% ~-20%",
    "당사는 동 자료를 기관투자자 또는 제3자에게 사전 제공한 사실이 없습니다.",
    "본 조사분석자료는 당사의 리서치센터가 신뢰할 수 있는 자료 및 정보로부터 얻은 것이나, 당사가 그 정확성이나 완전성을 보장할 수 없고, 통지 없이 의견이 변경될 수 있습니다.",
    "외부의 압력이나 간섭 없이 신의성실하게 작성되었음을 밝힙니다.",
    "당 보고서 공표일 기준으로 해당 기업과 관련하여, 회사는 해당 종목을 1%이상 보유하고 있지 않습니다.",
    "본 분석자료는 투자자의 증권투자를 돕기 위한 참고자료이며",
    "당사는 개별 종목에 대해 향후 1 년간 +15% 이상의 절대수익률이 기대되는 종목에 대해 Buy(매수) 의견을 제시합니다.",
    "당사는 산업에 대해 향후 1 년간 해당 업종의 수익률이 과거 수익률에 비해 양호한 흐름을 보일 것으로 예상되는 경우에 Positive(긍정적) 의견을 제시하고 있습니다",
    "이 조사자료는 당사 리서치센터가 신뢰할 수 있는 자료 및 정보로부터 얻어진 것이나,",
    "Hold(보유): KOSPI 대비 기대수익률 -10~10%",
    "자료의 작성과 관련하여 외부의 압력이나 부당한 간섭을 받지 않았으며",
]

for root, _, files in os.walk(file_path):   
    for file in files:
        if file.endswith(".xlsx"):
            file_full_path = os.path.join(root, file)
            df = pd.read_excel(file_full_path, engine="openpyxl")
            # 텍스트 데이터 변환
            df['embedding'] = df['embedding'].apply(clean_text)
            df['text'] = df['text'].apply(clean_text)
            df['embedding'] = df['embedding'].astype(str).str.replace('텍스트 데이터', '', regex=False)
            df['embedding'] = df['embedding'].str.replace(r'#+', '#', regex=True)
            df['text'] = df['text'].astype(str).str.replace('텍스트 데이터', '', regex=False)
            df['text'] = df['text'].str.replace(r'#+', '#', regex=True) 
            df['embedding'] = df['embedding'].astype(str).str.replace('COMPANY REPORT', '', regex=False)
            df['text'] = df['text'].astype(str).str.replace('COMPANY REPORT', '', regex=False)
            for remove_keyword in remove_keywords:
                df = df[~df["embedding"].str.contains(remove_keyword, na=False, regex=False)]
                df = df[~df["text"].str.contains(remove_keyword, na=False, regex=False)] 
            df['english_ratio'] = df['text'].apply(english_ratio)
            df = df[(df['english_ratio'] < 98) & (df['text'].astype(str).str.len() > 40)]
            df = df[~df["text"].apply(is_meaningless_text)]
            df = df.drop(columns=['english_ratio'])
            df.to_excel(file_full_path, index=False, engine='openpyxl')
            print(f"Checked file: {file_full_path}")
