In [1]:
from pyspark.sql import SparkSession


spark = SparkSession\
    .builder\
    .appName("CTR")\
    .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/06 18:47:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql.types import StructField, StringType, StructType, IntegerType

In [3]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("click", IntegerType(), True),
    StructField("hour", IntegerType(), True),
    StructField("C1", StringType(), True),
    StructField("banner_pos", StringType(), True),
    StructField("site_id", StringType(), True),
    StructField("site_domain", StringType(), True),
    StructField("site_category", StringType(), True),
    StructField("app_id", StringType(), True),
    StructField("app_domain", StringType(), True),
    StructField("app_category", StringType(), True),
    StructField("device_id", StringType(), True),
    StructField("device_ip", StringType(), True),
    StructField("device_model", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("device_conn_type", StringType(), True),
    StructField("C14", StringType(), True),
    StructField("C15", StringType(), True),
    StructField("C16", StringType(), True),
    StructField("C17", StringType(), True),
    StructField("C18", StringType(), True),
    StructField("C19", StringType(), True),
    StructField("C20", StringType(), True),
    StructField("C21", StringType(), True),
])


In [4]:
df = spark.read.csv("desktop/train.csv", schema=schema, header=True)

In [5]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- click: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- C1: string (nullable = true)
 |-- banner_pos: string (nullable = true)
 |-- site_id: string (nullable = true)
 |-- site_domain: string (nullable = true)
 |-- site_category: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- app_domain: string (nullable = true)
 |-- app_category: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_ip: string (nullable = true)
 |-- device_model: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- device_conn_type: string (nullable = true)
 |-- C14: string (nullable = true)
 |-- C15: string (nullable = true)
 |-- C16: string (nullable = true)
 |-- C17: string (nullable = true)
 |-- C18: string (nullable = true)
 |-- C19: string (nullable = true)
 |-- C20: string (nullable = true)
 |-- C21: string (nullable = true)



In [6]:
df.count()

                                                                                

40428967

In [7]:
df = df.drop('id').drop('hour').drop('device_id').drop('device_ip')

In [8]:
df = df.withColumnRenamed("click", "label")

In [9]:
df.columns

['label',
 'C1',
 'banner_pos',
 'site_id',
 'site_domain',
 'site_category',
 'app_id',
 'app_domain',
 'app_category',
 'device_model',
 'device_type',
 'device_conn_type',
 'C14',
 'C15',
 'C16',
 'C17',
 'C18',
 'C19',
 'C20',
 'C21']

In [10]:
df_train, df_test = df.randomSplit([0.7, 0.3], 42)


In [11]:
df_train.cache()
df_train.count()


                                                                                

28303473

In [12]:
df_test.cache()
df_test.count()

                                                                                

12125494

In [13]:
categorical = df_train.columns
categorical.remove('label')
print(categorical)

['C1', 'banner_pos', 'site_id', 'site_domain', 'site_category', 'app_id', 'app_domain', 'app_category', 'device_model', 'device_type', 'device_conn_type', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21']


In [14]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder


In [15]:
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("keep")
    for c in categorical
]

In [16]:
encoder = OneHotEncoder(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=[
        "{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers]
)

In [17]:
assembler = VectorAssembler(
    inputCols=encoder.getOutputCols(),
    outputCol="features"
)


In [18]:
stages = indexers + [encoder, assembler]

from pyspark.ml import Pipeline


pipeline = Pipeline(stages=stages)


one_hot_encoder = pipeline.fit(df_train)


                                                                                

In [19]:
df_train_encoded = one_hot_encoder.transform(df_train)


df_train_encoded.show()


23/08/06 18:52:58 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/08/06 18:52:59 WARN DAGScheduler: Broadcasting large task binary with size 5.0 MiB
[Stage 66:>                                                         (0 + 1) / 1]

+-----+----+----------+--------+-----------+-------------+--------+----------+------------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+----------+------------------+---------------+-------------------+---------------------+--------------+------------------+--------------------+--------------------+-------------------+------------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------------+--------------------------+-----------------------+---------------------------+-----------------------------+----------------------+--------------------------+----------------------------+----------------------------+---------------------------+--------------------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+
|label|  C1|banner_pos| site_id|site_domain|s

                                                                                

In [20]:
df_train_encoded = df_train_encoded.select(["label", "features"])

df_train_encoded.show()

df_train_encoded.cache()

df_train.unpersist()

23/08/06 18:53:02 WARN DAGScheduler: Broadcasting large task binary with size 1223.7 KiB


+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|(31532,[5,7,3758,...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,1322,...|
|    0|(31532,[5,7,1322,...|
|    0|(31532,[5,7,1539,...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
+-----+--------------------+
only showing top 20 rows



DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]

In [21]:
df_test_encoded = one_hot_encoder.transform(df_test)



df_test_encoded = df_test_encoded.select(["label", "features"])

df_test_encoded.show()

df_test_encoded.cache()

df_test.unpersist()


23/08/06 18:53:05 WARN DAGScheduler: Broadcasting large task binary with size 1223.7 KiB


+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,792,4...|
|    0|(31532,[5,7,1322,...|
|    0|(31532,[5,7,1322,...|
|    0|(31532,[5,7,4557,...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
|    0|(31532,[5,7,14,45...|
+-----+--------------------+
only showing top 20 rows



DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]

In [22]:
from pyspark.ml.classification import LogisticRegression

classifier = LogisticRegression(maxIter=20, regParam=0.000, elasticNetParam=0.000)

In [None]:
lr_model = classifier.fit(df_train_encoded)

df_train_encoded.unpersist()

In [None]:
predictions = lr_model.transform(df_test_encoded)

df_test_encoded.unpersist()

In [None]:
predictions.cache()

predictions.show()


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

ev = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction", metricName = "areaUnderROC")
print(ev.evaluate(predictions))


spark.stop()