In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
!tar xf spark-3.5.5-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.5.5/spark-3.5.5-bin-hadoop3"

In [3]:
import findspark
findspark.init("spark-3.5.5-bin-hadoop3")
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

## 1. Data Processing


In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
click_df = spark.read.load("drive/My Drive/CTR_data/filtered_train.csv", format='csv', header = True)

In [6]:
click_df.show(5)

+------+--------------------+-----+--------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+----+------+---+
|   _c0|                  id|click|    hour|  C1|banner_pos| site_id|site_domain|site_category|  app_id|app_domain|app_category|device_id|device_ip|device_model|device_type|device_conn_type|  C14|C15|C16| C17|C18| C19|   C20|C21|
+------+--------------------+-----+--------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+----+------+---+
|128037|1.107450855228460...|    0|14102101|1005|         1|e151e245|   7e091613|     f028772b|ecad2386|  7801e8d9|    07d7df22| a99f214a| 10dcdfb1|    88fe1d5d|          1|               0|20352|320| 50|2333|  0|  39|    -1|157|
|491755|1.240012241576984...|    0|14102103|1005|         0|1fbe01fe|   f3845767

In [7]:
click_df.tail(5)

[Row(_c0='40428218', id='9.854402622730424e+18', click='1', hour='14103023', C1='1005', banner_pos='0', site_id='85f751fd', site_domain='c4e18dd6', site_category='50e219e0', app_id='9c13b419', app_domain='2347f47a', app_category='f95efa07', device_id='a99f214a', device_ip='07f3354c', device_model='1f0bc64f', device_type='1', device_conn_type='0', C14='23161', C15='320', C16='50', C17='2667', C18='0', C19='47', C20='-1', C21='221'),
 Row(_c0='40055368', id='1.5167136474965117e+19', click='0', hour='14103020', C1='1005', banner_pos='0', site_id='85f751fd', site_domain='c4e18dd6', site_category='50e219e0', app_id='9c13b419', app_domain='2347f47a', app_category='f95efa07', device_id='a99f214a', device_ip='2eeea4d3', device_model='28570f08', device_type='1', device_conn_type='0', C14='23160', C15='320', C16='50', C17='2667', C18='0', C19='47', C20='-1', C21='221'),
 Row(_c0='40153544', id='1.3236782305235956e+19', click='0', hour='14103021', C1='1002', banner_pos='0', site_id='c545a354', si

In [8]:
click_df.count()

404290

In [9]:
click_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- id: string (nullable = true)
 |-- click: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- banner_pos: string (nullable = true)
 |-- site_id: string (nullable = true)
 |-- site_domain: string (nullable = true)
 |-- site_category: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- app_domain: string (nullable = true)
 |-- app_category: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_ip: string (nullable = true)
 |-- device_model: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- device_conn_type: string (nullable = true)
 |-- C14: string (nullable = true)
 |-- C15: string (nullable = true)
 |-- C16: string (nullable = true)
 |-- C17: string (nullable = true)
 |-- C18: string (nullable = true)
 |-- C19: string (nullable = true)
 |-- C20: string (nullable = true)
 |-- C21: string (nullable = true)



In [10]:
from pyspark.sql.functions import col, sum
click_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in click_df.columns]).show()

+---+---+-----+----+---+----------+-------+-----------+-------------+------+----------+------------+---------+---------+------------+-----------+----------------+---+---+---+---+---+---+---+---+
|_c0| id|click|hour| C1|banner_pos|site_id|site_domain|site_category|app_id|app_domain|app_category|device_id|device_ip|device_model|device_type|device_conn_type|C14|C15|C16|C17|C18|C19|C20|C21|
+---+---+-----+----+---+----------+-------+-----------+-------------+------+----------+------------+---------+---------+------------+-----------+----------------+---+---+---+---+---+---+---+---+
|  0|  0|    0|   0|  0|         0|      0|          0|            0|     0|         0|           0|        0|        0|           0|          0|               0|  0|  0|  0|  0|  0|  0|  0|  0|
+---+---+-----+----+---+----------+-------+-----------+-------------+------+----------+------------+---------+---------+------------+-----------+----------------+---+---+---+---+---+---+---+---+



In [11]:
from pyspark.sql.functions import countDistinct

unique_counts = click_df.agg(*(countDistinct(col).alias(col) for col in click_df.columns))
unique_counts.show()

+------+------+-----+----+---+----------+-------+-----------+-------------+------+----------+------------+---------+---------+------------+-----------+----------------+----+---+---+---+---+---+---+---+
|   _c0|    id|click|hour| C1|banner_pos|site_id|site_domain|site_category|app_id|app_domain|app_category|device_id|device_ip|device_model|device_type|device_conn_type| C14|C15|C16|C17|C18|C19|C20|C21|
+------+------+-----+----+---+----------+-------+-----------+-------------+------+----------+------------+---------+---------+------------+-----------+----------------+----+---+---+---+---+---+---+---+
|402586|402586|    2| 240|  7|         7|   2225|       2188|           22|  2241|       143|          27|    64742|   261706|        4380|          4|               4|2088|  8|  9|411|  4| 65|161| 60|
+------+------+-----+----+---+----------+-------+-----------+-------------+------+----------+------------+---------+---------+------------+-----------+----------------+----+---+---+---+---+---

In [18]:
from sklearn.preprocessing import LabelEncoder, StandardScaler
from pyspark.sql.functions import col
click_df = click_df.withColumn("click", col("click").cast("integer"))

# Select Relevant Features
categorical_cols = ["site_id", "site_domain", "site_category", "app_id", "app_domain", "app_category", "device_model"]
numerical_cols = ["hour", "C1", "banner_pos", "C15", "C16", "C18", "C19", "C20", "C21"]

# Convert Spark DataFrame to Pandas
click_pd = click_df.select(["click"] + categorical_cols + numerical_cols).toPandas()

# Label Encode Categorical Features
label_encoders = {}
for col in categorical_cols:
    le = LabelEncoder()
    click_pd[col] = le.fit_transform(click_pd[col])
    label_encoders[col] = le

# Standardize Numerical Features
scaler = StandardScaler()
click_pd[numerical_cols] = scaler.fit_transform(click_pd[numerical_cols])

# Split into Features (X) and Target (y)
X = click_pd.drop(columns=["click"]).values
y = click_pd["click"].values

## Train Model

In [13]:
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import numpy as np


In [19]:
class CTRDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32).unsqueeze(1)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Split Data into Train/Test
train_size = int(0.8 * len(X))
test_size = len(X) - train_size
train_X, test_X = X[:train_size], X[train_size:]
train_y, test_y = y[:train_size], y[train_size:]

train_dataset = CTRDataset(train_X, train_y)
test_dataset = CTRDataset(test_X, test_y)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [20]:
class CTRModel(nn.Module):
    def __init__(self, input_dim):
        super(CTRModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.sigmoid(self.fc3(x))
        return x

# Initialize Model
input_dim = X.shape[1]
model = CTRModel(input_dim)

# Define Loss Function & Optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [21]:
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")

Epoch 1/10, Loss: 17.132359325531965
Epoch 2/10, Loss: 17.14214170898993
Epoch 3/10, Loss: 17.142512704243902
Epoch 4/10, Loss: 17.142141710593854
Epoch 5/10, Loss: 17.14251270509304
Epoch 6/10, Loss: 17.14232720553191
Epoch 7/10, Loss: 17.142512703866508
Epoch 8/10, Loss: 17.14288369676177
Epoch 9/10, Loss: 17.142512702168236
Epoch 10/10, Loss: 17.142883695252195


In [22]:
from sklearn.metrics import accuracy_score, roc_auc_score

model.eval()
y_pred = []
y_true = []

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        preds = (outputs.numpy() > 0.5).astype(int)
        y_pred.extend(preds)
        y_true.extend(labels.numpy())

# Calculate Metrics
accuracy = accuracy_score(y_true, y_pred)
auc = roc_auc_score(y_true, y_pred)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"AUC-ROC: {auc:.4f}")


Test Accuracy: 0.8389
AUC-ROC: 0.5000
