In [4]:
from pyspark.context import SparkContext 
from pyspark.sql.types import Row
from pyspark.sql import SQLContext as sqlCtx
from pyspark.sql.window import Window
from pyspark.sql.functions import *
sc = SparkContext.getOrCreate()

In [6]:
# Part 1: Group Duplicates To Reduce Data Size
root = "./track2/"
trainPath = root + "training.txt"
trainRDD = sc.textFile(trainPath,2)
title = ["Click", "Impression", "DisplayURl", "AdID", "AdvertiseId", "Depth", "Position", "QueryID", "KeywordID", "TitleID", "DescriptionID", "UserID"]
train = trainRDD.map(lambda x : x.split("\t"))
trainDF = train.toDF(title)
trainEX = trainDF[["Click","Impression","AdID", "QueryID", "Depth", "Position", "UserID"]]
trainU = (trainEX.withColumn("Click",trainEX["Click"].cast("double"))
            .withColumn("Impression",trainEX["Impression"].cast("double"))
            .withColumn("AdID",trainEX["AdID"].cast("double"))
            .withColumn("QueryID",trainEX["QueryID"].cast("double"))
            .withColumn("Depth",trainEX["Depth"].cast("double"))
            .withColumn("Position",trainEX["Position"].cast("double"))
            )
trainDF = trainU.groupby("AdID", "QueryID", "Depth", "Position","UserID").agg({"Click": "sum", "Impression": "sum"})

In [None]:
# Part 2: Non-big-data top 25,000 Frequent From Trainning Data
# After ranking the frenquency, the first group has more than 25,000 instances.
# Here I'm taking the 250 instances from the first 100 groups to make 25,000 instances.
trainDFf = (trainDF.withColumn("Click",trainDF["sum(Click)"].cast("double"))
            .withColumn("Impression",trainDF["sum(Impression)"].cast("double"))
            .withColumn("AdID",trainDF["AdID"].cast("double"))
            .withColumn("QueryID",trainDF["QueryID"].cast("double"))
            .withColumn("Depth",trainDF["Depth"].cast("double"))
            .withColumn("Position",trainDF["Position"].cast("double"))
          .withColumn("UserID",trainDF["UserID"].cast("double"))
            )
trainFQ = trainDFf.groupBy(["AdID", "QueryID"]).count()
trainCT = trainDFf.join(trainFQ, ["AdID", "QueryID"], "outer")
trainORD = trainCT.sort("count", ascending = False)
trainRK = trainORD.withColumn("rank", denseRK().over(Window.partitionBy("count").orderBy(desc("UserID"))))
trainF = trainRK.filter(col("rank") <= 250)
trainDFF = trainF.limit(25000)
trainF.limit(25000).write.csv("train_df_f.csv")

In [53]:
# Part 3: CTR
pdNOR = trainDF.groupby("Depth", "Position").agg({"Click": "sum", "Impression": "sum"})
pd = pdNOR.withColumn("pCTR", pdNOR["sum(Click)"] / pdNOR["sum(Impression)"])
positionNM = pd["Depth", "Position", "pCTR"]
positionNM = (positionNM.withColumn("Depth", positionNM["Depth"].cast("double"))
              .withColumn("Position", positionNM["Position"].cast("double"))
              .withColumn("pCTR", positionNM["pCTR"].cast("double")))
positionNM.show()

+-----+--------+-----------+
|Depth|Position|       pCTR|
+-----+--------+-----------+
|  2.0|     1.0|0.053358235|
|  1.0|     1.0|0.039917249|
|  2.0|     2.0|0.026754174|
|  3.0|     1.0|0.034008055|
|  3.0|     3.0| 0.01078655|
|  3.0|     2.0|0.016526995|
+-----+--------+-----------+



In [55]:
tRDD = sc.textFile("./trainDFF.csv").map(lambda x : x.split(","))
tRDD = get trainDF
trainDFF = tRDD.toDF(["AdID", "QueryID", "Depth", "Position", "UserID", "Impression", "Click"])\
[["AdID", "QueryID", "Depth", "Position", "UserID", "Impression", "Click"]]
trainDFF = (trainDFF.withColumn("Click", trainDFF["Click"].cast("double"))
            .withColumn("Impression", trainDFF["Impression"].cast("double"))
            .withColumn("AdID", trainDFF["AdID"].cast("double"))
            .withColumn("QueryID", trainDFF["QueryID"].cast("double"))
            .withColumn("Depth", trainDFF["Depth"].cast("double"))
            .withColumn("Position", trainDFF["Position"].cast("double"))
            .withColumn("UserID", trainDFF["UserID"].cast("double")))
trainDFF.show(20)

+-----------+-------+-----+--------+-----------+----------+-----+
|       AdID|QueryID|Depth|Position|     UserID|Impression|Click|
+-----------+-------+-----+--------+-----------+----------+-----+
|2.1522776E7|    0.0|  1.0|     1.0|2.3907341E7|       1.0|  0.0|
|2.1522776E7|    0.0|  1.0|     1.0|2.3907195E7|       1.0|  0.0|
|2.1522776E7|    0.0|  1.0|     1.0|2.3907049E7|       1.0|  0.0|
|2.1522776E7|    0.0|  1.0|     1.0|2.3906018E7|       1.0|  0.0|
|2.1522776E7|    0.0|  1.0|     1.0|2.3905449E7|       1.0|  0.0|
|2.1522776E7|    0.0|  1.0|     1.0|2.3904198E7|       1.0|  0.0|
|2.1522776E7|    0.0|  1.0|     1.0|2.3903189E7|       1.0|  0.0|
|2.1522776E7|    0.0|  1.0|     1.0|2.3902876E7|       1.0|  0.0|
|2.1522776E7|    0.0|  1.0|     1.0|2.3902259E7|       1.0|  0.0|
|2.1522776E7|    0.0|  1.0|     1.0|2.3901976E7|       1.0|  0.0|
|2.1522776E7|    0.0|  1.0|     1.0|2.3900019E7|       1.0|  0.0|
|2.1522776E7|    0.0|  1.0|     1.0|2.3898575E7|       1.0|  0.0|
|2.1522776

In [45]:
trainDFF.count()
pdNOR = trainDF.groupby("Depth", "Position").agg({"Click": "sum", "Impression": "sum"})
pd = pdNOR.withColumn("pCTR",pdNOR["sum(Click)"] / pdNOR["sum(Impression)"])
positionNM = pd[("Depth", "Position", "pCTR")]
trainDFPRENOR = trainDFF.join(positionNM, ["Depth", "Position"],"outer")

25000

In [57]:
trainDFPRENOR.show(10)

+-----+--------+-----------+-------+-----------+----------+-----+-----------+
|Depth|Position|       AdID|QueryID|     UserID|Impression|Click|       pCTR|
+-----+--------+-----------+-------+-----------+----------+-----+-----------+
|  1.0|     1.0|2.1522776E7|    0.0|2.3907341E7|       1.0|  0.0|0.039917249|
|  1.0|     1.0|2.1522776E7|    0.0|2.3907195E7|       1.0|  0.0|0.039917249|
|  1.0|     1.0|2.1522776E7|    0.0|2.3907049E7|       1.0|  0.0|0.039917249|
|  1.0|     1.0|2.1522776E7|    0.0|2.3906018E7|       1.0|  0.0|0.039917249|
|  1.0|     1.0|2.1522776E7|    0.0|2.3905449E7|       1.0|  0.0|0.039917249|
|  1.0|     1.0|2.1522776E7|    0.0|2.3904198E7|       1.0|  0.0|0.039917249|
|  1.0|     1.0|2.1522776E7|    0.0|2.3903189E7|       1.0|  0.0|0.039917249|
|  1.0|     1.0|2.1522776E7|    0.0|2.3902876E7|       1.0|  0.0|0.039917249|
|  1.0|     1.0|2.1522776E7|    0.0|2.3902259E7|       1.0|  0.0|0.039917249|
|  1.0|     1.0|2.1522776E7|    0.0|2.3901976E7|       1.0|  0.0

In [61]:
trainDFPRENOR
trainDF = trainDFPRENOR.withColumn("normedCTR", trainDFPRENOR["Click"] / (trainDFPRENOR["Impression"] * trainDFPRENOR["pCTR"]))
trainDF.show(10)

+-----+--------+-----------+-------+-----------+----------+-----+-----------+----------+
|Depth|Position|       AdID|QueryID|     UserID|Impression|Click|       pCTR|normed_CTR|
+-----+--------+-----------+-------+-----------+----------+-----+-----------+----------+
|  1.0|     1.0|2.1522776E7|    0.0|2.3907341E7|       1.0|  0.0|0.039917249|       0.0|
|  1.0|     1.0|2.1522776E7|    0.0|2.3907195E7|       1.0|  0.0|0.039917249|       0.0|
|  1.0|     1.0|2.1522776E7|    0.0|2.3907049E7|       1.0|  0.0|0.039917249|       0.0|
|  1.0|     1.0|2.1522776E7|    0.0|2.3906018E7|       1.0|  0.0|0.039917249|       0.0|
|  1.0|     1.0|2.1522776E7|    0.0|2.3905449E7|       1.0|  0.0|0.039917249|       0.0|
|  1.0|     1.0|2.1522776E7|    0.0|2.3904198E7|       1.0|  0.0|0.039917249|       0.0|
|  1.0|     1.0|2.1522776E7|    0.0|2.3903189E7|       1.0|  0.0|0.039917249|       0.0|
|  1.0|     1.0|2.1522776E7|    0.0|2.3902876E7|       1.0|  0.0|0.039917249|       0.0|
|  1.0|     1.0|2.152

In [62]:
cc = trainDF.filter(trainDF["normedCTR"] != 0)

In [67]:
# Part 5 Features
userRDD = sc.textFile(root + "userid_profile.txt").map(lambda x : x.split("\t"))
userDF = userRDD.toDF(["UserID", "Gender", "Age"])
userDF = (userDF.withColumn("UserID",userDF["UserID"].cast("double"))
          .withColumn("Gender",userDF["Gender"].cast("double"))
          .withColumn("Age",userDF["Age"].cast("double")))
trainP5 = trainDF.join(userDF,['UserID'], "outer")
trainF = trainP5.filter(trainP5["Position"] > 0)
trainData = trainF.drop("pCTR").limit(25000)
trainData.write.csv("train_data_1.csv")

In [97]:
# Test Data
testRRD = sc.textFile(root + "test.txt").map(lambda x:x.split("\t"))
title = ["Click","Impression", "DisplayURl", "AdID", "AdvertiseId", "Depth", "Position", "QueryID", "KeywordID","TitleID","DescriptionID", "UserID"]
testDF = testRRD.toDF(title[2:])
testDF = testDF.drop("DisplayURl", "AdvertiseId","KeywordID","TitleID","DescriptionID")
testDF = (testDF.withColumn("AdID",testDF["AdID"].cast("double"))
          .withColumn("QueryID",testDF["QueryID"].cast("double"))
          .withColumn("Depth",testDF["Depth"].cast("double"))
          .withColumn("Position",testDF["Position"].cast("double"))
          .withColumn("UserID",testDF["UserID"].cast("double")))
testU = testDF.join(userDF,["UserID"], "outer")
test = testU.filter(testU["Position"] > 0)
test.write.csv("test_data.csv")