In [1]:
import cv2
import requests
import pandas as pd
import numpy as np
from PIL import Image, ImageEnhance
import pytesseract
from io import BytesIO
from gensim.models import FastText as FastText
import re
from datetime import datetime, timedelta
import xml.etree.ElementTree as ET
from typing import Optional, Dict, Any, List
from dotenv import load_dotenv
import os

In [2]:
load_dotenv()

KOPIS_API_KEY = os.getenv('KOPIS_API_KEY')

In [4]:
class KopisAPI:
    def __init__(self, service_key):
        self.service_key = service_key
        self.base_url = "http://www.kopis.or.kr/openApi/restful"

    def get_performance_list(self, start_date, end_date):
        """공연 목록 조회"""
        url = f"{self.base_url}/pblprfr"
        params = {
            'service':self.service_key,
            'stdate': start_date,
            'eddate': end_date,
            'rows': 100,
            'cpage': 1
        }
        response = requests.get(url, params=params)
        root = ET.fromstring(response.content)

        performance = []
        for db in root.findall('.//db'):
            perf = {}
            for child in db:
                perf[child.tag] = child.text
            performance.append(perf)

        return performances

    def get_performance_detail(self, mt20id: str) -> Optional[Dict[str, Any]]:
        """공연 상세정보 조회 - 포스터와 소개이미지 처리"""
        url = f"{self.base_url}/pblprfr/{mt20id}"
        params = {"service": self.service_key}

        try:
            response = requests.get(url, params=params)
            response.raise_for_status()

            root = ET.fromstring(response.content)
            db = root.findall('.//db')

            if db is None:
                return None

            detail = {}
            for elem in db:
                if elem.tag == 'styruls':
                    # XML 구조 디버깅
                    print(f"styurls element found for {mt20id}")
                    print(f"styurls content: {ET.tostring(elem, encoding='unicode')}")

                    # 소개이미지 목록 추출 (수정된 XPath)
                    urls = []
                    for styurl in elem.findall('styurl'):
                        if tyurl.text and styurl.text.strip():
                            print(f"Found image URL: {styurl.text}")
                            urls.append(styurl.text.strip())
                    detail['styurls'] = urls
                else:
                    if elem.text and elem.text.strip():
                        detail[elem.tag] = elem.text.strip()
                
            # 디버깅을 위한 출력
            if 'styurls' in detail:
                print(f"Total styurls found for {mt20id}: {len(detail['styurls'])}")
            else:
                print(f"No styurls found for {me20id}")

            return detail

        except Exception as e:
            print(f"API 요청 오류: {e}")
            return None

# 텍스트 처리 클래스 정의

In [7]:
class TextProcessor:
    def __init__(self):
        self.model = None
        pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'

    def enhance_image(self, img):
        """이미지 품질 개선"""
        # PIL Image를 사용한 개선
        enhance = ImageEnhance.Contrast(img)
        img = enhancer.enhance(2.0) # 대비 증가
        enhancer = ImageEnhance.Sharpness(img)
        img = enhancer.enhance(2.0) # 선명도 증가
        return img

    def preprocess_image(self, img_array):
        """openCV를 이용한 이미지 전처리"""
        #그레이스케일 변환
        gray = cv2.cvtColor(img_array, cv2.COLOR_BGR2GRAY)

        # 노이즈 제거
        denoised = cv2.fastNlMeansDenoising(gray)

        # 이진화
        _, binary = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

        # 모폴로지 연산으로 텍스트 영역 강화
        kernal = cv2.getStructuringElement(cv2.MORPH_RECT, (3,3))
        processed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)

        return processed

    def get_image_section(self, img):
        """이미지를 여러 섹션으로 분할"""
        width, height = img.size
        sections = []

        # 세로로 3등분
        section_height = height // 3
        for i in range(3):
            top = i * section_height
            bottom = (i + 1) * section_height
            section = img.crop((0, top, width, bottom))
            sections.append(section)

        return sections

    def extract_text_from_image(self, image_url):
        try:
            print(f"이미지 다운로드 시도: {image_url}")
            response = resquests.get(image_url)
            img = Image.open(BytesIO(response.content))

            # GIF 처리 추가
            if img.format == 'GIF':
                img = img.convert('RGB')

            # 이미지 크기 정규화
            target_width = 1000
            width_percent = (target_width / float(img.size[0]))
            target_height = int(float(img.size[1]) * float(width_percent))
            img = img.resize((target_width, target_height), Image.Resampling.LANZOS)

            # 이미지 크기 정규화
            target_width = 1000
            width_percent = (target_width / float(img.size[0]))
            target_percent = int(float(img.size[1]) * float(width_percent))
            img = img.resize((target_width, target_height), Image.Resampling.LANZOS)

            # PIL 이미지 개선
            enhance_img = self.enhance_image(img)

            # openCV 전처리
            img_array = np.array(enhance_img)
            processed_img = self.preprocess_image(img_array)

            # 이미지를 섹션으로 분할
            sections = self.get_image_sections(img)

            texts = []

            # 각 섹션으로 분할
            sections = self.get_image_sections(img)

            texts = []

            # 각 섹션별로 OCR 수행
            for i, section in enumerate(sections):
                # 다양한 OCR설정 시도
                config = [
                    '--oem 3 --psm 6', # 기본 설정
                    '--oem 3 --psm 1', # 자동 페이지 세그멘테이션
                    '--oem 3 --psm 4', # 컬럼으로 가정
                ]

                section_texts = []
                for config in configs:
                    text = pytesseract.image_to_string(
                        section,
                        lang='kor+eng',
                        config=config
                    )
                    if text.strip():
                        section_texts.append(text)

                # 가장 긴 리스트 선택
                if section_texts:
                    longest_text = max(section_texts, key=len)
                    texts.append(longest_text)

            # 처리된 이미지로 한 번 더 OCR 
            processed_text = pytesseract.image_to_string(
                processed_img,
                lang='kor+eng',
                config='--oem 3 --psm 6'
            )
            texts.append(processed_text)
            
            # 모든 텍스트 결합 및 정제
            conbined_text = ' '.join(texts)
            cleaned_text = self.clean_text(combined_text)

            # 결과 로깅
            print(f"추출된 총 텍스트 길이: {len(cleaned_text)}")
            print(f"텍스트 샘플: {cleaned_text[:200]}...")

            return cleaned_text

        except Exception as e:
            print(f"이미지 처리 중 오류 발생: {str(e)}")
            return ""

    def clean_text(self, text):
        """텍스트 전처리"""
        if not text:
            return ""

        # 불필요한 문자 제거
        text = re.sub(r'[^\w\s가 -힣]', ' ', text)

        # 연속된 공백 제거
        text = re.sub(r'\s+', ' ', text)

        # 줄바꿈 통일
        text = text.replace('\n', ' ')

        # 불필요한 반복 제거
        words = text.split()
        words = list(dict.fromkeys(words)) # 중복 제거
        text = ' '.join(words)

        return text.strip().lower()

    def train_model(self, texts):
        """FastText 모델 학습"""
        texts = [text for text in texts if text.strip()]
        if not texts:
            print("경고: 학습할 텍스트가 없습니다.")
            return

        sentence = [[word for word in text.split()] for text in texts]
        try:
            self.model = FastText(
                sentences=sentences,
                vector_size=100,
                window=5,
                min_count=1,
                workers=4
            )
            print(f"모델 학습 완료: {len(sentence)} 문장")
        except Exception as e:
            print(f"모델 학습 오류: {str(e)}")

    def get_text_vector(self, text):
        """텍스트 벡터화"""
        if self.model is None:
            print("경고: 모델이 학습되지 않았습니다.")
            return np.zeros(100)

        words = text.split()
        word_vectors = [self.model.wv[word] for word in words if word in self.wv]
        if not word_vectors:
            return np.zeros(100)
        return np.mean(word_vectors, axis=0)


# 공연 추천 시스템 클래스 정의

In [13]:
class PerformanceRecommender:
    def __init__(self, api_client, text_processor):
        self.api_client = api_client
        self.text_processor = text_processor
        self.performence_df = None

    def collect_performance_data(self, days=30):
        """공연 데이터 수집 - 모든 이미지 처리"""
        start_date = datetime.now().strtime("%Y%m%d")
        end_date = (datetime.now() + timedelta(days=days)).strftime("%Y%m%d")

        performances = []
        perf_list = self.api_client.get_performance_list(start_date, end_date)

        for perf in perf_list[:10]: # 테스트를 위해 10개만
            mt20id = perf['mt20id']
            detail = self.api_client.get_performance_detail(mt20id)

            if detail:
                # 포스터 텍스트 추출
                poster_text = ""
                if 'poster' in detail and detail['poster'] and detail['poster'].startswidth('http'):
                    try:
                        poster_text = self.text_processor.extract_text_from_image(detail['poster'])
                    except Exception as e:
                        print(f'포스터 이미지 처리 오류({mt20id}): {str(e)}')

                # 소개 이미지 텍스트 추출
                intro_texts = []
                if 'styurl' in detail and isinstance(detail['styurls'], list):
                    for img_url in detail['styurls']:
                        if img_url and img_url.startswidth('http'):
                            try:
                                text = self.text_processor.extract_text_from_image(img_url)
                                if text:
                                    intro_texts.append(text)
                            except Exception as e:
                                print(f'소개이미지 처리 오류({mt20id}): {str(e)}')

                # 모든 텍스트 결합
                all_text = ' '.join(filter(None, [poster_text] + intro_texts))

                performance.append({
                    'mt20id': mt20id,
                    'title': detail.get('prfnm', ''),
                    'plot': all_text if all_text.strip() else ""
                })

                self.performances_df = pd.DataFrame(performances)
                return self.performances_df

    def prepare_model(self):
        """추천 모델 준비"""
        if self.performances_df is None:
            raise ValuesError("공연 데이터를 먼저 수집하세요")

        plots = self.performances_df['plot'].tolist()
        self.text_performances.train_model(plots)

    def get_recommendations(self, user_plot, top_n=5):
        """사용자 입력에 기반한 공연 추천"""
        if self.performances_df is None:
            raise ValuesError("공연 데이터를 먼저 수집하세요.")

        user_vector = self.text_preprocessor.get_text_vector(ueser_plot)

        # 각 공연의 유사도 계산
        similarities = []
        for plot in self.performances_df['plot']:
            plot_vector = self.text_processor.get_text_vector(plot)
            similarity = np.dot(user_vector, plot_vector) / (
                np.linalg.norm(user_vector) * np.linalg.norm(plot_vector)
            )
            similarity.append(similarity)

        self.performances_df['similarity'] = similarities
        recommendation = self.performances_df.nlargest(top_n, 'similarity')
        return recommendations[['title', 'similarity']]
