In [None]:
# =============================================================
# Copyright © 2020 Intel Corporation
# 
# SPDX-License-Identifier: MIT
# =============================================================

# 使用 Intel® Distribution of Modin 和 Intel® Extension for Scikit-learn 加速的端到端的 Census 工作流

这个示例基于1970年到2010年的美国人口普查数据，运行了一个端到端的机器学习工作流。
在ETL阶段使用Intel® Distribution of Modin加速，然后使用Intel® Extension for Scikit-learn加速库训练了一个Ridge Regression算法，预测美国人口收入和受教育程度之间的关系。

首先下载数据集

In [None]:
!wget https://storage.googleapis.com/intel-optimized-tensorflow/datasets/ipums_education2income_1970-2010.csv.gz

导入相关的python模块，忽略 warnings 信息。

In [None]:
import os
import numpy as np
import warnings

warnings.filterwarnings("ignore")

导入 Modin 库，设置 HDK (Heterogeneous Data Kernels) 作为后端计算引擎，该引擎基于OmniSciDB来获得针对特定dataframe操作集的高单节点可扩展性。

In [None]:
# 使用原生pandas
# import pandas as pd

# 使用modin
import modin.pandas as pd

import modin.config as cfg
cfg.StorageFormat.put('hdk')


导入Intel® Extension for Scikit-learn库，调用patch函数，从而在运行时调用底层 Intel® oneAPI Data Analytics Library 对机器学习算法进行加速。

In [None]:
# 以下两行导入Intel® Extension for Scikit-learn库，调用patch函数加速。如果使用原生的scikit-learn, 注释这两行即可
from sklearnex import patch_sklearn
patch_sklearn()

from sklearn import config_context
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
import sklearn.linear_model as lm

加载数据到 dataframe中（如果机器内存有限，请尝试读取部分数据）  
*注：Intel® Distribution of Modin 和 Intel® Extension for Scikit-learn 对数据量比较大的工作负载有较好的加速效果。*

In [None]:
import time
dt_start = time.time()

# 训练该数据集需要大约30G内存，如果内存足够，使用该行代码读取所有数据
df = pd.read_csv('ipums_education2income_1970-2010.csv.gz')

# 如果内存有限，可以只读取部分数据集，如：
# df = pd.read_csv('ipums_education2income_1970-2010.csv.gz', nrows=5000000)

print("read_csv time: ", time.time() - dt_start)

运行ETL操作对数据进行预处理，保留使用ridge regression算法预测收入和受教育程度之间的关联关系所需要的字段，清洗无效数据，根据通货膨胀调整收入，设置目标字段为EDUC

In [None]:
dt_start = time.time()

# 选取算法需要的features
keep_cols = [
    "YEAR", "DATANUM", "SERIAL", "CBSERIAL", "HHWT",
    "CPI99", "GQ", "PERNUM", "SEX", "AGE",
    "INCTOT", "EDUC", "EDUCD", "EDUC_HEAD", "EDUC_POP",
    "EDUC_MOM", "EDUCD_MOM2", "EDUCD_POP2", "INCTOT_MOM", "INCTOT_POP",
    "INCTOT_MOM2", "INCTOT_POP2", "INCTOT_HEAD", "SEX_HEAD",
]
df = df[keep_cols]

# 清洗无效数据
df = df[df["INCTOT"] != 9999999]
df = df[df["EDUC"] != -1]
df = df[df["EDUCD"] != -1]

# 根据通货膨胀调整收入
df["INCTOT"] = df["INCTOT"] * df["CPI99"]

for column in keep_cols:
    df[column] = df[column].fillna(-1)
    df[column] = df[column].astype("float64")

#设置目标列为EDUC，并从features中移除EDUC和CPI99
y = df["EDUC"]
X = df.drop(columns=["EDUC", "CPI99"])

print("ETL time: ", time.time() - dt_start)

将数据集分为训练集和测试集，训练模型并运行预测，循环50次以减少过拟合。

In [None]:
# 创建ridge regression对象
clf = lm.Ridge()

mse_values, cod_values = [], []
N_RUNS = 10
TRAIN_SIZE = 0.9
random_state = 777

X = np.ascontiguousarray(X, dtype=np.float64)
y = np.ascontiguousarray(y, dtype=np.float64)

# 交叉验证
for i in range(N_RUNS):
    
    # 第一次需要warm-up，不计时
    if i == 2:
        dt_start = time.time()
        
    # 将数据集分为训练集和测试集
    X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=TRAIN_SIZE,
                                                        random_state=random_state)
    random_state += 777

    # 训练模型
    with config_context(assume_finite=True):
        model = clf.fit(X_train, y_train)

    # 模型预测
    y_pred = model.predict(X_test)

    # 计算均方误差（mean squared error）和r方分数（r square score）
    mse_values.append(mean_squared_error(y_test, y_pred))
    cod_values.append(r2_score(y_test, y_pred))
    
print("Ridge Regression traing & inference time: ", time.time() - dt_start)

使用均方误差（mean squared error）和r方分数（r square score）来检验回归结果

In [None]:
mean_mse = sum(mse_values)/len(mse_values)
mean_cod = sum(cod_values)/len(cod_values)
mse_dev = pow(sum([(mse_value - mean_mse)**2 for mse_value in mse_values])/(len(mse_values) - 1), 0.5)
cod_dev = pow(sum([(cod_value - mean_cod)**2 for cod_value in cod_values])/(len(cod_values) - 1), 0.5)
print("mean MSE ± deviation: {:.9f} ± {:.9f}".format(mean_mse, mse_dev))
print("mean COD ± deviation: {:.9f} ± {:.9f}".format(mean_cod, cod_dev))

In [None]:
# release resources
%reset -f