## 数据处理

不想让某个字/词语参与分析：在[stopwords.txt](stopwords.txt)里添加这个字/词  
想自定义某个词语：在[self_dict.txt](stopwords.txt)里添加这个词


In [13]:
import pandas as pd
import numpy as np
import jieba
import re

Type = {
    '1':'文字',
    '3':'图片',
    '43':'视频',
    '-1879048185':'微信运动排行榜',
    '5':'',
    '47':'表情包',
    '268445456':'撤回的消息',
    '34':'语音',
    '419430449':'转账',
    '50':'语音电话',
    '10000':'进群、撤回、拒收等系统消息',
    '822083633':'回复消息',
    '922746929':'拍一拍',
    '1090519089':'发送文件',
    '318767153':'付款成功',
    '436207665':'发红包',
}

jieba.load_userdict('self_dict.txt')
df = pd.read_csv('db_tables\messages.csv') # TODO: 改成你留痕导出的csv文件

In [None]:
df['Type'] = df['Type'].astype(str).map(Type)
type_counts = df.groupby(['Type', 'IsSender']).size().reset_index(name='Count')
type_pivot = type_counts.pivot_table(index='Type', columns='IsSender', values='Count', fill_value=0)
type_pivot.columns = ['Received', 'Sent']
type_pivot.reset_index(inplace=True)
print('你的所有聊天总览：')
type_pivot

In [None]:
df['StrTime'] = pd.to_datetime(df['StrTime'])
df = df[df['StrTime'] > '2024-01-01 00:00:00']

type_counts = df.groupby(['Type', 'IsSender']).size().reset_index(name='Count')
type_pivot = type_counts.pivot_table(index='Type', columns='IsSender', values='Count', fill_value=0)
type_pivot.columns = ['Received', 'Sent']
type_pivot.reset_index(inplace=True)
print('你的2024聊天总览：')
type_pivot


In [3]:
def contains_chinese(text:str):
    return bool(re.search(r'[\u4e00-\u9fff]', text))

# 清洗文本数据
def clean(text): #返回清洗后的句子
    url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
    if re.match(url_pattern, text): return ''

    text = re.sub(r'[^\w\s]', ' ', text) #英文符号
    if(not contains_chinese(text)): return text
       
    text = text.rstrip()
    text = text.replace('\n', '')
    text = text.replace('·', ' ')
    text = text.replace('...', '…')
    punctuation = r"[\u3000-\u303F\uff00-\uffef]|[!?\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~]" #中文符号
    text = re.sub(punctuation, ' ', text)
    text = re.sub(r'\s+', ' ', text) #多余的空格
    return text

# 分词：
def tokenize(text:str):
    if(not contains_chinese(text)): return [str(text).strip()]
    if(text == None or len(text) < 1): return ''

    with open('stopwords.txt', 'r', encoding='utf-8') as file:
        stopwords = {line.strip() for line in file}

    words = jieba.cut(text)
    clean_words = [word for word in words if word not in stopwords and word.strip() != '']
    return clean_words

In [None]:
pro_df = df[['IsSender', 'Type', 'StrTime', 'StrContent', 'Remark', 'NickName']]
pro_df = pro_df[pro_df['Type'] == '文字'] # 之后只分析文字消息
pro_df.drop(columns=['Type'], inplace=True)


In [5]:
# 生成聊天联系人文件用来手动删掉群聊的联系人
unique_contacts = pro_df[['NickName']].drop_duplicates()
unique_contacts.to_csv('unique_contacts.csv', index=False)
# 不包括在分析的联系人：
groups = ['文件传输助手'] # TODO: 自己加 

In [None]:
# 删掉这些 还有我发少于20条的联系人
pro_df = pro_df[~pro_df['NickName'].isin(groups)]
message_counts = pro_df[pro_df['IsSender'] == 1].groupby('NickName').size()
talkers_to_keep = message_counts[message_counts >= 20].index
pro_df = pro_df[pro_df['NickName'].isin(talkers_to_keep)]
pro_df

In [None]:
# 文本清洗
pro_df['CleanedContent'] = pro_df['StrContent'].apply(clean)
pro_df['TokenizedContent'] = pro_df['CleanedContent'].apply(tokenize)
pro_df.head()

## 总结：

In [None]:
# 一些全部统计：
all_send_messages = pro_df[pro_df['IsSender'] == 1]['StrContent'].tolist()
send_characters = sum(len(message) for message in all_send_messages)

all_received_messages = pro_df[pro_df['IsSender'] == 0]['StrContent'].tolist()
received_characters = sum(len(message) for message in all_received_messages)

print(f'今年你一共发出了{len(all_send_messages)}条文本消息，收到了{len(all_received_messages)}条文本消息。')
print(f'一共发送了{send_characters}个字符，收到了{received_characters}个字符。\n')

# 聊天总数排名：
top_chat = pro_df.groupby('NickName').agg(
    MessageCount=('NickName', 'size'),       # Count the number of messages
    Remark=('Remark', 'first')               # Get the first 'Remark' for each 'NickName'
).reset_index()
top_chat.sort_values(by='MessageCount', ascending=False, inplace=True)
print(f'其中你和{top_chat.iloc[0]["NickName"]}（备注：{top_chat.iloc[0]["Remark"]}）聊得最多，一共有{top_chat.iloc[0]["MessageCount"]}条消息。')
print(f'其次是{top_chat.iloc[1]["NickName"]}（备注：{top_chat.iloc[1]["Remark"]}），一共有{top_chat.iloc[1]["MessageCount"]}条消息。')
print(f'接着是{top_chat.iloc[2]["NickName"]}（备注：{top_chat.iloc[2]["Remark"]}），一共有{top_chat.iloc[2]["MessageCount"]}条消息。\n')


# 发送消息最多的
top_senders = pro_df[pro_df['IsSender'] == 1].groupby('NickName').size().sort_values(ascending=False)
print(f'你发送消息最多的联系人是{top_senders.index[0]}，一共发送了{top_senders[0]}条消息。')
print(f'其次是{top_senders.index[1]}，一共发送了{top_senders[1]}条消息。')
print(f'接着是{top_senders.index[2]}，一共发送了{top_senders[2]}条消息。\n')


# 接收消息最多的
top_receivers = pro_df[pro_df['IsSender'] == 0].groupby('NickName').size().sort_values(ascending=False)
print(f'你接收消息最多的联系人是{top_receivers.index[0]}，一共收到了{top_receivers[0]}条消息。')
print(f'其次是{top_receivers.index[1]}，一共收到了{top_receivers[1]}条消息。')
print(f'接着是{top_receivers.index[2]}，一共收到了{top_receivers[2]}条消息。\n')


In [17]:
# 词频统计
from collections import Counter
from wordcloud import WordCloud
import matplotlib.pyplot as plt


def word_frequency(df):
    wc = WordCloud(font_path='msyh.ttc', background_color='white')
    word_counter = Counter()
    for token_list in pro_df['TokenizedContent']:
        if len(token_list) < 1 or (len(token_list) == 1 and len(token_list[0]) <= 1): continue
        word_counter.update(token_list)
    
    wc.generate_from_frequencies(word_counter)
    plt.figure(figsize=(10, 5))  # Set the figure size
    plt.imshow(wc, interpolation='bilinear')  # Display the word cloud
    plt.axis("off")  # Do not show axes to keep it clean
    plt.show() 

    return word_counter
    

In [None]:
all_word_count = word_frequency(pro_df)
count_xs = all_word_count['想死']
print(f'我在2024说了{count_xs}次想死')
count_bb = all_word_count['拔杯']
print(f'说了{count_bb}次拔杯')

## 按联系人：

In [None]:
contact_dfs = {}
for nickname, group_df in pro_df.groupby('NickName'):
    contact_dfs[nickname] = group_df

contact_dfs["someone's nickname"]

In [None]:
word_frequency(contact_dfs["someone's nickname"])