In [1]:
import pandas as pd
import numpy as np
import findspark
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark import SQLContext, SparkContext
findspark.init()

In [2]:
sparkc = SparkContext('local')
sqlc = SQLContext(sparkc)
train_data = sqlc.read.csv('./train.csv', sep='\t', header=True)
test_data = sqlc.read.csv('./test.csv', sep='\t', header=True)



In [3]:
# drop useless columns 
to_drop = ['Row','Step Start time','First Transaction Time','Correct Transaction Time','Step End Time',
           'Step Duration (sec)','Correct Step Duration (sec)','Error Step Duration (sec)','Incorrects','Hints','Corrects']
train_data = train_data.drop(*to_drop)
test_data = test_data.drop(*to_drop)

In [4]:
def set_unit(str): 
    return str.split(',')[0]
    
def set_section(str):
    return str.split(',')[1]
ph = 'Problem Hierarchy'
udfset_unit = F.udf(set_unit, StringType())
udfset_section = F.udf(set_section, StringType())
test_data = test_data.withColumn('Problem Unit', udfset_unit(ph))
test_data = test_data.withColumn('Problem Section', udfset_section(ph))
test_data = test_data.drop(ph)
train_data = train_data.withColumn('Problem Unit', udfset_unit(ph))
train_data = train_data.withColumn('Problem Section', udfset_section(ph))
train_data =train_data.drop(ph)

In [5]:
# encode
def change_code(a):
    dict_1 = {}
    li = []
    flag = 1
    global train_data,test_data
    tran = train_data.union(test_data)
    temp = tran.select(a).distinct().collect()
    for i in temp:
        li.append(i[a])
    for i in li:
        dict_1[i] = flag
        flag += 1
    def code1(str):
        return dict_1[str]
     
    udfcode = F.udf(code1,IntegerType())
    new_a  = 'New '+a
    test_data = test_data.withColumn(new_a, udfcode(a))
    test_data = test_data.drop(a)
    test_data = test_data.withColumnRenamed(new_a, a)
    train_data = train_data.withColumn(new_a, udfcode(a))
    train_data = train_data.drop(a)
    train_data = train_data.withColumnRenamed(new_a, a)
new_code = ['Anon Student Id','Problem Name','Problem Unit','Problem Section','Step Name']
new_code_length = len(new_code)
for i in range(0,new_code_length):
    change_code(new_code[i])

In [6]:
# calculate
def KC(str):
    if not str:
        return 0
    else:
        count = str.count('~~')
        return count+1

def Opp(str):
    if not str:
        return 0.0
    else:
        sum = 0
        count = 0
        li = str.split('~~')
        for i in li:
            sum += eval(i)
            count += 1
        float1 = float(sum/count)
        return float1

udf_KC = F.udf(KC, IntegerType())
udf_Opp = F.udf(Opp, FloatType())
o_d = 'Opportunity(Default)'
k_c = 'KC Count'
o_a  = 'Opportunity Average'
train_data = train_data.withColumn(k_c, udf_KC('KC(Default)'))
train_data = train_data.withColumn(o_a , udf_Opp(o_d))
train_data = train_data.drop(o_d)
test_data = test_data.withColumn(k_c , udf_KC('KC(Default)'))
test_data = test_data.withColumn(o_a , udf_Opp(o_d))
test_data = test_data.drop(o_d)

In [7]:
# modeling
mark_data = train_data.filter(train_data['Correct First Attempt'] == '1')
CT = 'count'

def Personal_Count():
    global train_data,test_data
    d1 = {}
    s = 0
    ASI = 'Anon Student Id'
    new_train = train_data.groupBy(ASI).count().collect()
    new_mark = mark_data.groupBy(ASI).count().collect()
    for i in new_train:
        if i[ASI] not in d1:
            d1[i[ASI]] = 0
    for i in new_mark:
        d1[i[ASI]] = i[CT]
   
    
    for i in d1.keys():
        s = s + d1[i]
    mean = s/len(new_train)
    float_mean = float(mean)
    
    def Id_Count(id):
        if id in d1.keys():
            id_d = d1[id]
            return float(id_d)
        else:
            return float_mean
        
    udf_Id_Count = F.udf(Id_Count, FloatType())
    train_data = train_data.withColumn('Personal CFAC', udf_Id_Count(ASI))
    test_data = test_data.withColumn('Personal CFAC',udf_Id_Count(ASI))

Personal_Count()

In [8]:
def Personal_Rate():
    global train_data,test_data
    d = {}
    d1 = {}
    d2 = {} 
    s = 0
    ASI = 'Anon Student Id'
    new_mark = mark_data.groupBy(ASI).count().collect()
    new_train = train_data.groupBy(ASI).count().collect()
    for i in new_mark:
        d1[i[ASI]] = i[CT]
    for i in new_train:
        d2[i[ASI]] = i[CT]
    for i in new_mark:
        d[i[ASI]] = i[CT]/d2[i[ASI]]
    for i in new_train:
        if i[ASI] not in d:
            d[i[ASI]] = 0
   
    for i in d.keys():
        s = s + d[i]
    mean = s/len(new_train)
    float_mean = float(mean)
    def Id_Rate(id):
        if id in d.keys():
            id_d = d[id]
            return float(id_d)
        else:
            return float_mean
        
    udf_Id_Rate = F.udf(Id_Rate, FloatType())
    train_data = train_data.withColumn('Personal CFAR', udf_Id_Rate(ASI))
    test_data = test_data.withColumn('Personal CFAR',udf_Id_Rate(ASI))

Personal_Rate()

In [9]:
def Problem_Rate():
    global train_data,test_data
    d = {}
    d1 = {}
    d2 = {} 
    s = 0
    PN = 'Problem Name'
    new_mark = mark_data.groupBy(PN).count().collect()
    new_train = train_data.groupBy(PN).count().collect()
    for i in new_mark:
        d1[i[PN]] = i[CT]
    for i in new_train:
        d2[i[PN]] = i[CT]
    for i in new_mark:
        d[i[PN]] = i[CT]/d2[i[PN]]
    for i in new_train:
        if i[PN] not in d:
            d[i[PN]] = 0
    
    for i in d.keys():
        s = s + d[i]
    mean = s/len(new_train)
    float_mean = float(mean)
    def Name_Rate(id):
        if id in d.keys():
            id_d = d[id]
            return float(id_d)
        else:
            return float_mean
        

    udf_Name_Rate = F.udf(Name_Rate, FloatType())
    train_data = train_data.withColumn('Problem CFAR', udf_Name_Rate(PN))
    test_data = test_data.withColumn('Problem CFAR',udf_Name_Rate(PN))

Problem_Rate()

In [10]:
def Unit_Rate():
    global train_data,test_data
    d = {}
    d1 = {}
    d2 = {} 
    s = 0
    PU = 'Problem Unit'
    new_mark = mark_data.groupBy(PU).count().collect()
    new_train = train_data.groupBy(PU).count().collect()
    for i in new_mark:
        d1[i[PU]] = i[CT]
    for i in new_train:
        d2[i[PU]] = i[CT]
    for i in new_mark:
        d[i[PU]] = i[CT]/d2[i[PU]]
    for i in new_train:
        if i[PU] not in d:
            d[i[PU]] = 0
    for i in d.keys():
        s = s + d[i]
    mean = s/len(new_train)
    float_mean = float(mean)
    def Unitid_Rate(id):
        if id in d.keys():
            id_d = d[id]
            return float(id_d)
        else:
            return float_mean
        
    udf_Unitid_Rate = F.udf(Unitid_Rate, FloatType())
    train_data = train_data.withColumn('Unit CFAR', udf_Unitid_Rate(PU))
    test_data = test_data.withColumn('Unit CFAR',udf_Unitid_Rate(PU))

Unit_Rate()

In [11]:
def Section_Rate():
    global train_data,test_data
    d = {}
    d1 = {}
    d2 = {} 
    s = 0
    PS = 'Problem Section'
    new_mark = mark_data.groupBy(PS).count().collect()
    new_train = train_data.groupBy(PS).count().collect()
    for i in new_mark:
        d1[i[PS]] = i[CT]
    for i in new_train:
        d2[i[PS]] = i[CT]
    for i in new_mark:
        d[i[PS]] = i[CT]/d2[i[PS]]
    for i in new_train:
        if i[PS] not in d:
            d[i[PS]] = 0
    for i in d.keys():
        s = s + d[i]
    mean = s/len(new_train)
    float_mean = float(mean)
    def Sectionid_Rate(id):
        if id in d.keys():
            id_d = d[id]
            return float(id_d)
        else:
            return float_mean
        
    udf_Sectionid_Rate = F.udf(Sectionid_Rate, FloatType())
    train_data= train_data.withColumn('Section CFAR', udf_Sectionid_Rate(PS))
    test_data = test_data.withColumn('Section CFAR',udf_Sectionid_Rate(PS))
    

Section_Rate()

In [12]:
def Step_Rate():
    global train_data,test_data
    d = {}
    d1 = {}
    d2 = {} 
    s = 0
    SN = 'Step Name'
    new_mark = mark_data.groupBy(SN).count().collect()
    new_train = train_data.groupBy(SN).count().collect()
    for i in new_mark:
        d1[i[SN]] = i[CT]
    for i in new_train:
        d2[i[SN]] = i[CT]
    for i in new_mark:
        d[i[SN]] = i[CT]/d2[i[SN]]
    for i in new_train:
        if i[SN] not in d:
            d[i[SN]] = 0
    for i in d.keys():
        s = s + d[i]
    mean = s/len(new_train)
    float_mean = float(mean)

    def Name_Rate(id):
        if id in d.keys():
            id_d = d[id]
            return float(id_d)
        else:
            return float_mean
        
    udf_Name_Rate = F.udf(Name_Rate, FloatType())
    train_data = train_data.withColumn('Step CFAR', udf_Name_Rate(SN))
    test_data = test_data.withColumn('Step CFAR',udf_Name_Rate(SN))

Step_Rate()

In [13]:
def KC_Rate():
    global train_data,test_data
    d = {}
    d1 = {}
    d2 = {} 
    s = 0
    KC = 'KC(Default)'
    new_mark = mark_data.groupBy(KC).count().collect()
    new_train = train_data.groupBy(KC).count().collect()
    for i in new_mark:
        d1[i[KC]] = i[CT]
    for i in new_train:
        d2[i[KC]] = i[CT]
    for i in new_mark:
        d[i[KC]] = i[CT]/d2[i[KC]]
    for i in new_train:
        if i[KC] not in d:
            d[i[KC]] = 0
    for i in d.keys():
        s = s + d[i]
    mean = s/len(new_train)
    float_mean = float(mean)

    def KCid_Rate(id):
        if id in d.keys():
            id_d = d[id]
            return float(id_d)
        else:
            return float_mean
        
    udf_KCid_Rate = F.udf(KCid_Rate, FloatType())
    train_data = train_data.withColumn('KC CFAR', udf_KCid_Rate(KC))
    test_data = test_data.withColumn('KC CFAR',udf_KCid_Rate(KC))

KC_Rate()
train_data = train_data.drop('KC(Default)')
test_data = test_data.drop('KC(Default)')

In [14]:
train_pd = train_data.toPandas()
test_pd = test_data.toPandas()
train_pd.to_csv('./train_pyspark_data.csv', sep='\t')
test_pd.to_csv('./test_pyspark_data.csv', sep='\t')