# [실습] MovieLens 데이터를 활용한 AI Agent 구축

본 실습에서는 이전에 구축한 지식 그래프를 활용하는 영화 추천 AI Agent 시스템을 만들어 보겠습니다.

- MovieLens 데이터셋 다운로드 링크: https://grouplens.org/datasets/movielens/latest/

# 1. Python 환경 설정

In [None]:
# 필요한 라이브러리 설치 (1회성 실행)
#!pip install rdflib owlrl google-generativeai 

# 라이브러리 임포트
from rdflib import Graph, Namespace
from rdflib.namespace import RDF, RDFS, OWL
import owlrl
import google.generativeai as genai

### Gemini API 키 설정

[Google AI Studio](https://ai.google.dev/gemini-api/docs/api-key?hl=ko)에서 무료 API 키를 발급받을 수 있습니다.

In [None]:
# Gemini API 키 설정
GEMINI_API_KEY = "your-api-key-here"

# 2. 지식 그래프 및 추론 엔진 로드

### 지식 그래프 로드

In [3]:
# 지식 그래프 로드
kg = Graph()
kg.parse('movielens_ontology.ttl', format='turtle')
kg.parse('movielens_data.ttl', format='turtle')

ex = Namespace("http://example.org/movielens#")

print(f"지식 그래프 로드 완료")
print(f"    총 트리플 수: {len(kg):,}개")

지식 그래프 로드 완료
    총 트리플 수: 546,429개


### OWL 추론 엔진 실행

In [4]:
# 추론 전 트리플 수 저장
triple_count_before = len(kg)

# OWL 추론 실행
print("추론 엔진 실행 중...")
owlrl.DeductiveClosure(owlrl.OWLRL_Semantics).expand(kg)

print(f"추론 완료")
print(f"    추론 후 트리플 수: {len(kg):,}개")
print(f"    추론으로 생성된 트리플: {len(kg) - triple_count_before:,}개")

추론 엔진 실행 중...
추론 완료
    추론 후 트리플 수: 1,145,130개
    추론으로 생성된 트리플: 598,701개


# 3. 온톨로지 스키마 추출

Agent가 SPARQL을 생성하려면 온톨로지 구조를 알아야 합니다.


In [5]:
class OntologySchemaExtractor:
    """온톨로지에서 스키마 정보 추출"""
    
    def __init__(self, graph):
        self.g = graph
        self.schema = self._extract_schema()
    
    def _extract_schema(self):
        """클래스와 속성 추출"""
        schema = {
            'classes': [],
            'datatype_properties': [],
            'object_properties': []
        }
        
        # 클래스 추출 (OWL 기본 클래스 제외)
        owl_builtin_classes = {'Thing', 'Nothing', 'Class', 'Property'}
        
        for cls in self.g.subjects(RDF.type, OWL.Class):
            class_name = str(cls).split('#')[-1]
            
            # OWL 내장 클래스 및 빈 문자열 제외
            if class_name not in owl_builtin_classes and class_name:
                schema['classes'].append(class_name)
        
        # 데이터 속성 추출
        for prop in self.g.subjects(RDF.type, OWL.DatatypeProperty):
            prop_name = str(prop).split('#')[-1]
            domain = self.g.value(prop, RDFS.domain)
            range_type = self.g.value(prop, RDFS.range)
            
            if domain and range_type:
                domain_name = str(domain).split('#')[-1]
                range_name = str(range_type).split('#')[-1]
                schema['datatype_properties'].append(
                    f"{prop_name}: {domain_name} -> {range_name}"
                )
        
        # 객체 속성 추출
        for prop in self.g.subjects(RDF.type, OWL.ObjectProperty):
            prop_name = str(prop).split('#')[-1]
            domain = self.g.value(prop, RDFS.domain)
            range_cls = self.g.value(prop, RDFS.range)
            
            if domain and range_cls:
                domain_name = str(domain).split('#')[-1]
                range_name = str(range_cls).split('#')[-1]
                schema['object_properties'].append(
                    f"{prop_name}: {domain_name} -> {range_name}"
                )
        
        return schema
    
    def get_schema_text(self):
        """Agent가 읽을 수 있는 형태로 변환"""
        text = "=== MovieLens Knowledge Graph Schema ===\n\n"
        
        text += "Classes:\n"
        for cls in sorted(self.schema['classes']):
            text += f"  - {cls}\n"
        
        text += "\nDatatype Properties:\n"
        for prop in sorted(self.schema['datatype_properties']):
            text += f"  - {prop}\n"
        
        text += "\nObject Properties:\n"
        for prop in sorted(self.schema['object_properties']):
            text += f"  - {prop}\n"
        
        return text

# 스키마 추출
schema_extractor = OntologySchemaExtractor(kg)
schema_text = schema_extractor.get_schema_text()

print(schema_text)

=== MovieLens Knowledge Graph Schema ===

Classes:
  - Genre
  - Movie
  - Person
  - ReviewEvent
  - User

Datatype Properties:
  - genreName: Genre -> string
  - hasScore: ReviewEvent -> decimal
  - timestamp: ReviewEvent -> integer
  - title: Movie -> string

Object Properties:
  - aboutMovie: ReviewEvent -> Movie
  - belongsToGenre: Movie -> Genre
  - hasReview: Movie -> ReviewEvent
  - ratedBy: ReviewEvent -> Person



# 4. Text-to-SPARQL Generator

### Gemini 기반 Generator 구현

In [6]:
class TextToSPARQLGenerator:
    """Gemini 기반 Text-to-SPARQL Generator"""
    
    def __init__(self, graph, schema_text, api_key):
        self.g = graph
        self.schema_text = schema_text
        
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel('gemini-2.5-flash')
    
    def generate_sparql(self, question: str) -> str:
        """자연어 → SPARQL 변환"""
        
        prompt = f"""You are a SPARQL query expert for MovieLens knowledge graph.

Schema:
{self.schema_text}

CRITICAL Rules:
1. User IDs follow the format 'ex:user_ID' (e.g., ex:user_3).
2. Relationship Direction (CRITICAL):
   - A Review is linked TO a User: ?review ex:ratedBy ex:user_ID
   - A Review is linked TO a Movie: ?review ex:aboutMovie ?movie
   - A Movie is linked TO a Genre: ?movie ex:belongsToGenre ?genre
3. String filtering: Use FILTER(str(?genreName) = "Action")
4. Always use PREFIX ex: <http://example.org/movielens#>

Examples:

Q: Action movies with rating above 4.5
A:
PREFIX ex: <http://example.org/movielens#>
SELECT ?title (AVG(?score) as ?avg_score)
WHERE {{
  ?movie ex:title ?title ;
         ex:belongsToGenre ?genre .
  ?genre ex:genreName ?genreName .
  FILTER(str(?genreName) = "Action")
  ?review ex:aboutMovie ?movie ;
          ex:hasScore ?score .
  FILTER(?score >= 4.5)
}}
GROUP BY ?title
ORDER BY DESC(?avg_score)
LIMIT 10

Q: Comedy movies with highest rating
A:
PREFIX ex: <http://example.org/movielens#>
SELECT ?title (AVG(?score) as ?avg_score)
WHERE {{
  ?movie ex:title ?title ;
         ex:belongsToGenre ?genre .
  ?genre ex:genreName ?genreName .
  FILTER(str(?genreName) = "Comedy")
  ?review ex:aboutMovie ?movie ;
          ex:hasScore ?score .
}}
GROUP BY ?title
ORDER BY DESC(?avg_score)
LIMIT 1

Now generate:
Q: {question}
A:
"""
        
        response = self.model.generate_content(prompt)
        query = response.text.strip()
        query = self._clean_query(query)
        
        return query
    
    def _clean_query(self, query):
        """쿼리 정리"""
        
        # A: 제거
        if "A:" in query or "A :" in query:
            import re
            query = re.split(r'A\s*:', query)[-1]
        
        # 코드 블록 제거
        if "```sparql" in query:
            query = query.split("```sparql")[1].split("```")[0]
        elif "```" in query:
            query = query.split("```")[1].split("```")[0]
        
        # 주석 제거
        lines = [l for l in query.split('\n') if not l.strip().startswith('#')]
        query = '\n'.join(lines)
        
        return query.strip()
    
    def execute_sparql(self, query: str) -> str:
        """SPARQL 실행 및 결과 포맷팅"""
        try:
            results = self.g.query(query)
            result_list = list(results)
            
            if not result_list:
                return "결과 없음"
            
            output = []
            for i, row in enumerate(result_list, 1):
                row_items = []
                
                for var in row.labels:
                    # row[var] 대신 getattr 사용 (메서드 충돌 방지)
                    value = row[var] if var in row.asdict() else getattr(row, var, None)
                    
                    # None 체크
                    if value is None:
                        continue
                    
                    # 값 변환
                    if hasattr(value, 'toPython'):
                        value = value.toPython()
                    
                    # URI 단순화
                    if isinstance(value, str) and '#' in value:
                        value = value.split('#')[-1]
                    
                    # 숫자 포맷팅
                    if isinstance(value, float):
                        # avg, score 등은 소수점 2자리
                        if any(keyword in var for keyword in ['avg', 'score', 'rating']):
                            value = f"{value:.2f}"
                        else:
                            value = f"{value:.1f}"
                    elif isinstance(value, int):
                        # count는 정수 그대로
                        value = str(value)
                    
                    row_items.append(f"{var}: {value}")
                
                output.append(f"{i}. {', '.join(row_items)}")
                
                if i >= 10:
                    break
            
            return "\n".join(output)
        
        except Exception as e:
            return f"쿼리 실행 오류: {str(e)}\n\n생성된 쿼리:\n{query}"

# 5. Agent 구성
간단한 Agent를 직접 구현합니다.

In [7]:
class SimpleSPARQLAgent:
    """간단한 SPARQL Agent"""
    
    def __init__(self, sparql_generator):
        self.generator = sparql_generator
    
    def run(self, question):
        """질문 → SPARQL 생성 → 실행 → 결과 반환"""
        
        print(f"\n{'='*60}")
        print(f"질문: {question}")
        print(f"{'='*60}\n")
        
        try:
            # 1단계: SPARQL 쿼리 생성
            print("[1/2] SPARQL 쿼리 생성 중...\n")
            query = self.generator.generate_sparql(question)
            
            print("생성된 SPARQL 쿼리:")
            print("-" * 60)
            print(query)
            print("-" * 60 + "\n")
            
            # 2단계: 쿼리 실행 및 결과 포맷팅
            print("[2/2] 쿼리 실행 중...\n")
            results = self.generator.execute_sparql(query)
            
            print("실행 결과:")
            print("-" * 60)
            print(results)
            print("-" * 60 + "\n")
            
            return results
            
        except Exception as e:
            print(f"오류 발생: {str(e)}\n")
            import traceback
            traceback.print_exc()
            return None

# 6. Agent 실행 및 결과

In [8]:
# Generator 생성
sparql_generator = TextToSPARQLGenerator(
    kg, 
    schema_text, 
    api_key=GEMINI_API_KEY
)

# Agent 생성
simple_agent = SimpleSPARQLAgent(sparql_generator)

### 시나리오 1: 장르별 고평점 영화 (다중 조건 필터링)

In [9]:
question = "Action 장르 중에서 리뷰가 3개 이상이고 평점이 4.5 이상인 영화를 추천해줘."

response = simple_agent.run(question)


질문: Action 장르 중에서 리뷰가 3개 이상이고 평점이 4.5 이상인 영화를 추천해줘.

[1/2] SPARQL 쿼리 생성 중...

생성된 SPARQL 쿼리:
------------------------------------------------------------
PREFIX ex: <http://example.org/movielens#>
SELECT ?title (AVG(?score) as ?avg_score) (COUNT(?review) as ?num_reviews)
WHERE {
  ?movie ex:title ?title ;
         ex:belongsToGenre ?genre .
  ?genre ex:genreName ?genreName .
  FILTER(str(?genreName) = "Action")
  ?review ex:aboutMovie ?movie ;
          ex:hasScore ?score .
}
GROUP BY ?title
HAVING (COUNT(?review) >= 3 && AVG(?score) >= 4.5)
ORDER BY DESC(?avg_score)
LIMIT 10
------------------------------------------------------------

[2/2] 쿼리 실행 중...

실행 결과:
------------------------------------------------------------
1. title: Dog Soldiers (2002), avg_score: 4.666666666666666666666666667, num_reviews: 3
2. title: Tekkonkinkreet (Tekkon kinkurîto) (2006), avg_score: 4.625, num_reviews: 4
3. title: Doctor Who: Voyage Of The Damned (2007), avg_score: 4.5, num_reviews: 3
4. title: Stu

In [10]:
# 검증
import pandas as pd

# 1. 데이터 로드
movies = pd.read_csv('ml-latest-small/movies.csv')
ratings = pd.read_csv('ml-latest-small/ratings.csv')

# 2. Action 장르 필터링
action_movies = movies[movies['genres'].str.contains('Action', case=False)]

# 3. 영화별로 [평균 평점]과 [리뷰 개수]를 동시에 계산
stats = ratings.groupby('movieId')['rating'].agg(['mean', 'count']).reset_index()

# 4. 조건 필터링 (리뷰 3개 이상 AND 평균 평점 4.5 이상)
qualified_stats = stats[(stats['count'] >= 3) & (stats['mean'] >= 4.5)]

# 5. 영화 정보와 병합
result = pd.merge(action_movies, qualified_stats, on='movieId')

# 6. 평점 높은 순으로 정렬하여 출력
final_recommendation = result.sort_values(by='mean', ascending=False)

print(f"--- 검증 결과: {len(final_recommendation)}건 ---")
print(final_recommendation[['title', 'mean', 'count']].head(10))

--- 검증 결과: 7건 ---
                                         title      mean  count
2                          Dog Soldiers (2002)  4.666667      3
4     Tekkonkinkreet (Tekkon kinkurîto) (2006)  4.625000      4
0   Taking of Pelham One Two Three, The (1974)  4.500000      4
1                        Stunt Man, The (1980)  4.500000      3
3  Flickering Lights (Blinkende lygter) (2000)  4.500000      3
5                  The Man from Nowhere (2010)  4.500000      3
6      Doctor Who: Voyage Of The Damned (2007)  4.500000      3


### 시나리오 2: 사용자별 선호 장르 분석 (다단계 경로 탐색 및 집계)

In [11]:
question = "사용자 2번이 가장 많이 봤던 장르 3개가 뭐야?"

response = simple_agent.run(question)


질문: 사용자 2번이 가장 많이 봤던 장르 3개가 뭐야?

[1/2] SPARQL 쿼리 생성 중...

생성된 SPARQL 쿼리:
------------------------------------------------------------
PREFIX ex: <http://example.org/movielens#>
SELECT ?genreName (COUNT(?genre) as ?genre_count)
WHERE {
  ?review ex:ratedBy ex:user_2 ;
          ex:aboutMovie ?movie .
  ?movie ex:belongsToGenre ?genre .
  ?genre ex:genreName ?genreName .
}
GROUP BY ?genreName
ORDER BY DESC(?genre_count)
LIMIT 3
------------------------------------------------------------

[2/2] 쿼리 실행 중...

실행 결과:
------------------------------------------------------------
1. genreName: Drama, genre_count: 17
2. genreName: Action, genre_count: 11
3. genreName: Crime, genre_count: 10
------------------------------------------------------------



In [12]:
import pandas as pd

# 데이터 로드
movies = pd.read_csv('ml-latest-small/movies.csv')
ratings = pd.read_csv('ml-latest-small/ratings.csv')

# 1. 사용자 1번의 평점 데이터 추출
user1_data = ratings[ratings['userId'] == 2]

# 2. 영화 정보와 합쳐서 장르 나열
user1_with_genres = pd.merge(user1_data, movies, on='movieId')

# 3. 장르별 개수 카운트 (가장 많이 본 순)
top3_genres_pandas = user1_with_genres['genres'].str.split('|').explode().value_counts().head(3)

print("--- 사용자 2번이 가장 많이 본 장르 TOP 3 ---")
print(top3_genres_pandas)

--- 사용자 2번이 가장 많이 본 장르 TOP 3 ---
genres
Drama     17
Action    11
Crime     10
Name: count, dtype: int64


### 시나리오 3: 시스템 내 전체 인원 파악 (클래스 계층 기반 상속 추론)

In [13]:
question = "시스템에 등록된 사람이 모두 몇 명?"

response = simple_agent.run(question)


질문: 시스템에 등록된 사람이 모두 몇 명?

[1/2] SPARQL 쿼리 생성 중...

생성된 SPARQL 쿼리:
------------------------------------------------------------
PREFIX ex: <http://example.org/movielens#>
SELECT (COUNT(DISTINCT ?person) as ?totalUsers)
WHERE {
  ?review ex:ratedBy ?person .
}
------------------------------------------------------------

[2/2] 쿼리 실행 중...

실행 결과:
------------------------------------------------------------
1. totalUsers: 610
------------------------------------------------------------



In [14]:
# 검증
import pandas as pd

# 1. 데이터 로드
ratings = pd.read_csv('ml-latest-small/ratings.csv')

# 2. 시스템에 등록된 고유 사용자(User) 수 계산
# ratings 파일의 'userId' 컬럼에 있는 고유한 값의 개수가 곧 등록된 사람 수입니다.
total_people_count = ratings['userId'].nunique()

print(f"--- 검증 결과 ---")
print(f"시스템에 등록된 총 인원(Unique Users): {total_people_count}명")

--- 검증 결과 ---
시스템에 등록된 총 인원(Unique Users): 610명
