In [190]:
import requests
import zipfile
import subprocess
import os
import uuid
import shlex

algorithm_lookup = {
    "bw": "1.1",
    "gd": "1.2",
    "cgd_pr": "1.3.1",
    "cgd_fr": "1.3.2",
    "cgd_hs": "1.3.3"
}


class BKT(object):
    def __init__(self, 
                 hmm_folder='hmm-scalable-818d905234a8600a8e3a65bb0f7aa4cf06423f1a', 
                 git_commit='818d905234a8600a8e3a65bb0f7aa4cf06423f1a'):
        
        # Git commit to download hmm-scalable
        self.git_commit = git_commit
        # Set HMM-scalable folder.
        self.hmm_folder = hmm_folder
        # Params
        self.params = None
    
    def download(self):
        """  This implementation is a wrapper around the 
        HMM-scalable tool ( http://yudelson.info/hmm-scalable).
        This function will download the original implementation."""
        
        # Download zipfile from GitHub
#         results = requests.get('https://github.com/myudelson/hmm-scalable/archive/master.zip')
        results = requests.get('https://github.com/myudelson/hmm-scalable/archive/%s.zip' % self.git_commit)
        with open('/tmp/hmm-scalable.zip', 'wb') as f:
            f.write(results.content)
            
        # Extract zipfile
        file = zipfile.ZipFile('/tmp/hmm-scalable.zip')
        file.extractall(path='.')
        
        # Install
        process = subprocess.Popen("make all", stdout=PIPE, stderr=PIPE, cwd=self.hmm_folder, shell=True)
        stdout, stderr = process.communicate()
        if process.returncode != 0:
            raise RuntimeError("Could not build HMM tool. Check if the make utility is installed "
                               "and if the folder has appropriate permissions.\n "
                               "Code: %d\n"
                               "Error: %s" % (process.returncode, stderr))
    
    def fit(self, data, q_matrix, solver='bw', iterations=200):
        """ Fit BKT model to data. 
        As of July 2019, just default parameters are allowed.
        
        Parameters
        ----------
        data : {array-like}, shape (n_steps, 3)
            Sequence of students steps. Each of the three dimensions are:
            Observed outcome: 0 for fail and 1 for success
            Student id: student unique identifier
            Question id: question id in q_matrix
            
        q_matrix: matrix, shape (n_questions, n_concepts)
            Each row is a question and each column a concept.
            If the concept is present in the question, the 
            correspondent cell should contain 1, otherwise, 0.
            
        solver: string, optional
            Algorithm used to fit the BKT model. Available solvers are:
            'bw': Baum-Welch (default)
            'gd': Gradient Descent
            'cgd_pr': Conjugate Gradient Descent (Polak-Ribiere)
            'cgd_fr': Conjugate Gradient Descent (Fletcher–Reeves)
            'cgd_hs': Conjugate Gradient Descent (Hestenes-Stiefel)
            
        iterations: integer, optional
            Maximum number of iterations
        
        Returns
        -------
        self: object
        
        Notes
        -----
        This is a wrapper around the HMM-scalable tool (http://yudelson.info/hmm-scalable)
        """
        if not os.path.exists("hmm_files"):
            os.makedirs("hmm_files")
        filename = "hmm_files/%s" % uuid.uuid4().hex
        
        # Create data file in the format expected by the tool
        with open("%s.txt" % filename, "w") as step_file:
            for row in data:
                outcome, student_id, question_id = row
                skills = np.where(q_matrix[question_id] == 1)
                skills = "~".join(str(skill) for skill in skills[0])
                step_file.write("%s\t%s\t%s\t%s\n" % (outcome, student_id, question_id, skills))
        
        # Run train program
        command = "./trainhmm -s %s -d ~ -m 1 ../%s.txt ../%s_model.txt" % (
            algorithm_lookup[solver], filename, filename)
        args = shlex.split(command)
        process = subprocess.Popen(args, stdout=PIPE, stderr=PIPE, cwd=self.hmm_folder)
        process.wait()
        stdout, stderr = process.communicate()
        if process.returncode != 0:
            raise RuntimeError("Could not train HMM model. Check if the HMM files are properly created and "
                               "accessible.\n"
                               "Code: %d\n"
                               "Error: %s" % (process.returncode, stderr))
            
        # Extract fitted params
        with open("%s_model.txt" % filename, "r") as model_file:
            content = model_file.read()
        params = []
        skills = re.findall(r'^\d+\t(\d+)$', content, flags=re.M)
        priors = re.findall(r'^PI\t(\d+\.\d+\t\d+\.\d+)$', content, flags=re.M)
        transitions = re.findall(r'^A\t(\d+\.\d+\t\d+\.\d+\t\d+\.\d+\t\d+\.\d+)$', content, flags=re.M)
        emissions = re.findall(r'^B\t(\d+\.\d+\t\d+\.\d+\t\d+\.\d+\t\d+\.\d+)$', content, flags=re.M)

        for idx, skill in enumerate(skills):
            params.append({
                "skill": skill,
                "priors": np.asarray([float(i) for i in priors[idx].split("\t")]),
                "transitions": np.asarray([float(i) for i in transitions[idx].split("\t")]).reshape((2,2)),
                "emissions": np.asarray([float(i) for i in emissions[idx].split("\t")]).reshape((2,2)),
            })
            
        self.params = params
        
    def get_params(self):
        """ Get fitted params.
        
        Returns
        -------
        params : list. List containing the prior, transition and emission values for each skill.
        """
        if self.params is None:
            raise RuntimeError("You should run fit before getting params")
        return self.params
    
    def set_params(self, params):
        """ Set model params. No validation is done for this function. 
        Make sure the params variable is in the expected format.
        
        Returns
        -------
        self: object
        """
        self.params = params
        return self
        

In [111]:
import re

In [177]:
filename = "hmm_files/fit"
filename = "hmm_files/c7b99343e4144c4da4b50f22210771ea"
# Extract fitted params
with open("%s_model.txt" % filename, "r") as model_file:
    content = model_file.read()
params = []
skills = re.findall(r'^\d+\t(\d+)$', content, flags=re.M)
priors = re.findall(r'^PI\t(\d+\.\d+\t\d+\.\d+)$', content, flags=re.M)
transitions = re.findall(r'^A\t(\d+\.\d+\t\d+\.\d+\t\d+\.\d+\t\d+\.\d+)$', content, flags=re.M)
emissions = re.findall(r'^B\t(\d+\.\d+\t\d+\.\d+\t\d+\.\d+\t\d+\.\d+)$', content, flags=re.M)

for idx, skill in enumerate(skills):
    params.append({
        "skill": skill,
        "priors": np.asarray([float(i) for i in priors[idx].split("\t")]),
        "transitions": np.asarray([float(i) for i in transitions[idx].split("\t")]).reshape((2,2)),
        "emissions": np.asarray([float(i) for i in emissions[idx].split("\t")]).reshape((2,2)),
    })

[{'skill': '0', 'priors': array([0.17653284, 0.82346716]), 'transitions': array([[1.        , 0.        ],
       [0.20830107, 0.79169893]]), 'emissions': array([[0.76312566, 0.23687434],
       [0.07136991, 0.92863009]])}, {'skill': '2', 'priors': array([0.17653284, 0.82346716]), 'transitions': array([[1.        , 0.        ],
       [0.20830107, 0.79169893]]), 'emissions': array([[0.76312566, 0.23687434],
       [0.07136991, 0.92863009]])}]


In [117]:
import re
string = 'value is between 5 and 10'
re.match(r'value is between (.*) and (.*)', string)
# print(m.group(1), m.group(2))

<_sre.SRE_Match object; span=(0, 25), match='value is between 5 and 10'>

### Unit tests

In [234]:
import unittest
import os
from simulate_student import SimulateStudent
import numpy as np

class TestBKT(unittest.TestCase):
    def __init__(self):
        self.PARAMS_KEYS = {
            "skill": str, "priors": np.ndarray, 
            "transitions": np.ndarray, "emissions": np.ndarray
        }
    
    def test_download(self):
        """ Testing HMM-scalable download """
        model = BKT()
        model.download()
        
        # Check if directory exists and it contains items
        self.assertGreater(len(os.listdir(model.hmm_folder)), 1)
        
    def test_fit(self):
        """ Testing if fit tool is able to run and fit data """
        
        # p(L0)
        pi = [0.26, 0.74]
        # p(T)
        A = [[1, 0], [0.17, 0.83]]
        # p(S) and p(G)
        B = [[0.7, 0.3], [0.13, 0.87]]
        
        data = []
        n_questions = 10
        for i in range(50):
            observations = SimulateStudent(pi, A, B).simulate(n_questions)[0]
            for question_id, outcome in enumerate(observations):
                data.append([outcome+1, i, question_id])
        q_matrix = np.array([[1,0,1]]*n_questions)
        model = BKT()
        model.fit(data, q_matrix)
        
        # Make sure self.params variable is complete after this test
        self.assertIsNotNone(model.params)
        
    def test_params(self):
        """ Testing if fitted params are on the expected format """
        # p(L0)
        pi = [0.26, 0.74]
        # p(T)
        A = [[1, 0], [0.17, 0.83]]
        # p(S) and p(G)
        B = [[0.7, 0.3], [0.13, 0.87]]
        
        data = []
        n_questions = 10
        for i in range(50):
            observations = SimulateStudent(pi, A, B).simulate(n_questions)[0]
            for question_id, outcome in enumerate(observations):
                data.append([outcome+1, i, question_id])
        q_matrix = np.array([[1,0,1]]*n_questions)
        model = BKT()
        model.fit(data, q_matrix)
        
        params = model.get_params()
        
        for model_param in params:
            for key, value in self.PARAMS_KEYS.items():
                # Assert that all keys are there
                self.assertIn(key, model_param)
            
                # Assert key type is correct
                self.assertTrue(isinstance(model_param[key], value))

In [60]:
TestBKT().test_download()

In [193]:
TestBKT().test_fit()

In [235]:
TestBKT().test_params()