## Import needed library

In [1]:
from pyspark import SQLContext, SparkContext
from pyspark.sql.functions import mean, col, udf
from pyspark.sql.types import *
import numpy as np
import pandas
import findspark
findspark.init()

spark_context = SparkContext('local')
sqlc = SQLContext(spark_context)
train = sqlc.read.csv('data/train.csv', sep='\t', header=True)
test = sqlc.read.csv('data/test.csv', sep='\t', header=True)



## Split Problem Hieararchy

In [2]:
@udf
def get_unit(str):
    return str.split(",")[0]

@udf
def get_section(str):
     return str.split(",")[1]

train = train.withColumn("Problem Unit",get_unit(train["Problem Hierarchy"]))
train = train.withColumn("Problem Section",get_section(train["Problem Hierarchy"]))
test = test.withColumn("Problem Unit",get_unit(test["Problem Hierarchy"]))
test = test.withColumn("Problem Section",get_section(test["Problem Hierarchy"]))
# train = train.drop('Problem Hierarchy')
# test = test.drop('Problem Hierarchy')

## get the number of KC

In [3]:
@udf
def KC_num(str):
    if str:
        return str.count("~~")+1
    else:
        return 0
train = train.withColumn("KC num",KC_num(train['KC(Default)']))
test = test.withColumn("KC num",KC_num(test['KC(Default)']))

## get oportunity min and avg

In [4]:
@udf 
def opp_avg(str):
    if str:
        str_list = str.split("~~")
        num_list = []
        for s in str_list:
            num = int(s)
            num_list.append(num)
        return 1.0 * sum(num_list)/len(num_list)
    else:
        return 0
train = train.withColumn("Opportunity Average",opp_avg(train['Opportunity(Default)']))
test = test.withColumn("Opportunity Average",opp_avg(test['Opportunity(Default)']))

@udf 
def opp_min(str):
    if str:
        str_list = str.split("~~")
        num_list = []
        for s in str_list:
            num = int(s)
            num_list.append(num)
        return min(num_list)
    else:
        return 0
train = train.withColumn("Opportunity Min",opp_min(train["Opportunity(Default)"]))
test = test.withColumn("Opportunity Min",opp_min(test["Opportunity(Default)"]))


## Step Name Part

In [5]:
@udf
def step_name_rename(str):
    if str:
        basic_op = ["+","-","*","/","^","="]
        for op in basic_op:
            if op in str:
                return "basic op"
        return str

train = train.withColumn("renamed Step Name",step_name_rename(train["Step Name"]))
train = train.drop("Step Name")
train = train.withColumnRenamed("renamed Step Name","Step Name")
test = test.withColumn("renamed Step Name",step_name_rename(test["Step Name"]))
test = test.drop("Step Name")
test = test.withColumnRenamed("renamed Step Name","Step Name")

## Encode 

In [6]:
def encode(column):
    global train,test
    index = {}
    all_rows = train.union(test).select(column).distinct().collect()
    col_value = []
    for item in all_rows:
        col_value.append(item[column])
    for i, val in enumerate(col_value):
        index[val] = i
    @udf
    def get_id(str):
        return index[str]
    
    encoded_column = "encoded" + " " + column 
    train = train.withColumn(encoded_column, get_id(train[column]))
    train = train.drop(column)
    train = train.withColumnRenamed(encoded_column, column)
    test = test.withColumn(encoded_column, get_id(test[column]))
    test = test.drop(column)
    test = test.withColumnRenamed(encoded_column, column)

col_encode = ['Anon Student Id','Problem Name','Problem Unit','Problem Section','Step Name']
for col in col_encode:
    encode(col)


## CFAR

In [7]:
correct_train = train.filter(train["Correct First Attempt"] == "1")
def CFAR(column):
    global train,test,correct_train
    correct_group = correct_train.groupBy(column).count().collect()
    all_group = train.groupBy(column).count().collect()
    CFAR = {}
    for col in all_group:
        CFAR[col[column]] = [col['count'],0,0]
    for col in correct_group:
        CFAR[col[column]][1] = col['count']
    for col in all_group:
        CFAR[col[column]][2] = float(CFAR[col[column]][1] / CFAR[col[column]][0])
    sum = 0
    for col_value in CFAR:
        sum += CFAR[col_value][2]
    avg = float(sum / len(CFAR))
    @udf
    def get_rate(value):
        if value in CFAR:
            return CFAR[value][2]
        else:
            return avg
    train = train.withColumn(column + " CFAR",get_rate(train[column]))
    test = test.withColumn(column +" CFAR",get_rate(test[column]))

CFAR_list = ['Anon Student Id','Problem Name','Problem Unit','Problem Section','Step Name','KC(Default)']
for col in CFAR_list:
    CFAR(col)

## Drop useless columns

In [8]:
columns_to_drop = ['Row','Step Start time','First Transaction Time','Correct Transaction Time','Step End Time','Step Duration (sec)',\
    'Correct Step Duration (sec)','Error Step Duration (sec)','Incorrects','Hints','Corrects','KC(Default)',"Opportunity(Default)","Problem Hierarchy"]

for col in columns_to_drop:
    train =  train.drop(col)
    test = test.drop(col)


## export

In [9]:
train.toPandas().to_csv('data/train_preprocess.csv', sep='\t', header=True, index = False)
test.toPandas().to_csv('data/test_preprocess.csv', sep='\t', header=True, index = False)