In [None]:
import os
import re
from collections import Counter
import math

import pkuseg
from transformers import AutoTokenizer, AutoModel
import torch
import numpy as np
import pandas as pd

  from .autonotebook import tqdm as notebook_tqdm


In [27]:
from tqdm import tqdm
def load_reports_mda(base_folder: str = './2000-2023/') -> dict:

    def extract_mda(text: str) -> str:
        m = re.search(r'(管理层讨论与分析|MD&A)', text)
        if not m:
            return ""
        tail = text[m.start():]
        end_match = re.search(r'\n[一二三四五六七八九十]+、|\\n[A-Z ]{5,}\n', tail)
        end = end_match.start() if end_match else len(tail)
        return tail

    mda_texts = {}
    for root, _, files in os.walk(base_folder):

        for fname in tqdm(files, desc=f"Loading MD&A in {os.path.basename(root)}"):
            if not fname.endswith('.txt'):
                continue
            key = fname.rsplit('.txt', 1)[0]   # e.g. "300097-智云股份-2013"
            path = os.path.join(root, fname)
            with open(path, 'r', encoding='utf-8') as f:
                text = f.read()
            mda = extract_mda(text)
            if mda:
                mda_texts[key] = mda
    return mda_texts


def segment_documents(texts: dict, model_name: str = 'web') -> dict:
    """
    使用 pkuseg 对每个 MD&A 文本切词，
    并显示分词进度。
    返回：{ticker_year: [token1, token2, ...]}
    """
    seg = pkuseg.pkuseg(model_name=model_name)
    tokenized = {}

    for ticker, doc in tqdm(texts.items(), desc="Tokenizing documents"):
        toks = seg.cut(doc)

        tokenized[ticker] = [tok for tok in toks if tok.strip()]
    return tokenized

In [None]:
from transformers import AutoTokenizer, AutoModel
import torch
import numpy as np
from tqdm.auto import tqdm

def build_static_embeddings(model_name: str = "langboat/moss-moon-003-sft"):

    tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
    model = AutoModel.from_pretrained(model_name).eval()
    

    if torch.cuda.is_available():
        device = torch.device("cuda")
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")
    model.to(device)
    

    weight = model.get_input_embeddings().weight.detach().cpu().numpy()
    

    vocab = tokenizer.get_vocab()  # dict: token_str -> token_id
    wp_emb = {}
    for tok, idx in tqdm(list(vocab.items()),
                         desc="Building static embeddings",
                         unit="token",
                         total=len(vocab)):
        wp_emb[tok] = weight[idx]
    
    return tokenizer, wp_emb

# === 基于 Transformer 扩充并获取相似度评分 ===

def load_stop_words(path: str = 'stop_words.txt') -> set:

    with open(path, 'r', encoding='utf-8') as f:
        return {line.strip() for line in f if line.strip()}
    
from tqdm.auto import tqdm
import numpy as np
import math
import re
from collections import Counter

from tqdm.auto import tqdm
import numpy as np
import math
import re
from collections import Counter

def expand_seed_with_pmi_multi(
    seed_words,
    tokenized_docs,
    tokenizer,
    wp_emb,
    topn=100,
    center_k=100,
    per_seed_k=50,
    sim_thresh=0.01,
    pmi_thresh=0.1,
    stop_words_path='stop_words.txt'
):
  
    seed_vecs = []
    for w in seed_words:
        pieces = tokenizer.tokenize(w)
        vecs   = [wp_emb[p] for p in pieces if p in wp_emb]
        if vecs:
            seed_vecs.append(np.mean(vecs, axis=0))
    if not seed_vecs:
        return []
    center = np.mean(seed_vecs, axis=0)
    center_norm = np.linalg.norm(center)

    tokens     = [t for t in wp_emb.keys() if not t.startswith('##')]
    emb_matrix = np.stack([wp_emb[t] for t in tokens])
    norms      = np.linalg.norm(emb_matrix, axis=1)

    cos_center = emb_matrix.dot(center) / (norms * center_norm + 1e-12)
    idx_center = np.argsort(cos_center)[::-1][:center_k]
    center_candidates = { tokens[i] for i in idx_center }

    seed_candidates = set()
    for w in seed_words:
        pieces = tokenizer.tokenize(w)
        vecs   = [wp_emb[p] for p in pieces if p in wp_emb]
        if not vecs:
            continue
        seed_vec   = np.mean(vecs, axis=0)
        seed_norm  = np.linalg.norm(seed_vec)
        cos_seed   = emb_matrix.dot(seed_vec) / (norms * seed_norm + 1e-12)
        idxs       = np.argsort(cos_seed)[::-1][:per_seed_k]
        for i in idxs:
            seed_candidates.add(tokens[i])

    all_candidates = center_candidates.union(seed_candidates)

    N = len(tokenized_docs)
    inverted_index = {}
    df = Counter()
    for doc_id, toks in tokenized_docs.items():
        unique = set(toks)
        for t in unique:
            df[t] += 1
            inverted_index.setdefault(t, set()).add(doc_id)

    filtered_pmi = []
    for w in tqdm(all_candidates, desc="PMI filtering"):
        df_w = df.get(w, 0)
        if df_w == 0:
            continue
        docs_w = inverted_index[w]
        for seed in seed_words:
            df_s = df.get(seed, 0)
            if df_s == 0:
                continue
            docs_s = inverted_index[seed]
            co = len(docs_w & docs_s)
            if co == 0:
                continue
            pmi = math.log((co * N) / (df_w * df_s) + 1e-12)
            if pmi >= pmi_thresh:
                # 使用中心相似度作为排序依据
                sim_score = float(cos_center[tokens.index(w)])
                filtered_pmi.append((w, sim_score))
                break

    filtered_pmi.sort(key=lambda x: x[1], reverse=True)
    sims_sorted = filtered_pmi[:topn]

    STOP_WORDS = load_stop_words(stop_words_path)
    final = []
    for w, s in sims_sorted:
        if re.match(r'^[\u4e00-\u9fffA-Za-z0-9]+$', w) \
           and len(w) > 1 and w not in STOP_WORDS:
            final.append((w, s))

    idx = 0
    while len(final) < topn and idx < len(sims_sorted):
        w, s = sims_sorted[idx]
        if re.match(r'^[\u4e00-\u9fffA-Za-z0-9]+$', w) \
           and len(w) > 1 and w not in STOP_WORDS \
           and (w, s) not in final:
            final.append((w, s))
        idx += 1

    return final[:topn]

import math
from collections import Counter

def compute_idf(tokenized_docs: dict) -> dict: #模块 Y：计算 IDF 权重

    N  = len(tokenized_docs)
    df = Counter()
    for tokens in tokenized_docs.values():
        for w in set(tokens):
            df[w] += 1
    idf = {w: math.log((N + 1) / (df[w] + 1)) + 1 for w in df}
    return idf

def assign_weights_auto(expanded_sims: list, idf_dict: dict) -> dict:

    idfs    = [idf_dict.get(w, 0.0) for w, _ in expanded_sims]
    max_idf = max(idfs) or 1.0
    weights = {}
    for w, sim in expanded_sims:
        idf_norm     = idf_dict.get(w, 0.0) / max_idf
        weights[w]   = sim * idf_norm
    return weights

from collections import Counter as _Counter

def compute_ai_scores(tokenized_docs: dict, dict_weights: dict) -> dict:

    scores = {}
    for ticker, tokens in tokenized_docs.items():
        tf   = _Counter(tokens)
        raw  = sum(dict_weights.get(w, 0.0) * tf.get(w, 0) for w in tokens)
        norm = raw / max(len(tokens), 1)
        scores[ticker] = math.log1p(norm)
    return scores


In [None]:
# === 模块 0.5：从文件加载 seed words ===
def load_seed_words(path: str = 'seed_words.txt') -> list:

    seeds = []
    with open(path, 'r', encoding='utf-8') as f:
        for line in f:
            w = line.strip()
            if w:
                seeds.append(w)

    return list(dict.fromkeys(seeds))

seed = load_seed_words('seed_words.txt')
print(f'已加载 {len(seed)} 个种子词：', seed[:10], '…')

已加载 57 个种子词： ['人工智能', '计算机视觉', '图像识别', '知识图谱', '增强现实', '特征提取', '支持向量机', '知识表示', '模式识别', '物联网'] …


In [None]:
# 提取 MD&A 并分词
mda_texts = load_reports_mda('./2000-2023MDA文本按年份/')
tokenized = segment_documents(mda_texts, model_name='web')

Loading MD&A in : 100%|██████████| 1/1 [00:00<00:00, 11650.84it/s]
Loading MD&A in 2022: 100%|██████████| 5069/5069 [00:01<00:00, 3399.64it/s]
Loading MD&A in 2023: 100%|██████████| 5254/5254 [00:01<00:00, 4092.44it/s]
Loading MD&A in 2019: 100%|██████████| 3789/3789 [00:01<00:00, 3782.66it/s]
Loading MD&A in 2021: 100%|██████████| 4662/4662 [00:01<00:00, 4182.03it/s]
Loading MD&A in 2020: 100%|██████████| 4228/4228 [00:01<00:00, 3809.76it/s]
Tokenizing documents: 100%|██████████| 11219/11219 [54:32<00:00,  3.43it/s] 


In [None]:
# Save
# np.save('my_file.npy', tokenized) 

In [None]:
loaded = np.load("my_file.npy", allow_pickle=True)

tokenized = loaded.item()

In [None]:

tokenizer, wp_emb = build_static_embeddings("sentence-transformers/paraphrase-multilingual-mpnet-base-v2")

expanded_sims = expand_seed_with_pmi_multi(
    seed,
    tokenized,
    tokenizer,
    wp_emb,
    topn=250,
    center_k=120,
    per_seed_k=60)

Building static embeddings: 100%|██████████| 250002/250002 [00:00<00:00, 4831963.45token/s]
PMI filtering: 100%|██████████| 905/905 [00:00<00:00, 1976.63it/s]


In [20]:
expanded_df = pd.DataFrame(expanded_sims, columns=['word', 'similarity'])
expanded_df.to_csv('expanded_dictionary.csv', index=False, encoding='utf-8-sig')
print("完整扩充词典已保存至 expanded_dictionary.csv")

完整扩充词典已保存至 expanded_dictionary.csv


In [None]:
# 计算 IDF
idf_dict = compute_idf(tokenized)

In [None]:
# 自动赋权
dict_weights = assign_weights_auto(expanded_sims, idf_dict)

In [None]:
#计算 AI 得分
ai_scores = compute_ai_scores(tokenized, dict_weights)

In [None]:

df = pd.DataFrame({
	'Ticker': list(ai_scores.keys()),
	'AI_score': list(ai_scores.values())
})

split_cols = df['Ticker'].str.split('-', expand=True)
split_cols.columns = ['code', 'name', 'year']


df = pd.concat([split_cols, df['AI_score']], axis=1)

df['year'] = df['year'].astype(int)
df['AI_score'] = df['AI_score'].astype(float)
df['code'] = df['code'].str.zfill(6)

df.to_csv('ai_scores_2023_split.csv', index=False, encoding='utf-8-sig')
print("已生成 ai_scores_2023_split.csv，包含 code, name, year, AI_score 四列。")

已生成 ai_scores_2023_split.csv，包含 code, name, year, AI_score 四列。


In [None]:

import pandas as pd

def compute_total_ai_score(df: pd.DataFrame, base_year: int = None) -> pd.DataFrame:


    if base_year is None:
        base_year = df['year'].min()

    df['weight'] = df['year'] - base_year + 1

    stock_scores = (
        df
        .groupby(['code', 'name'])
        .apply(lambda g: (g['AI_score'] * g['weight']).sum() / g['weight'].sum())
        .reset_index(name='total_AI_score')
    )
    return stock_scores

In [None]:

stock_scores = compute_total_ai_score(df)

top100 = stock_scores.sort_values('total_AI_score', ascending=False).head(100)

stock_scores.to_csv('stock_total_ai_score.csv', index=False, encoding='utf-8-sig')
top100.to_csv('top100_ai.csv', index=False, encoding='utf-8-sig')

  .apply(lambda g: (g['AI_score'] * g['weight']).sum() / g['weight'].sum())
