# 豆瓣电影图谱
1. 从豆瓣爬取电影与书籍信息
2. 将爬取到的json数据转化为MySQL并存储
3. 将 Mysql 数据转化为 CSV 数据
4. 将 CSV 数据导入 Neo4j

In [None]:
from pathlib import Path
import pymysql
import json
import pandas as pd

## 爬取豆瓣电影与书籍详细信息

爬虫参考 github link
+ [https://github.com/weizhixiaoyi/DouBan-Spider](https://github.com/weizhixiaoyi/DouBan-Spider)
+ [https://github.com/Jack-Cherish/python-spider](https://github.com/Jack-Cherish/python-spider)
+ [https://github.com/facert/awesome-spider](https://github.com/facert/awesome-spider)
+ [https://github.com/binux/pyspider](https://github.com/binux/pyspider)
+ [https://github.com/wistbean/learn_python3_spider](https://github.com/wistbean/learn_python3_spider)

In [None]:
root_dir = Path('../data/movie_kg/txt')
for file in root_dir.glob('*'):
    print(f'path_name:{file} \nname:\t{file.name}\n')

## Explore Data Analysis

In [None]:
count = 0
while True:
    with open(root_dir / "small_movie_info.txt", "r") as f:
        line = f.readline()
        print(line)
        if count > 3:
            break
        count += 1

- 电影信息(**small_movie_info.txt**)包括电影id、图片链接、名称、导演名称、编剧名称、主演名称、类型、制片国家、语言、上映日期、片长、季数、集数、其他名称、剧情简介、评分、评分人数，共67245条数据信息。虽说是电影信息，但其中也包括电视剧、综艺、动漫、纪录片、短片。

- 电影演员(**small_movie_person_info.txt**)信息包括演员id、姓名、图片链接、性别、星座、出生日期、出生地、职业、更多中文名、更多外文名、-家庭成员、简介，共89592条数据信息。这里所指的演员包括电影演员、编剧、导演。

## 提取电影类别, 并存入到mysql之中
transfrom json to sql

创建 database 

```sh
mysql -u root -p

CREATE DATABASE douban_kg;
```

In [None]:
# 连接mysql数据库

kg_conn = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='fengdage',
    db='movie_kg',
    cursorclass=pymysql.cursors.DictCursor
)

In [None]:
# 读取文件
movie_file_path = '../data/movie_kg/txt/small_movie_info.txt'
movie_str_list = open(movie_file_path, 'r').readlines()
movie_json_list = [json.loads(movie) for movie in movie_str_list]

In [None]:
# 电影类型
movie_genres = set()
for movie in movie_json_list:
    for genre in movie['genres']:
        movie_genres.add(genre)
movie_genres = list(movie_genres)
movie_genres.sort(key=lambda i: len(i))
print(movie_genres)

## 存储信息到表`movie_genere`之中

#### 创建表

In [None]:
# sql = "DROP TABLE  movie_genre;"
# with kg_conn.cursor() as cursor:
#   cursor.execute(sql)
#   kg_conn.commit()

sql = '''
create table movie_genre
(
  movie_genre_id   int         not null
    primary key,
  movie_genre_name varchar(20) not null
);
'''
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    kg_conn.commit()

#### 数据插入

In [None]:
try:
    genre_id = 1
    with kg_conn.cursor() as cursor:
        # Insert Data
        for genre in movie_genres:
            sql = "INSERT INTO `movie_genre` (`movie_genre_id`, `movie_genre_name`) VALUES (%s, %s)"
            cursor.execute(sql, (genre_id, genre))
            genre_id += 1
        kg_conn.commit()
except Exception as err:
    print('movie_genres数据插入错误' + str(err))

#### 测试插入结果

In [None]:
sql = 'SELECT * FROM movie_genre LIMIT 5;'
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    result = cursor.fetchall()
result

## 存储信息到表`movie_info`之中

In [None]:
# 删除表
# sql = "DROP TABLE  movie_info;"
# with kg_conn.cursor() as cursor:
#   cursor.execute(sql)
#   kg_conn.commit()

# 创建表
sql = '''
create table movie_info
(
  movie_info_id           int          not null
    primary key,
  movie_info_name         text         null,
  movie_info_image_url    varchar(200) null,
  movie_info_country      varchar(200) null,
  movie_info_language     varchar(200) null,
  movie_info_pubdate      varchar(200) null,
  movie_info_duration     varchar(200) null,
  movie_info_other_name   text         null,
  movie_info_summary      mediumtext   null,
  movie_info_rating       varchar(10)  null,
  movie_info_review_count varchar(200) null
);
'''
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    kg_conn.commit()

In [None]:
# 存储信息到mysql数据库之中
try:
    with kg_conn.cursor() as cursor:
        for movie in movie_json_list:
            movie_info_id = movie['id']
            movie_info_image_url = movie['image_url']
            movie_info_name = movie['name']
            movie_info_country = ','.join(movie['countries'])
            movie_info_language = ','.join(movie['languages'])
            movie_info_pubdate = ','.join(movie['pubdates'])
            movie_info_duration = ','.join(movie['durations'])
            movie_info_other_name = ','.join(movie['other_names'])
            movie_info_summary = movie['summary']
            movie_info_rating = movie['rating']['average']
            movie_info_review_count = movie['rating']['reviews_count']

            sql = "INSERT INTO `movie_info` (`movie_info_id`, `movie_info_image_url`, `movie_info_name`, `movie_info_country`," \
                    "`movie_info_language`, `movie_info_pubdate`, `movie_info_duration`, `movie_info_other_name`," \
                    "`movie_info_summary`, `movie_info_rating`, `movie_info_review_count`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"

            cursor.execute(sql, (
                movie_info_id, movie_info_image_url, movie_info_name, movie_info_country, movie_info_language,
                movie_info_pubdate, movie_info_duration, movie_info_other_name,
                movie_info_summary,
                movie_info_rating, movie_info_review_count))
        kg_conn.commit()
except Exception as err:
    print('movie_info插入数据错误' + str(err))

In [None]:
# 查看插入的电影信息
sql = 'SELECT * FROM movie_info'
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    result = cursor.fetchone()
result

## 提取电影演员(演员、编剧、导演)信息到`movie_person`之中

In [None]:
# sql = "DROP TABLE  movie_person;"
# with kg_connection.cursor() as cursor:
#   cursor.execute(sql)
#   kg_connection.commit()

sql = '''
create table movie_person
(
  movie_person_id            int          not null
    primary key,
  movie_person_name          text         null,
  movie_person_image_url     varchar(200) null,
  movie_person_gender        varchar(100) null,
  movie_person_constellation varchar(200) null,
  movie_person_birthday      varchar(200) null,
  movie_person_birthplace    text         null,
  movie_person_profession    varchar(200) null,
  movie_person_other_name    text         null,
  movie_person_introduction  mediumtext   null
);
'''

with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    kg_conn.commit()

In [None]:
movie_person_file_path = '../data/movie_kg/txt/small_movie_person_info.txt'
movie_person_str_list = open(movie_person_file_path, 'r').readlines()
movie_person_json_list = [json.loads(movie) for movie in movie_person_str_list]

try:
    with kg_conn.cursor() as cursor:
        for movie_person in movie_person_json_list:
            movie_person_id = movie_person['id']
            movie_person_name = movie_person['name']
            movie_person_image_url = movie_person['image_url']
            movie_person_gender = movie_person['gender']
            movie_person_constellation = movie_person['constellation']
            movie_person_birthday = movie_person['birthday']
            movie_person_birthplace = movie_person['birthplace']
            movie_person_profession = movie_person['profession']
            movie_person_other_name = movie_person['other_english_name'] + movie_person['other_chinese_name']
            movie_person_introduction = movie_person['introduction']

            sql = "INSERT INTO `movie_person` (`movie_person_id`, `movie_person_name`, `movie_person_image_url`," \
                    "`movie_person_gender`, `movie_person_constellation`, `movie_person_birthday`, `movie_person_birthplace`," \
                    "`movie_person_profession`, `movie_person_other_name`, " \
                    "`movie_person_introduction`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
            cursor.execute(sql, (movie_person_id, movie_person_name, movie_person_image_url, movie_person_gender,movie_person_constellation,movie_person_birthday, movie_person_birthplace, movie_person_profession,movie_person_other_name, movie_person_introduction))
        kg_conn.commit()
except Exception as err:
    print('movie_person数据插入错误' + str(err))

In [None]:
sql = "SELECT * FROM movie_person;"
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    result = cursor.fetchone()
result

## 电影和演员之间进行关联`actor_to_movie`

In [None]:
# sql = "DROP TABLE  actor_to_movie;"
# with kg_conn.cursor() as cursor:
#   cursor.execute(sql)
#   kg_conn.commit()

sql = '''
create table actor_to_movie
(
  movie_info_id  int not null,
  movie_actor_id int not null,
  primary key (movie_info_id, movie_actor_id),
  constraint actor_to_movie_movie
  foreign key (movie_info_id) references movie_info (movie_info_id),
  constraint actor_to_movie_person
  foreign key (movie_actor_id) references movie_person (movie_person_id)
);
'''
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    kg_conn.commit()

In [None]:
try:
    with kg_conn.cursor() as cursor:
        for movie in movie_json_list:
            movie_info_id = movie['id']
            movie_actor = movie['actors']
            movie_actor_id = [actor['id'] for actor in movie_actor]
            movie_actor_id = list(set(movie_actor_id))
            if '' in movie_actor_id:
                movie_actor_id.remove('')
            for actor_id in movie_actor_id:
                # 判断movie_person中是否存在该ID
                is_existed_sql = 'select 1 from `movie_person` where `movie_person_id` = ' + '\'' + actor_id + '\'' + 'limit 1'
                cursor.execute(is_existed_sql)
                is_existed = cursor.fetchone()
                if is_existed is None:
                    continue
                actor_to_movie_sql = "INSERT INTO `actor_to_movie` (`movie_info_id`, `movie_actor_id`) VALUES (%s, %s)"
                cursor.execute(actor_to_movie_sql, (movie_info_id, actor_id))
        kg_conn.commit()
except Exception as err:
    print('actor_to_movie数据插入错误' + str(err))

In [None]:
sql = '''
create index actor_to_movie_person
  on actor_to_movie (movie_actor_id);
'''
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    kg_conn.commit()

In [None]:
sql = "SELECT * FROM actor_to_movie;"
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    result = cursor.fetchmany(10)
result

## 电影和编剧之间进行关联 `writer_to_movie`

In [None]:
# sql = "DROP TABLE  writer_to_movie;"
# with kg_conn.cursor() as cursor:
#   cursor.execute(sql)
#   kg_conn.commit()

sql = '''
create table writer_to_movie
(
  movie_info_id   int not null,
  movie_writer_id int not null,
  primary key (movie_info_id, movie_writer_id),
  constraint writer_to_movie_movie
  foreign key (movie_info_id) references movie_info (movie_info_id),
  constraint writer_to_movie_person
  foreign key (movie_writer_id) references movie_person (movie_person_id)
);
'''
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    kg_conn.commit()

In [None]:
try:
    with kg_conn.cursor() as cursor:
        for movie in movie_json_list:
            movie_info_id = movie['id']
            movie_writer = movie['writers']
            movie_writer_id = [writer['id'] for writer in movie_writer]
            movie_writer_id = list(set(movie_writer_id))
            if '' in movie_writer_id:
                movie_writer_id.remove('')
            for writer_id in movie_writer_id:
                # 判断movie_person中是否存在该ID
                is_existed_sql = 'select 1 from `movie_person` where `movie_person_id` = ' + '\'' + writer_id + '\'' + 'limit 1'
                cursor.execute(is_existed_sql)
                is_existed = cursor.fetchone()
                if is_existed is None:
                    continue
                writer_to_movie_sql = "INSERT INTO `writer_to_movie` (`movie_info_id`, `movie_writer_id`) VALUES (%s, %s)"
                cursor.execute(writer_to_movie_sql, (movie_info_id, writer_id))
        kg_conn.commit()
except Exception as err:
    print('actor_to_movie数据插入错误' + str(err))

In [None]:
sql = '''
create index writer_to_movie_person
  on writer_to_movie (movie_writer_id);
'''
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    kg_conn.commit()

In [None]:
sql = "SELECT * FROM writer_to_movie;"
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    result = cursor.fetchmany(10)
result

## 电影和电影类别之间进行关联

In [None]:
# sql = "DROP TABLE  movie_to_genre;"
# with kg_conn.cursor() as cursor:
#   cursor.execute(sql)
#   kg_conn.commit()

sql = '''
create table movie_to_genre
(
  movie_info_id  int not null,
  movie_genre_id int not null,
  primary key (movie_info_id, movie_genre_id),
  constraint movie_to_genre_movie
  foreign key (movie_info_id) references movie_info (movie_info_id),
  constraint movie_to_genre_genre
  foreign key (movie_genre_id) references movie_genre (movie_genre_id)
);
'''
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    kg_conn.commit()

In [None]:
try:
    with kg_conn.cursor() as cursor:
        for movie in movie_json_list:
            movie_info_id = movie['id']
            movie_genres = movie['genres']
            for genre in movie_genres:
                movie_genre_id_sql = "SELECT `movie_genre_id` FROM `movie_genre` where `movie_genre_name`=" + '\'' + str(
                    genre) + '\''
                cursor.execute(movie_genre_id_sql)
                movie_genre_info = cursor.fetchone()
                movie_genre_id = movie_genre_info['movie_genre_id']

                # 插入到movie_to_genre之中
                # print(str(movie_info_id) + ':' + str(movie_genre_id))
                movie_to_genre_sql = 'INSERT INTO `movie_to_genre` (`movie_info_id`, `movie_genre_id`) VALUES (%s, %s)'
                cursor.execute(movie_to_genre_sql, (movie_info_id, movie_genre_id))
        kg_conn.commit()

except Exception as err:
    print('movie_to_genre数据插入错误' + str(err))

In [None]:
sql = '''
create index movie_to_genre_idx
  on movie_to_genre (movie_genre_id);
'''
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    kg_conn.commit()

In [None]:
sql = "SELECT * FROM movie_to_genre;"
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    result = cursor.fetchmany(10)
result

## 显示所有 Table

In [None]:
sql = '''
SHOW TABLES;
'''
with kg_conn.cursor() as cursor:
    cursor.execute(sql)
    result = cursor.fetchall()
result

RDB转换成RDF有两种方式，一是direct mapping，即直接映射。另一种为R2RDF(RDB to RDF Mapping Language)，链接为https://www.w3.org/TR/r2rml/。


下面我们使用D2RQ工具将RDB数据转换到RDF形式。
直接映射规则为:

- 数据库的表作为本体中的类（Class）。
- 表的列作为属性（Property）。
- 表的行作为实例/资源。
- 表的单元格值为字面量。
- 如果单元格所在的列是外键，那么其值为IRI，或者说实体/资源。

## 在命令行中执行以下命令生成CSV

```
bin/mysql -A movie_kg -uroot -ppassword -e 'select * from movie_person;' > ~/code/git/fhaoguo/KnowledgeGraph/data/movie_kg/csv/movie_person.csv

bin/mysql -A movie_kg -uroot -ppassword -e 'select * from movie_info;' > ~/code/git/fhaoguo/KnowledgeGraph/data/movie_kg/csv/movie_info.csv

bin/mysql -A movie_kg -uroot -ppassword -e 'select * from movie_genre;' > ~/code/git/fhaoguo/KnowledgeGraph/data/movie_kg/csv/movie_genre.csv

bin/mysql -A movie_kg -uroot -ppassword -e 'select * from movie_to_genre;' > ~/code/git/fhaoguo/KnowledgeGraph/data/movie_kg/csv/movie_to_genre.csv

bin/mysql -A movie_kg -uroot -ppassword -e 'select * from actor_to_movie;' > ~/code/git/fhaoguo/KnowledgeGraph/data/movie_kg/csv/actor_to_movie.csv

bin/mysql -A movie_kg -uroot -ppassword -e 'select * from writer_to_movie;' > ~/code/git/fhaoguo/KnowledgeGraph/data/movie_kg/csv/writer_to_movie.csv
```

## 编辑属性

In [49]:
movie_person = pd.read_table("../data/movie_kg/csv/movie_person.csv")
columns = list(movie_person.columns)
for i, name in enumerate(columns):
    if i == 0:
        columns[i] = columns[i] + ":ID"
    else:
        columns[i] = columns[i] + ":string"
movie_person.columns = columns
movie_person[":LABEL"] = "person"
movie_person.to_csv("../data/movie_kg/csv/movie_person.csv", index = None)

In [50]:
movie_info = pd.read_table("../data/movie_kg/csv/movie_info.csv")
columns = list(movie_info.columns)
for i, name in enumerate(columns):
    if i == 0:
        columns[i] = columns[i] + ":ID"
    else:
        columns[i] = columns[i] + ":string"
movie_info.columns = columns
movie_info[":LABEL"] = "movie"
movie_info.to_csv("../data/movie_kg/csv/movie_info.csv", index = None)

In [51]:
movie_genre = pd.read_table("../data/movie_kg/csv/movie_genre.csv")
columns = list(movie_genre.columns)
for i, name in enumerate(columns):
    if i == 0:
        columns[i] = columns[i] + ":ID"
    else:
        columns[i] = columns[i] + ":string"
movie_genre.columns = columns
movie_genre[":LABEL"] = "genre"
movie_genre.to_csv("../data/movie_kg/csv/movie_genre.csv", index = None)

## Clean Relationships

In [52]:
movie_info_set = set(movie_info["movie_info_id:ID"].to_list())
movie_genre_set = set(movie_genre["movie_genre_id:ID"].to_list())
movie_person_set = set(movie_person["movie_person_id:ID"].to_list())

In [53]:
movie_to_genre = pd.read_table("../data/movie_kg/csv/movie_to_genre.csv")
movie_to_genre.columns = [":START_ID", ":END_ID"]
movie_to_genre = movie_to_genre[movie_to_genre[":START_ID"].isin(movie_info_set) & movie_to_genre[":END_ID"].isin(movie_genre_set)]
movie_to_genre[":TYPE"] = "movie_to_genre"
movie_to_genre.to_csv("../data/movie_kg/csv/movie_to_genre2.csv", index = None)

In [54]:
actor_to_movie = pd.read_table("../data/movie_kg/csv/actor_to_movie.csv")
actor_to_movie.columns = [":START_ID", ":END_ID"]
actor_to_movie = actor_to_movie[actor_to_movie[":START_ID"].isin(movie_info_set) &  actor_to_movie[":END_ID"].isin(movie_person_set)]
actor_to_movie[":TYPE"] = "actor_to_movie"
actor_to_movie.to_csv("../data/movie_kg/csv/actor_to_movie2.csv", index = None)

In [55]:
writer_to_movie = pd.read_table("../data/movie_kg/csv/writer_to_movie.csv")
writer_to_movie.columns = [":START_ID", ":END_ID"]
writer_to_movie = writer_to_movie[writer_to_movie[":START_ID"].isin(movie_info_set) &  writer_to_movie[":END_ID"].isin(movie_person_set)]
writer_to_movie[":TYPE"] = "writer_to_movie"
writer_to_movie.to_csv("../data/movie_kg/csv/writer_to_movie2.csv", index = None)

## CSV导入Neo4j（ONgDB）

```
bin/ongdb-admin import --database graph.db --nodes=import/movie_genre.csv --nodes=import/movie_info.csv --nodes=import/movie_person.csv --relationships=import/actor_to_movie2.csv --relationships=import/movie_to_genre2.csv --relationships=import/writer_to_movie2.csv
```

## 效果预览