In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
# Tạo một phiên Spark mới
spark = SparkSession.builder.appName("CTR Prediction").getOrCreate()

# Đọc dữ liệu từ tập tin CSV và tạo một DataFrame
train = spark.read.format("csv").option("header", "true").load("train.csv")

test= spark.read.format("csv").option("header", "true").load("test.csv")

In [3]:
from pyspark.sql.functions import floor,col

In [4]:
train = train.withColumn("day", floor(col("hour") / 1000000))
train = train.withColumn("dow", floor((col("hour") / 1000000) % 7))
train = train.withColumn("time", (col("hour") % 10000) / 100)

test = test.withColumn("day", floor(col("hour") / 1000000))
test = test.withColumn("dow", floor((col("hour") / 1000000) % 7))
test = test.withColumn("time", (col("hour") % 10000) / 100)

In [6]:
from pyspark.sql.functions import stddev, mean, col

for col_name in ['C18', 'C20', 'C21']:
    # calculate mean and standard deviation
    col_mean = train.select(mean(col(col_name))).collect()[0][0]
    col_stddev = train.select(stddev(col(col_name))).collect()[0][0]
    
    # keep only values within 3 standard deviations from the mean
    train = train.filter((col(col_name) >= col_mean - 3 * col_stddev) & (col(col_name) <= col_mean + 3 * col_stddev))
    test = test.filter((col(col_name) >= col_mean - 3 * col_stddev) & (col(col_name) <= col_mean + 3 * col_stddev))

In [8]:
from pyspark.sql.functions import concat_ws, col

# create new columns by concatenating values from multiple columns
train = train.withColumn("site", concat_ws("", col("site_id"), col("site_domain"), col("site_category")))
train = train.withColumn("app", concat_ws("", col("app_id"), col("app_domain"), col("app_category")))
train = train.withColumn("device", concat_ws("", col("device_id"), col("device_ip"), col("device_model")))
train = train.withColumn("type", concat_ws("", col("device_type"), col("device_conn_type")))
train = train.withColumn("C", col("C14") + col("C15") + col("C16") + col("C17") + col("C19"))

test = test.withColumn("site", concat_ws("", col("site_id"), col("site_domain"), col("site_category")))
test = test.withColumn("app", concat_ws("", col("app_id"), col("app_domain"), col("app_category")))
test = test.withColumn("device", concat_ws("", col("device_id"), col("device_ip"), col("device_model")))
test = test.withColumn("type", concat_ws("", col("device_type"), col("device_conn_type")))
test = test.withColumn("C", col("C14") + col("C15") + col("C16") + col("C17") + col("C19"))

In [12]:
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(x for x in set(train.columns) if x in {'site', 'app', 'device', 'type'})]

In [13]:
from pyspark.ml.feature import OneHotEncoder
ohe = [OneHotEncoder(inputCol=column, 
    outputCol=column+"_index") 
    for column in list(x for x in set(train.columns) 
    if x in {'banner_pos','device_type'})]

In [14]:
assembler = VectorAssembler(inputCols=[col+"_indexed" for col in list(set(train.columns) - {'click'} )], outputCol="features")

In [15]:
lr = LogisticRegression(labelCol="click", featuresCol="features")

In [16]:
# Đánh giá hiệu suất của mô hình bằng BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="click")

In [17]:
# Tạo pipeline
pipeline = Pipeline(stages=indexers + ohe +[assembler, lr])

In [18]:
# Chia dữ liệu thành tập huấn luyện và tập kiểm tra theo tỷ lệ 70:30
(trainingData, testData) = train.randomSplit([0.7, 0.3], seed=1234)

In [19]:
# Huấn luyện pipeline trên tập huấn luyện
model = pipeline.fit(trainingData)

: 

: 