In [None]:
!pip install scikit-learn


In [None]:
!pip install google-cloud-translate

In [None]:
!pip install googletrans

In [None]:
!pip install spacy googlemaps requests transformers torch datasets faiss-cpu faiss-gpu numpy

In [1]:
gmaps_key = 'AIzaSyD8wsLwdy0gyFfTnJBsj2ZA-biZ1OlKHPI'

In [18]:
import faiss
import numpy as np
from transformers import RagTokenizer, RagSequenceForGeneration
from sklearn.feature_extraction.text import TfidfVectorizer
import googlemaps

# Google Places API 설정
gmaps = googlemaps.Client(key=gmaps_key)

# RAG 모델과 토크나이저 로드
tokenizer = RagTokenizer.from_pretrained("facebook/rag-sequence-nq")
model = RagSequenceForGeneration.from_pretrained("facebook/rag-sequence-nq")

# 구글 리뷰를 수집하는 함수
def fetch_reviews(location, radius=1000):
    places_result = gmaps.places_nearby(location=location, radius=radius, type='restaurant')
    reviews = []

    for place in places_result['results']:
        place_details = gmaps.place(place_id=place['place_id'])
        place_reviews = place_details.get('result', {}).get('reviews', [])
        reviews.extend([review['text'] for review in place_reviews])

    return reviews

# 리뷰 데이터를 벡터화하여 FAISS 인덱스 생성
def create_faiss_index(reviews):
    vectorizer = TfidfVectorizer(stop_words='english')
    X = vectorizer.fit_transform(reviews).toarray().astype('float32')
    dimension = X.shape[1]
    index = faiss.IndexFlatL2(dimension)
    index.add(X)
    return index, vectorizer

# 사용자 입력에 대한 응답 생성 (RAG 모델 사용)
def generate_response_rag(user_input, reviews, index, vectorizer):
    # 사용자 입력을 벡터화
    input_vector = vectorizer.transform([user_input]).toarray().astype('float32')
    # FAISS 인덱스에서 가장 유사한 문서 검색
    distances, indices = index.search(input_vector, k=5)  # k=5: 상위 5개 문서 검색

    # 검색된 리뷰 출력
    context = [reviews[i] for i in indices[0]]

    # RAG 모델을 사용하여 응답 생성
    # Tokenize context and input
    inputs = tokenizer(user_input, return_tensors="pt")
    context_encodings = tokenizer(context, return_tensors="pt", padding=True, truncation=True)

    # Generate response
    outputs = model.generate(
        input_ids=inputs["input_ids"],
        attention_mask=inputs["attention_mask"],
        context_input_ids=context_encodings["input_ids"],
        context_attention_mask=context_encodings["attention_mask"],
        num_beams=4,
        early_stopping=True
    )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)

    return response, context

# 주요 함수 호출 예시
def main():
    location = (37.5665, 126.978)  # 서울 시청의 좌표 (위도, 경도)
    reviews = fetch_reviews(location)

    if not reviews:
        print("No reviews found.")
        return

    index, vectorizer = create_faiss_index(reviews)

    user_input = "서울에서 좋은 레스토랑 추천해 주세요."
    rag_response, similar_reviews = generate_response_rag(user_input, reviews, index, vectorizer)

    print("RAG Model Generated Response:", rag_response)
    print("Similar Reviews:")
    for review in similar_reviews:
        print("-", review)

if __name__ == "__main__":
    main()


The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'RagTokenizer'. 
The class this function is called from is 'DPRQuestionEncoderTokenizer'.
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'RagTokenizer'. 
The class this function is called from is 'DPRQuestionEncoderTokenizerFast'.
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'RagTokenizer'. 
The class this function is called from is 'BartTokenizer'.
The tokenizer class you load from this checkpoint is not the same type as the class this function is called fr

AssertionError: Make sure that `context_input_ids` are passed, if no `retriever` is set. Alternatively, you can set a retriever using the `set_retriever(...)` function.

디버깅용 코드

구글 리뷰 수집

In [2]:
import googlemaps

# Google Places API 설정
gmaps = googlemaps.Client(key=gmaps_key)

def fetch_reviews(location, radius=1000):
    try:
        places_result = gmaps.places_nearby(location=location, radius=radius, type='restaurant')
        reviews = []

        for place in places_result['results']:
            place_details = gmaps.place(place_id=place['place_id'])
            place_reviews = place_details.get('result', {}).get('reviews', [])
            reviews.extend([review['text'] for review in place_reviews])

        return reviews
    except Exception as e:
        print(f"Error fetching reviews: {e}")
        return []

# 테스트 코드
location = (37.5665, 126.978)  # 서울 시청의 좌표
reviews = fetch_reviews(location)
print(f"Fetched {len(reviews)} reviews")
#reviews


Fetched 93 reviews


2. 한글 리뷰만 추출

In [32]:
import googlemaps
import re

# Google Places API 설정
gmaps = googlemaps.Client(key=gmaps_key)

def is_korean(text):
    return bool(re.search('[가-힣]', text))

def fetch_reviews(location, radius=1000):
    try:
        places_result = gmaps.places_nearby(location=location, radius=radius, type='restaurant')
        reviews = []

        for place in places_result['results']:
            place_details = gmaps.place(place_id=place['place_id'])
            place_reviews = place_details.get('result', {}).get('reviews', [])
            # 한글 리뷰만 필터링
            korean_reviews = [review['text'] for review in place_reviews if is_korean(review['text'])]
            reviews.extend(korean_reviews)

        return reviews
    except Exception as e:
        print(f"Error fetching reviews: {e}")
        return []

# 테스트 코드
location = (37.5665, 126.978)  # 서울 시청의 좌표
reviews = fetch_reviews(location)
print(f"Fetched {len(reviews)} Korean reviews")
reviews


Fetched 1 Korean reviews


['Pretty famous korean restaurant. The food was really good. If you get the course meal you get some of everything. Went and ate the lunch course meal for ₩25,000 per person. Everything was tasty and delicious. They have a main dining area and private rooms for groups. This restaurant is in the basement of the 하나로 building.']

반경 10km 검색

In [3]:
import googlemaps
import time

# Google Maps API 설정
gmaps = googlemaps.Client(key=gmaps_key)

def get_location_from_city(city_name):
    try:
        geocode_result = gmaps.geocode(city_name)
        if geocode_result:
            # 첫 번째 결과의 위치 정보를 사용
            location = geocode_result[0]['geometry']['location']
            return (location['lat'], location['lng'])
        else:
            print(f"Could not find location for city: {city_name}")
            return None
    except Exception as e:
        print(f"Error getting location: {e}")
        return None

def fetch_reviews(location, radius=10000):  # 10km = 10000m
    reviews = []
    next_page_token = None

    while True:
        try:
            # 식당 리뷰 수집
            places_result = gmaps.places_nearby(location=location, radius=radius, type='restaurant', page_token=next_page_token)
            for place in places_result['results']:
                place_name = place['name']
                place_details = gmaps.place(place_id=place['place_id'])
                place_reviews = place_details.get('result', {}).get('reviews', [])
                for review in place_reviews:
                    reviews.append((place_name, review['text']))

            # 관광지 리뷰 수집
            places_result = gmaps.places_nearby(location=location, radius=radius, type='tourist_attraction', page_token=next_page_token)
            for place in places_result['results']:
                place_name = place['name']
                place_details = gmaps.place(place_id=place['place_id'])
                place_reviews = place_details.get('result', {}).get('reviews', [])
                for review in place_reviews:
                    reviews.append((place_name, review['text']))

            # 다음 페이지 토큰 처리
            next_page_token = places_result.get('next_page_token')
            if not next_page_token:
                break

            # Google API 요청 시 대기 시간 추가
            time.sleep(2)

        except Exception as e:
            print(f"Error fetching reviews: {e}")
            break

    return reviews

# 테스트 코드
city_name = input("Enter the city name: ")
location = get_location_from_city(city_name)

if location:
    reviews = fetch_reviews(location)
    if reviews:
        print(f"Fetched {len(reviews)} reviews")
        for place_name, review_text in reviews:
            print(f"Place: {place_name}\nReview: {review_text}\n")
    else:
        print("No reviews found.")
else:
    print("Failed to fetch location.")


Enter the city name: 서울
Fetched 553 reviews
Place: Hotel Prince Seoul
Review: The hotel room is perfect for me as a solo traveller, and the staff were really helpful. My toilet kept flushing and the safe was a little faulty so the staff kindly helped me fix the problem. The location is just nice; a few feet away from the Myeongdong train station, and it’s just across the street from the shopping area!! AND there’s a Starbucks just next to the hotel!! There’s 2 Oliveyoung branches near the hotel, which made it really convenient for shopping, and a nearby small 7-11 convenience store for any late night cravings ☺️✨ I also managed to exchange my small notes for bigger notes with the help of the staff, and they’re willing to go the extra mile to search up the stores I wanted to visit (but didn’t know where the nearest branch was) if they were unsure themselves. Really friendly people, and they have free storage lockers for hotel guests who would like to store their luggages before their ch

Faiss 인덱스 생성

In [4]:
import faiss
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer

def create_faiss_index(reviews):
    try:
        vectorizer = TfidfVectorizer(stop_words='english')
        X = vectorizer.fit_transform(reviews).toarray().astype('float32')
        dimension = X.shape[1]
        index = faiss.IndexFlatL2(dimension)
        index.add(X)
        return index, vectorizer
    except Exception as e:
        print(f"Error creating FAISS index: {e}")
        return None, None

# 테스트 코드
if reviews:
    index, vectorizer = create_faiss_index(reviews)
    if index is not None:
        print("FAISS index created successfully")
    else:
        print("Failed to create FAISS index")


Error creating FAISS index: 'tuple' object has no attribute 'lower'
Failed to create FAISS index


에러수정

In [5]:
import faiss
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer

def create_faiss_index(reviews):
    try:
        # 모든 리뷰가 문자열인지 확인
        reviews = [str(review) for review in reviews]

        vectorizer = TfidfVectorizer(stop_words='english')
        X = vectorizer.fit_transform(reviews).toarray().astype('float32')

        print(f"Vectorized reviews shape: {X.shape}")

        if X.size == 0:
            raise ValueError("No data to add to the FAISS index.")

        dimension = X.shape[1]
        index = faiss.IndexFlatL2(dimension)

        if X.shape[0] > 0:
            index.add(X)
        else:
            raise ValueError("The input matrix is empty.")

        print(f"Index dimension: {dimension}")
        return index, vectorizer

    except Exception as e:
        print(f"Error creating FAISS index: {e}")
        return None, None

# 테스트 코드
if reviews:
    index, vectorizer = create_faiss_index(reviews)
    if index is not None:
        print("FAISS index created successfully")
    else:
        print("Failed to create FAISS index")
else:
    print("No reviews to process")


Vectorized reviews shape: (553, 2812)
Index dimension: 2812
FAISS index created successfully


RAG 모델 로드

In [6]:
from transformers import RagTokenizer, RagSequenceForGeneration

def load_rag_model():
    try:
        tokenizer = RagTokenizer.from_pretrained("facebook/rag-sequence-nq")
        model = RagSequenceForGeneration.from_pretrained("facebook/rag-sequence-nq")
        return tokenizer, model
    except Exception as e:
        print(f"Error loading RAG model: {e}")
        return None, None

# 테스트 코드
tokenizer, model = load_rag_model()
if tokenizer and model:
    print("RAG model loaded successfully")
else:
    print("Failed to load RAG model")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'RagTokenizer'. 
The class this function is called from is 'DPRQuestionEncoderTokenizer'.
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'RagTokenizer'. 
The class this function is called from is 'DPRQuestionEncoder

RAG model loaded successfully


RAG 모델 실행

In [9]:
import googletrans
from transformers import RagTokenizer, RagSequenceForGeneration
import torch

# 번역 함수 정의
def translate_text(text, target_language='en'):
    translator = googletrans.Translator()
    translation = translator.translate(text, dest=target_language)
    return translation.text

def generate_response_rag(user_input, reviews, index, vectorizer, tokenizer, model):
    try:
        # 사용자 입력을 영어로 번역
        print("번역 시작")
        translated_input = translate_text(user_input)
        if translated_input is None:
            raise ValueError("Translation failed or returned None.")
        print(f"Translated User Input: {translated_input}")

        # 사용자 입력을 벡터화
        print("입력 벡터화")
        input_vector = vectorizer.transform([translated_input]).toarray().astype('float32')
        print(f"입력벡터 출력 : {input_vector.shape}")

        # FAISS 인덱스에서 가장 유사한 문서 검색
        print("유사한 문서 검색")
        distances, indices = index.search(input_vector, k=5)  # k=5: 상위 5개 문서 검색
        print(f"Distances: {distances}")
        print(f"Indices: {indices}")

        # 검색된 리뷰 출력
        context = [reviews[i] for i in indices[0]]
        print("검색된 리뷰 출력")
        for i, review in enumerate(context):
            print(f"Context {i}: {review}")

        # RAG 모델을 사용하여 응답 생성
        print("Tokenizing user input and context...")
        inputs = tokenizer(translated_input, return_tensors="pt")
        context_encodings = tokenizer(context, return_tensors="pt", padding=True, truncation=True)

        # Model inputs 확인
        print("Model inputs:")
        print(f"User input IDs: {inputs['input_ids']}")
        print(f"User input attention mask: {inputs['attention_mask']}")
        print(f"Context input IDs: {context_encodings['input_ids']}")
        print(f"Context attention mask: {context_encodings['attention_mask']}")

        # Generate response
        print("Generating response...")
        outputs = model.generate(
            input_ids=inputs["input_ids"].to(model.device),
            attention_mask=inputs["attention_mask"].to(model.device),
            decoder_input_ids=context_encodings["input_ids"].to(model.device),
            decoder_attention_mask=context_encodings["attention_mask"].to(model.device),
            num_beams=4,
            early_stopping=True,
            max_length=50  # 생성 길이를 조절
        )
        response = tokenizer.decode(outputs[0], skip_special_tokens=True)

        return response, context
    except Exception as e:
        print(f"Error generating response: {e}")
        return None, []

# 테스트 코드
if tokenizer and model and reviews:
    user_input = "한국의 역사적인 내용이 많이 담겨있는 관광지를 가고싶어"
    rag_response, similar_reviews = generate_response_rag(user_input, reviews, index, vectorizer, tokenizer, model)

    if rag_response:
        print("RAG Model Generated Response:", rag_response)
        print("Similar Reviews:")
        for review in similar_reviews:
            print("-", review)
    else:
        print("Failed to generate response")


번역 시작
Error generating response: 'NoneType' object has no attribute 'group'
Failed to generate response


번역

In [11]:
from google.cloud import translate_v2 as translate

def translate_text(text, target_language='en'):
    try:
        translate_client = translate.Client()
        result = translate_client.translate(text, target_language=target_language)
        return result['translatedText']
    except Exception as e:
        print(f"Translation error: {e}")
        return None

# 테스트 코드
user_input = "한국의 역사적인 내용이 많이 담겨있는 관광지를 가고싶어"
translated_input = translate_text(user_input)
print(f"Translated Input: {translated_input}")


Translation error: ("Failed to retrieve http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/?recursive=true from the Google Compute Engine metadata service. Status: 404 Response:\nb''", <google.auth.transport.requests._Response object at 0x795b46452020>)
Translated Input: None
