# Avazu Click-Through Rate Prediction
Kaggle web site: https://www.kaggle.com/c/avazu-ctr-prediction

In [1]:
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import GradientBoostedTrees
from time import time

In [2]:
# 初始化Spark
sc = SparkContext()
# 读入数据
ctr_rdd = sc.textFile("train")
# 去除第一行
header = ctr_rdd.first()
all_rdd = ctr_rdd.filter(lambda x:x!=header)
# 数据个数
all_rdd.count()

40428967

In [3]:
# 采样减少以便单机可计算运行
sample_rdd = all_rdd.sample(False, 0.001, 123)
sample_rdd.count()

40600

In [4]:
# 划分训练集（80%比例）和测试集（20%）
train_test_rdd = sample_rdd.randomSplit((0.8, 0.2))
train_rdd = train_test_rdd[0]
test_rdd = train_test_rdd[1]
train_rdd.count()

32517

In [5]:
# 数据形式展示
train_rdd.first()

'10445426567380108283,0,14102100,1002,0,2c4ed2f7,c4e18dd6,50e219e0,ecad2386,7801e8d9,07d7df22,6598e836,32ff2d84,44b1eaab,0,0,19665,320,50,2253,2,303,-1,52'

In [6]:
# 数据预处理，生成（Key，V_C, V_N）格式
def data_prepare(line):
    tokens = line.split(",")
    key = tokens[0] + "::" + tokens[1]
    vc = tokens[5:14]
    vn = tokens[15:]
    return (key, vc, vn)

train_kv = train_rdd.map(data_prepare)
test_kv = test_rdd.map(data_prepare)
train_kv.first()

('10445426567380108283::0',
 ['2c4ed2f7',
  'c4e18dd6',
  '50e219e0',
  'ecad2386',
  '7801e8d9',
  '07d7df22',
  '6598e836',
  '32ff2d84',
  '44b1eaab'],
 ['0', '19665', '320', '50', '2253', '2', '303', '-1', '52'])

In [7]:
# 处理分类数据
def parseCatFeatures(x):
    featureList = []
    for i in range(len(x[1])):
        featureList.append((i, x[1][i]))
    return featureList

train_cat = train_kv.map(parseCatFeatures)
test_cat = test_kv.map(parseCatFeatures)
train_cat.first()

[(0, '2c4ed2f7'),
 (1, 'c4e18dd6'),
 (2, '50e219e0'),
 (3, 'ecad2386'),
 (4, '7801e8d9'),
 (5, '07d7df22'),
 (6, '6598e836'),
 (7, '32ff2d84'),
 (8, '44b1eaab')]

In [8]:
# 分类特征做one hot encoding
oneMap = train_cat.flatMap(lambda x:x).distinct().zipWithIndex().collectAsMap()
len(oneMap)

38148

In [9]:
# 输出Spark LabeledPoint特征
def featureProcess(x):
    cat_feature_indexed = []
    for i in range(len(x[1])):
        cat_feature_indexed.append((i, x[1][i]))
    cat_feature_ohe = []
    for k in cat_feature_indexed:
        if k in oneMap:
            cat_feature_ohe.append(oneMap.get(k))
        else:
            cat_feature_ohe.append(0)
    num_feature = [float(y) for y in x[2]]
    return LabeledPoint(int(x[0].split("::")[1]), cat_feature_ohe + num_feature)

train = train_kv.map(featureProcess)
test = test_kv.map(featureProcess)
train.first()

LabeledPoint(0.0, [15561.0,26740.0,2210.0,8651.0,31448.0,3437.0,17168.0,27401.0,25083.0,0.0,19665.0,320.0,50.0,2253.0,2.0,303.0,-1.0,52.0])

In [10]:
# 使用GBDT方法进行训练
model = GradientBoostedTrees.trainClassifier(train, {})
model.numTrees()

100

In [11]:
# 预测实验（由于做了简化，此处不评估预测结果）
temp = test.first()
pred = model.predict(temp.features)
print(temp.label, pred)

0.0 0.0
