The goal of this notebook is to build a slightly sophisticated dataset.  

In [1]:
#import riiideducation
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import dask
from sklearn.metrics import roc_auc_score
import sys
import copy
from dask.distributed import Client, LocalCluster
from dask import dataframe as dd 



def npt(mystring) :
    print("**{}** : {}".format(sys._getframe(1).f_code.co_name,mystring))
    
#env = riiideducation.make_env()

In a previous notebook, I read in and then pickled the data.  Lets see how fast i can load it.  Note, you need to add the other notebook to this project.

In [2]:
full_df = pd.read_pickle('full_df.pkl')
full_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 99271300 entries, 0 to 99271299
Data columns (total 8 columns):
 #   Column                          Dtype  
---  ------                          -----  
 0   timestamp                       int64  
 1   user_id                         int32  
 2   content_id                      int16  
 3   content_type_id                 int8   
 4   task_container_id               int16  
 5   answered_correctly              int8   
 6   prior_question_elapsed_time     float32
 7   prior_question_had_explanation  boolean
dtypes: boolean(1), float32(1), int16(2), int32(1), int64(1), int8(2)
memory usage: 3.0 GB


In [3]:
full_df

Unnamed: 0,timestamp,user_id,content_id,content_type_id,task_container_id,answered_correctly,prior_question_elapsed_time,prior_question_had_explanation
0,0,115,5692,0,1,1,,
1,0,1805962620,5547,0,0,0,,
2,0,2015251289,4024,0,0,1,,
3,0,867941388,6659,0,0,1,,
4,0,867946278,3977,0,0,1,,
...,...,...,...,...,...,...,...,...
99271295,87193076570,626308830,8185,0,9217,0,13000.0,True
99271296,87193279051,626308830,6686,0,9218,0,10000.0,True
99271297,87193332075,626308830,5860,0,9219,1,21000.0,True
99271298,87193355096,626308830,11465,0,9220,0,25000.0,True


# Simple Question averages

Lets do a simple calculation to find average pct correct per question

In [None]:
%%time
question_averages_df = full_df[['content_id','answered_correctly']].groupby(['content_id']).agg(['sum','count'])
question_averages_df.columns = ['correct','total']
question_averages_df["pct_correct"] = question_averages_df['correct'] / question_averages_df['total']

In [None]:
question_averages_df.describe()

## Dask Version

In [74]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=40,
    threads_per_worker=1,
    processes=True,memory_limit=10e9)
client = Client(cluster)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 32903 instead
  http_address["port"], self.http_server.port


In [75]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:46716  Dashboard: http://127.0.0.1:32903/status,Cluster  Workers: 40  Cores: 40  Memory: 400.00 GB


## Recycle Cluster

In [325]:
def recycle_cluster(client, cluster) :
    if cluster == None :
        cluster = LocalCluster(n_workers=40,
            threads_per_worker=1,
            processes=True,memory_limit=10e9)
    if(client == None) :
        client = Client(cluster)
    else :
        client.close()
        client = Client(cluster)
    
    return client, cluster

In [326]:
client, cluster = recycle_cluster(client, cluster)

In [327]:
full_ddf = dd.from_pandas(full_df, npartitions=5)

## Add Question  Averages to full_df

In [328]:
# %%time
# def add_question_averages() :
#     question_averages_ddf = full_ddf[['content_id','answered_correctly']].groupby(['content_id']).agg(['sum','count'])
#     question_averages_ddf.columns = ['correct','total']
# question_averages_ddf["pct_correct"] = question_averages_ddf['correct'] / question_averages_ddf['total']
# question_averages_ddf.compute()

**10X Speedup for DASK !**

In [329]:
# full_df.iloc[0:1000,:].merge(question_averages_df,how="inner",left_on="content_id",right_on="content_id",)

## Downsample + Train_Val split

In [330]:
def downsample_and_split(df) :
    #num_samples = len(full_df)
    #split_point = int(0.85*num_samples)
    #train_df = full_df.iloc[0:split_point]
    #val_df = full_df.iloc[split_point:num_samples]
    # val_df.head(), timestamp = 17010520340
    #num_samples = len(full_ddf)
    #split_point = int(0.85*num_samples)
    train_ddf = df[df["timestamp"]< 17010520340]
    val_ddf = df[df["timestamp"]> 17010520340]
    return train_ddf,val_ddf


In [331]:
def sigmoid(x) :
    return (1.0 / (1.0+ np.exp(-1.0*x)))
sigmoid(0)

0.5

## Users > N Questions Answered

In [332]:
train_ddf,val_ddf = downsample_and_split(full_ddf)

user_q_count = train_ddf.groupby('user_id').agg({"user_id":"count"})
user_q_count.columns = ["count"]
user_q_count.columns
user_q_count = user_q_count.persist()
for i in range(1,16) :
    print("users > than {} is {}".format( np.power(2,i),  len(user_q_count[user_q_count["count"] > np.power(2,i)])))

users > than 2 is 393146
users > than 4 is 392323
users > than 8 is 389104
users > than 16 is 362047
users > than 32 is 218884
users > than 64 is 147230
users > than 128 is 102072
users > than 256 is 66486
users > than 512 is 38255
users > than 1024 is 18443
users > than 2048 is 7031
users > than 4096 is 1762
users > than 8192 is 254
users > than 16384 is 2
users > than 32768 is 0


In [333]:
prolific_users = user_q_count[user_q_count["count"] > 1000].compute().index.values.flatten().tolist()
print(len(prolific_users))

19010


about 10% of users are answering greater than 500 Q/A pairs

In [334]:
# train_ddf[~train_ddf["user_id"].isin(prolific_users)]

# Dask Cartesian Join Experiment!

So the goal is that by user_id i want to create this table ..

1. partial cartesian join on user_id
user_id, content_id, answered_correctly, timestamp, user_id_p, content_id_p, answered_correctly_p, timestamp_p

2.  then filter by timestamp > timestamp_p




In [335]:
# Get non-prolific users ..
t_ddf = train_ddf[~train_ddf["user_id"].isin(prolific_users)]
t_ddf = t_ddf.persist()
print(len(t_ddf),t_ddf.npartitions)
t_ddf.head(5)

42021035 5


Unnamed: 0,timestamp,user_id,content_id,content_type_id,task_container_id,answered_correctly,prior_question_elapsed_time,prior_question_had_explanation
0,0,115,5692,0,1,1,,
1,0,1805962620,5547,0,0,0,,
2,0,2015251289,4024,0,0,1,,
4,0,867946278,3977,0,0,1,,
5,0,867947333,7900,0,0,1,,


## Large table join .. use index join ..

In [336]:
# cluster.scale(40, memory=10e9)

In [337]:
# Large hash join using columns .. avoid... 
# cols = ['user_id', 'content_id', 'answered_correctly','timestamp' ]
# tj = t_ddf[cols].merge(t_ddf[cols], how="left", left_on='user_id', right_on='user_id',suffixes=('_c','_p'),npartitions=12000) #
# tjf = tj[tj.timestamp_c > tj.timestamp_p]
# #tjf.
# #df.repartition(npartitions=10) 
# #tjf = tjf.persist()
# print(len(tjf))

## Setup Index Join, Partition =5.0MB

In [338]:
tu_ddf = t_ddf.set_index('user_id').repartition(partition_size="2.5MB")
print(len(tu_ddf),tu_ddf.npartitions)

42021035 473


## Check Dataframe distributions

In [339]:
## Check Distribution
def cl(df) :
    return len(df)

def part_dist(df) :
    #rv=  df.groupby('user_id').agg({'user_id':'count'})
    rv=  df.index.value_counts().to_dict()
    rv = {k: v for k, v in sorted(rv.items(), key=lambda item: -1*item[1])}
    rv = np.histogram(list(rv.values()),bins=[1,100,500,1000,5000,10000])
    #return zip(rv.index, rv.values)
    return rv


In [340]:
#np.histogram(list(pl[50].values()),bins=[100,500,1000,5000,10000])
pl=tu_ddf.map_partitions(part_dist).compute()
print(len(pl))
pl


473


0      ([566, 170, 43, 0, 0], [1, 100, 500, 1000, 500...
1      ([591, 167, 40, 0, 0], [1, 100, 500, 1000, 500...
2      ([600, 167, 39, 0, 0], [1, 100, 500, 1000, 500...
3      ([656, 166, 40, 0, 0], [1, 100, 500, 1000, 500...
4      ([570, 156, 44, 0, 0], [1, 100, 500, 1000, 500...
                             ...                        
468    ([669, 155, 44, 0, 0], [1, 100, 500, 1000, 500...
469    ([565, 173, 43, 0, 0], [1, 100, 500, 1000, 500...
470    ([561, 164, 47, 0, 0], [1, 100, 500, 1000, 500...
471    ([566, 178, 40, 0, 0], [1, 100, 500, 1000, 500...
472    ([577, 162, 44, 0, 0], [1, 100, 500, 1000, 500...
Length: 473, dtype: object

## Join data

In [341]:
cols = ['content_id', 'answered_correctly','timestamp' ]
tju = tu_ddf[cols].merge(tu_ddf[cols], how="inner", left_index=True, right_index=True, suffixes=('_c','_p'),npartitions=1000) #
tjfu = tju[tju.timestamp_c > tju.timestamp_p]

In [342]:
#tjfu.visualize("test.png")

In [343]:
tjfu = tjfu.persist()
# wait a minute, this triggers a backgroupnd task

In [344]:
#tjfu = tjfu.repartition(partition_size="500MB")
print(len(tjfu))

7920502621


## Compute Q/A history statistics

In [345]:
tjg = tjfu.groupby(['content_id_c','answered_correctly_c','content_id_p']).agg({'answered_correctly_p':['sum','count']},split_out=20)
tjg = tjg.persist()
print(len(tjg))



253485474


In [355]:
tjg.compute()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,answered_correctly_p,answered_correctly_p
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,sum,count
content_id_c,answered_correctly_c,content_id_p,Unnamed: 3_level_2,Unnamed: 4_level_2
0,0,15,25.0,48
0,0,23,4.0,8
0,0,51,43.0,60
0,0,63,10.0,20
0,0,79,26.0,32
...,...,...,...,...
13522,1,12502,0.0,1
13522,1,12671,0.0,1
13522,1,12918,1.0,1
13522,1,12966,1.0,1


In [350]:
tjg.memory_usage_per_partition().compute()

0     267804658
1     267508201
2     267711985
3     267643651
4     267671539
5     267799513
6     267681611
7     267659981
8     267658918
9     267812189
10    267557627
11    267736471
12    267571747
13    267665806
14    267830740
15    267710284
16    267682501
17    267595309
18    267727441
19    267708730
dtype: int64

In [484]:
# tjg.to_parquet("nb_model_lt_1K.pq")
#c= tjg.loc[(0),:].compute()
#a = tjg.map_partitions(lambda df : df.iloc[0:1]).compute()
idx = (0,0,0)
#b = tjg.map_partitions(lambda df : df.index[0]).compute()

def getvals(df,idx) :
    try :
        rv = df.loc[idx]
    except :
        rv=pd.Series(dtype='int64')
        
    return rv

c = tjg.map_partitions(getvals,idx).compute()

def get_prob(idx) :
    prob = tjg.map_partitions(getvals,idx).compute()
    return prob[0]/prob[1]
for i in range(0,5) :
    print( get_prob((0,0,i)) )

0.35714285714285715
0.7
0.45454545454545453
0.75
0.46875


In [442]:
b = a.unstack(level=1)
b.loc[(0,0)]
a.loc[(0,0,0)]

a.index = a.index.rename(['a','b','c'])
a.loc[(0,0,15)]
# Multi-column access
type(a.index[0][2])

numpy.int64

In [472]:
c

answered_correctly_p  sum      25.0
                      count    48.0
dtype: float64

# [Data prep] Build NB Dataset

## Question model

In [None]:
# Correct / Total count stored in a List(fornow ..) (num correct , total)

class QuestionHistory:
    def __init__(self,q_idx) :
        self.q_idx = q_idx
        self.qh_correct = {}
        self.qh_incorrect = {}
        self.correct = 0
        self.total_q = 0
        self.print_threshold = 0
    
    # Inserrt hirstory into either Answer Correct / Incorrect history
    def insert_history(self, answer_correct, q_idx, q_idx_res, debug=False) :
        #if(q_idx == self.q_idx) :
        #    print("Warning: q_idx:{} shouldnt be inserting question result for current question model here".format(q_idx))
        #    return
        
        if debug : npt("Current Question Model : {}  Current question result:{}".format(self.q_idx,answer_correct))
        if debug : npt("Adding history for Question:{} with result:{}".format(q_idx,q_idx_res))

        qh_list = self.qh_correct if answer_correct == True else self.qh_incorrect
        if(q_idx not in qh_list.keys()) :
            qh_list[q_idx] = [q_idx_res,1]  # correct,total cnt
        else :
            if(q_idx_res) :
                qh_list[q_idx][0] += 1
            qh_list[q_idx][1] += 1
    
    def add_result(self, answer_correct) :
        if(answer_correct) :
            self.correct +=1
        self.total_q +=1
        
    # Utility to return question history prob | answer correct/incorret 
    # P(QH_i | AC), P(QH_i | AI)
    def qh_prob(self, answer_correct, idx, debug=False) :
        epsilon = 1e-3
        default_prob = 0.5
        qh_list = self.qh_correct if answer_correct else self.qh_incorrect
        if(idx in qh_list.keys()) :
            qh_prob = qh_list[idx][0]/qh_list[idx][1]
            if debug : npt("question {} given answer_correct={}  correct_cnt={} total={} ".format(idx,answer_correct,qh_list[idx][0],qh_list[idx][1]))
        else :
            qh_prob = 0.5 # no record yet1
        if(qh_prob == 0.0): 
            qh_prob = epsilon
        elif(qh_prob == 1.0):
            qh_prob -= epsilon
        
        if(qh_prob == 0) :
            npt("error")
        
        return qh_prob

        
    # Average probability of getting this answer correct in general
    # if small sample size, then just do 50%
    def q_prob(self) :
        epsilon = 0.001
        rv = 0.5 if self.total_q == 0 else self.correct /  self.total_q
        if(rv == 0) :
            rv += epsilon
        return rv
    
    def filter_low_freq_questions(self, answer_correct, idx_list,nb_history_minumum=10) :
        '''
        if not too many questions answered , then just remove otherwise it will screw up calc
        '''
        qh_list = self.qh_correct if answer_correct else self.qh_incorrect
        rv = [q for q in idx_list if q in qh_list.keys()]
        rv = [q for q in rv if qh_list[q][1] > nb_history_minumum]
        return rv
        
        
    def log_ratio_qh(self, correct_question_idx, incorrect_question_idx, nb_history_minumum=1,debug=False) :
        '''
        Calculate log probability of question history
        return p = p1*...pn / p1*...pn + (1-p1)*....(1-pn) turned to summation.  log p1 + ...
        '''
        correct_question_idx   = self.filter_low_freq_questions(1,correct_question_idx,nb_history_minumum)
        incorrect_question_idx = self.filter_low_freq_questions(1,incorrect_question_idx,nb_history_minumum)
        # avoid div by zero
        epsilon = 1e-3 if(self.q_prob() == 1.0) else 0.0
        log_ac_ai = np.log((self.q_prob()-epsilon )/ (1-(self.q_prob()-epsilon) ))

        if debug : npt("Question:{} average correct:{} log(p/1-p):{}".format(self.q_idx,self.q_prob(),log_ac_ai))

        log_qh_g_ac = 0
        log_qh_g_ai = 0
        # handle correct 
        corr_eqn = "\nP(QH|AC) P(AC) =\n"
        incorr_eqn = "\nP(QH|AI) P(AI) =\n"
        for i, q_list in enumerate([incorrect_question_idx,correct_question_idx]) :
            if(i==1) :
                if debug : npt("Correct Bin for question {}".format(self.q_idx))
            else :
                if debug : npt("Incorrect Bin for question {}".format(self.q_idx))
            
            if debug : npt("{:10s} {:10s} {:10s} {:10s} {:10s}".format("question", "corr%", "inc%", "logcorr%", "loginc%"))
            for q in q_list :
                # self.qh_prob(1, q) # probability QH question coreret given answer correct for current question
                # self.qh_prob(0, q) # probability QH question coreret given answer incorrect for current question
                if debug : npt("{:<10d} {:<10.3f} {:<10.3f} {:<10.3f} {:<10.3f}".format(q,self.qh_prob(i, q),1-self.qh_prob(i, q),np.log(self.qh_prob(i, q)),np.log(1-self.qh_prob(i, q))))

                if(i==1) : # corr
                    log_qh_g_ac += np.log(self.qh_prob(1, q))
                    if debug :corr_eqn += "P(Q{}=1|AC) {} * ".format(q, self.qh_prob(1, q))
                    log_qh_g_ai += np.log(self.qh_prob(0, q))
                    if debug :incorr_eqn += "P(Q{}=1|AI) {} * ".format(q, self.qh_prob(0, q))
                else : # inc
                    log_qh_g_ac += np.log(1-self.qh_prob(1, q))
                    if debug :corr_eqn += "P(Q{}=0|AC) {} * ".format(q, 1-self.qh_prob(1, q))
                    log_qh_g_ai += np.log(1-self.qh_prob(0, q))
                    if debug :incorr_eqn += "P(Q{}=0|AI) {} * ".format(q, 1-self.qh_prob(0, q))
                    
        # Debug eqn
        if debug :corr_eqn += "P(AC) {}".format(self.q_prob())
        if debug :incorr_eqn += "P(AC) {}".format(1-self.q_prob())

        if debug : npt("{}".format(corr_eqn))
        if debug : npt("{}".format(incorr_eqn))
        # spam classifier equation !
        if debug : print("log(ac/ai) {}, logsum qh given correct {} logsum qh given incorr {}".format( log_ac_ai ,log_qh_g_ac , log_qh_g_ai))
        log_prob_qh = log_ac_ai + log_qh_g_ac - log_qh_g_ai
        return log_prob_qh, sigmoid(log_prob_qh)
    
    def prob_qh(self, qh_idxs) :
        return np.exp(self.log_prob_qh(qh_idxs))
    
    
    def __add__(self,other) :
        _qm = QuestionHistory(self.q_idx)
        _qm.qh_correct = copy.deepcopy(self.qh_correct)
        #print(self.qh_correct)
        #print(other.qh_correct)
        for k,[c,t] in other.qh_correct.items():
            if(k in _qm.qh_correct.keys()) :
                _qm.qh_correct[k][0] += c
                _qm.qh_correct[k][1] += t
            else :
                _qm.qh_correct[k] = [c,t]
        _qm.qh_incorrect = copy.deepcopy(self.qh_incorrect)
        #print(self.qh_incorrect)
        #print(other.qh_incorrect)
        for k,[c,t] in other.qh_incorrect.items():
            if(k in _qm.qh_incorrect.keys()) :
                _qm.qh_incorrect[k][0] += c
                _qm.qh_incorrect[k][1] += t
            else :
                _qm.qh_incorrect[k] = [c,t]
                #qm.qh_incorrect[k][ = t
                
        #qm.qh_incorrect = self.qh_incorrect + other.qh_incorrect
        _qm.correct = self.correct +other.correct
        _qm.total_q = self.total_q +other.total_q
        return _qm
        
    def __repr__(self) :
        count_threshold = self.print_threshold
        rv = "Question {} model. Only printing for question history > {}\n".format(self.q_idx,count_threshold)
        rv += "Total num correct  :{}\n".format(self.correct)
        rv += "Total num questions:{}\n".format(self.total_q)
        for i,qhl in enumerate([self.qh_incorrect, self.qh_correct]) :
            rv += "incorrect bin\n" if i == 0 else "correct bin\n"
            for k,cor_tot in qhl.items() :
                if(cor_tot[1] > count_threshold):
                    rv += "Question {} Corr,Total {} q_prob {}\n".format(k,cor_tot,self.qh_prob(i,k))
        return rv
        

In [None]:
qm = QuestionHistory(0)
# question 0 model.  if q1 correct, q0 correct
#  if q2 correct q0 incorrect


# User 1 Q history 
qm.insert_history(answer_correct=1,q_idx=1,q_idx_res=1)
qm.insert_history(answer_correct=1,q_idx=1,q_idx_res=1)
qm.insert_history(answer_correct=1,q_idx=1,q_idx_res=1)
qm.insert_history(answer_correct=1,q_idx=1,q_idx_res=0)
qm.insert_history(answer_correct=1,q_idx=2,q_idx_res=0)
qm.insert_history(answer_correct=1,q_idx=2,q_idx_res=0)
qm.insert_history(answer_correct=1,q_idx=2,q_idx_res=0)
qm.insert_history(answer_correct=1,q_idx=2,q_idx_res=0)
qm.insert_history(answer_correct=1,q_idx=2,q_idx_res=1)
qm.insert_history(answer_correct=1,q_idx=3,q_idx_res=0)
qm.insert_history(answer_correct=1,q_idx=4,q_idx_res=1)
qm.add_result(1)

# User 2 Q history 
qm.insert_history(answer_correct=0,q_idx=1,q_idx_res=0,debug=True)
qm.insert_history(answer_correct=0,q_idx=1,q_idx_res=0)
qm.insert_history(answer_correct=0,q_idx=1,q_idx_res=0)
qm.insert_history(answer_correct=0,q_idx=1,q_idx_res=1)
qm.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm.insert_history(answer_correct=0,q_idx=2,q_idx_res=1)
qm.insert_history(answer_correct=0,q_idx=2,q_idx_res=1)
qm.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm.insert_history(answer_correct=0,q_idx=3,q_idx_res=0)
qm.insert_history(answer_correct=0,q_idx=4,q_idx_res=1)
qm.add_result(0)
#
qm1 = QuestionHistory(0)
qm1.insert_history(answer_correct=0,q_idx=1,q_idx_res=0,debug=True)
qm1.insert_history(answer_correct=0,q_idx=1,q_idx_res=0)
qm1.insert_history(answer_correct=0,q_idx=1,q_idx_res=0)
qm1.insert_history(answer_correct=0,q_idx=1,q_idx_res=1)
qm1.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm1.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm1.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm1.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm1.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm1.insert_history(answer_correct=0,q_idx=2,q_idx_res=1)
qm1.insert_history(answer_correct=0,q_idx=2,q_idx_res=1)
qm1.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm1.insert_history(answer_correct=0,q_idx=2,q_idx_res=0)
qm1.insert_history(answer_correct=0,q_idx=3,q_idx_res=0)
qm1.insert_history(answer_correct=0,q_idx=2224,q_idx_res=1)
qm1.insert_history(answer_correct=0,q_idx=14,q_idx_res=1)
qm1.add_result(0)
#1

In [None]:
#a = qm1 + qm
np.sum([qm1,qm])

In [None]:
qm.filter_low_freq_questions(1,[1,2,99],nb_history_minumum=0)

In [None]:
print(qm)
print(qm.log_ratio_qh([2,4],[1,3],nb_history_minumum=0)) # asking the question relative to correct only ..
#history.prob_qh([1,2]) # correct only !


# QM_DICT

This is a class to hold a dictionary of question models and add them together in a reduction ..

In [None]:
class QmDict():
    def __init__(self,qm_dict):
        self.qm_dict = copy.deepcopy(qm_dict)
        
    def __add__(self,other) :
        _qmd = QmDict(self.qm_dict)
        for k,v in other.qm_dict.items():
            if(k in _qmd.qm_dict.keys()) :
                _qmd.qm_dict[k] = v + _qmd.qm_dict[k]  # add question models together
            else :
                _qmd.qm_dict[k] = v
        return _qmd
    
    def keys(self) :
        return self.qm_dict.keys()
    
    def __getitem__(self,key) :
        return self.qm_dict[key]
    
    def __repr__(self) :
        rv = "QmDict object holding {} models".format(len(self.qm_dict.keys()))
        return rv
        
        

In [None]:
qm

In [None]:
qmd1 = QmDict({0:qm,1:qm1})
qmd2 = QmDict({1:qm,2:qm1})
print(qmd1)
qmd1.qm_dict[5]= qm1
print(qmd1.keys())
print(qmd1[0])

In [None]:
qms = qmd1 + qmd2
qms = np.sum([qmd1,qmd2])

#qms.qm_dict[0]
qms.qm_dict[2]


# User history

In [None]:
class UserHistory():
    '''
    Class to track ordered history of answers
    '''
    def __init__(self,user_id, initial_data=None) :
        self.user_id = user_id
        self.corr_list = []
        self.incr_list = []
        if(initial_data is not None) :
            self.bulk_add(initial_data)   
            
    def bulk_add(self, df) :
        questions = list(df.content_id.values)
        answers = list(df.answered_correctly.values)
        corr = [q for q,a in zip(questions,answers) if a==1]
        incorr = [q for q,a in zip(questions,answers) if a==0]
        self.corr_list = self.corr_list +corr
        self.incr_list = self.incr_list +incorr
        
        
        
    #def single_add(row) :
    #    a=1
        
    def get_history(self) :
        return self.corr_list, self.incr_list
    
    def __repr__(self) :
        return "UserHistory {} with {} corr_list and {} incr_list entries\n".format(self.user_id, len(self.corr_list),len(self.incr_list))
        
        
        

In [None]:
# test
df = train_df.head(3)
display(df)
user_test = UserHistory(8,df)
print(user_test)

user_test.bulk_add(df)
user_test.bulk_add(df)

user_test.get_history()


# Mega Model - 15K NB's

In [None]:
class MegaModel() :
    def __init__(self, train_df=None, val_df=None) :
        self.nb_models = QmDict({})
        self.train_df = train_df
        self.val_df = val_df
        self.user_history_dict = {}
        self.history_depth = 200 # how far back in Q/A history do we scan ...
            
    
    def get_users(self, question_filter=None) :
        df = self.train_df
        if(question_filter is not None) :
            npt("Returning only users that answered content_id:{}".format(question_filter))
            df = df[df.content_id==question_filter]
        user_ids = sorted(list(df.user_id.unique()))
        print("Num unique users {}".format(len(user_ids)))

        return user_ids
    
    def get_user_transactions_from_training(self, user_idx) :
        '''
        Not to be confused with my user history dict.  This just fetches a pandas 
        data frame from training dataframe (for now)
        '''
        user_transacitons_df = self.train_df[self.train_df["user_id"] == user_idx]
        return user_transacitons_df
    
    # This is expensive .. O(user_hist^2)
    def process_user(self, user_idx, debug=True) :
        npt("user:{}".format(user_idx))
        user_history_df = self.get_user_transactions_from_training(user_idx)
        # add to custom user history data structre.  To be used later for inference !
        user_history_struct = UserHistory(user_idx,user_history_df)
        
        nr = len(user_history_df)
        if debug : print("user hist len {}".format(nr))        
        if debug : display(user_history_df)

        # use content_id / answered_correctly ...
        # 1st row in user_history has no history.  start with second row to end, and on inner loop add hist up to that row..
        qh_idx =  list(user_history_df.content_id)
        qh_ans =  list(user_history_df.answered_correctly)
        qmd = QmDict({})
        for i in range(1,nr) :
            curr_q_idx = qh_idx[i]
            curr_q_correct =  qh_ans[i]
            # qm = self.nb_models[curr_q_idx]
            qmd.qm_dict[curr_q_idx] = QuestionHistory(curr_q_idx)
            qm = qmd.qm_dict[curr_q_idx]
            # log result of current question
            qm.add_result(curr_q_correct)
            
            start = 0 if i < self.history_depth else i-self.history_depth
            for j in range(start,i) :
                prev_q_idx = qh_idx[j]
                prev_q_correct =  qh_ans[j]
                qm.insert_history(answer_correct=curr_q_correct,q_idx=prev_q_idx,q_idx_res=prev_q_correct, debug=False)
          #print("Done adding stats for user:{}, history_len:{}".format(user_idx,nr))
        # Emitting these partials to be reduced later by dask!
        # process_user no longer side effects ...
        return qmd,user_history_struct

    def partition_data(self, df, pct_train=0.85):
        num_samples = len(df)
        split_point = int(pct_train*num_samples)
        self.train_df = df.iloc[0:split_point]
        self.val_df   = df.iloc[split_point:num_samples]
        npt("Added {} train {} val records".format(len(self.train_df),len(self.val_df)))

    
    def train(self, num_users=None, user_print_freq=1000,pct_print_freq=5):
        user_list = self.get_users()
        
        prev_pct=-1
        job_list = []
        
        # Create a bunch of dask delayed objects !
        for i,user in enumerate(user_list) :
            job_list.append(dask.delayed(self.process_user)(user,debug=False))
        
        npt("Processing {} jobs".format(len(job_list)))
        job_list = dask.compute(job_list)
        
        # job_list is interleaved here ... and has for tuple([[qm,uh],[qm,uh]] , blank)
        # To handle qm_dict and user_history separtely, break apart and handle
        # Convert [qm,uh],[qm,uh] => [qm,qm],[uh,uh]
        # neat zip trick !
        [qmd,user_history_list] = list(zip(*job_list[0]))
        # print(qmd)
        # print(uh)
        ##################
        # handle qmd /nbmodels
        ##################
        npt("Tree sum for nb_models")
        def add(q1,q2):
            return q1+q2
        add = dask.delayed(add)
        # Tree Sum ! -> https://examples.dask.org/delayed.html
        L=qmd
        while len(L) > 1:
            print(len(L))
            new_L = []
            for i in range(0, len(L)-1, 2):
                lazy = add(L[i], L[i + 1])  # add neighbors
                new_L.append(lazy)
            L = new_L                       # swap old list for new
        #display(L.visualize())
        # Compute final tree sum
        nbm = dask.compute(L)
        # nbm is a tuple([QmDict],blank) to get QmDict need nb[0][0]
        self.nb_models = nbm[0][0]
        
        ##################
        # handle userhistory
        ##################
        npt("Adding user history")
        for uh in user_history_list :
            self.user_history_dict[uh.user_id] = uh

        npt("Completed!")
        
            #pct = int(100*i/len(user_list))
            
            #if(i%user_print_freq == 0 ) :
            #    print("Processed {} users ".format(i))
        #
            #
            #if(pct!=prev_pct and pct%pct_print_freq==0) :
            #    print("Processed {} %".format(pct))
            #    prev_pct=pct
            ##    break
        #
            #if(num_users is not None and i>num_users) : 
            #    break
#

    # Inference hacks fomr now
    def inference_row(self,user_id,content_id,nb_history_minumum,debug) :
        
        user_hist = mm.user_history_dict[user_id].get_history() if user_id in mm.user_history_dict.keys() else [[],[]]
        # user_hist -> corr_list, incorr_list
        #print(mm.nb_models[content_id])
        prediction = [9999999, 0.505050505050505]
        if(content_id in mm.nb_models.keys()) :
            prediction = mm.nb_models[content_id].log_ratio_qh(user_hist[0],user_hist[1],nb_history_minumum=nb_history_minumum,debug=debug)
        else :
            npt("Warning no model yet avail for content_id {}".format(content_id))
        if debug : npt("Prediction = {}".format(prediction))
        prob_correct = prediction[1]
        return prob_correct
    
    def inference_df(self,df,nb_history_minumum=1,debug=False) :
        
        dfn = df.copy()
        tmp = pd.Series([self.inference_row(uid,cid,nb_history_minumum,debug) for uid,cid in zip(df['user_id'],df['content_id']) ], index=df.index)
        #npt(tmp, len(tmp),len(df))
        dfn.loc[:,"answered_correctly"] = tmp
        
        # return an inference_df ..
        if("row_id" not in df.columns) :
            npt("Adding row_id=user_id for local inferencing")
            dfn["row_id"] = dfn["user_id"]
        # This is what the final submittal tool desires - here row_id can just be the index if inferencing on validation set.
        return dfn[['row_id','answered_correctly','content_type_id','content_id']]

 
    def rmse(self,df,qa_df) :
        a=0
    
    def histogram_stats(self) :
        a=0
        # get the number of correct vs incorrect per user / per question!
    
    def __repr__(self):
        rv = "Model Information\n"
        # Number Models ->
        rv += "Num of models total         : {}\n".format(len(self.nb_models.qm_dict))
        rv += "Num of models updated       : {}\n".format(sum([1 for k,v in mm.nb_models.qm_dict.items() if v.total_q > 0]))
        rv += "Num of models updated w/corr: {}\n".format(sum([1 for k,v in mm.nb_models.qm_dict.items() if len(v.qh_correct) > 0]))
        rv += "Num of models updated w/incr: {}\n".format(sum([1 for k,v in mm.nb_models.qm_dict.items() if len(v.qh_incorrect) > 0]))
        
        # Number of Users
        rv += "Num of unique users processed: {}\n".format(len(self.user_history_dict.keys()))
        # Training Size 
        if(self.train_df is not None) : 
            rv += "training size : {}\n".format(len(self.train_df))
            rv += "training num uniq questions : {}\n".format(len(self.train_df.content_id.unique()))
            rv += "training num uniq users     : {}\n".format(len(self.train_df.user_id.unique()))
        # val size
        if(self.val_df is not None) :
            rv += "valid size : {}\n".format(len(self.val_df))
            rv += "training num uniq questions : {}\n".format(len(self.val_df.content_id.unique()))
            rv += "training num uniq users     : {}\n".format(len(self.val_df.user_id.unique()))
            user_intersection = set(self.val_df.user_id.unique()).intersection(self.train_df.user_id.unique())
            question_intersection = set(self.val_df.content_id.unique()).intersection(self.train_df.content_id.unique())
            rv += "training/val num intersection users     : {}\n".format(len(user_intersection))
            rv += "training/val num intersection questions : {}\n".format(len(question_intersection))
         # Avg user history
        # TBD
        return rv
    
    

# Train MegaModel 
* Todo add dask

In [None]:
# Process your data .. use dask [hmm yes]??
# cluster = LocalCluster(n_workers=10)
# client = Client(cluster)


In [None]:
# client.close()
# print(client)
# cluster.close()
# print(cluster)
# del cluster
# 

In [None]:

mm = MegaModel()
jl=None
with LocalCluster(n_workers=40,
    processes=True,
    threads_per_worker=1,
    ip='tcp://p10a114.pbm.ihost.com:9994',
    dashboard_address=':6555'
) as cluster, Client(cluster) as client:

    print(mm)    
    # if i just take the first N rows, the cardinatlity of users is to high for me to test !! 
    # keeping a smaller set of users helps testing ..
    downsample_df = full_df[full_df['user_id']%1900 == 0]
    downsample_df = downsample_df[downsample_df['content_id']%1 == 0]
    mm.partition_data(downsample_df,pct_train=0.80)
    mm.train( num_users=None)
    #print(mm)


#mm
# Processing // Training Algorithm 
# Foreach user in dset
# get history. 
# from bottom process backwards and add samples to correct / incorrect counters for the question at hand based on history
#foreach row in df .. do
#add result!
#assert(len(user_list)==393656)
#user_list = mm.get_users(question_filter=675)
#len(user_list)
#assert(len(user_list)==16726)


In [None]:
# dask.visualize(L)
# M=dask.compute(L)

In [None]:
# Chek size of objects
# sys.getsizeof(mm) #56
# sys.getsizeof(mm.nb_models.qm_dict)

mm.nb_models+mm.nb_models


In [None]:
# Convert [qm,uh],[qm,uh] => [qm,qm],[uh,uh]
# neat zip trick !
#qmd_uh = list(zip(*jl[0]))
#qmd_uh[0] # just qmd !
#dask.compute(qmd_uh[0])

In [None]:
#mm.nb_models
#mm.user_history_dict

# Aggregation Examle

In [None]:
#user_agg = mm.train_df[['user_id','content_id']].groupby(['user_id']).agg([ 'count'])
#user_agg.columns = ['count']
#multi_q_users = user_agg[user_agg['count']>1]
#multi_q_users = multi_q_users.index.values
#mm.train_df[mm.train_df['user_id'] in multi_q_users]

In [None]:
#mm.nb_models[4301]

## Inference

In [None]:
pd.options.display.float_format = '{:,.3f}'.format

inf_df = mm.val_df.iloc[0:300,:]
ans_df = mm.inference_df(inf_df,nb_history_minumum=5,debug=False)
display(mm.val_df.head(2))
ans_df["answered_correctly_orig"] = inf_df["answered_correctly"] 
# add simple question averages column

display(question_averages_df.head())
question_averages_df.columns = ["total_correct","total_questions","avg_correct"]
ans_df = ans_df.merge(question_averages_df,how="inner",left_on="content_id",right_on="content_id")

ans_df["nb_sqerr"] = np.power(ans_df["answered_correctly"]-ans_df["answered_correctly_orig"],2)
ans_df["avg_sqerr"] = np.power(ans_df["avg_correct"]-ans_df["answered_correctly_orig"],2)


display(ans_df)
print("nb sse ", np.sum(ans_df.nb_sqerr.values))
print("avg sse ", np.sum(ans_df.avg_sqerr.values))

In [None]:
mm.nb_models[1000]

In [None]:
# Save model
import pickle
pickle.dump(obj=mm, file=open("mm_147900.nbm", "wb" ))

In [None]:
!ls -l
sys.getsizeof(mm)

In [None]:
print("User number")
idx=2
print(user_list[idx])
print("Correct // Incorrect history")
print(mm.user_history_dict[user_list[idx]].get_history())

In [None]:
# Get userid / content_id ...
row = val_df.iloc[0]
row.user_id

user_hist = train_df[train_df.user_id == row.user_id]

prediction = mm.nb_models[row.content_id].log_ratio_qh([],[],tot_cutoff=1)
#def log_ratio_qh(self, correct_question_idx, incorrect_question_idx, tot_cutoff=10,debug=True) :


In [None]:
print(mm.nb_models[675].correct)
print(mm.nb_models[675].total_q)

print(mm.nb_models[675].log_ratio_qh([1291],[731],tot_cutoff=0)) # asking the question relative to correct only ..
#print(mm.nb_models[675].qh_correct[731])
#print(mm.nb_models[675].qh_incorrect[731])
print(mm.nb_models[675])

#Key 4078 Value [3, 8] q_prob 0.375
#Key 1291 Value [11, 14] q_prob 0.7857142857142857
#Key 1131 Value [7, 11] q_prob 0.6363636363636364
#Key 1145 Value [11, 11] q_prob 1.0
#Key

In [None]:
print(mm.nb_models[675])

In [None]:
row=train_df.iloc[0]
print(row)
qm = mm.nb_models[row.content_id]
qm.q_idx = row.content_id
qm.insert_history(answer_correct=row.answered_correctly,q_idx=4,q_idx_res=1)


In [None]:
0.75*0.75 / (0.75*0.75 + 0.25*0.25)

Save out to file

In [None]:
# question_averages_df.to_csv('qa.csv')

In [None]:
# !ls

## Submit Code

In [None]:
## # You can only iterate through a result from `env.iter_test()` once
# so be careful not to lose it once you start iterating.
iter_test = env.iter_test()

In [None]:
# (test_df, sample_prediction_df) = next(iter_test)

In [None]:
# sample_prediction_df

In [None]:
# test_df

In [None]:
def make_prediction(test_df, question_averages_df) :
    # Join Tables ...
    prediction_df = test_df[['row_id','content_id','content_type_id']].merge(question_averages_df,left_on='content_id',right_on='content_id',how='left')
    # import numpy as np
    # prediction_df['random'] = np.random.rand(len(prediction_df))
    prediction_df = prediction_df.rename(columns={"pct_correct": "answered_correctly"})
    print("testdf len : {}".format(len(test_df)))
    print("DF len : {}".format(len(prediction_df)))
    return prediction_df[['row_id','answered_correctly','content_type_id']]

In [None]:
# Test Function
# prediction_df = make_prediction(test_df,question_averages_df)
# env.predict(prediction_df)

In [None]:
# prediction_df

In [None]:
for (test_df, sample_prediction_df) in iter_test:
    test_df = test_df.loc[test_df.content_type_id == 0].reset_index(drop=True)
    prediction_df = make_prediction(test_df,question_averages_df)
    env.predict(prediction_df[['row_id', 'answered_correctly']])

In [None]:
prediction_df

In [None]:
test_df