In [None]:
try:
    import arrow
    from tqdm import tqdm
    import pandas as pd
    from functional import seq
    from ruamel.yaml import YAML
    import jieba
    from sklearn.feature_extraction.text import TfidfVectorizer
except ModuleNotFoundError:
    %pip install arrow ruamel.yaml tqdm pandas pyfunctional scikit-learn jieba -q

In [None]:
import requests
import json
from urllib.parse import urlencode
from pathlib import Path
from functools import reduce

In [None]:
# 日志模块
import logging
from notion_rich_text_analysis.parameter.log import config_log

config_log(
        "notion_rich_text_analysis",
        "notebook",
        log_root='notion_rich_text_analysis/logs',
        print_terminal=True,
        enable_monitor=False,
    )

In [None]:
# 使用 yaml 1.2
from ruamel.yaml import YAML
yaml = YAML()
## define custom tag handler
def join(loader, node):
    seq = loader.construct_sequence(node)
    return ''.join([str(i) for i in seq])

## register the tag handler
yaml.constructor.add_constructor('!join', join)

## 变量


In [None]:
with open('config.yaml', 'r', encoding='utf-8') as f:
    config = yaml.load(f)

# request的header信息
notion_header = config['notion']['header']

## 读取数据库


In [None]:
from typing import List
class NotionDBText:
    """
    读取数据库中所有富文本信息
    """
    def __init__(self, header: dict, database_id: str, extra_data: dict = dict()):
        self.header = header
        self.database_id = database_id
        self.extra_data = extra_data
        self.total_texts, self.total_blocks, self.total_pages = [[]] * 3
        self.block_types = ["paragraph", "bulleted_list_item", "numbered_list_item", 
                            "toggle", "to_do", "quote", 
                            "callout", "synced_block", "template", 
                            "column", "child_page", "child_database", "table",
                            "heading_1","heading_2","heading_3"]
    
    def read(self):
        self.total_pages = self.read_pages()
        self.total_blocks = self.read_blocks(self.total_pages)
        self.total_texts = self.read_rich_text(self.total_blocks)
        
    def read_pages(self):
        """
        读取database中所有pages
        """
        total_pages = []
        has_more = True
        next_cursor = ''
        # 有下一页时，继续读取
        while has_more:
            if next_cursor:
                self.extra_data['start_cursor'] = next_cursor
            r_database = requests.post(
                url=f"https://api.notion.com/v1/databases/{self.database_id}/query",
                headers=self.header,
                data=json.dumps(self.extra_data),
            )
            respond = json.loads(r_database.text)
            total_pages.extend(respond["results"])
            has_more = respond['has_more']
            next_cursor = respond['next_cursor']
        logging.info(f'{len(total_pages)} pages when {arrow.now()}')
        return total_pages
    
    def read_blocks(self, pages: List):
        """
        读取pages中所有blocks
        """
        total_blocks = []
        for page in tqdm(pages, desc='read blocks'):
            page_id = page["id"]
            r_page = requests.get(
                        url=f"https://api.notion.com/v1/blocks/{page_id}/children",
                        headers=self.header,
                        )
            total_blocks.append(json.loads(r_page.text).get("results", []))
        return total_blocks
        
    def read_rich_text(self, blocks: List):
        """
        读取blocks中所有rich text
        """
        total_texts = []
        self.unsupported_types = set()
        for page_blocks in blocks:
            page_texts = []
            for block in page_blocks:
                if block['type'] not in self.block_types:
#                     logging.warning(block['type'] + ' not in type list')
                    self.unsupported_types.add(block['type'])
                    continue
                try:
                    page_texts.extend([x['plain_text'] for x in block[block['type']]['rich_text']])
                except Exception as e:
                    logging.error(block['type'] + '|' + json.dumps(block[block['type']]))
            total_texts.append(page_texts)
        return total_texts

In [None]:
# notion_db = NotionDBText()

# notion_db.read()

# notion_db.total_texts[-1]

## 分析结果


In [None]:
# !pip install -U pkuseg

# !wget https://github.com/lancopku/pkuseg-python/releases/download/v0.0.25/default_v2.zip

# import pkuseg
# from pathlib import Path

# seg = pkuseg.pkuseg(model_name=Path('./pkuseg_model'))  # 程序会自动下载所对应的细领域模型

# text = seg.cut('我爱北京天安门')              # 进行分词
# print(text)

### 停用词


In [None]:
# !pip install scikit-learn jieba -q

# 标点符号
import sys
from unicodedata import category
codepoints = range(sys.maxunicode + 1)
punctuation = {c for k in codepoints if category(c := chr(k)).startswith("P")}

# 停用词
from glob import glob
stopfiles = glob("./stopwords/*stopwords.txt")
stopwords = reduce(lambda x,y: x.union(y), 
                   [set([x.strip() 
                         for x in open(file, "r").readlines()])
                         for file in stopfiles])
stopwords = stopwords | punctuation

In [None]:
class NotionTextAnalysis(NotionDBText):
    '''
    分析notion富文本信息
    '''
    def __init__(self, header, task_name, task_describe, database_id, extra_data):
        super().__init__(header, database_id, extra_data)
        logging.info(f'{task_name} start, {task_describe}')
        self.read()
        logging.info(f'Unsupported types: {self.unsupported_types}')
        
        self.task_name = task_name
        self.task_describe = task_describe
        self.database_id = database_id
        self.extra_data = extra_data
    
    def run(self, stopwords=set(), output_dir=Path('./'), top_n=5):
        self.handling_sentences(stopwords)
        self.tf_idf_dataframe = self.tf_idf(self.sequence)
        self.output(self.task_name, self.task_describe, output_dir, top_n)
        
    @staticmethod
    def check_stopwords(word: str, stopwords: set):
        """
        检查词语是否在停用词列表内
        """
        return word in stopwords \
                 or word.isdigit() \
                 or not word.strip()
    
    @staticmethod
    def check_sentence_available(text: str):
        """
        检查句子是否符合要求
        """
        # 不要#开头的，可能是作为标签输入的，也可以用来控制一些分版本的重复内容
        if text.startswith("#"):
            return False
        return True
    
    def handling_sentences(self, stopwords):
        '''
        处理所有文本：分词、清洗、建立映射
        '''
        import jieba
        
        # 检查数据库中获取的富文本是否为空
        if not self.total_texts:
            logging.error(f'该任务未获取到符合条件的文本，请检查筛选条件。database ID: {self.database_id}; extra data: {self.extra_data}')
            raise ValueError('empty rich texts.')
        
        # 剔除无效句子
        text_list = [text for item in self.total_texts 
                     for text in item  
                     if self.check_sentence_available(text)]
        # 分词
        split_text_list = [jieba.lcut(text, HMM=True) for text in text_list]
        
        # 剔除停用词
        self.sequence = seq(split_text_list).map(
            lambda sent: [word for word in sent 
                          if not self.check_stopwords(word, stopwords)])

        # 检查序列是否为空
        if not self.sequence:
            logging.error(f'该任务未获取到符合条件的文本，请检查停用词。database ID: {self.database_id}; extra data: {self.extra_data}')
            raise ValueError('empty rich texts.')
        
        # 获取词表
        self.unique_words = (self.sequence
                           .map(lambda sent: set(sent))
                           .reduce(lambda x, y: x.union(y)))
        
        # 检查词表是否为空
        if not self.unique_words:
            logging.error(f'词表为空，请检查筛选条件及停用词。database ID: {self.database_id}; extra data: {self.extra_data}')
            raise ValueError('empty unique words')
        
        # 词 --> 句子 查询字典
        self.word2sents = self._word2sent(text_list, self.unique_words)
    
    @staticmethod
    def _word2sent(text_list, unique_words):
        '''
        词 --> 句子 查询字典
        '''
        word2sents = {word.lower(): set() for word in unique_words}

        for text in text_list:
            for word in unique_words:
                if word in text:
                    word2sents[word.lower()].add(text)
        return word2sents
    
    @staticmethod
    def tf_idf(sequence):
        '''
        使用标准tf-idf工具来分析
        '''
        from sklearn.feature_extraction.text import TfidfVectorizer

        vectorizer = TfidfVectorizer()
        vectors = vectorizer.fit_transform(sequence.map(lambda x: " ".join(x)).to_list())
        feature_names = vectorizer.get_feature_names_out()
        denselist = vectors.todense().tolist()
        df = pd.DataFrame(denselist, columns=feature_names)
        return df

    @staticmethod
    def empty_func(*args, **kwargs):
        return

    def output(self, task_name, task_describe, output_dir=Path('./'), top_n=5):
        '''
        输出分析结果
        '''
        import re
        
        self.directory = Path(output_dir)
        self.directory.mkdir(exist_ok=True)
        
        # 按不同统计方法逆序输出所有词的tf-idf
        result_type = 'tf_idf'
        task_name_clean = re.sub(r"\s", "_", task_name.strip())
        result_suffix = f'{task_name_clean}.{result_type}.analysis_result'
        result_attr_list = ['by_mean_drop_maxmin', 'by_max', 'by_sum']
        for attr in result_attr_list:
            func = getattr(self, attr, empty_func)(self.tf_idf_dataframe)
            if not func:
                continue
            func.to_csv(self.directory / f"{result_suffix}.{attr}.csv")
        self.top_freq(self.tf_idf_dataframe, 
                        f'{result_suffix}.top{top_n}_word_with_sentences.md', task_describe, top_n)
        
        logging.info(f'{self.task_name} result files have been saved to {output_dir}.')
    
    def top_freq(self, df, file_name, task_describe, top_n):
        '''
        检查高频词
        '''
        with open(self.directory / file_name, "w") as f:
            f.write('# ' + task_describe + '\n\n')
            for word in df.sum(axis=0).sort_values(ascending=False).head(top_n).index:
                f.write('## '+ word + '\n\n')
                f.write('\n\n'.join([sent.replace("\n", " ").replace(word, f'**{word}**') for sent in self.word2sents[word]]) + '\n\n')

    @staticmethod
    def by_mean_drop_maxmin(df):
        # 剔除最大最小值，求均值
        df_drop_maxmin = df.copy()
        for col in df.columns:
            df_drop_maxmin[col] = df[col][df[col].between(df[col].min(), df[col].max())]
            df_drop_maxmin[col].dropna(inplace=True)
        return df_drop_maxmin.mean().sort_values(ascending=False)
        
    @staticmethod
    def by_max(df):
        # 最大值
        return df.max(axis=0).sort_values(ascending=False)
    
    @staticmethod
    def by_sum(df):
        # 求和
        return df.sum(axis=0).sort_values(ascending=False)

In [None]:
for task in config['task']:
    if not task['run']:
        continue
    # 任务名称及描述
    task_name = task['name']
    task_describe = task['describe']

    # 需要读取的database ID
    database_id = task['database_id']

    # 筛选 property，这里的 Label 是上述 database 中的属性
    extra_data = task['extra']
    try:
        notion_text_analysis = NotionTextAnalysis(notion_header, task_name, task_describe, database_id, extra_data)
        notion_text_analysis.run(stopwords, output_dir=Path('./results'), top_n=10)
    except Exception as e:
        logging.error(f'{task_name} failed. \n{e}')

### 自定义分析方法(不是 tf\*idf)


In [None]:
def computeTF(wordDict, bagOfWords):
    tfDict = {}
    bagOfWordsCount = len(bagOfWords)
    for word, count in wordDict.items():
        tfDict[word] = count / float(bagOfWordsCount)
    return tfDict

In [None]:
def computeIDF(documents):
    import math
    N = len(documents)
    
    idfDict = dict.fromkeys(documents[0].keys(), 0)
    for document in documents:
        for word, val in document.items():
            if val > 0:
                idfDict[word] += 1
    
    for word, val in idfDict.items():
        idfDict[word] = math.log(N / float(val))
    return idfDict

## test
