In [1]:
import time
import pandas as pd
from content_cleaning import *

In [2]:
# 初始化处理器
processor = XHSMixedLanguageProcessor(cache_size=2000, max_workers=12)

# # 示例笔记
# sample_notes = [
#     "今天去了Dallas的鲜芋仙 #美食打卡# 这家MeetFresh真的超级好吃！The taro balls were amazing! 强烈推荐大家尝试～",
#     "新开的台湾甜品店@小红书美食博主 服务态度nice，芋圆Q弹，仙草冻很香 #DFW美食# [笑哭R] 2001 Coit Rd真的很方便",
#     "约上闺蜜一起去吃甜品，牛奶冰+芋圆组合👍 The dessert was incredibly refreshing on such a hot day! https://xiaohongshu.com/...",
#     "今日份打卡：鲜芋仙 Park Pavillion Center，人均$15左右，店内环境整洁，服务态度很好，definitely worth the price!",
# ]

# processed_text = processor.process_text(sample_notes[0], enable_translation=True)
# print("单条处理结果:", processed_text)
# processed_texts = processor.batch_process(sample_notes)
# print("\n批量处理结果:")
# for i, text in enumerate(processed_texts):
#     print(f"{i+1}. {text}")

# Load the cooked contents data
cont = pd.read_json('..\..\Data\processed\contents_cooked.json')

# combine the title and note_body into a single string
def process_text(note):
    return note['title'] + ' ' + note['note_body']

# Apply the function to the DF
cont['text'] = cont.apply(process_text, axis=1).astype(str)

# Apply the batch processing function to the DF.text column
start_time = time.time()
processed_texts = processor.batch_process(cont['text'].tolist(), enable_translation=True)
end_time = time.time()

print(f"批量处理耗时: {end_time - start_time:.2f}秒")

# save the processed texts to a new column in the DF
cont['semantic_proc_text'] = processed_texts
# remove the original text column
cont.drop(columns=['text'], inplace=True)
# save the processed DF to a new JSON file
cont.to_json('..\..\Data\processed\contents_cooked_semantic.json', orient='records', lines=True, force_ascii=False)


批量处理耗时: 449.05秒


In [3]:
cont.columns

Index(['note_id', 'user_id', 'title', 'note_body', 'tag_list', 'image_count',
       'content_type_video', 'hot_note', 'post_time', 'last_update_time',
       'scraped_time', 'elapsed_time', 'liked_count', 'collected_count',
       'comment_count', 'share_count', 'interaction_count',
       'semantic_proc_text'],
      dtype='object')

In [None]:
sample_note = cont.text

In [None]:
processed_texts = processor.batch_process(cont_sample.text.tolist(), verbose=True)
print("\n批量处理结果:")
for i, text in enumerate(processed_texts):
    print(f"{i+1}. {text}")

In [16]:
processor.get_stats()

{'cache_size': 61,
 'cache_capacity': 2000,
 'cache_hits': 2,
 'translation_requests': 61,
 'cache_hit_rate': 0.03278688524590164}

定义domain_keywords, 并提前compile 正则表达式和实例化重复使用的翻译器

In [None]:
# Add domain keywords
DOMAIN_KEYWORDS = {
    '鲜芋仙', 'Meet Fresh', 'MeetFresh', '台湾美食', '甜品', 
    '芋圆', 'taro', '仙草', 'grass jelly', '奶茶', 'milk tea',
    '豆花', 'tofu pudding', '奶刨冰', 'milked shaved ice',
    '红豆汤', 'purple rice soup', '紫米粥', 'red bean soup',
    '2001 Coit Rd', 'Park Pavillion Center', '(972) 596-6088',
    '餐厅', '餐馆', '美食', '台湾小吃', '台湾甜品', '冰激凌'
}

# Pre-compile regex patterns for efficiency
ZH_CHAR_PATTERN = re.compile(r'[\u4e00-\u9fff]')  # 中文字符检测
NON_ZH_PATTERN = re.compile(r'[a-zA-Z]')  # 拉丁字母检测
SPLIT_PATTERN = re.compile(r'([。！？?!.])')  # 分句正则
URL_PATTERN = re.compile(
    r'(?:https?://)?(?:[a-zA-Z0-9\u4e00-\u9fff-]+\.)+[a-zA-Z]{2,}'
    r'(?:/\S*)?(?:\?\S*)?(?:#\S*)?',
    flags=re.IGNORECASE
)
TOPIC_PATTERN = re.compile(r'#([^#]+)#')
MENTION_PATTERN = re.compile(r'@[\w\u4e00-\u9fff-]+')  # 支持中英文用户名
XIAOHONGSHU_TAG_PATTERN = re.compile(r'\[(话题|表情|地点)\s*[^\]]*\]')
BRACKET_CONTENT_PATTERN = re.compile(r'\[([^\]]+)\]')
HTML_TAG_PATTERN = re.compile(r'<[^>]+>')
WHITESPACE_PATTERN = re.compile(r'[\t\n\u3000]')
MULTI_SPACE_PATTERN = re.compile(r'\s+')

# Translation cache
translation_cache = {}

# Initialize translator once
_translator = GoogleTranslator(source='auto', target='zh-CN')

In [None]:
class TextSanitizer:
    def __init__(self):
        # 预编译所有正则表达式
        self.mention_pattern = re.compile(r'@\w+') # @提及正则
        self.url_pattern = re.compile(r'(https?://\S+)') # URL正则
        self.emoji_dict = emoji.EMOJI_DATA  # 预加载表情符号库
        self.protected_terms = self._load_protected_terms()
        
    def _load_protected_terms(self):
        # 从文件/数据库加载保护词表（品牌词、产品词等）
        return {'鲜芋仙', 'MeetFresh', 'ParkPavilion'}  
    
    def pipeline(self, text):
        """单条文本处理管道"""
        if not text:
            return text
        
        text = basic_clean(text)
        text = social_media_clean(text)
        text = language_optimize(text, protected_terms=self.protected_terms, enable_translation=True)
        return text
    
    def batch_process(self, texts):
        """批量处理文本"""
        if not texts:
            return []
            
        # 计算合适的核心数
        num_cores = max(1, os.cpu_count() * 8 // 10)
        chunk_size = max(1, len(texts) // (num_cores * 4))
        
        # 合并相似处理步骤，减少线程切换
        def process_chunk(chunk):
            return [self.pipeline(text) for text in chunk]
        
        # 创建文本块
        chunks = [texts[i:i+chunk_size] for i in range(0, len(texts), chunk_size)]
        
        # 并行处理
        with ProcessPoolExecutor(max_workers=num_cores) as executor:
            results = list(executor.map(process_chunk, chunks))
        
        # 展平结果
        processed_texts = []
        for chunk_result in results:
            processed_texts.extend(chunk_result)
        
        return processed_texts



In [None]:
# Load the cooked contents data
cont = pd.read_json('..\..\Data\processed\contents_cooked.json')

# combine the title and note_body into a single string
def process_text(note):
    return note['title'] + ' ' + note['note_body']

# Apply the function to the DataFrame
cont['text'] = cont.apply(process_text, axis=1).astype(str)

# Load the MeetFresh user dictionary
jieba.load_userdict("MF_dict.txt")

# prepare the regex patterns for text processing
ZH_CHAR_PATTERN = re.compile(r'[\u4e00-\u9fff]')  # 中文字符检测
NON_ZH_PATTERN = re.compile(r'[a-zA-Z]')  # 拉丁字母检测
SPLIT_PATTERN = re.compile(r'([。！？?!.])')  # 分句正则

# 复用翻译器实例（降低初始化开销）
_translator = GoogleTranslator(source='auto', target='zh-CN')

In [None]:
domain_keywords = {
    '鲜芋仙', 'Meet Fresh', 'MeetFresh', '台湾美食', '甜品', 
    '芋圆', 'taro', '仙草', 'grass jelly', '奶茶', 'milk tea',
    '豆花', 'tofu pudding', '奶刨冰', 'milked shaved ice',
    '红豆汤', 'purple rice soup', '紫米粥', 'red bean soup',
    '2001 Coit Rd', 'Park Pavillion Center', '(972) 596-6088',
    '餐厅', '餐馆', '美食', '台湾小吃', '台湾甜品', '冰激凌'
}

In [None]:
# slice the 1st row of cont
cont.iloc[11]

中文文本清洗黄金四步法

In [5]:
def fullwidth_to_halfwidth(text:str) -> str:
    """全角转半角（保留￥符号）"""
    translation_table = str.maketrans({
        '！': '!', '“': '"', '”': '"', '‘': "'", '’': "'",
        '、': ',', '，': ',', '；': ';', '：': ':', '？': '?',
        '《': '<', '》': '>', '【': '[', '】': ']', '·': '.',
        '～': '~', '—': '-', '（': '(', '）': ')', '　': ' '
    })
    return text.translate(translation_table)

def normalize_punctuation(text:str) -> str:
    """符号标准化（保留emoji, retain 字符的位置信息但牺牲了效率, 之后可以考虑优化"""
    # 定义保留符号集（新增%$￥）
    keep_symbols = {"'", '"', ',', '.', ':', ';', '!', '?', '-', 
                   '(', ')', '<', '>', '[', ']', '&', '#', '@',
                   '%', '$', '￥', '/', '=', '+', '~', '^'}
    
    # 字符级处理, 
    cleaned_chars = []
    for char in text:
        # 保留条件：字母数字/汉字/keep_symbols/emoji
        if (char.isalnum() or
            '\u4e00' <= char <= '\u9fff' or
            char in keep_symbols or
            emoji.is_emoji(char)):
            cleaned_chars.append(char)
        else:
            cleaned_chars.append(' ')
    
    return ''.join(cleaned_chars)

def remove_urls(text:str) -> str:
    """适配中文域名的URL移除"""
    url_pattern = re.compile(
        r'(?:https?://)?(?:[a-zA-Z0-9\u4e00-\u9fff-]+\.)+[a-zA-Z]{2,}'
        r'(?:/\S*)?(?:\?\S*)?(?:#\S*)?',
        flags=re.IGNORECASE
    )
    return url_pattern.sub('', text)

def basic_clean(
        text : str
    ) -> str:
    """
    对文本进行基础层清洗，包括去除HTML标签、URL、特殊符号处理, 空格标准化等。
    1. 移除HTML标签
    2. 移除URL（适配中文域名）
    3. 处理特殊符号（保留常用符号和emoji, 全角标点转半角）
    4. 标准化空白字符

    Args:
        text (str): 待清洗的文本。
    Returns:
        str: 清洗后的文本。
    """
    # 替换所有空白符（含小红书常见的全角空格\u3000、换行、制表符）
    text = re.sub(r'[\t\n\u3000]', ' ', text)
    # HTML标签移除
    text = re.sub(r'<[^>]+>', '', text)
    # URL移除, 适配中文域名
    text = remove_urls(text)
    # 全角转半角（保留全角￥）
    text = fullwidth_to_halfwidth(text)
    # 符号标准化
    text = normalize_punctuation(text)
    # 标准化合并连续空格
    text = re.sub(r'\s+', ' ', text).strip()
    return text

In [6]:
# apply the basic_clean function to the 'text' column
cont['text'] = cont['text'].apply(basic_clean)

In [7]:
cont['text'][6]

'[达拉斯.吃]快乐小羊,回到儿时澳门豆捞坊 Happy Lamb Hot Pot 📍 240 Legacy Dr ste 116, Plano, TX 75023 感谢快乐小羊的邀请 虽然不知道小羊快不快乐[笑哭R]但我吃的很快乐 特别喜欢小羊家环境,舒适低调不喧哗,适合朋友聊天 所有菜品自取,更自由快捷,绝大多数都很新鲜(只有个别鹌鹑蛋黄不知为何有点咸鸭蛋味道,也许是我敏感[害羞R])我们选的金汤酸辣锅和特制香辣锅 金汤锅其实更像酸菜锅,酸菜味挺浓,还有很麻的口感,下了鱼片和牛肉,秒变酸菜鱼和酸菜牛 香辣锅是台湾健康感麻辣小火锅味儿,相对重庆火锅更为清淡,也更能体现食材本身的味道 强烈推荐他家臻品羊肉片,肥瘦合适还有奶香味 然后手打羊肉丸,和香菜混合制作,特别好吃 台湾酸甜口包心菜泡菜永远吃不腻 作为碳水大户,炒饭炒面炸馒头炸麻球完全满足了我的需求,炸鸡翅和橙子也很不错,小朋友饭应有尽有~ 重点来了!小羊给的两杯鸡尾酒把不太喝酒的我给惊艳了!图10中矮的那杯清冽的酒精带着丝丝甜味,醇香口感干脆利落,会一直想喝不停 有柠檬片的那杯是一种混合果汁配着清淡酸奶泡的无酒精鸡尾酒,打败我爱的所有果汁 没想到小羊要靠鸡尾酒出道了[笑哭R] 一顿饭下来,不仅吃的满足,整个氛围也让人想起童年回忆里的澳门豆捞坊,干净惬意有情调 我就喜欢这样安安静静享受火锅的快乐 #达拉斯火锅[话题]# #达拉斯美食[话题]# #达拉斯生活[话题]# #达拉斯[话题]# #达拉斯探店[话题]# #达拉斯周边[话题]# #达拉斯周末[话题]# #达拉斯吃喝玩乐[话题]# #快乐小羊[话题]# #快乐小羊火锅[话题]#'

社交媒体特征处理层

In [8]:
def social_media_clean(text:str, strategy='demojize') -> str:
    """
    社交媒体文本清洗，主要针对小红书平台的特定格式和符号进行处理。
    1. 移除话题标签（#）但保留关键词
    2. 移除@提及, support 中英文复合用户名
    3. 转换表情符号（可选：移除或转换为文本描述）
    4. 处理小红书特有方括号
    5. 去除多余空格

    Args:
        text (str): 待清洗的文本。
    Returns:
        str: 清洗后的文本。
    """

    # 移除话题标签但保留关键词（如 #达拉斯美食# → 达拉斯美食）
    text = re.sub(r'#([^#]+)#', r'\1', text)
    
    # 移除@提及(包含其变体, 如@小红书用户)
    text = re.sub(r'@[\w\u4e00-\u9fff-]+', '', text)  # 支持中英文用户名
    
    # 转换Emoji（可选策略）
    if strategy == 'remove':
        text = emoji.replace_emoji(text, replace='')
    elif strategy == 'demojize':
        # 将emoji转换为文本描述（如:😀 → :grinning_face:）
        text = emoji.demojize(text, delimiters=(' [', '] '))
    
    # 处理小红书特有的方括号标签（删除系统标签, 如[话题]→'')
    text = re.sub(r'\[话题\s*[^\]]*\]', '', text)  # 删除系统标签 

    # 继续处理小红书方括号标签（同时保留关键文本信息, 如[笑哭R]→笑哭R）
    text = re.sub(r'\[(?:表情|地点)\s*[^\]]*\]', '', text)
    text = re.sub(r'\[([^\]]+)\]', r'\1', text)  # 去除方括号但保留内容
    
    return text.strip()

In [9]:
# apply the social_media_clean function to the 'text' column
cont['text'] = cont['text'].apply(social_media_clean, strategy='demojize')

中文英文混合语言环境优化层

In [10]:
def ratio_of_chinese(text: str) -> float:
    """返回 text 中（Unicode范围4E00-9FFF）汉字的占比。"""
    if not text:
        return 1.0
    zh_chars = ZH_CHAR_PATTERN.findall(text)
    return len(zh_chars) / len(text) if len(zh_chars) > 0 else 0.0

def mask_protected_terms(text: str, protected_terms: set) -> (str, dict):
    """
    将 text 中出现的保护词用占位符替换，并返回替换后的文本以及占位符映射字典。
    比如 "MeetFresh" -> "[EN_TERM_0]"
    """
    if not protected_terms:
        return text, {}
    
    # 按术语长度降序排列（优先匹配长词）
    sorted_terms = sorted(protected_terms, key=lambda x: len(x), reverse=True)
    pattern = re.compile(
        r'\b(' + '|'.join(map(re.escape, sorted_terms)) + r')\b', 
        flags=re.IGNORECASE
    )
    
    placeholder_map = {}
    idx = 0
    
    def _repl(m):
        nonlocal idx
        placeholder = f"[EN_TERM_{idx}]"
        placeholder_map[placeholder] = m.group(0)
        idx += 1
        return placeholder
    
    return pattern.sub(_repl, text), placeholder_map

def unmask_protected_terms(text: str, placeholder_map: dict) -> str:
    """将翻译后文本中的占位符恢复成原始英文术语"""
    for placeholder, original in placeholder_map.items():
        text = text.replace(placeholder, original)
    return text

@lru_cache(maxsize=5010)
def cached_translate(text: str) -> str:
    """带缓存的翻译方法，整合了安全检查"""
    # 1. 空字符串或仅包含空格、标点、数字等，可直接返回原文
    if not text or text.strip().isdigit():
        return text 
    
    # 2. 避免超过 5000 字符的文本
    if len(text) > 5000:
        print(f"Warning: text too long ({len(text)} chars). Truncating for translation.")
        text = text[:4900]  # 适当截断以防API限制

    # 3. 调用翻译，并捕捉异常
    try:
        return _translator.translate(text)
    except Exception as e:
        print(f"Translation failed: {e}")
        return text

def language_optimize(
    text: str,
    threshold: float = 0.5,
    protected_terms: set = None,
    enable_translation: bool = True
) -> str:
    """终极优化版语言处理"""
    if not enable_translation or not text:
        return text
    
    if protected_terms is None:
        protected_terms = {
            "MeetFresh", "VIP", "AI", "DFW", 
            "Grass Jelly", "Taro", "Milk Tea",
            "Red Bean Soup", "Purple Rice Soup",
            "Tofu Pudding", "Shaved Ice",
            "Purple Rice", '2001 Coit Rd', 'Park Pavillion Center'
        }
    
    # 预处理：快速过滤纯中文文本或空文本
    if not text or not NON_ZH_PATTERN.search(text):
        return text
    
    # 分句优化（减少内存占用）
    buffer = []
    segments = SPLIT_PATTERN.split(text)
    
    for i in range(0, len(segments), 2):
        sentence = segments[i]
        if not sentence:
            continue
        
        # 快速跳过纯中文句
        if NON_ZH_PATTERN.search(sentence):
            # 先保护重要术语
            masked, ph_map = mask_protected_terms(sentence, protected_terms)
            
            # 只翻译中文占比低于阈值的句子
            if ratio_of_chinese(masked) < threshold:
                translated = cached_translate(masked)
                unmasked = unmask_protected_terms(translated, ph_map)
                buffer.append(unmasked)
            else:
                buffer.append(sentence)
        else:
            buffer.append(sentence)
        
        # 添加分隔符
        if i+1 < len(segments):
            buffer.append(segments[i+1])
    
    return ''.join(buffer)

In [13]:
# apply the language_optimize function to the 'text' column
cont['text'] = cont['text'].apply(language_optimize, enable_translation = True)

高级语义清洗层

In [17]:
def chinese_semantic_clean(
    texts: List[str],
    freq_threshold: float = 0.25,          # 词频阈值
    doc_freq_threshold: float = 0.6,       # 文档频率阈值
    min_word_length: int = 2,              # 最小词长度（过滤单字词）
    custom_stopwords: Optional[Set[str]] = None,
    domain_keywords: Optional[Set[str]] = None,
    return_noise_terms: bool = False       # 是否返回识别出的噪声词
) -> Union[List[str], tuple]:
    """
    基于词频统计的中文语义清洗函数
    
    Args:
        texts: 待清洗的文本列表
        freq_threshold: 词频阈值，超过此阈值的词被视为噪声（除非在domain_keywords中）
        doc_freq_threshold: 文档频率阈值，出现在超过此比例文档中的词被视为噪声
        min_word_length: 最小词长度，小于此长度的词不参与统计
        custom_stopwords: 自定义停用词集合
        domain_keywords: 领域关键词集合（这些词不会被过滤）
        return_noise_terms: 是否返回识别出的噪声词
        
    Returns:
        清洗后的文本列表，如果return_noise_terms为True，则同时返回识别的噪声词集合
    """
    # 默认小红书常见噪声词
    default_stopwords = {
        # 情感强化词
        "真的", "真是", "太", "好", "很", "非常", "超级", "绝对", "简直",
        # 网络用语
        "哈哈", "哈哈哈", "啊啊", "啊啊啊", "呜呜", "呜呜呜", "omg", "OMG",
        "xswl", "awsl", "yyds", "绝绝子", "无语子",
        # 口头禅
        "真的是", "就是", "反正", "然后", "其实", "那个", "这个", "所以",
        "emmm", "emm", "啊这", "蹲一个", "冲鸭",
        # 标点符号组合
        "～～", "…"
    }
    
    # 合并自定义停用词
    stopwords = default_stopwords.copy()
    if custom_stopwords:
        stopwords.update(custom_stopwords)
    
    # 初始化领域关键词集合
    domain_keywords = domain_keywords or set()
    
    # 统计词频和文档频率
    total_docs = len(texts)
    word_counts = Counter()
    doc_counts = defaultdict(int)
    
    print(f"正在统计词频（共{total_docs}篇文本）...")
    for text in texts:
        words = jieba.lcut(text)
        # 过滤太短的词
        words = [w for w in words if len(w) >= min_word_length]
        
        # 更新全局词频
        word_counts.update(words)
        
        # 更新文档频率（每个文档中的词只计算一次）
        unique_words = set(words)
        for word in unique_words:
            doc_counts[word] += 1
    
    # 计算相对频率
    word_freq = {word: count/total_docs for word, count in word_counts.items()}
    doc_freq = {word: count/total_docs for word, count in doc_counts.items()}
    
    # 识别噪声词（高频但不是领域关键词）
    noise_terms = {
        word for word, freq in word_freq.items()
        if (freq > freq_threshold or  # 全局高频
            doc_freq[word] > doc_freq_threshold) and  # 文档高频
           word not in domain_keywords  # 不是领域关键词
    }
    
    # 合并自定义停用词
    noise_terms.update(stopwords)
    
    print(f"已识别噪声词{len(noise_terms)}个，开始清洗...")
    
    # 构建过滤模式
    pattern = re.compile('|'.join(noise_terms))
    cleaned_texts = [pattern.sub('', text) for text in texts]
    
    # 评估整体效果
    total_original_length = sum(len(t) for t in texts)
    total_cleaned_length = sum(len(t) for t in cleaned_texts)
    compression_ratio = total_cleaned_length / total_original_length
    
    print(f"清洗完成! 噪声去除率: {(1-compression_ratio):.2%}")
    print(f"原始总字符数: {total_original_length}")
    print(f"清洗后总字符数: {total_cleaned_length}")
    
    if return_noise_terms:
        return cleaned_texts, noise_terms
    
    return cleaned_texts


In [None]:
# apply the chinese_semantic_clean function to the 'text' column
cont['text'] = chinese_semantic_clean(
    cont['text'].tolist(),
    freq_threshold=0.25,
    doc_freq_threshold=0.6,
    min_word_length=2,
    custom_stopwords=None,
    domain_keywords=domain_keywords,
    return_noise_terms=False
)
# 语义清洗后，去除空文本
cont = cont[cont['text'].str.strip() != '']
#cont = cont.reset_index(drop=True)

全流程集成示例

In [25]:
class TextSanitizer:
    def __init__(self):
        self.mention_pattern = re.compile(r'@\w+') # @提及正则
        self.url_pattern = re.compile(r'(https?://\S+)') # URL正则
        self.emoji_dict = emoji.EMOJI_DATA  # 预加载表情符号库
        self.protected_terms = self._load_protected_terms()
        
    def _load_protected_terms(self):
        # 从文件/数据库加载保护词表（品牌词、产品词等）
        return {'鲜芋仙', 'MeetFresh', 'ParkPavilion'}  
    
    def pipeline(self, text):
        text = basic_clean(text)
        text = social_media_clean(text)
        text = language_optimize(text, protected_terms=self.protected_terms, enable_translation=True)
        text = language_optimize(text)
        return text
    
    def batch_process(self, texts):
        # 并行加速（利用80%CPU核心）
        num_cores = os.cpu_count() * 8 // 10
        # 确保至少使用一个核心
        num_cores = max(1, num_cores)
        # 使用ProcessPoolExecutor进行并行处理
        with ProcessPoolExecutor(max_workers=num_cores) as executor:
            return list(executor.map(self.pipeline, texts))

In [None]:
# 实例化 TextSanitizer 类
sanitizer = TextSanitizer()
# 批量处理文本
cont['text'] = sanitizer.batch_process(cont['text'].tolist())
# 语义清洗后，去除空文本
cont = cont[cont['text'].str.strip() != '']