# Feature Extration (using KDD '15 Dataset)


In [1]:
from os.path import abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler

import datetime

### Zeppelin automatically binds spark variables to its interpreter.
### Thus no need to run the following line.

# spark = SparkSession.builder.getOrCreate()

In [2]:
def log2row(line):    
    fields = line.split(',')
    row_eid = int(fields[0])
    row_source = fields[2]
    row_event = fields[3]
    row_object = fields[4]
    t = fields[1].split('T')
    r_date = t[0].split('-')
    r_time = t[1].split(':')
    row_date = datetime.datetime(int(r_date[0]), int(r_date[1]), int(r_date[2]), 0, 0, 0)
    row_time = datetime.datetime(1990, 1, 1, int(r_time[0]), int(r_time[1]), int(r_time[2]))
    return Row(enrollment_id = row_eid, date = row_date, time = row_time, source = row_source, event = row_event, object_id = row_object )


def true2row(line):
    fields = line.split(',')
    row_eid = int(fields[0])
    row_dropout = bool(int(fields[1]))
    return Row(enrollment_id = row_eid, dropout = row_dropout)
        

def date2row(line):
    fields = line.split(',')
    row_cid = fields[0]
    r_from = fields[1].split('-')
    r_to = fields[2].split('-')
    row_from = datetime.datetime(int(r_from[0]), int(r_from[1]), int(r_from[2]), 0, 0, 0)
    row_to = datetime.datetime(int(r_to[0]), int(r_to[1]), int(r_to[2]), 0, 0, 0)    
    return Row(course_id = row_cid, fromdate = row_from, todate = row_to) 

def enrollment2row(line):
    fields = line.split(',')
    row_eid = int(fields[0])
    row_username = fields[1]
    row_cid = fields[2]
    return Row(enrollment_id = row_eid, username = row_username, course_id = row_cid)
  
    
def object2row(line):
    fields = line.split(',')
    if len(fields) == 5:    
        row_cid = fields[0]
        row_mid = fields[1]
        row_category = fields[2]
        row_children = fields[3]
        row_date = datetime.datetime(1990, 1, 1, 0, 0, 0)
        row_time = datetime.datetime(1990, 1, 1, 0, 0, 0)
        if fields[4] != "null":
            t = fields[4].split('T')
            r_date = t[0].split('-')
            r_time = t[1].split(':')
            row_date = datetime.datetime(int(r_date[0]), int(r_date[1]), int(r_date[2]), 0, 0, 0)
            row_time = datetime.datetime(1990, 1, 1, int(r_time[0]), int(r_time[1]), int(r_time[2]))
        return Row(course_id = row_cid, module_id = row_mid, category = row_category, children=row_children, date=row_date, time=row_time)
    elif len(fields) == 4:
        row_cid = fields[0]
        row_mid = fields[1]
        row_category = fields[2]
        row_children = ""        
        row_date = datetime.datetime(1990, 1, 1, 0, 0, 0)
        row_time = datetime.datetime(1990, 1, 1, 0, 0, 0)
        if fields[3] != "null":
            t = fields[3].split('T')
            r_date = t[0].split('-')
            r_time = t[1].split(':')
            row_date = datetime.datetime(int(r_date[0]), int(r_date[1]), int(r_date[2]), 0, 0, 0)
            row_time = datetime.datetime(1990, 1, 1, int(r_time[0]), int(r_time[1]), int(r_time[2]))
        return Row(course_id = row_cid, module_id = row_mid, category = row_category, children=row_children, date=row_date, time=row_time)    



In [3]:
#Convert each csv file to RDD
#Parse each line using function defined above
#Convert parsed RDD to DataFrame and create temp views


###########################################################
############ TODO change it to your directory #############
DATAPATH = "file:///home/ss00/5_Feature_Extraction/featuredata"
###########################################################


log_table = spark.sparkContext.textFile("{}/log_10percent.csv".format(DATAPATH)) # use truncated dataset (10%).
log_head = log_table.first()
log_table_no_header = log_table.filter(lambda x : x != log_head)


true_table = spark.sparkContext.textFile("{}/truth_train.csv".format(DATAPATH))
true_head = true_table.first()
true_table_no_header = true_table.filter(lambda x : x != true_head)


date_table = spark.sparkContext.textFile("{}/date.csv".format(DATAPATH))
date_head = date_table.first()
date_table_no_header = date_table.filter(lambda x : x != date_head)


enrollment_table = spark.sparkContext.textFile("{}/enrollment.csv".format(DATAPATH))
enrollment_head = enrollment_table.first()
enrollment_table_no_header = enrollment_table.filter(lambda x : x != enrollment_head)

object_table = spark.sparkContext.textFile("{}/object.csv".format(DATAPATH))
object_head = object_table.first()
object_table_no_header = object_table.filter(lambda x : x != object_head)

In [4]:
log_rows = log_table_no_header.map(log2row)
true_rows = true_table_no_header.map(true2row)
date_rows = date_table_no_header.map(date2row)
enrollment_rows = enrollment_table_no_header.map(enrollment2row)
object_rows = object_table_no_header.map(object2row)

In [5]:
log_df = spark.createDataFrame(log_rows).cache()
log_df.createOrReplaceTempView("log_t")

true_df = spark.createDataFrame(true_rows).cache()
true_df.createOrReplaceTempView("true_t")

date_df = spark.createDataFrame(date_rows).cache()
date_df.createOrReplaceTempView("date_t")

enrollment_df = spark.createDataFrame(enrollment_rows).cache()
enrollment_df.createOrReplaceTempView("enrollment_t")

object_df = spark.createDataFrame(object_rows).cache()
object_df.createOrReplaceTempView("object_t")

In [6]:
log_df.show(3)
log_df.printSchema()
true_df.show(3)
date_df.show(3)
enrollment_df.show(3)
object_df.show(3)

In [7]:
%%sql
SELECT enrollment_id,
sum(CASE WHEN event = "discussion" THEN count_event ELSE 0 END) c_discusion,
sum(CASE WHEN event = "wiki" THEN count_event ELSE 0 END) c_wiki,
sum(CASE WHEN event = "page_close" THEN count_event ELSE 0 END) c_page_close,
sum(CASE WHEN event = "access" THEN count_event ELSE 0 END) c_access,
sum(CASE WHEN event = "video" THEN count_event ELSE 0 END) c_video,
sum(CASE WHEN event = "navigate" THEN count_event ELSE 0 END) c_navigate,
sum(CASE WHEN event = "problem" THEN count_event ELSE 0 END) c_problem 
FROM 
 (SELECT enrollment_id, event, count(*) as count_event
 FROM log_t 
 group by enrollment_id, event) 
 group by enrollment_id 
 order by enrollment_id

In [8]:
# SQL 1
# COUNT: 7
results1 = spark.sql('''SELECT enrollment_id,
sum(CASE WHEN event = "discussion" THEN count_event ELSE 0 END) c_discusion,
sum(CASE WHEN event = "wiki" THEN count_event ELSE 0 END) c_wiki,
sum(CASE WHEN event = "page_close" THEN count_event ELSE 0 END) c_page_close,
sum(CASE WHEN event = "access" THEN count_event ELSE 0 END) c_access,
sum(CASE WHEN event = "video" THEN count_event ELSE 0 END) c_video,
sum(CASE WHEN event = "navigate" THEN count_event ELSE 0 END) c_navigate,
sum(CASE WHEN event = "problem" THEN count_event ELSE 0 END) c_problem 
FROM 
 (SELECT enrollment_id, event, count(*) as count_event
 FROM log_t 
 group by enrollment_id, event) 
 group by enrollment_id 
 order by enrollment_id ''').cache()


In [9]:
%%sql
SELECT enrollment_id,
sum(CASE WHEN source = "browser" THEN count_event ELSE 0 END) s_browser,
sum(CASE WHEN source = "server" THEN count_event ELSE 0 END) s_server
FROM 
 (SELECT enrollment_id,source, count(*) as count_event FROM log_t group by enrollment_id, source order by enrollment_id)
 group by enrollment_id
 order by enrollment_id

In [10]:
# SQL 2
# COUNT: 2, ACCUMULATE: 9
results2 = spark.sql('''SELECT enrollment_id,
sum(CASE WHEN source = "browser" THEN count_event ELSE 0 END) s_browser,
sum(CASE WHEN source = "server" THEN count_event ELSE 0 END) s_server
FROM 
 (SELECT enrollment_id,source, count(*) as count_event FROM log_t group by enrollment_id, source order by enrollment_id)
 group by enrollment_id
 order by enrollment_id
 ''').cache()


In [11]:
# Join SQL 2 with 1
joined_results1_2 = results2.join(results1,'enrollment_id','outer').cache()


In [12]:
%%sql
SELECT course_id, count(enrollment_id) as count_eid_per_course FROM enrollment_t group by course_id order by course_id

In [13]:
# SQL 3
# COUNT: 1, ACCUMULATE: 10
results3 = spark.sql('SELECT course_id, count(enrollment_id) as count_eid_per_course FROM enrollment_t group by course_id order by course_id').cache()


In [14]:
%%sql
SELECT course_id, count(e.enrollment_id) as dropout_per_course FROM enrollment_t as e, true_t as t WHERE e.enrollment_id = t.enrollment_id and t.dropout = 1 group by course_id order by course_id

In [15]:
# SQL 4
# COUNT: 1, ACCUMULATE: 11
results4 = spark.sql('SELECT course_id, count(e.enrollment_id) as dropout_per_course FROM enrollment_t as e, true_t as t WHERE e.enrollment_id = t.enrollment_id and t.dropout = 1 group by course_id order by course_id').cache()


In [16]:
%%sql
SELECT sq1.course_id as course_id, sq2.do/sq1.ce as dropoutrate_per_course
FROM (SELECT course_id, count(e.enrollment_id) as ce FROM enrollment_t as e, true_t as t
        WHERE e.enrollment_id = t.enrollment_id group by course_id) as sq1,
     (SELECT course_id, count(dropout) as do FROM enrollment_t as e, true_t as t
        WHERE e.enrollment_id = t.enrollment_id and t.dropout = 1 group by course_id) as sq2
WHERE sq1.course_id = sq2.course_id order by sq1.course_id

In [17]:
# SQL 5
# COUNT: 1, ACCUMULATE: 12
results5 = spark.sql('SELECT sq1.course_id as course_id, sq2.do/sq1.ce as dropoutrate_per_course FROM (SELECT course_id, count(e.enrollment_id) as ce FROM enrollment_t as e, true_t as t WHERE e.enrollment_id = t.enrollment_id group by course_id) as sq1, (SELECT course_id, count(dropout) as do FROM enrollment_t as e, true_t as t WHERE e.enrollment_id = t.enrollment_id and t.dropout = 1 group by course_id) as sq2 WHERE sq1.course_id = sq2.course_id order by sq1.course_id').cache()


In [18]:
# Join SQL 3,4,5 with 1~2
joined_result_3_4_5 = results3.join(results4,'course_id','outer').join(results5,'course_id','outer').cache()
joined_result_3_4_5_edf = joined_result_3_4_5.join(enrollment_df, 'course_id','outer').cache()
joined_result1_2_3_4_5_edf = joined_result_3_4_5_edf.join(joined_results1_2, 'enrollment_id','outer').cache()


In [19]:
%%sql
SELECT username, count(enrollment_id) as count_eid_per_user FROM enrollment_t group by username order by username

In [20]:
#SQL 6
# COUNT: 1, ACCUMULATE: 13
results6 = spark.sql('SELECT username, count(enrollment_id) as count_eid_per_user FROM enrollment_t group by username order by username').cache()


In [21]:
%%sql
SELECT e.username as username, count(e.enrollment_id) as dropout_per_user FROM true_t as t, enrollment_t as e WHERE t.enrollment_id = e.enrollment_id and t.dropout = 1 group by e.username order by e.username

In [22]:
#SQL 7
# COUNT: 1, ACCUMULATE: 14
results7 = spark.sql('SELECT e.username as username, count(e.enrollment_id) as dropout_per_user FROM true_t as t, enrollment_t as e WHERE t.enrollment_id = e.enrollment_id and t.dropout = 1 group by e.username order by e.username').cache()


In [23]:
%%sql
SELECT sq1.username as username , sq2.do/sq1.ce as dropoutrate_per_user
FROM (SELECT username, count(e.enrollment_id) as ce FROM enrollment_t as e, true_t as t
        WHERE e.enrollment_id = t.enrollment_id group by username) as sq1, 
     (SELECT username, count(dropout) as do FROM enrollment_t as e, true_t as t
        WHERE e.enrollment_id = t.enrollment_id and t.dropout = 1 group by username) as sq2
WHERE sq1.username = sq2.username order by sq1.username

In [24]:
# SQL 8
# COUNT: 1, ACCUMULATE: 15
results8 = spark.sql('SELECT sq1.username as username , sq2.do/sq1.ce as dropoutrate_per_user FROM (SELECT username, count(enrollment_id) as ce FROM enrollment_t group by username) as sq1,(SELECT username, count(dropout) as do FROM enrollment_t as e, true_t as t WHERE e.enrollment_id = t.enrollment_id and t.dropout = 1 group by username) as sq2 WHERE sq1.username = sq2.username order by sq1.username').cache()


In [25]:
# Join SQL 6,7,8 with 1~5
joined_result_6_7_8 = results6.join(results7,'username','outer').join(results8, 'username','outer').cache()
joined_result1_2_3_4_5_6_7_8_edf = joined_result_6_7_8.join(joined_result1_2_3_4_5_edf, 'username','outer').drop('username').cache()


In [26]:
%%sql
SELECT e.enrollment_id, datediff( MAX(l.date), MIN(l.date) ) as period FROM log_t as l, enrollment_t  as e WHERE e.enrollment_id = l.enrollment_id group by e.enrollment_id order by e.enrollment_id

In [27]:
#SQL 9 
# COUNT: 1, ACCUMULATE: 16
results9 = spark.sql('SELECT e.enrollment_id, datediff( MAX(l.date), MIN(l.date) ) as period FROM log_t as l, enrollment_t  as e WHERE e.enrollment_id = l.enrollment_id group by e.enrollment_id order by e.enrollment_id').cache()

# Uncomment and visualize
#z.show(results9)


In [28]:
# Join SQL 9 with 1~8
joined_result1_2_3_4_5_6_7_8_9_edf = results9.join(joined_result1_2_3_4_5_6_7_8_edf, 'enrollment_id','outer')

In [29]:
%%sql
SELECT  e.enrollment_id, count(distinct(l.date)) as effective_study_days FROM log_t as l, enrollment_t as e WHERE e.enrollment_id = l.enrollment_id group by e.enrollment_id order by e.enrollment_id

In [30]:
# SQL 10
# COUNT: 1, ACCUMULATE: 17
results10 = spark.sql('SELECT  e.enrollment_id, count(distinct(l.date)) as effective_study_days FROM log_t as l, enrollment_t as e WHERE e.enrollment_id = l.enrollment_id group by e.enrollment_id order by e.enrollment_id').cache()


In [31]:
# Join SQL 10 with 1~9
joined_result1_2_3_4_5_6_7_8_9_10_edf = results10.join(joined_result1_2_3_4_5_6_7_8_9_edf, 'enrollment_id','outer').cache()
joined_result1_2_3_4_5_6_7_8_9_10_edf.printSchema()


In [32]:
joined_result_total = joined_result1_2_3_4_5_6_7_8_9_10_edf.drop('course_id')
joined_result_total_numeric = joined_result_total.na.fill(0)

agg_result = joined_result_total_numeric.join(true_df, 'enrollment_id', how='left_outer').cache()

# Split
train_dataset_df = agg_result.filter('dropout is  not null').sort('enrollment_id').cache()
test_dataset_df = agg_result.filter('dropout is null').sort('enrollment_id').drop('dropout').cache()
agg_result.printSchema()

In [33]:
z.show(joined_result1_2_3_4_5_6_7_8_9_10_edf)

In [34]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.sql import functions as F
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql import types 

In [35]:
print("The number of dropout: ", train_dataset_df.filter(train_dataset_df['dropout'] == True).count())
print("The number of NOT dropout: ", train_dataset_df.filter(train_dataset_df['dropout'] == False).count())

In [36]:
scaler = StandardScaler(withMean=True, withStd=True, inputCol="temp_feature", outputCol="scaled")


In [37]:
train_dataset_df = train_dataset_df.withColumn('label', train_dataset_df['dropout'].cast(types.IntegerType()))
train_dataset_df.show()

In [38]:
BalancingRatio=  train_dataset_df.filter(train_dataset_df['label'] == 1).count() / train_dataset_df.count() #num_Majority / dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))


In [39]:
train_dataset_df = train_dataset_df.withColumn("classWeights", F.when(train_dataset_df['label'] == 0, BalancingRatio).otherwise(1 - BalancingRatio))
train_dataset_df.select("label","classWeights").show(5)

In [40]:
categoricalColumns=[]
stages=[]

for c in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = c, outputCol=c+'index')
    encoder = OneHotEncoderEstimator(inputCols = [stringIndexer.getOutputCol()], outputCols =[c + 'classVec'])
    stages += [stringIndexer, encoder ]
    
labelIndexer = StringIndexer(inputCol = 'label',outputCol = 'indexedLabel').setHandleInvalid("skip")
stages += [labelIndexer]

list_of_col_not_numeric = categoricalColumns + ['enrollment_id', 'label','dropout']
numericCols = agg_result.drop(*list_of_col_not_numeric).columns

assemblerInputs = [c  + 'classVec' for c in cateCol] + numericCols
vectorAssembler = VectorAssembler(inputCols  = assemblerInputs, outputCol="Features")

stages+= [vectorAssembler]

scaler = StandardScaler(inputCol = 'Features', outputCol ='scaledFeatures', withStd = True, withMean= False)

stages +=[scaler]

In [41]:
lr = LogisticRegression(labelCol = "indexedLabel", featuresCol= "scaledFeatures", weightCol = "classWeights" )
rf = RandomForestClassifier(labelCol = "indexedLabel", featuresCol = "scaledFeatures")

lr_params = ParamGridBuilder().addGrid(lr.regParam, [0.1]).addGrid(lr.maxIter, [300]).build()
                                # .addGrid(lr.elasticNetParam, [0.1, 0.5, 1.0])\
                                
                                

rf_params = ParamGridBuilder().addGrid(rf.maxDepth, [5, 10])\
                                .addGrid(rf.numTrees, [200, 500]).build()
                                # .addGrid(rf.featureSubsetStrategy, ['sqrt'])
                                
                                
binary_evaluator = BinaryClassificationEvaluator(metricName = 'areaUnderROC') #option, areaUnderPR, areaUnderROC
lr_cv = CrossValidator(estimator = lr, estimatorParamMaps = lr_params, evaluator = binary_evaluator,  numFolds = 10, seed = 900)
rf_cv = CrossValidator(estimator = rf, estimatorParamMaps = rf_params, evaluator = binary_evaluator,  numFolds = 10, seed = 900)

stages_lr_cv= stages + [lr_cv]
stages_rf_cv= stages + [rf_cv]

In [42]:
prepPipeline = Pipeline().setStages(stages_lr_cv)
pipelineModel = prepPipeline.fit(train_dataset_df)


In [43]:
test_dataset_df.show()

In [44]:
pipelineModel.transform(test_dataset_df).show()

In [45]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
second_element = udf(lambda v:float(v[1]), FloatType())

prediction = pipelineModel.transform(test_dataset_df)
prediction = prediction.withColumn("proba_as_1", second_element('probability'))
prediction.select("enrollment_id", "proba_as_1").show()