In [None]:
import zipfile


def unzip(zip_filepath, dest_path):
    """
        解压zip文件
    """
    with zipfile.ZipFile(zip_filepath) as zf:
        zf.extractall(path=dest_path)


def get_dataset_filename(zip_filepath):
    """
        获取数据库文件名
    """
    with zipfile.ZipFile(zip_filepath) as zf:
        return zf.namelist()[0]


def cal_acc(true_labels, pred_labels):
    """
        计算准确率
    """
    n_total = len(true_labels)
    correct_list = [true_labels[i] == pred_labels[i] for i in range(n_total)]

    acc = sum(correct_list) / n_total
    return acc


In [None]:
from skimage import io
import os
import re
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
import pandas as pd
import math
import numpy as np
from skimage import exposure, img_as_float


# 头像图片保存路径
profile_image_path = './pro_img/'


def inspect_dataset(df_data):
    """pytoho
        查看加载的数据基本信息
    """
    print('数据集基本信息：')
    print(df_data.info())
    print('数据集有{}行，{}列'.format(df_data.shape[0], df_data.shape[1]))
    print('数据预览:')
    print(df_data.head())


def check_profile_image(img_link):
    """
        判断头像图片链接是否有效
        如果有效，下载到本地，并且返回保存路径
    """
    save_image_path = ''
    # 有效的图片扩展名
    valid_img_ext_lst = ['.jpeg', '.png', '.jpg']

    try:
        img_data = io.imread(img_link)#读取图像
        image_name = img_link.rsplit('/')[-1]# rsplit() 方法通过指定分隔符对字符串进行分割并返回一个列表
        if any(valid_img_ext in image_name.lower() for valid_img_ext in valid_img_ext_lst):
            # 确保图片文件包含有效的扩展名
            save_image_path = os.path.join(profile_image_path, image_name)
            io.imsave(save_image_path, img_data)
    except:
        print('头像链接 {} 无效'.format(img_link))
        #{} {}".format("hello", "world")   不设置指定位置，按默认顺序 'hello world'


    return save_image_path


def clean_text(text):
    """
        清洗文本数据
    """
    # just in case
    text = text.lower()

    # 去除特殊字符
    text = re.sub('\s\W', ' ', text)
    text = re.sub('\W\s', ' ', text)
    text = re.sub('\s+', ' ', text)

    return text


def split_train_test(df_data, size=0.8):
    """
        分割训练集和测试集
    """
    # 为保证每个类中的数据能在训练集中和测试集中的比例相同，所以需要依次对每个类进行处理
    df_train = pd.DataFrame()
    df_test = pd.DataFrame()

    labels = [0, 1]
    for label in labels:
        # 找出gender的记录
        text_df_w_label = df_data[df_data['label'] == label]
        # 重新设置索引，保证每个类的记录是从0开始索引，方便之后的拆分
        text_df_w_label = text_df_w_label.reset_index()

        # 默认按80%训练集，20%测试集分割
        # 这里为了简化操作，取前80%放到训练集中，后20%放到测试集中
        # 当然也可以随机拆分80%，20%（尝试实现下DataFrame中的随机拆分）

        # 该类数据的行数
        n_lines = text_df_w_label.shape[0]
        split_line_no = math.floor(n_lines * size)
        text_df_w_label_train = text_df_w_label.iloc[:split_line_no, :]
        text_df_w_label_test = text_df_w_label.iloc[split_line_no:, :]

        # 放入整体训练集，测试集中
        df_train = df_train.append(text_df_w_label_train)
        df_test = df_test.append(text_df_w_label_test)

    df_train = df_train.reset_index()
    df_test = df_test.reset_index()
    return df_train, df_test


def get_word_list_from_data(text_s):
    """
        将数据集中的单词放入到一个列表中
    """
    word_list = []
    for _, text in text_s.iteritems():
        word_list += text.split(' ')
    return word_list


def proc_text(text):
    """
        分词+去除停用词
    """
    tokenizer = RegexpTokenizer(r'\w+')
    words = tokenizer.tokenize(text)
    filtered_words = [word for word in words if word not in stopwords.words('english')]
    return " ".join(filtered_words)


def extract_tf_idf(text_s, text_collection, common_words_freqs):
    """
        提取tf-idf特征
    """
    # 这里只选择TF-IDF特征作为例子
    # 可考虑使用词频或其他文本特征作为额外的特征

    n_sample = text_s.shape[0]
    n_feat = len(common_words_freqs)

    common_words = [word for word, _ in common_words_freqs]

    # 初始化
    X = np.zeros([n_sample, n_feat])

    print('提取tf-idf特征...')
    for i, text in text_s.iteritems():
        feat_vec = []
        for word in common_words:
            if word in text:
                # 如果在高频词中，计算TF-IDF值
                tf_idf_val = text_collection.tf_idf(word, text)
            else:
                tf_idf_val = 0

            feat_vec.append(tf_idf_val)

        # 赋值
        X[i, :] = np.array(feat_vec)

    return X


def hex_to_rgb(value):
    """
        十六进制颜色码转换为RGB值
    """
    rgb_list = list(int(value[i:i + 2], 16) for i in range(0, 6, 2))
    return rgb_list


def extract_rgb_feat(hex_color_s):
    """
         从十六进制颜色码中提取RGB值作为特征
    """
    n_sample = hex_color_s.shape[0]
    n_feat = 3

    # 初始化
    X = np.zeros([n_sample, n_feat])

    print('提取RGB特征...')
    for i, hex_val in hex_color_s.iteritems():
        feat_vec = hex_to_rgb(hex_val)

        # 赋值
        X[i, :] = np.array(feat_vec)

    return X


def extract_rgb_hist_feat(img_path_s):
    """
        从图像中提取RGB直方图特征
    """
    n_sample = img_path_s.shape[0]
    n_bins = 100    # 每个通道bin的个数
    n_feat = n_bins * 3

    # 初始化
    X = np.zeros([n_sample, n_feat])

    print('提取RGB直方图特征...')
    for i, img_path in img_path_s.iteritems():
        # 加载图像
        img_data = io.imread(img_path)
        img_data = img_as_float(img_data)

        if img_data.ndim == 3:
            # 3个通道
            hist_r, _ = exposure.histogram(img_data[:, :, 0], nbins=n_bins)
            hist_g, _ = exposure.histogram(img_data[:, :, 1], nbins=n_bins)
            hist_b, _ = exposure.histogram(img_data[:, :, 2], nbins=n_bins)
        else:
            # 2个通道
            hist, _ = exposure.histogram(img_data, nbins=n_bins)
            hist_r = hist.copy()
            hist_g = hist.copy()
            hist_b = hist.copy()

        feat_vec = np.concatenate((hist_r, hist_b, hist_g))

        # 赋值
        X[i, :] = np.array(feat_vec)

    return X


In [None]:
import os
import pandas as pd
from common_tools import get_dataset_filename, unzip, cal_acc
from pd_tools import inspect_dataset, check_profile_image, \
    split_train_test, clean_text, proc_text, get_word_list_from_data, \
    extract_tf_idf, extract_rgb_feat, extract_rgb_hist_feat
import nltk
from nltk.text import TextCollection
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.decomposition import PCA


# 声明数据集路径
dataset_path = './dataset'  # 数据集路径
zip_filename = 'twitter-user-gender-classification.zip'  # zip文件名
zip_filepath = os.path.join(dataset_path, zip_filename)  # zip文件路径
cln_datapath = './cln_data.csv'     # 清洗好的数据路径

# 是否第一次运行
is_first_run = True


def run_main():
    """
        主函数
    """
    # 声明变量
    dataset_filename = get_dataset_filename(zip_filepath)  # 数据集文件名（在zip中）
    dataset_filepath = os.path.join(dataset_path, dataset_filename)  # 数据集文件路径

    if is_first_run:

        print('解压zip...', end='')
        unzip(zip_filepath, dataset_path)
        print('完成.')

        # 读取数据
        data = pd.read_csv(dataset_filepath, encoding='latin1',
                           usecols=['gender', 'description', 'link_color',
                                    'profileimage', 'sidebar_color', 'text'])
        # 1. 查看加载的数据集
        inspect_dataset(data)

        # 2. 数据清洗
        # 2.1. 根据 'gender' 列过滤数据
        filtered_data = data[(data['gender'] == 'male') | (data['gender'] == 'female')]

        # 2.2 过滤掉 'description' 列为空的数据
        filtered_data = filtered_data.dropna(subset=['description'])

        # 2.3 过滤掉 'link_color' 列和 'sidebar_color' 列非法的16进制数据
        filtered_data = filtered_data[filtered_data['link_color'].str.len() == 6]
        filtered_data = filtered_data[filtered_data['sidebar_color'].str.len() == 6]

        # 2.4 清洗文本数据
        print('清洗文本数据...')
        cln_desc = filtered_data['description'].apply(clean_text)
        cln_text = filtered_data['text'].apply(clean_text)
        filtered_data['cln_desc'] = cln_desc
        filtered_data['cln_text'] = cln_text

        # 2.5 根据profileimage的链接判断头像图片是否有效，
        # 并生成新的列代表头像图片保存的路径
        print('下载头像数据...')
        saved_img_s = filtered_data['profileimage'].apply(check_profile_image)
        filtered_data['saved_image'] = saved_img_s
        # 过滤掉无效的头像数据
        filtered_data = filtered_data[filtered_data['saved_image'] != '']

        # 保存处理好的数据
        filtered_data.to_csv(cln_datapath, index=False)

    # 读取处理好的数据
    clean_data = pd.read_csv(cln_datapath, encoding='latin1',
                             usecols=['gender', 'cln_desc', 'cln_text',
                                      'link_color', 'sidebar_color', 'saved_image'])

    # 查看label的分布
    print(clean_data.groupby('gender').size())

    # 替换male->0, female->1
    clean_data.loc[clean_data['gender'] == 'male', 'label'] = 0
    clean_data.loc[clean_data['gender'] == 'female', 'label'] = 1

    # 3. 分割数据集
    # 分词 去除停用词
    proc_desc_s = clean_data['cln_desc'].apply(proc_text)
    clean_data['desc_words'] = proc_desc_s

    proc_text_s = clean_data['cln_text'].apply(proc_text)
    clean_data['text_words'] = proc_text_s

    df_train, df_test = split_train_test(clean_data)
    # 查看训练集测试集基本信息
    print('训练集中各类的数据个数：', df_train.groupby('label').size())
    print('测试集中各类的数据个数：', df_test.groupby('label').size())

    # 4. 特征工程
    # 4.1 训练数据特征提取
    print('训练样本特征提取：')
    # 4.1.1 文本数据
    # description数据
    print('统计description词频...')
    n_desc_common_words = 50
    desc_words_in_train = get_word_list_from_data(df_train['desc_words'])
    fdisk = nltk.FreqDist(desc_words_in_train)
    desc_common_words_freqs = fdisk.most_common(n_desc_common_words)
    print('descriptino中出现最多的{}个词是：'.format(n_desc_common_words))
    for word, count in desc_common_words_freqs:
        print('{}: {}次'.format(word, count))
    print()

    # 提取desc文本的TF-IDF特征
    print('提取desc文本特征...', end=' ')
    desc_collection = TextCollection(df_train['desc_words'].values.tolist())
    tr_desc_feat = extract_tf_idf(df_train['desc_words'], desc_collection, desc_common_words_freqs)
    print('完成')
    print()

    # text数据
    print('统计text词频...')
    n_text_common_words = 50
    text_words_in_train = get_word_list_from_data(df_train['text_words'])
    fdisk = nltk.FreqDist(text_words_in_train)
    text_common_words_freqs = fdisk.most_common(n_text_common_words)
    print('text中出现最多的{}个词是：'.format(n_text_common_words))
    for word, count in text_common_words_freqs:
        print('{}: {}次'.format(word, count))
    print()

    # 提取text文本TF-IDF特征
    text_collection = TextCollection(df_train['text_words'].values.tolist())
    print('提取text文本特征...', end=' ')
    tr_text_feat = extract_tf_idf(df_train['text_words'], text_collection, text_common_words_freqs)
    print('完成')
    print()

    # 4.1.2 图像数据
    # link color的RGB特征
    tr_link_color_feat_ = extract_rgb_feat(df_train['link_color'])
    tr_sidebar_color_feat = extract_rgb_feat(df_train['sidebar_color'])

    # 头像的RGB直方图特征
    tr_profile_img_hist_feat = extract_rgb_hist_feat(df_train['saved_image'])

    # 组合文本特征和图像特征
    tr_feat = np.hstack((tr_desc_feat, tr_text_feat, tr_link_color_feat_,
                         tr_sidebar_color_feat, tr_profile_img_hist_feat))

    # 特征范围归一化
    scaler = StandardScaler()
    tr_feat_scaled = scaler.fit_transform(tr_feat)

    # 获取训练集标签
    tr_labels = df_train['label'].values

    # 4.2 测试数据特征提取
    print('测试样本特征提取：')
    # 4.2.1 文本数据
    # description数据
    # 提取desc文本的TF-IDF特征
    print('提取desc文本特征...', end=' ')
    te_desc_feat = extract_tf_idf(df_test['desc_words'], desc_collection, desc_common_words_freqs)
    print('完成')
    print()

    # text数据
    # 提取text文本TF-IDF特征
    print('提取text文本特征...', end=' ')
    te_text_feat = extract_tf_idf(df_test['text_words'], text_collection, text_common_words_freqs)
    print('完成')
    print()

    # 4.2.2 图像数据
    # link color的RGB特征
    te_link_color_feat_ = extract_rgb_feat(df_test['link_color'])
    te_sidebar_color_feat = extract_rgb_feat(df_test['sidebar_color'])

    # 头像的RGB直方图特征
    te_profile_img_hist_feat = extract_rgb_hist_feat(df_test['saved_image'])

    # 组合文本特征和图像特征
    te_feat = np.hstack((te_desc_feat, te_text_feat, te_link_color_feat_,
                         te_sidebar_color_feat, te_profile_img_hist_feat))

    # 特征范围归一化
    te_feat_scaled = scaler.transform(te_feat)

    # 获取训练集标签
    te_labels = df_test['label'].values

    # 4.3 PCA降维操作
    pca = PCA(n_components=0.95)  # 保留95%累计贡献率的特征向量
    tr_feat_scaled_pca = pca.fit_transform(tr_feat_scaled)
    te_feat_scaled_pca = pca.transform(te_feat_scaled)

    # 5. 模型建立训练，对比PCA操作前后的效果
    # 使用未进行PCA操作的特征
    lr_model = LogisticRegression()
    lr_model.fit(tr_feat_scaled, tr_labels)

    # 使用PCA操作后的特征
    lr_pca_model = LogisticRegression()
    lr_pca_model.fit(tr_feat_scaled_pca, tr_labels)

    # 6. 模型测试
    pred_labels = lr_model.predict(te_feat_scaled)
    pred_pca_labels = lr_pca_model.predict(te_feat_scaled_pca)
    # 准确率
    print('未进行PCA操作:')
    print('样本维度：', tr_feat_scaled.shape[1])
    print('准确率：{}'.format(cal_acc(te_labels, pred_labels)))

    print()
    print('进行PCA操作后:')
    print('样本维度：', tr_feat_scaled_pca.shape[1])
    print('准确率：{}'.format(cal_acc(te_labels, pred_pca_labels)))

    # 7. 删除解压数据，清理空间
    if os.path.exists(dataset_filepath):
        os.remove(dataset_filepath)


if __name__ == '__main__':
    run_main()
