In [11]:
import re
def preprocess_text(text):
    if isinstance(text, list):  # 리스트일 경우 첫 번째 값 반환
        text = text[0] if text else ""  # 리스트가 비어있으면 빈 문자열 반환
    
    if text is None:  # None 값 처리
        return ""
    
    # 줄바꿈 제거 및 공백 정리
    text = text.replace("\n", " ").strip()
    text = re.sub(r'\s+', ' ', text)  # 중복 공백 제거
    return text

    '''
    def time_converter(match):
        period = match.group(1)  # 오전 또는 오후
        hour = int(match.group(2))  # 시
        minute = match.group(3) or "00"  # 분 (없으면 기본값 00)
        

        if period == "오후" and hour != 12:
            hour += 12  # 오후는 12를 더함
        elif period == "오전" and hour == 12:
            hour = 0  # 오전 12시는 0시로 변환

        return f"{hour:02}:{minute}"  # 24시간 형식으로 반환

    text = re.sub(r"(오전|오후)\s*(\d{1,2})시\s*(\d{1,2})?분?", time_converter, text)
    '''
    



def extract_earliest(value, is_time=False):
    if value is None or value.strip() == "":  # 빈 값 처리
        return "Null"
    
    # 범위 구분자로 ~ 또는 - 처리
    if any(separator in value for separator in ["~", "-"]):
        for separator in ["~", "-"]:  # 모든 구분자에 대해 처리
            if separator in value:
                parts = value.split(separator)
                parts = [p.strip() for p in parts if p.strip()]  # 공백 제거 및 빈 값 필터링
                
                if not parts:  # 모든 값이 빈 값인 경우
                    return "Null"
                
                if is_time:
                    try:
                        return min(parts, key=lambda x: int(x.split(":")[0]) * 60 + int(x.split(":")[1]))
                    except (IndexError, ValueError):
                        return parts[0]  # 기본값으로 첫 번째 값 반환
                else:
                    return min(parts, key=lambda x: re.sub(r"[^\d]", "", x))  # 숫자만 비교
    return value


def preprocess_data(dataset):
    for item in dataset:
        # NoneType을 처리하기 위해 get 메서드 사용
        item["original_text"] = preprocess_text(item.get("original_text", ""))
        item["schedule_info"]["event_title"] = preprocess_text(item["schedule_info"].get("event_title", ""))
        item["schedule_info"]["date"] = extract_earliest(preprocess_text(item["schedule_info"].get("date", "")))
        item["schedule_info"]["time"] = extract_earliest(preprocess_text(item["schedule_info"].get("time", "Null")), is_time=True)
    return dataset


In [5]:
import json

 # 데이터셋 로드
def load_jsonl(file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        return [json.loads(line) for line in f]
    
train_set= load_jsonl("/kaggle/input/dataset/filtered_train_set.jsonl")
test_set = load_jsonl("/kaggle/input/dataset/filtered_test_set.jsonl")
train_data = preprocess_data(train_set)
test_data = preprocess_data(test_set)
#print(train_data[:5])
def save_jsonl(data, file_path):
    with open(file_path, "w", encoding="utf-8") as f:
        for item in data:
            f.write(json.dumps(item, ensure_ascii=False) + "\n")

# 파일 저장
save_jsonl(train_data, "/kaggle/working/filtered_train_data.jsonl")
save_jsonl(test_data, "/kaggle/working/filtered_test_data.jsonl")

In [9]:
def augment_text_with_context(item):
    text = item["original_text"]
    date = item["schedule_info"].get("date", "Unknown Date")
    time = item["schedule_info"].get("time", "Unknown Time")
    return f"{text} [Date: {date}] [Time: {time}]"

for item in train_data:
    item["original_text"] = augment_text_with_context(item)


In [94]:
import logging

import os
import torch
from tqdm import tqdm
from torch.utils.data import DataLoader, RandomSampler
from transformers import AdamW
import random
import numpy as np

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
# fine-tunning 이전 코드
'''
def read_data(file_path):
    import json
    datas = []
    with open(file_path, "r", encoding="utf8") as infile:
        for line in infile:
            item = json.loads(line)
            text = item["original_text"]
            title = item["schedule_info"]["event_title"]
            datas.append((text, title))
    return datas


def convert_data2feature(datas, max_length, max_dec_length, tokenizer):
    input_ids_features, attention_mask_features, decoder_input_features, decoder_attention_mask_features, label_features = [], [], [], [], []

    for text, title in tqdm(datas, desc="convert_data2feature"):
        # tokenizer를 사용하여 입력 문장을 word piece 단위로 분리
        tokenized_text = tokenizer.tokenize(text)
        tokenized_title= tokenizer.tokenize(title)
        #########################################
        # 인코더는 출력 없음
        input_ids = tokenizer.convert_tokens_to_ids(tokenized_text)
        attention_mask = [1]*len(input_ids)

        # 디코더의 입력:  start 심볼 + 본문 출력 : 본문 + end 심볼
        decoder_input = tokenizer.convert_tokens_to_ids(['<s>'] + tokenized_title)
        decoder_attention_mask = [1] *len(decoder_input)
        label = tokenizer.convert_tokens_to_ids(tokenized_title + ['</s>'])

        padding = [tokenizer.convert_tokens_to_ids(tokenizer.pad_token)] * (max_length - len(input_ids))
        input_ids += padding
        attention_mask += padding

        ### decoder padding
        decoder_padding = [tokenizer.convert_tokens_to_ids(tokenizer.pad_token)] * (max_dec_length - len(decoder_input))
        decoder_input += decoder_padding
        decoder_attention_mask += decoder_padding
        label += decoder_padding

        #########################################

        # 변환한 데이터를 각 리스트에 저장
        input_ids_features.append(input_ids[:max_length])
        attention_mask_features.append(attention_mask[:max_length])
        decoder_input_features.append(decoder_input[:max_dec_length])
        decoder_attention_mask_features.append(decoder_attention_mask[:max_length])
        label_features.append(label[:max_dec_length])

    # 변환한 데이터를 Tensor 객체에 담아 반환
    input_ids_features = torch.tensor(input_ids_features, dtype=torch.long)
    attention_mask_features = torch.tensor(attention_mask_features, dtype=torch.long)
    decoder_input_features = torch.tensor(decoder_input_features, dtype=torch.long)
    decoder_attention_mask_features = torch.tensor(decoder_attention_mask_features, dtype=torch.long)
    label_features = torch.tensor(label_features, dtype=torch.long)

    return input_ids_features, attention_mask_features, decoder_input_features, decoder_attention_mask_features, label_features
'''
# fine -tunning 코드
def read_data(file_path):
    import json
    datas = []
    with open(file_path, "r", encoding="utf8") as infile:
        for line in infile:
            item = json.loads(line)
            text = item["original_text"]
            title = item["schedule_info"]["event_title"]
            
            # `date`와 `time`을 데이터셋에서 가져오기
            date = item["schedule_info"].get("date", "")
            time = item["schedule_info"].get("time", "")
            
            datas.append((text, title, date, time))
    return datas
def convert_data2feature(datas, max_length, max_dec_length, tokenizer, boost_weight=2.0):
    input_ids_features, attention_mask_features, decoder_input_features, decoder_attention_mask_features, label_features, importance_masks = [], [], [], [], [], []

    for text, title, date, time in tqdm(datas, desc="convert_data2feature"):
        # Tokenize 텍스트 및 타이틀
        tokenized_text = tokenizer.tokenize(text)
        tokenized_title = tokenizer.tokenize(title)

        # Tokenize date와 time
        tokenized_date = tokenizer.tokenize(date)
        tokenized_time = tokenizer.tokenize(time)

        #########################################
        # 인코더 입력 데이터 준비
        input_ids = tokenizer.convert_tokens_to_ids(tokenized_text)
        attention_mask = [1] * len(input_ids)

        # 디코더 입력 준비
        decoder_input = tokenizer.convert_tokens_to_ids(['<s>'] + tokenized_title)
        decoder_attention_mask = [1] * len(decoder_input)
        label = tokenizer.convert_tokens_to_ids(tokenized_title + ['</s>'])

        # 중요도 마스크 생성: date와 time 토큰에 가중치 적용
        importance_mask = [
            boost_weight if token in tokenized_date or token in tokenized_time else 1.0
            for token in tokenized_text
        ]

        # Padding 처리
        padding = [tokenizer.pad_token_id] * (max_length - len(input_ids))
        input_ids += padding
        attention_mask += [0] * len(padding)
        importance_mask += [1.0] * len(padding)  # 패딩 토큰은 기본 가중치

        decoder_padding = [tokenizer.pad_token_id] * (max_dec_length - len(decoder_input))
        decoder_input += decoder_padding
        decoder_attention_mask += decoder_padding
        label += decoder_padding

        #########################################

        # 변환한 데이터를 각 리스트에 저장
        input_ids_features.append(input_ids[:max_length])
        attention_mask_features.append(attention_mask[:max_length])
        decoder_input_features.append(decoder_input[:max_dec_length])
        decoder_attention_mask_features.append(decoder_attention_mask[:max_dec_length])
        label_features.append(label[:max_dec_length])
        importance_masks.append(importance_mask[:max_length])

    # 텐서 변환
    input_ids_features = torch.tensor(input_ids_features, dtype=torch.long)
    attention_mask_features = torch.tensor(attention_mask_features, dtype=torch.long)
    decoder_input_features = torch.tensor(decoder_input_features, dtype=torch.long)
    decoder_attention_mask_features = torch.tensor(decoder_attention_mask_features, dtype=torch.long)
    label_features = torch.tensor(label_features, dtype=torch.long)
    importance_masks = torch.tensor(importance_masks, dtype=torch.float)

    return (input_ids_features, attention_mask_features, decoder_input_features, decoder_attention_mask_features, label_features, importance_masks)


def to_list(tensor):
    return tensor.detach().cpu().tolist()

In [97]:

logger = logging.getLogger(__name__)

from torch.utils.data import (DataLoader, TensorDataset, RandomSampler, SequentialSampler)
from transformers import BartForConditionalGeneration
from transformers import PreTrainedTokenizerFast
from transformers import BartTokenizer
from sklearn.metrics import accuracy_score

# 학습
def train(config):
    tokenizer = PreTrainedTokenizerFast.from_pretrained(config["pretrained_model_name_or_path"])
    model = BartForConditionalGeneration.from_pretrained(config["pretrained_model_name_or_path"]).cuda()

    # 데이터 읽기
    train_datas = read_data(config["train_data_path"])
    test_datas = read_data(config["test_data_path"])

    # 데이터 전처리
    train_features = convert_data2feature(train_datas, config["max_length"], config["max_dec_length"], tokenizer)
    test_features = convert_data2feature(test_datas, config["max_length"], config["max_dec_length"], tokenizer)

    # DataLoader 생성
    train_dataloader = DataLoader(
        TensorDataset(*train_features),
        sampler=RandomSampler(TensorDataset(*train_features)),
        batch_size=config["batch_size"]
    )
    test_dataloader = DataLoader(
        TensorDataset(*test_features),
        sampler=SequentialSampler(TensorDataset(*test_features)),
        batch_size=config["batch_size"]
    )

    # Optimizer 설정
    optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)
    #optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5, weight_decay=0.01)
    global_step = 1
    model.zero_grad()
    set_seed(config["seed"])

    for epoch in range(config["epoch"]):
        for step, batch in enumerate(train_dataloader):
            model.train()
            batch = tuple(t.cuda() for t in batch)

            outputs = model(
                input_ids=batch[0],
                attention_mask=batch[1],
                decoder_input_ids=batch[2],
                decoder_attention_mask=batch[3],
                labels=batch[4],
                return_dict=True
            )

            # 가중치 적용
            loss = outputs.loss
            importance_mask = batch[5]
            weighted_loss = (loss * importance_mask).mean()

            weighted_loss.backward()
            optimizer.step()
            model.zero_grad()
            global_step += 1

            if (global_step + 1) % 50 == 0:
                print(f"Step {global_step + 1}, Loss: {weighted_loss.item()}")

            if global_step % 500 == 0:
                evaluate(config, model, tokenizer, test_dataloader)
                output_dir = os.path.join(config["output_dir_path"], f"checkpoint-{global_step}")
                if not os.path.exists(output_dir):
                    os.makedirs(output_dir)
                model.save_pretrained(output_dir)

    return global_step, weighted_loss.item()

# 평가
def evaluate(config, model, tokenizer, test_dataloader):
    model.eval()
    total_correct, total_samples = 0, 0
    all_refs, all_preds = [], []

    for batch in tqdm(test_dataloader):
        batch = tuple(t.cuda() for t in batch)
        dec_outputs = model.generate(
            input_ids=batch[0],
            attention_mask=batch[1],
            max_length=config["max_dec_length"],
            eos_token_id=1,
            do_sample=False,
        )

        batch_size = batch[0].size(0)
        dec_outputs = dec_outputs.tolist()
        dec_labels = batch[4].tolist()

        for i in range(batch_size):
            pred = "".join(tokenizer.convert_ids_to_tokens(dec_outputs[i][1:])).replace("<pad>", "").replace("</s>", "")
            ref = "".join(tokenizer.convert_ids_to_tokens(dec_labels[i][:-1])).replace("<pad>", "").replace("</s>", "")
            all_refs.append(ref)
            all_preds.append(pred)
            total_correct += int(pred == ref)
            total_samples += 1

    accuracy = total_correct / total_samples if total_samples > 0 else 0
    print(f"Accuracy: {accuracy * 100:.2f}% ({total_correct}/{total_samples})")
    sklearn_accuracy = accuracy_score(all_refs, all_preds)
    print(f"Sklearn Accuracy: {sklearn_accuracy * 100:.2f}%")


 

fine tunning 이전 코드


In [20]:
'''
logger = logging.getLogger(__name__)

from torch.utils.data import (DataLoader, TensorDataset, RandomSampler, SequentialSampler)
from transformers import BartForConditionalGeneration
from transformers import PreTrainedTokenizerFast
from transformers import BartTokenizer
from sklearn.metrics import accuracy_score

def train(config):
    tokenizer = PreTrainedTokenizerFast.from_pretrained(config["pretrained_model_name_or_path"])
    model = BartForConditionalGeneration.from_pretrained(config["pretrained_model_name_or_path"]).cuda()

    #tokenizer = BartTokenizer.from_pretrained(config["pretrained_model_name_or_path"])
    #model = BartForConditionalGeneration.from_pretrained(config["pretrained_model_name_or_path"]).cuda()

    """ Train the model """
    # 학습 및 평가 데이터 읽기
    train_datas = read_data(config["train_data_path"])
    test_datas = read_data(config["test_data_path"])

    # 입력 데이터 전처리
    train_input_ids_features, train_attention_mask_features, train_decoder_input_features, train_decoder_attention_mask_features, train_label_features = \
        convert_data2feature(train_datas, config["max_length"],config["max_dec_length"], tokenizer)
    test_input_ids_features, test_attention_mask_features, test_decoder_input_features, test_decoder_attention_mask_features, test_label_features = \
        convert_data2feature(test_datas, config["max_length"], config["max_dec_length"], tokenizer)

    # 학습 데이터를 batch 단위로 추출하기 위한 DataLoader 객체 생성
    train_features = TensorDataset(train_input_ids_features, train_attention_mask_features, train_decoder_input_features, train_decoder_attention_mask_features, train_label_features)
    train_dataloader = DataLoader(train_features, sampler=RandomSampler(train_features),
                                  batch_size=config["batch_size"])

    # 평가 데이터를 batch 단위로 추출하기 위한 DataLoader 객체 생성
    test_features = TensorDataset(test_input_ids_features, test_attention_mask_features, test_decoder_input_features, test_decoder_attention_mask_features, test_label_features)
    test_dataloader = DataLoader(test_features, sampler=SequentialSampler(test_features),
                                 batch_size=config["batch_size"])

    # 모델 학습을 위한 optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)
    global_step = 1

    tr_loss, logging_loss = 0.0, 0.0
    model.zero_grad()

    set_seed(config["seed"])

    for epoch in range(config["epoch"]):
        for step, batch in enumerate(train_dataloader):
            # Skip past any already trained steps if resuming training
            model.train()
            batch = tuple(t.cuda() for t in batch)
            outputs = model(input_ids=batch[0],
                              attention_mask=batch[1],
                              decoder_input_ids=batch[2],
                              decoder_attention_mask=batch[3],
                              labels=batch[4],
                                return_dict=True)

            loss = outputs["loss"]

            loss.backward()
            if (global_step+1) % 50 == 0:
                print("{} Processed.. Total Loss : {}".format(global_step+1, loss.item()))

            tr_loss += loss.item()

            optimizer.step()
            model.zero_grad()
            global_step += 1

            # Save model checkpoint
            if global_step % 500 == 0:
                #평가
                evaluate(config, model, tokenizer, test_dataloader)
                output_dir = os.path.join(config["output_dir_path"], "checkpoint-{}".format(global_step))
                print("Model Save in {}".format(output_dir))
                if not os.path.exists(output_dir):
                    os.makedirs(output_dir)

                # Take care of distributed/parallel training
                model_to_save = model.module if hasattr(model, "module") else model
                model_to_save.save_pretrained(output_dir)

    return global_step, tr_loss / global_step

def evaluate(config, model, tokenizer, test_dataloader):
    model.eval()
    total_correct = 0
    total_samples = 0
    all_refs = []
    all_preds = []
    for batch in tqdm(test_dataloader):
        batch = tuple(t.cuda() for t in batch)

        dec_outputs = model.generate(input_ids = batch[0],
                                     attention_mask=batch[1],
                                     max_length=config["max_dec_length"],
                                     eos_token_id=1,
                                     do_sample=False,
                                     bad_words_ids=[[5]]
                                    )

        batch_size = batch[0].size()[0]

        dec_outputs = dec_outputs.tolist()
        dec_labels = batch[4].tolist()

        for index in range(batch_size):
            if 1 in dec_outputs[index]:
                dec_outputs[index] = dec_outputs[index]
            if -100 in dec_labels[index]:
                dec_labels[index] = dec_labels[index][:dec_labels[index].index(-100)]
            pred = "".join(tokenizer.convert_ids_to_tokens(dec_outputs[index][1:])).replace("Ġ", " ").replace("<pad>", "").replace("</s>", "").replace("▁", " ")
            ref = "".join(tokenizer.convert_ids_to_tokens(dec_labels[index][:-1])).replace("Ġ", " ").replace("<pad>", "").replace("</s>", "").replace("▁", " ")
            all_refs.append(ref)
            all_preds.append(pred)

            if pred == ref:
                total_correct += 1
            total_samples += 1

            #print("Correct : {}\nPredict  : {}\n".format(ref, pred))
     # Calculate accuracy
    accuracy = total_correct / total_samples if total_samples > 0 else 0
    print("\nAccuracy: {:.2f}% ({}/{})".format(accuracy * 100, total_correct, total_samples))

    # Optionally: Use sklearn's accuracy_score for further validation
    sklearn_accuracy = accuracy_score(all_refs, all_preds)
    print("Sklearn Accuracy: {:.2f}%".format(sklearn_accuracy * 100))

학습 실행

In [98]:
root_dir = "/kaggle/working/"
if (__name__ == "__main__"):
    save_dir = os.path.join(root_dir, "save")
    output_dir = os.path.join(root_dir, "output")
    cache_dir = os.path.join(root_dir, "cache")

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    if not os.path.exists(cache_dir):
        os.makedirs(cache_dir)

    set_seed(seed=1234)

    config = {"mode": "train",
              "train_data_path": "/kaggle/working/filtered_test_data.jsonl",
              "test_data_path": "/kaggle/working/filtered_test_data.jsonl",
              "output_dir_path": output_dir,
              "save_dir_path": save_dir,
              "cache_dir_path": cache_dir,
              "pretrained_model_name_or_path": "hyunwoongko/kobart",
              "max_length": 512,
              "max_dec_length": 60,
              "epoch": 30,
              "batch_size": 16,
              "seed": 42,
              }

    if (config["mode"] == "train"):
        train(config)

You passed along `num_labels=3` with an incompatible id to label map: {'0': 'NEGATIVE', '1': 'POSITIVE'}. The number of labels wil be overwritten to 2.
convert_data2feature: 100%|██████████| 263/263 [00:00<00:00, 1067.14it/s]
convert_data2feature: 100%|██████████| 263/263 [00:00<00:00, 1198.75it/s]


Step 50, Loss: 5.016598151996732e-05
Step 100, Loss: 1.0974742508551572e-05
Step 150, Loss: 6.810571903770324e-06
Step 200, Loss: 4.172928129264619e-06
Step 250, Loss: 3.4979943848156836e-06
Step 300, Loss: 5.864443210157333e-06
Step 350, Loss: 1.2459269100872916e-06
Step 400, Loss: 2.2050262487027794e-06
Step 450, Loss: 7.236949386424385e-07
Step 500, Loss: 1.5641962818335742e-06


100%|██████████| 17/17 [00:08<00:00,  1.90it/s]
Non-default generation parameters: {'forced_eos_token_id': 1}


Accuracy: 98.48% (259/263)
Sklearn Accuracy: 98.48%


In [28]:
import base64
import requests
import re
def preprocess_text(text):
    if isinstance(text, list):  # 리스트일 경우 첫 번째 값 반환
        text = text[0] if text else ""  # 리스트가 비어있으면 빈 문자열 반환
    
    if text is None:  # None 값 처리
        return ""
    
    # 줄바꿈 제거 및 공백 정리
    text = text.replace("\n", " ").strip()
    text = re.sub(r'\s+', ' ', text)  # 중복 공백 제거
    return text
def extract_text_from_image_google_vision(image_path, api_key):
    url = f"https://vision.googleapis.com/v1/images:annotate?key={api_key}"

    with open(image_path, "rb") as image_file:
        image_content = base64.b64encode(image_file.read()).decode('utf-8')

    # 요청 페이로드 생성
    payload = {
        "requests": [
            {
                "image": {"content": image_content},  
                "features": [{"type": "TEXT_DETECTION"}]
            }
        ]
    }

    headers = {"Content-Type": "application/json"}

    # Google Vision API 호출
    response = requests.post(url, headers=headers, json=payload)
    
    if response.status_code == 200:
        result = response.json()
        try:
            # 텍스트만 추출
            texts = result["responses"][0]["textAnnotations"]
            return texts[0]["description"].strip() if texts else ""
        except Exception as e:
            print("결과 파싱 중 오류:", e)
            return ""
    else:
        print("API 호출 오류:", response.status_code, response.text)
        return ""

'''
def extract_schedule_with_gemini(api_key, text):
    url = "https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent"
    headers = {"Content-Type": "application/json"}
    
    # Prompt 생성
    prompt = f"""
    아래 텍스트를 분석하여 일정에 기록할 수 있도록 일정 제목, 날짜, 시간과 같은 일정 정보를 JSON 형식으로 추출해주세요.
    반환되는 날짜는 반드시 "YYYY.MM.DD" 형식으로, 시간은 "HH:MM" (24시간제) 형식으로 통일해주세요.
    만약 날짜나 시간 정보가 명확하지 않다면, 해당 필드는 "Null"로 설정해주세요.
    여러 날짜와 시간이 나타난다면, 가장 이른 날짜와 시간을 기준으로 표시해주세요.

    텍스트: {text}

    JSON 형식 예시:
    {{
      "original_text": "전체 텍스트",
      "schedule_info": {{
        "event_title": "이벤트 제목",
        "date": "2024-12-25",
        "time": "14:00"
      }}
    }}
    """

    # 요청 페이로드 생성
    payload = {
        "contents": [{"parts": [{"text": prompt}]}]
    }
    # API 호출
    response = requests.post(url, headers=headers, json=payload, params={"key": api_key})
    
    if response.status_code == 200:
        try:
            response_text = response.json()["candidates"][0]["content"]["parts"][0]["text"]
            #return json.loads(response_text) 
            # JSON 형식으로 파싱
            return response_text
        except Exception as e:
            print("결과 파싱 중 오류 발생:", e)
            return None
    else:
        print("API 호출 오류:", response.status_code, response.text)
        return None
'''
gemini_api_key = "AIzaSyCEtG0v1gxLCykmxscNwFat0rgeWb990NA" 
text = extract_text_from_image_google_vision("/kaggle/input/image2/2024-12-19  10.39.46.png",  gemini_api_key)
text = preprocess_text(text)
#answer = extract_schedule_with_gemini( gemini_api_key, text)
#print(answer)

train Model로 title 추출

In [33]:
def generate_title_from_text(model, tokenizer, text, max_length, max_dec_length):
    """본문에서 제목 생성"""
    model.eval()

    # 입력 텍스트를 토큰화 및 Tensor 변환
    inputs = tokenizer(
        text, 
        return_tensors="pt", 
        max_length=max_length, 
        truncation=True, 
        padding="max_length"
    )

    input_ids = inputs['input_ids'].cuda()
    attention_mask = inputs['attention_mask'].cuda()

    # 모델을 사용해 제목 생성
    with torch.no_grad():
        generated_ids = model.generate(
            input_ids=input_ids,
            attention_mask=attention_mask,
            max_length=max_dec_length,
            eos_token_id=tokenizer.eos_token_id,
            do_sample=False
        )

    # 생성된 제목 디코딩
    title = tokenizer.decode(
        generated_ids[0], 
        skip_special_tokens=True, 
        clean_up_tokenization_spaces=True
    )
    return title


In [37]:
#test
if __name__ == "__main__":
    # 모델 및 토크나이저 로드
    model_path = "/kaggle/working/output/checkpoint-500"
    tokenizer_path = "hyunwoongko/kobart"
    model = BartForConditionalGeneration.from_pretrained(model_path).cuda()
    tokenizer = PreTrainedTokenizerFast.from_pretrained(tokenizer_path)

    # 이미지 경로
    print("Extracted Text:\n",text)

    # 학습된 모델로 제목 생성
    max_length = 250
    max_dec_length = 60
    generated_title = generate_title_from_text(model, tokenizer,text, max_length, max_dec_length)
    print("Generated Title:\n", generated_title)


You passed along `num_labels=3` with an incompatible id to label map: {'0': 'NEGATIVE', '1': 'POSITIVE'}. The number of labels wil be overwritten to 2.


Extracted Text:
 ITL Meet Up 2: 전체 타임라인 공개 12월 20일 (금) 진행될 TL Meet Up 2의 전체 타임라인을 공개합니다! 아래 안내 사항을 꼭 숙지하셔서 원활하게 행사에 참여하시길 바랍니다 입장 시간은 파트별 상이합니다. [1부]20시-20시 45분 (디자인 / 서버) 쉬는 시간: 20시 45분 - 21시 [2부]21시-21시 45분(PM/ 안드로이드 / iOS/웹) 쉬는시간 : 21시 45분 - 22시 [3부] 22시-22시 45분 (파트 무관 자유 네트워킹) 퇴장 시간: 22시 45분 - 23시 --- OTL 후보분들은 19시 30분(30분 전)까지 ZEP에 입장해주세요. PM/안드로이드/iOS/웹 참여자분들은 19시 50분(10분 전)까지 ZEP에 입장해주세요. 디자인/서버 참여자분들은 20시 50분(10분 전)까지 ZEP에 입장해주세요. 밋업이 시작되면 각 파트장님 인솔에 따라 이동해... 화할 때 주의가 필요한 방입니다. 잠금 ㅎ
Generated Title:
 ITL Meet Up 2: 전체 타임라인 공개


Pipeline을 통한 NER과정 수행

In [87]:
from transformers import pipeline
from dateutil.parser import parse
import re

import torch

# GPU 사용 가능 여부 확인
device = 0 if torch.cuda.is_available() else -1

ner_model = pipeline(
    "token-classification",
    model="monologg/koelectra-small-finetuned-naver-ner",
    tokenizer="monologg/koelectra-small-finetuned-naver-ner",
    device=device)

def extract_datetime_entities(text):
    """NER을 통해 텍스트에서 날짜 및 시간을 추출"""
    # NER 결과
    ner_results = ner_model(text)
    #print(ner_results)

    # 날짜 및 시간 엔티티만 필터링
    date_entities = [res["word"] for res in ner_results if "DAT" in res["entity"]]
    time_entities = [res["word"] for res in ner_results if "TIM" in res["entity"]]
    print(date_entities)
    print(time_entities)    
    # 정규화
    normalized_times = normalize_times(time_entities)
    normalized_dates = date_entities

    return normalized_dates,normalized_times


  '''normalized_dates = []
    for entity in date_entities:
        try:
            # 날짜/시간 정규화
            normalized_date = parse(entity, fuzzy=True)
            normalized_dates.append(normalized_date)
        except Exception:
            pass  # 정규화 실패시 무시
            '''


In [77]:
def get_earliest_date(dates):
    """가장 빠른 날짜/시간 반환"""
    if not dates:
        return None
    return min(dates)
def get_earliest_time(times):
    """가장 빠른 날짜/시간 반환"""
    if not times:
        return None
    return min(times)



어플리케이션 통신

In [None]:
from pyngrok import ngrok

# ngrok 인증 토큰 등록
ngrok.set_auth_token("2iLDfJFLUtVvODLsCKQghY5gqdg_42WA6QBqDa9BABji6QRae")

In [None]:
from flask import Flask, request, jsonify
import io
import requests
from pyngrok import ngrok
from transformers import pipeline
from dateutil.parser import parse
import re
import torch

# GPU 사용 가능 여부 확인
device = 0 if torch.cuda.is_available() else -1

gemini_api_key = "AIzaSyCEtG0v1gxLCykmxscNwFat0rgeWb990NA" 
model_path = "/kaggle/working/output/checkpoint-500"
tokenizer_path = "hyunwoongko/kobart"
model = BartForConditionalGeneration.from_pretrained(model_path).cuda()
tokenizer = PreTrainedTokenizerFast.from_pretrained(tokenizer_path)
# 학습된 모델로 제목 생성
max_length = 250
max_dec_length = 60

ner_model = pipeline(
    "token-classification",
    model="monologg/koelectra-small-finetuned-naver-ner",
    tokenizer="monologg/koelectra-small-finetuned-naver-ner",
    device=device)

app = Flask(__name__)

@app.route('/predict', methods=['POST'])
def predict():
    data = request.json
    if 'image' not in data:
        return jsonify({'error': 'No image URL provided'}), 400

    image_url = data['image']
    try:
        # 이미지 URL에서 이미지를 다운로드합니다.
        bucket_name, file_name = extract_bucket_and_filename(image_url)
        # 버킷 가져오기
        bucket = client.get_bucket(bucket_name)

        # 블롭(파일) 가져오기
        blob = bucket.blob(file_name)

        # 블롭 URL에서 파일 다운로드
        image_data = blob.download_as_bytes()

        # 이미지 열기
        img= Image.open(io.BytesIO(image_data)).convert('RGB')
#         response = requests.get(image_url)
#         response.raise_for_status()
#         img = Image.open(io.BytesIO(response.content)).convert('RGB')
    except requests.RequestException as e:
        return jsonify({'error': f'Failed to download image: {str(e)}'}), 400
    except Exception as e:
        return jsonify({'error': f'Failed to open image: {str(e)}'}), 400

    extracted_text = extract_text_from_image_google_vision(img) 
    generated_title = generate_title_from_text(model, tokenizer,extracted_text, max_length, max_dec_length)
    dates,times = extract_datetime_entities(extracted_text)
    date= get_earliest_date(dates)
    time = get_earliest_time(times)
    print(f"결과 확인: {generated_title,date,time}")

    return jsonify({'schedule_title': generated_title, 'date': date, 'time': time})
if __name__ == '__main__':
    # ngrok을 사용하여 공개 URL 생성
    public_url = ngrok.connect(5000)
    print(f"Public URL: {public_url}")
    app.run(host='0.0.0.0', port=5000)
