# **AI Application Specialist Practical Problem 1**

## assets.zip 파일을 업로드 하세요

In [None]:
from google.colab import files
uploaded = files.upload()

## 업로드한 파일의 압축을 푸세요

In [None]:
!unzip ./assets.zip

## 시험을 위한 라이브러리를 설치하세요. 설치 중간에 나오는 error는 무시해 주세요~. colab환경과 라이브러리 충돌에 관한 에러로 시험의 결과에 영향을 주지 않습니다.

In [None]:
!pip install -q datasets
!pip install -q openai

## Helper 함수들을 실행해 주세요

In [None]:
### helper functions
def get_queries():
    queries = [
        "삼성 갤럭시s24를 매력적인 한문장으로 홍보해주세요.",
        "삼성 갤럭시 버즈3를 십대들이 좋아하는 한문장으로 홍보해주세요.",
        "삼성 갤럭시 링 홍보문구를 스펙 고려해서 한문장으로 만들어 주세요.",
        "삼성 전자렌지를 매력적인 한문장으로 홍보해주세요.",
        "삼성 냉장고를 한 문장으로 홍보한다면?",
    ]

    return queries

def get_prompt(context, query):
    prompt_msg = f"<|USER|>Context information is below.\n---------------------\n{context}\n---------------------\nGiven the context information and not prior knowledge, answer the query.\nQuery: {query}\nAnswer: <|ASSISTANT|>"
    return prompt_msg

def pretty_print(title, msg):
    print("#" * 10," ", title," ", "#" * 10)
    print(msg)
    print("#" * 34)
    print("\n")

def get_specs_docs(spec_docs_dir =""):
    from pathlib import Path
    from glob import glob
    spec_paths = glob(str(Path(spec_docs_dir)) + "/*.txt")
    spec_raw_text_li = []
    for spec_path in spec_paths:
        with open(Path(spec_path), encoding="utf-8") as f:
            content = f.read()
            spec_raw_text_li.append(content)
    return spec_raw_text_li

def display_plt_img(img_path):
    import matplotlib.pyplot as plt
    from matplotlib.gridspec import GridSpec
    from PIL import Image
    plt.figure(figsize = (20,16))
    plt.imshow(Image.open(img_path))
    plt.axis('off')
    plt.show()


# Step 1. Retreiver

5개 제품의 Spec문서는 ./data/spec_docs/ 에 txt파일 형태로 제공, 사용자의 각 제품에 대한 질문은 get_queries()로 확인 가능함.
각각 질문에 대응해서 적합한 spec_docs 폴더 안의 spec documents를 찾아내는 Retriever Class를 완성하세요

In [None]:
##### model 및 spec docs 불러오기 ####
embedding_model_name = "BAAI/bge-m3"
SPEC_DOCS_DIR = "./data/spec_docs"

In [None]:
spec_docs = get_specs_docs(spec_docs_dir=SPEC_DOCS_DIR)
for i, spec_doc in enumerate(spec_docs):
    print(i,spec_doc)

In [None]:
queries = get_queries()
print(queries)

In [None]:
### Retriever를 위한 라이브러리 불러오기
import warnings
warnings.filterwarnings('ignore')
from transformers import AutoTokenizer, AutoModel
import torch

Q A-1) get_embedding(text)구현 (text->embedding 구현)
 - 함수 인자로 받은 text를 self.model을 이용, embedding vector로 변환 후 반환
   1. self.tokenizer를 사용하여 tokenization 후, self.model에 입력하여 embedding matrix (#tokens * embedding_size) 생성
   2. embedding matrix를 torch.mean을 사용하여 embedding_size의 embedding vector 반환

Q A-2) build_doc_embedding()구현 (spec문서들->embedding 구현)
 - 각 document에 대한 embedding을 생성
   1. 각 document를 get_embedding()을 사용하여 tensor 타입의 matrix (len(self.documents) * self.model.config.hidden_size) 반환

Q A-3) retrieve_doc(query)구현 (text->top-1 doc)
- 함수 인자로 받은 query를 documents_embeddings와 유사도 비교, query와 가장 유사한 document 1개를 반환.
- 유사도 비교는 Retriever class에 구현된 cosine_similarity() 사용

In [None]:
class Retriever:
    def __init__(self, documents, embedding_model_name):
        self.documents = documents
        self.tokenizer = AutoTokenizer.from_pretrained(embedding_model_name)
        self.model = AutoModel.from_pretrained(embedding_model_name)
        self.documents_embeddings = self.build_docs_embedding()

    def get_embedding(self, text):
        ### Q A-1) 작성 필요
        return embedding

    def build_docs_embedding(self):
        ### Q A-2) 작성 필요
        return documents_embeddings

    def retrieve_doc(self, query):
        ### Q A-3) 작성 필요
        return doc

    def cosine_similarity(self, query_embedding):
        scores = self.documents_embeddings.matmul(query_embedding) / (
                    torch.linalg.norm(self.documents_embeddings, dim=1) * torch.linalg.norm(query_embedding))
        return scores



### Step A-2 Retriever 결과 확인

In [None]:
queries = get_queries()
spec_docs = get_specs_docs(spec_docs_dir=SPEC_DOCS_DIR)

### retriever 설정 ###
retriever = Retriever(spec_docs.copy(), embedding_model_name)

for i, query in enumerate(queries):

    pretty_print(str(i)+" QUESTION", query)
    context = retriever.retrieve_doc(query=query)
    pretty_print(str(i)+" Context",context)

## Step B Finetune

In [None]:
query = get_queries()[0] ### Galaxy에 대한 질문만 수행
spec_docs = get_specs_docs(spec_docs_dir=SPEC_DOCS_DIR)

### retriever 설정 ###
retriever = Retriever(spec_docs.copy(), embedding_model_name)

### Retriever 결과 확인 ###
pretty_print("QUESTION", query)
context = retriever.retrieve_doc(query=query)
pretty_print("RAG",context)

### sLM을 위한 prompt 정의###
prompt = get_prompt(context=context, query=query)

pretty_print("Prompt",prompt)

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer, DataCollatorForLanguageModeling
from datasets import DatasetDict

path_pretrained_model = "./models/pretrained_model/"
path_finetuned_model = "./models/finetuned_model/"
path_json_dataset = "./data/training_data.jsonl"

device_map = "cpu"
# hyperparams for generation
max_new_tokens = 64

In [None]:
# Tokenizing and generating a response
def generate_response(prompt, model, tokenizer):
    tokenized_prompt = tokenizer([prompt], return_tensors='pt')
    generated_ids = model.generate(tokenized_prompt.input_ids,
                   tokenizer=tokenizer,
                   max_new_tokens=max_new_tokens)
    #Stripping the input text from generated_ids
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(tokenized_prompt.input_ids, generated_ids)
    ]

    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    return response

### 현재 Pretrained 모델의 학습전 Response 확인

In [None]:
# Loading the pretrained model
model = AutoModelForCausalLM.from_pretrained(
    path_pretrained_model,
    device_map=device_map
)

#Loading the tokenizer
tokenizer = AutoTokenizer.from_pretrained(
    path_pretrained_model,
    device_map=device_map
)

response = generate_response(prompt, model, tokenizer)
print("response(before training):", response)


Q B-1) training dataset(jsonl format) 로드하여 DatasetDict 객체를 생성하세요 (아래 조건 만족 필요)
 - split: "train"

In [None]:
# Loading the dataset
### Q B-1-1) 작성 필요
dataset =

### Q B-1-2) 작성 필요
dataset = dataset.map(lambda x: tokenizer(x["### B-1-2) 작성 필요"], truncation=True, padding=True, max_length=512), batched=True, remove_columns=dataset["train"].column_names)

Q B-2) DataCollatorForLanguageModeling 객체를 생성하세요 (적절한 params 사용)

In [None]:
# Constructing a collator
data_collator = ### Q B-2) 작성 필요

Q B-3) 제시된 값들을 이용하여 TrainingArguments 객체를 생성하세요
- learning rate: 7e-4
- epoch: 100
- batch size: 4
- cpu 사용: True
- report_to: "none"

In [None]:
# Setting TrainingArguments
training_args = ### Q B-3) 작성 필요

Constructing Trainer and Training the model

In [None]:
# Constructing trainer, training and saving the model.
trainer = Trainer(model, train_dataset=dataset['train'], args=training_args, data_collator=data_collator)
trainer.train()

trainer.save_model(path_finetuned_model)

In [None]:
# pretrained_model's tokenizer copy to finetuned_model
import shutil
copy_file_li = ["tokenizer.json", "tokenizer_config.json","special_tokens_map.json"]
for copy_file in copy_file_li:
    shutil.copy(f"{path_pretrained_model}/{copy_file}", f"{path_finetuned_model}/{copy_file}")

#### Step B Finetune한 모델 홍보문구 결과 확인

In [None]:
# Loading the finetuned model
model = AutoModelForCausalLM.from_pretrained(
    path_finetuned_model,
    device_map=device_map
)

#Loading the tokenizer
tokenizer = AutoTokenizer.from_pretrained(
    path_finetuned_model,
    device_map=device_map
)

response = generate_response(prompt, model, tokenizer)
print("response(after training):", response)

sLM의 출력 결과를 slogan의 변수에 저장하세요

In [None]:
slogan = response

# Step C api call로 홍보 이미지 생성

In [None]:
open_api_key = ""

Q C-1) 아래 지시사항을 만족하는 call_api_dalle라는 함수를 완성하세요.

시험 응시자 개별 open_api_key를 하기 변수에 입력하세요

[지시사항]
* 프롬프트: target의 주변을 target과 어울리는 배경으로 채워주는 프롬프트
* 모델이름: dall-e-3
* 해상도: 가로, 세로 1024 px
* 품질수준: standard
* 생성이미지 장수: 1
* 리턴: 생성된 이미지 위치

In [None]:
def call_api_dalle(target =""):
    import urllib.request
    from openai import OpenAI
    import matplotlib.pyplot as plt
    import matplotlib.image as img
    client = OpenAI(api_key=open_api_key)
    response = client.images.generate(
        ### Q C-1) 작성 필요
    )
    image_url = response.data[0].url
    image_path = f"./{str(target)}_result.png"
    urllib.request.urlretrieve(image_url,image_path)
    print("Image_Saved",image_path)

    plt.imshow(img.imread(image_path))
    plt.show()

    return image_path


In [None]:
call_api_dalle("갤럭시 s24 스마트폰")

생성된 이미지의 위치를 generated_dall_img_path 저장하세요

In [None]:
generated_dall_img_path = "./갤럭시 s24 스마트폰_result.png"

## 홍보문구와 이미지를 최종 확인

생성된 홍보문구를 출력하세요

In [None]:
pretty_print("홍보문구",slogan)

생성된 홍보이미지를 출력하세요

In [None]:
display_plt_img(generated_dall_img_path) ### 제공

#### 실행한 jupyter notebook을 다운로드 한 후 이름을 knox id로 변경후 메일(ai.edu@samsung.com)로 제출하세요.
#### <span style="color:red">제출 파일 확인(knox id.ipynb )</span>
#### 시험 감독관에게 이야기 후 퇴실하세요.