In [9]:
import requests
from requests.structures import CaseInsensitiveDict
import json
from lxml import etree
import os
import re
from IPython.display import HTML, display
import pickle
import jieba


class MySearcherC5:
    def __init__(self, scale=1):
        self.news_list = []
        self.fetch_data()
        self.news_list *= scale
        self.cache = {}
        self.cache_segmented_words()

    def fetch_data(self):
        news_list_file = 'news_list.dat'

        if os.path.exists(news_list_file):
            with open(news_list_file, 'rb') as file:
                self.news_list = pickle.load(file)

        else:
            urls = ["https://tech.163.com/special/00097UHL/tech_datalist.js?callback=data_callback", "https://tech.163.com/special/00097UHL/tech_datalist_02.js?callback=data_callback", "https://tech.163.com/special/00097UHL/tech_datalist_03.js?callback=data_callback"]
            headers = CaseInsensitiveDict()
            headers["Referer"] = "https://tech.163.com/"
            headers["user-agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"

            url_set = set()
            processed_count = 0

            for url in urls:
                resp = requests.get(url, headers=headers)
                json_data = json.loads(resp.text[len('data_callback('):-1])

                for news in json_data:
                    title = news['title']
                    docurl = news['docurl']

                    if docurl not in url_set:
                        doc_resp = requests.get(docurl, headers=headers, timeout=30)
                        doc_resp.encoding = 'utf-8'
                        tree = etree.HTML(doc_resp.text)

                        post_body = tree.xpath("//div[@class='post_body']")
                        if post_body:
                            paragraphs = post_body[0].xpath(".//p")
                            html = ''.join(etree.tostring(p, method='html', encoding='unicode') for p in paragraphs)
                            text = ''.join(t.strip() for t in etree.HTML(html).xpath("//text()") if t.strip())

                            self.news_list.append([docurl, title, text])
                        
                        url_set.add(docurl)

                    processed_count += 1
                    if processed_count % 15 == 0:
                        print(f'{processed_count} processed.')

            
            if self.news_list:
                with open(news_list_file, 'wb') as file:
                    pickle.dump(self.news_list, file)

    def score(self, item, keyword):
        keyword_lower = keyword.lower()
        return (item[1].lower().count(keyword_lower) * 5 + item[2].lower().count(keyword_lower) * 3)


    def search_keywords(self, keyword):
        keyword_lower = keyword.lower()
        if keyword_lower in self.cache:
            result = self.cache[keyword_lower]
        else:
            result = [(i, self.score(item, keyword)) for i, item in enumerate(self.news_list) if (keyword_lower in item[1].lower() or keyword_lower in item[2].lower())]
            result.sort(key=lambda x: x[1], reverse=True)
            self.cache[keyword_lower] = result

        return result
    
    def highlight(self, text, keyword):
        return re.sub(pattern=f'({keyword})', repl=r'<span style="color:#dd4b39">\1</span>', string=text, flags=re.IGNORECASE)

    def render_search_result(self, keyword):
        result = self.search_keywords(keyword)
        for item in result:
            clickable_title = f'<a href="{self.news_list[item[0]][0]}" target="_blank">{self.highlight(self.news_list[item[0]][1], keyword)}</a>'
            display(HTML(f'[{item[1]}] {clickable_title}'))

    def cache_segmented_words(self):
        for news in self.news_list:
            for word in set(jieba.cut(news[1] + news[2], cut_all=True)):
                if word not in self.cache:
                    r = self.search_keywords(word)



In [10]:
%prun searcherC5 = MySearcherC5(scale=10)

# 78是list comprehension 也就是function search_keywords

 

         38129453 function calls in 219.694 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 27155088  183.466    0.000  183.466    0.000 {method 'lower' of 'str' objects}
    14023   24.962    0.002  211.123    0.015 3975915539.py:78(<listcomp>)
   159850    3.385    0.000    3.715    0.000 __init__.py:180(get_DAG)
  1130100    1.693    0.000    6.074    0.000 __init__.py:198(__cut_all)
  1279080    1.646    0.000    1.646    0.000 {method 'count' of 'str' objects}
   639540    1.056    0.000   17.189    0.000 3975915539.py:68(score)
  1177620    0.919    0.000    7.410    0.000 __init__.py:289(cut)
        1    0.719    0.719  219.667  219.667 3975915539.py:93(cache_segmented_words)
  1176260    0.558    0.000    0.558    0.000 {method 'match' of 're.Pattern' objects}
  2191330    0.265    0.000    0.265    0.000 {method 'append' of 'list' objects}
  1726290    0.197    0.000    0.197    0.000 {built-in method builtins.le

In [11]:

class MySearcherC6V1(MySearcherC5):

    """
    尽量减少lower的运行次数
    文本是得lower 但是可以只预处理lower一次 就不要搜一次词lower一次
    """
    def __init__(self, scale=1):
        self.news_list = []
        self.fetch_data()
        self.news_list *= scale
        self.cache = {}
        self.normalize_case_in_news()
        self.cache_segmented_words()
        

    # 把标题 正文粘在一起 整体来了个lower，把结果放在news list每项最后 1 url 2 标题 3 正文 3 小写化内容
    def normalize_case_in_news(self):
        for index in range(len(self.news_list)):
            self.news_list[index].append((self.news_list[index][1] + self.news_list[index][2]).lower())

    def search_keywords(self, keyword):
        keyword_lower = keyword.lower()
        if keyword_lower in self.cache:
            result = self.cache[keyword_lower]
        else:
            result = [(i, self.score(item, keyword)) for i, item in enumerate(self.news_list) if (keyword_lower in item[3])]
            result.sort(key=lambda x: x[1], reverse=True)
            self.cache[keyword_lower] = result

        return result

In [12]:
%%time
searcherC6V1 = MySearcherC6V1(scale=10)

CPU times: total: 36.2 s
Wall time: 36.9 s


In [13]:
class MySearcherC6V2(MySearcherC6V1):

    """
    用文档刷词 构建缓存
    """
    def cache_segmented_words(self):
        
        for i, news in enumerate(self.news_list):
            words = set(word.lower() for word in jieba.cut(news[1] + news[2], cut_all=True))
            for word in words:
                result_tuple = (i, self.score(news, word))

                if word not in self.cache:
                    self.cache[word] = [result_tuple]

                else:
                    self.cache[word].append(result_tuple)

        for word in self.cache:
            self.cache[word].sort(key=lambda x: x[1], reverse=True)


In [14]:
%%time
searcherC6V2 = MySearcherC6V2(scale=10)

CPU times: total: 16.3 s
Wall time: 16.5 s


In [15]:
class MySearcherC6V3(MySearcherC6V2):
    """
    删掉search keywords里的文档扫描过程
    现在有倒排索引啰 我们认为出现在索引的词才有 索引里没出现的词文档里就没有 没词就没必要再遍历、刷所有文档 避免不必要的计算 不然第一个用户倒霉
    但是现在我们把所有注押cache上了 该搜到的没搜到就是分词的锅 解决办法 - jieba load userdict
    """

    def search_keywords(self, keyword):
        keyword_lower = keyword.lower()
        if keyword_lower in self.cache:
            result = self.cache[keyword_lower]
        else:
            result = []

        return result