#### crawler

In [None]:
import requests
from bs4 import BeautifulSoup
from collections import deque
from urllib.parse import urljoin
from elasticsearch import Elasticsearch
from datetime import datetime
import re
# real-----------------------------------------

class Spider:
    def __init__(self, start_url, max_pages):
        self.start_url = start_url
        self.max_pages = max_pages
        self.visited_urls = set()# 记录已访问的url
        self.url_map = {}  # 记录url和page_id的映射关系
        self.to_visit_urls = deque([start_url])
        self.page_count = 0
        self.page_id_counter = 1  # 用于分配唯一的 page_id
        # 初始化 Elasticsearch 客户端
        self.es = Elasticsearch("http://localhost:9200")
        # 创建网页信息索引
        
        # self.create_index('web_pages', {
        #     "mappings": {
        #         "properties": {
        #             "page_id": {"type": "integer"},
        #             "url": {"type": "keyword"},
        #             "title": {"type": "text"},
        #             "last_modify_time": {"type": "date"},
        #             "size": {"type": "integer"},
        #             "genre": {"type": "keyword"},  # 新增属性
        #             "plot_summary": {"type": "text"},  # 新增属性
        #             "plot_keywords": {"type": "keyword"},  # 新增属性
        #             "country": {"type": "keyword"},  # 新增属性
        #             "language": {"type": "keyword"},  # 新增属性
        #             "company": {"type": "keyword"} ,  # 新增属性
        #             "content": {"type": "text"}
        #         }
        #     }
        # })
        self.create_index('web_pages', {
            "mappings": {
                "properties": {
                    "page_id": {"type": "integer"},
                    "url": {"type": "keyword"},
                    "title": {"type": "text"},
                    "content": {"type": "text"},
                    "last_modify_time": {"type": "date"},
                    "size": {"type": "integer"}
                }
            }
        })
        self.create_index('web_extended_info', {
            "mappings": {
                "properties": {
                    "page_id": {"type": "integer"},
                    "genre": {"type": "keyword"},  # 新增属性
                    "plot_summary": {"type": "text"},  # 新增属性
                    "plot_keywords": {"type": "keyword"},  # 新增属性
                    "country": {"type": "keyword"},  # 新增属性
                    "language": {"type": "keyword"},  # 新增属性
                    "company": {"type": "keyword"} ,  # 新增属性
                }
            }
        })
        # 创建网页结构索引(parent, child)
        self.create_index('web_page_structure', {
            "mappings": {
                "properties": {
                    "parent_page_id": {"type": "integer"},
                    "child_page_id": {"type": "integer"}
                }
            }
        })
        # 创建反向网页结构索引（子 -> 父）
        self.create_index('reverse_web_page_structure', {
            "mappings": {
                "properties": {
                    "child_page_id": {"type": "integer"},
                    "parent_page_id": {"type": "integer"}
                }
            }
        })
        
        
        
        self.page_info_batch = []  # 暂存网页信息
        self.page_structure_batch = []  # 暂存网页结构信息
        self.reverse_page_structure_batch = []  # 暂存反向网页结构信息
        self.page_extended_info_batch = []  # 暂存网页扩展信息
        
        

    def create_index(self, index_name, mapping):
        # 检查索引是否存在
        if self.es.indices.exists(index=index_name):
            # 如果存在，则删除该索引
            self.es.indices.delete(index=index_name)
            print(f"索引 {index_name} 已删除")
        # 创建新的索引
        self.es.indices.create(index=index_name, body=mapping)
        print(f"索引 {index_name} 创建成功")

    def fetch_page_links(self, url):
        try:
            response = requests.get(url)
            if response.status_code == 200:
                # 检查 HTTP 头中的 Last-Modified 字段
                last_modified = response.headers.get('Last-Modified')
                if last_modified:
                    last_modify_time = datetime.strptime(last_modified, '%a, %d %b %Y %H:%M:%S %Z')
                else:
                    last_modify_time = datetime.now()
                # soup = BeautifulSoup(response.text, 'html.parser')
                # title = soup.title.string if soup.title else ''
                # content =soup.body.get_text(separator='\n')#.replace('\n', '').replace('\r', '')
                
                
                soup = BeautifulSoup(response.text, 'lxml')
                title = soup.title.string if soup.title else ''
                content = soup.get_text(separator='\n')  # 获取网页正文内容


                # genre_match = re.search(r'Genre:\s*(.*?)(?=[^ \n].*?:)',content)
                genre_match = re.search(  r'Genre:\s*(.*?)(?=\n(?!\n)(?=[A-Z])|more)', content, re.DOTALL)
                if genre_match:
                    genre = genre_match.group(1).strip().split('/')
                    genre = [g.strip() for g in genre if g.strip()]  # 去除空格和空字符串
                
                plot_summary_match = re.search(r'Plot Summary:\s*(.*?)(?=\n(?!\n)(?=[A-Z])|more)', content, re.DOTALL)
                if plot_summary_match:
                    plot_summary = plot_summary_match.group(1).strip()
                
                plot_keywords_match = re.search( r'Plot Keywords:\s*([\s\S]*?)(?=\n[^\n:]*:|more|\Z)', content, re.DOTALL)
                if plot_keywords_match:
                    plot_keywords = plot_keywords_match.group(1).strip().split('/')
                    plot_keywords = [pk.replace('\xa0',' ').strip() for pk in plot_keywords if pk.strip()]   
                
                country_match = re.search(r'Country:\s*(.*?)(?=\n\w+:|\Z)', content, re.DOTALL)
                if country_match:
                    country = country_match.group(1).strip().split('/')
                    country = [c.strip() for c in country if c.strip()]   
                
                language_match = re.search(r'Language:\s*(.*?)(?=\n\w+:|\Z)', content, re.DOTALL)
                if language_match:
                    language = language_match.group(1).strip().split('/')
                    language = [l.strip() for l in language if l.strip()]   
                
                company_match = re.search(r'Company:\s*(.*?)(?=\n(?!\n)(?=[A-Z])|more)', content, re.DOTALL)
                if company_match:
                    company = company_match.group(1).strip().split('/')
                    company = [c.strip() for c in company if c.strip()]  
                            
                size=response.raw._fp_bytes_read if response.raw._fp_bytes_read else 0

                # 没访问过且没分配过id
                if url not in self.url_map:
                    page_id = self.page_id_counter
                    self.page_id_counter += 1
                    self.url_map[url] = page_id
                # 没访问过但分配过id
                else:
                    page_id = self.url_map[url]
                # 存储网页信息到 Elasticsearch
                
                self.page_info_batch.append({
                    "page_id": page_id,
                    "url": url,
                    "title": title,
                    "content": content.replace('\n', '').replace('\r', ''),
                    "last_modify_time": last_modify_time,
                    "size": size,
                })
                self.page_extended_info_batch.append({
                    "page_id": page_id,
                    "genre": genre if 'genre' in locals() else [],
                    "plot_summary": plot_summary if 'plot_summary' in locals() else '',
                    "plot_keywords": plot_keywords if 'plot_keywords' in locals() else [],
                    "country": country if 'country' in locals() else [],
                    "language": language if 'language' in locals() else [],
                    "company": company if 'company' in locals() else []
                })
                # self.store_page_info(page_id, url, title, content, last_modify_time,size)

                links = [link.get('href') for link in soup.find_all('a', href=True)]
                return page_id, links
            return None, []
        except requests.RequestException as e:
            print(f"Failed to fetch {url}: {e}")
            return None, []

    def store_page_info(self):
        for doc in self.page_info_batch:
            # 批量存储网页信息到 Elasticsearch
            self.es.index(index='web_pages', id=doc['page_id'], body=doc)
        self.page_info_batch=[]  # 清空批量存储列表
        for doc in self.page_extended_info_batch:
            # 批量存储网页扩展信息到 Elasticsearch
            self.es.index(index='web_extended_info', id=doc['page_id'], body=doc)
        self.page_extended_info_batch=[]  # 清空批量存储列表
        # self.es.index(index='web_pages', id=page_id, body=doc)

    def store_page_structure(self):
        for doc in self.page_structure_batch:
            self.es.index(index='web_page_structure', body=doc)
        self.page_structure_batch = []
        for doc in self.reverse_page_structure_batch:
            self.es.index(index='reverse_web_page_structure', body=doc)
        self.reverse_page_structure_batch = []

    def crawl(self,batch_size=20):
        while self.to_visit_urls and self.page_count < self.max_pages:
            url = self.to_visit_urls.popleft()
            if url in self.visited_urls:
                continue
            print(f"Crawling: {url}")
            self.visited_urls.add(url)
            self.page_count += 1
            parent_page_id, links = self.fetch_page_links(url)#肯定没访问过
            if parent_page_id:
                for link in links:
                    new_full_url = urljoin(url, link)
                    
                    # 没分配过id(没访问过   )
                    if new_full_url not in self.url_map:
                        child_page_id = self.page_id_counter
                        self.page_id_counter += 1
                        self.url_map[new_full_url] = child_page_id
                        self.to_visit_urls.append(new_full_url)
                    # 已分配过id(没访问过)
                    elif new_full_url in self.url_map and new_full_url not in self.visited_urls:
                        child_page_id = self.url_map[new_full_url]
                        self.to_visit_urls.append(new_full_url)
                    # 已访问过(已分配过id)
                    elif new_full_url in self.visited_urls:
                        child_page_id = self.url_map[new_full_url]
                    
                    # self.store_page_structure(parent_page_id, child_page_id)
                    self.page_structure_batch.append({
                        "parent_page_id": parent_page_id,
                        "child_page_id": child_page_id
                    })
                    self.reverse_page_structure_batch.append({
                        "child_page_id": child_page_id,
                        "parent_page_id": parent_page_id
                    })
                
                if len(self.page_structure_batch) >= batch_size:
                    self.store_page_info()
                if len(self.reverse_page_structure_batch) >= batch_size:
                    self.store_page_structure()
        # 爬取结束后，将剩余暂存的数据写入数据库
        if self.page_info_batch:
            self.store_page_info()
        if self.page_structure_batch or self.reverse_page_structure_batch:
            self.store_page_structure()
    
    def view_web_pages_data(self):
        """查看 web_pages 索引中的数据"""
        query = {
            "query": {
                "match_all": {}
            },
            "size": 1000
        }
        result = self.es.search(index='web_pages', body=query)
        for hit in result['hits']['hits']:
            print(hit['_source'])

    def view_web_page_structure_data(self):
        """查看 web_page_structure 索引中的数据"""
        query = {
            "query": {
                "match_all": {}
            },
            "size": 1000
        }
        result = self.es.search(index='web_page_structure', body=query)
        for hit in result['hits']['hits']:
            print(hit['_source'])
            
    def view_reverse_web_page_structure_data(self):
        """查看 reverse_web_page_structure 索引中的数据"""
        query = {
            "query": {
                "match_all": {}
            },
            "size": 1000
        }
        result = self.es.search(index='reverse_web_page_structure', body=query)
        for hit in result['hits']['hits']:
            print(hit['_source'])


if __name__ == "__main__":
    start_url = 'https://www.cse.ust.hk/~kwtleung/COMP4321/testpage.htm'
    max_pages = 300
    spider = Spider(start_url, max_pages)
    spider.crawl()

#### 新表示例

In [149]:
query = {
    "query": {
        "bool": {
            "must": [
                {
                    "exists": {
                        "field": "genre"
                    }
                }
            ]
        }
    },
    "size": 1000
        }
es=Elasticsearch("http://localhost:9200")
result = es.search(index='web_extended_info', body=query)
for hit in result['hits']['hits']:
    print(hit['_source'])

{'page_id': 21, 'genre': ['Sci-Fi', 'Comedy'], 'plot_summary': "The class of nuke 'em high is back, and this time they're in college! Tromaville's nuclear factory has...", 'plot_keywords': ['Sequel', 'Psychotronic', 'Troma Film', 'Cult Favorite', 'Troma'], 'country': ['USA'], 'language': ['English'], 'company': ['Troma Entertainment']}
{'page_id': 24, 'genre': ['Comedy', 'Romance', 'Mystery'], 'plot_summary': "Baby photographer Ronnie Jackson, on death row in San Quentin, tells reporters how he got there: taking care of his private-eye neighbor's office...", 'plot_keywords': ['Brunette', 'Chase', 'Death Row', 'Detective', 'Frame Up'], 'country': ['USA'], 'language': ['English'], 'company': ['Hope Enterprises']}
{'page_id': 27, 'genre': ['Documentary', 'Music'], 'plot_summary': '', 'plot_keywords': ['Concert Film', 'Country', 'Independent Film'], 'country': ['USA'], 'language': ['English'], 'company': []}
{'page_id': 30, 'genre': ['Biography', 'Drama', 'Music', 'Romance'], 'plot_summary

  result = es.search(index='web_extended_info', body=query)
