# 环境准备

In [1]:
# set env 
from pyalink.alink import *
import sys, os
resetEnv()
useLocalEnv(2)


Use one of the following command to start using pyalink:
使用以下一条命令来开始使用 pyalink：
 - useLocalEnv(parallelism, flinkHome=None, config=None)
 - useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", config=None)
Call resetEnv() to reset environment and switch to another.
使用 resetEnv() 来重置运行环境，并切换到另一个。

JVM listening on 127.0.0.1:51134


JavaObject id=o6

# 数据准备

In [2]:
# schema of train data
schemaStr = "id string, click string, dt string, C1 string, banner_pos int, site_id string, \
            site_domain string, site_category string, app_id string, app_domain string, \
            app_category string, device_id string, device_ip string, device_model string, \
            device_type string, device_conn_type string, C14 int, C15 int, C16 int, C17 int, \
            C18 int, C19 int, C20 int, C21 int"

# prepare batch train data
batchTrainDataFn = "http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-small.csv"
trainBatchData = CsvSourceBatchOp().setFilePath(batchTrainDataFn) \
        .setSchemaStr(schemaStr) \
        .setIgnoreFirstLine(True);
# feature fit
labelColName = "click"
vecColName = "vec"
numHashFeatures = 30000
selectedColNames =["C1","banner_pos","site_category","app_domain",
                  "app_category","device_type","device_conn_type", 
                  "C14","C15","C16","C17","C18","C19","C20","C21",
                   "site_id","site_domain","device_id","device_model"]

categoryColNames = ["C1","banner_pos","site_category","app_domain", 
                    "app_category","device_type","device_conn_type",
                    "site_id","site_domain","device_id","device_model"]

numericalColNames = ["C14","C15","C16","C17","C18","C19","C20","C21"]

# prepare stream train data
wholeDataFile = "http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-ctr-train-8M.csv"
data = CsvSourceStreamOp() \
        .setFilePath(wholeDataFile) \
        .setSchemaStr(schemaStr) \
        .setIgnoreFirstLine(True);

# split stream to train and eval data
spliter = SplitStreamOp().setFraction(0.5).linkFrom(data)
train_stream_data = spliter
test_stream_data = spliter.getSideOutput(0)

# 在线学习五步骤
<ul>
    <li>步骤一、特征工程</li>
    <li>步骤二、批式模型训练</li>
    <li>步骤三、在线模型训练（FTRL）</li>
    <li>步骤四、在线预测</li>
    <li>步骤五、在线评估</li>
</ul>

# 步骤一、特征工程

In [3]:
# setup feature enginerring pipeline
feature_pipeline = Pipeline() \
        .add(StandardScaler() \
                .setSelectedCols(numericalColNames)) \
        .add(FeatureHasher() \
                .setSelectedCols(selectedColNames) \
                .setCategoricalCols(categoryColNames) \
                .setOutputCol(vecColName) \
                .setNumFeatures(numHashFeatures))

# fit and save feature pipeline model
FEATURE_PIPELINE_MODEL_FILE = os.path.join(os.getcwd(), "feature_pipe_model.csv")
feature_pipeline.fit(trainBatchData).save(FEATURE_PIPELINE_MODEL_FILE);

BatchOperator.execute();

# load pipeline model
feature_pipelineModel = PipelineModel.load(FEATURE_PIPELINE_MODEL_FILE);


# 步骤二、批式模型训练

In [4]:
# train initial batch model
lr = LogisticRegressionTrainBatchOp()
initModel = lr.setVectorCol(vecColName) \
        .setLabelCol(labelColName) \
        .setWithIntercept(True) \
        .setMaxIter(10) \
        .linkFrom(feature_pipelineModel.transform(trainBatchData)) 

### 在线模型训练（FTRL）

In [5]:
# ftrl train 
model = FtrlTrainStreamOp(initModel) \
        .setVectorCol(vecColName) \
        .setLabelCol(labelColName) \
        .setWithIntercept(True) \
        .setAlpha(0.1) \
        .setBeta(0.1) \
        .setL1(0.01) \
        .setL2(0.01) \
        .setTimeInterval(10) \
        .setVectorSize(numHashFeatures) \
        .linkFrom(feature_pipelineModel.transform(train_stream_data))

### 在线预测

In [6]:
# ftrl predict
predResult = FtrlPredictStreamOp(initModel) \
        .setVectorCol(vecColName) \
        .setPredictionCol("pred") \
        .setReservedCols([labelColName]) \
        .setPredictionDetailCol("details") \
        .linkFrom(model, feature_pipelineModel.transform(test_stream_data))

predResult.print(key="predResult", refreshInterval = 30, maxLimit=20)

'DataStream predResult: (Updated on 2019-12-05 15:03:33)'

Unnamed: 0,click,pred,details
0,0,0,"{""0"":""0.9046159047711626"",""1"":""0.0953840952288..."
1,1,0,"{""0"":""0.7301554114492774"",""1"":""0.2698445885507..."
2,0,0,"{""0"":""0.9354702479573089"",""1"":""0.0645297520426..."
3,1,0,"{""0"":""0.7472443769874088"",""1"":""0.2527556230125..."
4,0,0,"{""0"":""0.7313933609276811"",""1"":""0.2686066390723..."
5,0,0,"{""0"":""0.7579078017993002"",""1"":""0.2420921982006..."
6,0,0,"{""0"":""0.9658883764493819"",""1"":""0.0341116235506..."
7,0,0,"{""0"":""0.8916428187684737"",""1"":""0.1083571812315..."
8,0,0,"{""0"":""0.964470362868512"",""1"":""0.03552963713148..."
9,0,0,"{""0"":""0.7879843998010425"",""1"":""0.2120156001989..."


### 在线评估

In [None]:
# ftrl eval
EvalBinaryClassStreamOp() \
        .setLabelCol(labelColName) \
        .setPredictionCol("pred") \
        .setPredictionDetailCol("details") \
        .setTimeInterval(10) \
        .linkFrom(predResult) \
        .link(JsonValueStreamOp() \
                .setSelectedCol("Data") \
                .setReservedCols(["Statistics"]) \
                .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix"]) \
                .setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix"])) \
                .print(key="evaluation", refreshInterval = 30, maxLimit=20)
StreamOperator.execute();

'DataStream evaluation: (Updated on 2019-12-05 15:03:31)'

Unnamed: 0,Statistics,Accuracy,AUC,ConfusionMatrix
0,all,0.8286096670786908,0.7182165258211499,"[[5535,5007],[112297,561587]]"
1,window,0.8464953470502861,0.7283501551891348,"[[485,456],[8534,49090]]"
2,all,0.830019475336848,0.7191075542108774,"[[6020,5463],[120831,610677]]"
3,window,0.8455799884444143,0.7227709897015594,"[[512,416],[8671,49247]]"
4,all,0.8311614455307001,0.719465721678977,"[[6532,5879],[129502,659924]]"
5,window,0.8444954128440367,0.7259189182276968,"[[545,455],[8698,49162]]"
6,all,0.8320733080282608,0.7199603254520217,"[[7077,6334],[138200,709086]]"
