## Data Loading & Cleaning

In [1]:
from pyspark.sql.types import BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, StructType


schema = StructType()\
    .add("row_id", LongType()) \
    .add("timestamp", LongType()) \
    .add("user_id", IntegerType()) \
    .add("content_id", ShortType()) \
    .add("content_type_id", ByteType()) \
    .add("task_container_id", ShortType()) \
    .add("user_answer", ByteType()) \
    .add("answered_correctly", ByteType()) \
    .add("prior_question_elapsed_time", FloatType()) \
    .add("prior_question_had_explanation", BooleanType())
data = spark.read.csv('s3://riiid-test-answer-prediction/train.csv',
                      header=True,
                      schema=schema)
data = data.na.drop()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1639175647689_0011,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
print('Total Columns: %d' % len(data.dtypes))
print('Total Rows: %d' % data.count())
data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total Columns: 10
Total Rows: 98878794
root
 |-- row_id: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- content_id: short (nullable = true)
 |-- content_type_id: byte (nullable = true)
 |-- task_container_id: short (nullable = true)
 |-- user_answer: byte (nullable = true)
 |-- answered_correctly: byte (nullable = true)
 |-- prior_question_elapsed_time: float (nullable = true)
 |-- prior_question_had_explanation: boolean (nullable = true)

## Data Preparation

### Transformation

In [3]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline


assemblerInputs = ["row_id", "timestamp", "user_id", "content_id", "content_type_id", "task_container_id",
                   "user_answer", "prior_question_elapsed_time", "prior_question_had_explanation"]
assembler = VectorAssembler(inputCols = assemblerInputs,
                            outputCol = "features")


pipeline = Pipeline(stages = [assembler])
pipelineModel = pipeline.fit(data)
data = pipelineModel.transform(data)
data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- row_id: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- content_id: short (nullable = true)
 |-- content_type_id: byte (nullable = true)
 |-- task_container_id: short (nullable = true)
 |-- user_answer: byte (nullable = true)
 |-- answered_correctly: byte (nullable = true)
 |-- prior_question_elapsed_time: float (nullable = true)
 |-- prior_question_had_explanation: boolean (nullable = true)
 |-- features: vector (nullable = true)

### Train-test split

In [4]:
train, test = data.randomSplit([0.7, 0.3], seed = 42)

print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Training Dataset Count: 69213351
Test Dataset Count: 29665443

## Prediction

### Prediction with LogisticRegression

In [5]:
from pyspark.ml.classification import LogisticRegression


lr = LogisticRegression(featuresCol = 'features', labelCol = 'answered_correctly', maxIter = 10)
lrModel = lr.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = lrModel.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol='answered_correctly')
print('LogisticRegression AUC =', evaluator.evaluate(predictions))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

LogisticRegression AUC = 0.5588427383284479

### Prediction with DecisionTreeClassifier

In [7]:
from pyspark.ml.classification import DecisionTreeClassifier


dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'answered_correctly', maxDepth = 3)
dtModel = dt.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
predictions = dtModel.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol='answered_correctly')
print('DecisionTreeClassifier AUC =', evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"}))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DecisionTreeClassifier AUC = 0.534533167057924

### Prediction with RandomForestClassifier

In [9]:
from pyspark.ml.classification import RandomForestClassifier


rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'answered_correctly')
rfModel = rf.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
predictions = rfModel.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol='answered_correctly')
print('RandomForestClassifier AUC =', evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"}))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RandomForestClassifier AUC = 0.5782046231409025