In [1]:
import requests
import arrow
import pprint
import json
from urllib.parse import urlencode
from functools import reduce
from tqdm import tqdm

In [15]:
import logging
from utils.log import config_log

config_log(
        "notion_api",
        "DBtexts",
        log_root='./logs',
        print_terminal=True,
        enable_monitor=False,
    )

## 环境变量

In [2]:
# 获取token：https://www.notion.so/my-integrations/
token = open("./NOTION_TOKEN", "r").readlines()[0]
# notion_version =  "2021-08-16"
notion_version = "2022-06-28"

notion_header = {"Authorization": f"Bearer {token}",
                 "Notion-Version": notion_version,
                 "Content-Type": "application/json",
                 }

In [3]:
# 需要读取的database ID
database_id = 'a2594f51053a47b3a58a171017ea0435'

# 筛选 property，这里的 Label 是上述 database 中的属性
extra_data = {"filter": {"and": [{"property": "Label",
                                  "multi_select": {"is_not_empty": True}},],},
             }

## 读取数据库

In [50]:
from typing import List
class NotionDBText:
    """
    读取数据库中所有富文本信息
    """
    def __init__(self, notion_header: dict, database_id: str, extra_data: dict = dict()):
        self.header = notion_header
        self.database_id = database_id
        self.extra_data = extra_data
        self.total_texts, self.total_blocks, self.total_pages = [[]] * 3
        self.block_types = ["paragraph", "bulleted_list_item", "numbered_list_item", 
                            "toggle", "to_do", "quote", 
                            "callout", "synced_block", "template", 
                            "column", "child_page", "child_database", "table",
                            "heading_1","heading_2","heading_3"]
    
    def read(self):
        self.total_pages = self.read_pages()
        self.total_blocks = self.read_blocks(self.total_pages)
        self.total_texts = self.read_rich_text(self.total_blocks)
        
    def read_pages(self):
        """
        读取database中所有pages
        """
        total_pages = []
        has_more = True
        next_cursor = ''
        # 有下一页时，继续读取
        while has_more:
            if next_cursor:
                extra_data['start_cursor'] = next_cursor
            r_database = requests.post(
                url=f"https://api.notion.com/v1/databases/{self.database_id}/query",
                headers=self.header,
                data=json.dumps(self.extra_data),
            )
            respond = json.loads(r_database.text)
            total_pages.extend(respond["results"])
            has_more = respond['has_more']
            next_cursor = respond['next_cursor']
        logging.info(f'{len(total_pages)} pages when {arrow.now()}')
        return total_pages
    
    def read_blocks(self, pages: List):
        """
        读取pages中所有blocks
        """
        total_blocks = []
        for page in tqdm(pages, desc='read blocks'):
            page_id = page["id"]
            r_page = requests.get(
                        url=f"https://api.notion.com/v1/blocks/{page_id}/children",
                        headers=self.header,
                        )
            total_blocks.append(json.loads(r_page.text).get("results", []))
        return total_blocks
        
    def read_rich_text(self, blocks: List):
        """
        读取blocks中所有rich text
        """
        total_texts = []
        for page_blocks in blocks:
            page_texts = []
            for block in page_blocks:
                if block['type'] not in self.block_types:
                    logging.warning(block['type'] + ' not in type list')
                    continue
                try:
                    page_texts.extend([x['plain_text'] for x in block[block['type']]['rich_text']])
                except Exception as e:
                    logging.error(block['type'] + '|' + json.dumps(block[block['type']]))
            total_texts.append(page_texts)
        return total_texts

In [51]:
notion_db = NotionDBText(notion_header, database_id, extra_data)

In [52]:
notion_db.read()

[2023-01-29 05:26:14.616] [INFO] [298233] [410687785.py] [42] [67 pages when 2023-01-29T05:26:14.616211+00:00]
read blocks: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 67/67 [00:26<00:00,  2.52it/s]


In [53]:
notion_db.total_texts[0]

['外部的数据大家都可以用，怎样能把MP的独有数据利用起来？或者设计独有的数据收集？', '数据是否能够按照博弈论，分类成外部公开、半公开数据、独家数据？']

## 分析结果

In [170]:
# !pip install scikit-learn jieba -q

import pandas as pd
import jieba

# 标点符号
import sys
from unicodedata import category
codepoints = range(sys.maxunicode + 1)
punctuation = {c for k in codepoints if category(c := chr(k)).startswith("P")}

# 停用词
from glob import glob
stopfiles = glob("./stopwords/*stopwords.txt")
stopwords = reduce(lambda x,y: x.union(y), [set([x.strip() for x in open(file, "r").readlines()]) for file in stopfiles])

In [171]:
def check_stopwords(word):
    return word in stopwords \
             or word.isdigit() \
             or word in punctuation \
             or not word.strip()

### 分词、清洗、建立映射

In [172]:
from functional import seq
text_list = [text for item in notion_db.total_texts for text in item]
# 分词
split_text_list = [jieba.lcut(text, HMM=True) for text in text_list]
# 剔除停用词
sequence = seq(split_text_list).map(lambda sent: [word for word in sent if not check_stopwords(word)])
# sequence = seq(split_text_list)

# 包含的词
uniqueWords = (sequence
               .map(lambda sent: set(sent))
               .reduce(lambda x, y: x.union(y))
              )

In [178]:
# 词 --> 句子 查询字典
word2sents = {word.lower(): [] for word in uniqueWords}

for text in text_list:
    for word in uniqueWords:
        if word in text:
            word2sents[word.lower()].append(text)

### 使用标准tf-idf工具来分析

todo:

句子全混合在同一个列表里肯定不对，要按文档分开

In [179]:
from sklearn.feature_extraction.text import TfidfVectorizer

vectorizer = TfidfVectorizer()
vectors = vectorizer.fit_transform(sequence.map(lambda x: " ".join(x)).to_list())
feature_names = vectorizer.get_feature_names_out()
denselist = vectors.todense().tolist()
df = pd.DataFrame(denselist, columns=feature_names)

#### 按不同统计方法逆序输出所有词的tf-idf

In [180]:
# 剔除最大最小值，求均值
df_drop_maxmin = df.copy()
for col in df.columns:
    df_drop_maxmin[col] = df[col][df[col].between(df[col].min(), df[col].max())]
    df_drop_maxmin[col].dropna(inplace=True)
df_drop_maxmin.mean().sort_values(ascending=False).to_csv("./tf_idf_top.drop_maxmin.csv")

In [181]:
# 最大值
df.max(axis=0).sort_values(ascending=False).to_csv("./tf_idf_top.max.csv")
# 求和
df.sum(axis=0).sort_values(ascending=False).to_csv("./tf_idf_top.sum.csv")

In [182]:
# 检查高频词
for word in df.sum(axis=0).sort_values(ascending=False).head(3).index:
    print(word)
    print(word2sents[word])
    print("-" * 10)

知识
['追求伟大的知识分子，癫狂、荒谬', '我需要解决什么问题？精力有限，需要管理想要了解的问题，而不是知识', '空洞的知识没有用（例如同花顺的知识图谱，几百万三元组，吃灰）', '如果你本身没有需要解决的问题（或者说专注研究的领域），那么知识管理只是个伪命题', '我们需要管理的不是知识，而是自己的精力和想要了解的问题。', '你的知识应该是像一条大河一样，有上游涓涓的溪流，也能灌溉更多的良田。', '如果要将知识讲给别人，应该怎么组织结构？把它输出为教案形式', '知识', '阅读论文、网页等短中篇文献，整理归纳后导出工作区PDF到notion存档，链接个人知识网络', '文献的知识密度要大', '存档数据库、整合前三者，输出自己的思维和成体系的知识', '纯知识：', '投资能力（量化工具、方法，股票知识）、']
----------
思考
['我是一个很喜欢思考和研究时事政治的答主。但我从来不写时事政治感悟。\n我觉得，在现实中讨论任何政治有关的言论都是极其愚蠢的。你支持A，得罪B，支持B得罪A。\n', '可能是慢？更多时间思考？', '压制收藏的欲望，必须用大脑去思考，吸收有用信息，然后输出为自己的内容，这样才能将', '关于社会核心本质的思考', '只能作为思考的中间产物', '可以批注二度思考nobility的手写笔记，组织成结构', 'xmind（思考）', '周报思考', '如果有轮回转世，那中国民间信仰的敬天崇祖就失去了标的，毕竟祖宗都转世去了，烧钱给谁？所以一切相信有轮回转世的宗教，比如佛教、伊斯兰教、基督教、印度教，在中国都面临着与本土信仰解释性发生冲突的问题。很遗憾，最后被改变的是本土文明，道教原本是不讲轮回的，崇尚长生，只修这一世，结果受佛教影响太深，几乎所有道教宗派现在都认同轮回转世。其中的矛盾点不知前人是否思考过，也许是刻意回避了，因为两个都解释不了的东西，即使矛盾也影响不大，无法对自己证真，自然就无法对别人证伪🌝', '没有人有权以“思考能力不足”为由，剥夺他人的权利。何为“独立思考能力”，它的界限在哪里。', '开放式办公场所的弊病之一，就是嘈杂的外部环境会干扰思考，即使戴着降噪还是很难完全屏蔽人声。对算法这种需要沉浸思考的工种很不友好。']
----------
时间
['。\n你说自己无神论，有神论的觉得的你是傻逼以后

### 自定义(不是tf*idf)

In [47]:
uniqueWords = (sequence
               .map(lambda sent: set(sent))
               .reduce(lambda x, y: x.union(y))
              )

In [49]:
def computeTF(wordDict, bagOfWords):
    tfDict = {}
    bagOfWordsCount = len(bagOfWords)
    for word, count in wordDict.items():
        tfDict[word] = count / float(bagOfWordsCount)
    return tfDict

In [50]:
def computeIDF(documents):
    import math
    N = len(documents)
    
    idfDict = dict.fromkeys(documents[0].keys(), 0)
    for document in documents:
        for word, val in document.items():
            if val > 0:
                idfDict[word] += 1
    
    for word, val in idfDict.items():
        idfDict[word] = math.log(N / float(val))
    return idfDict

## test