In [1]:

import numpy as np
import scipy.stats
from collections import Counter
import random
import pandas as pd
from sklearn.model_selection import train_test_split


In [14]:

class Dtree :
  def __init__(self , 
               X = None ,
               y = None ,
               depth = 0,
               MAX_DEPTH = 3,
               error = "entropy",
               features = None ,
               label = None,
               feature = None, 
               Type = None,
               AllFeatures = [],
               value =None,
               NextSplitBy = None,
               confidenceLevel = 0.05,
               minSampleSize = 20
               ):
    
        self.children = {}
        self.X = X
        self.y = y
        self.MAX_DEPTH = MAX_DEPTH
        self.AllFeatures = AllFeatures
        self.NextSplitBy = NextSplitBy
        self.feature = feature
        self.value = value
        self.label = label
        self.Type = Type
        
        self.confidenceLevel = confidenceLevel
        self.depth = depth
        self.error = error
        self.minSampleSize = minSampleSize

        self.Bulid()


  def Bulid (self):
    
    if self.depth >= self.MAX_DEPTH:
        self.label = max (dict (self.y.value_counts()))
        self.Type = "leaf"
        return 

        

    if len (self.y.unique()) == 1: #one class left in Y
      self.label = max (dict (self.y.value_counts()))
      self.Type = "leaf"
      return 


    if len(self.AllFeatures) == 0: # No features left
      self.label = max (dict (self.y.value_counts()))
      self.Type = "leaf"
      return


    if len (self.y) < self.minSampleSize :   #min sample examples
      self.Type = "leaf"
      self.label = max (dict (self.y.value_counts()))
      return 



    self.NextSplitBy = self.MakeSplitDecision(self.AllFeatures , self.error)  

    if self.NextSplitBy is None:
      self.AllFeatures = []
      self.label = self.y.unique()[0]
      self.Type ="leaf"
      return 

    if not (self.chi_square(self.NextSplitBy , self.X , self.y)):
        self.AllFeatures =[]
        return

    if len(self.y.unique()) > 1 : 
        
        possibleValues = self.X[self.NextSplitBy].unique()

        for v in possibleValues:
          
          SubsetX = self.X[self.X[self.NextSplitBy] == v] # return rows where feature == specific values
          SubsetY = self.y[SubsetX.index]
     
          m = [f for f in self.AllFeatures if f != self.NextSplitBy]

          child = Dtree(X = SubsetX.copy() ,
                        y = SubsetY.copy() ,
                        error = self.error,
                        depth = self.depth +1 ,
                        label = max (dict (SubsetY.value_counts())),
                        AllFeatures = m,
                        MAX_DEPTH = self.MAX_DEPTH,
                        feature = self.NextSplitBy,
                        value = v,
                        Type = "internal",
                        confidenceLevel = self.confidenceLevel,
                        minSampleSize = self.minSampleSize
                        )
   
          self.children[v]= child # append to children
        
        for v in possibleValues:          
          self.children[v].Bulid()
            

            

    return self

  
  def predict (self , df):
    """ recieves a dataframe and returns a list of predictions """


    predictions = []
    setOfFeature= set()
    for indx in range(len(df)):

        pred = self.predictOne(dict(df.iloc[indx]).copy()) 

        predictions.append(pred)

    return predictions

  def predictOne(self , row) -> str:


      node = self
      label = "Undefined"
        
      features = None #self.AllFeatures

      while (node is not None ):

        if len(node.children) != 0 :

              branchName = row[node.NextSplitBy]
              child =None
              try :
                child = node.children[branchName] 
              except:
                return node.label

              row.pop (node.NextSplitBy)

              node = child 
 
        else:
              label = node.label
              break

      return label 



  def MakeSplitDecision (self , features , error):
    """
    Perform spliting on the tree until:
        1- the maximum depth is reached, 
        2- one class remains, or
        3- A performance metric is achieved.
    """
    
    #determine the best split based on the features

    BestFeature = None
    BestInformationGain = -1
    BestFeatureList = []

    #consider all error metrics for the split. For a list of features, 
    #we take the feature that satisfies most of error metrics

    if error == "all" or error == "All": 
      for err in {"Entropy" , "MCE" , "gini"}:
        BestFeatureList = {}

        for f in features:  

          IG = self.CalculateIG(f , err)

          if BestInformationGain < IG:
            BestFeature = f
            BestInformationGain = IG

        BestFeatureList[err] = BestFeature

      return max(Counter(BestFeatureList.values())) #return the feature suggested by most error metrics

    else:
      for f in features:  

          IG = self.CalculateIG(f , error)

          if BestInformationGain < IG:
            BestFeature = f
            BestInformationGain = IG

      return BestFeature


  def chi_square(self, feature , X , y  ):
        
    """
    Given a feature, return True if that feature useful for performing the next split.
    return False, if not.
    """
    if self.confidenceLevel == 0: #always expand
        return True
    
    possibleValues = set(X[feature])

    X_counts = dict (X[feature].value_counts()) 

    y_counts = dict (Counter(y))


    Sum = 0
    for v in possibleValues:
      SubsetX = X[X[feature] == v] 
      SubsetY = y[SubsetX.index] 

      SubsetY_counts = dict (Counter(SubsetY))


      for key in y_counts.keys():
        actual = SubsetY_counts.get(key , 0)
        expected = len(SubsetY) * ((y_counts.get(key , 0) / len(y)))

        Sum += ((actual - expected)**2)/expected


    dFreedom = (len(y_counts) - 1) * (len(possibleValues) - 1)
    
    

    chi_value = scipy.stats.chi2.ppf(1- self.confidenceLevel, df=dFreedom)


    if (Sum > chi_value):
      return True

    return False
  
  def CalculateIG(self , feature , error ):
    """
      loop over attribute's values to calculate the maximum Information Gain.
    """

    #Get unique values for that attributes:
    Xtemp = self.X.copy()
    ytemp = self.y.copy()

    PossibleValuesForFeature= Xtemp[feature].unique()

    IG = None

    if error == "MCE" or error == "mce":
      IG = self.MCE(Xtemp, ytemp)
    elif error == "entropy" or error == "Entropy":
      IG = self.Entropy(Xtemp, ytemp)
    elif error == "gini" or error == "Gini":
      IG = self.GiniIndex(Xtemp, ytemp)
    else:
      print("enter a valid error metric name , {MCE, entropy, Gini, or  All}.")      
    
    

    for value in PossibleValuesForFeature:
      SubsetX = Xtemp[Xtemp[feature] == value] # return rows where feature == specific values
      SubsetY = ytemp [SubsetX.index]

      IG -=  (SubsetX.shape[0] / Xtemp.shape[0]) * self.Entropy(SubsetX, SubsetY) 
      if error == "MCE" or error == "mce":
         IG -=  (SubsetX.shape[0] / Xtemp.shape[0])  * self.MCE(SubsetX, SubsetY) 
  
         
      elif error == "entropy" or error == "Entropy":
        IG -= (SubsetX.shape[0] / Xtemp.shape[0]) * self.Entropy(SubsetX, SubsetY) 
         

      elif error == "gini" or error == "Gini":
        IG -=  (SubsetX.shape[0] / Xtemp.shape[0]) * self.GiniIndex(SubsetX, SubsetY) 
         

      else:
        print("enter a valid error metric name , {MCE, entropy, Gini, or  All}.") 

    return IG



  def GiniIndex(self, Xtemp, ytemp ):
    
    gini = 0
    
    for c in ytemp.unique(): #returns the rows where class == c   
      splitCandidate = Xtemp[ytemp[:] == c] 

      L1 = len(splitCandidate) 
      L2 = len(Xtemp)

      ratio = L1 / L2
      gini += (ratio ** 2)

    return 1- gini 




  def Entropy(self, XTemp , yTemp):
    """
    For attribute x_i, calculate the value of entropy 
    """

    entropy = 0
    for c in yTemp.unique(): #returns the rows where class == c   
      splitCandidate= XTemp[yTemp[:] == c] 

      L1 = len(splitCandidate) 
      L2 = len(XTemp)

      ratio = L1 / L2
      entropy += ratio * np.log2(ratio)
    

    return -1 * entropy #take every class i and do (- p_i .log p_i)





  def MCE(self , Xtemp, ytemp):
    """
     - Miss-Classification Error(MCE): measures the amount
     of samples that classified with an incorrect label. 

     - calculating the MCE for each attributes (A) then 
     the attribute with minimum MCE is selected at each node.
    """
    mceList = []
    
    for c in ytemp.unique(): #returns the rows where class == c   
      splitCandidate = Xtemp[ytemp[:] == c] 

      L1 = len(splitCandidate) 
      L2 = len(Xtemp)

      ratio = L1 / L2
      mceList.append(ratio)

    return 1- max (mceList) 


In [3]:
class DecisionTreeClassifier:

  def __init__(self, error_metric= "entropy",
               confidence_level = 0.05 ,
               MAX_DEPTH = 3,
               n_ensembles = 1,
               stopping_critera = "Chi_square",
               minimumSampleSize = 20) :
    
    self.forest = []
    self.MAX_DEPTH = MAX_DEPTH
    self.stopping_critera = stopping_critera
    self.error_metric = error_metric
    self.confidence_level = confidence_level
    self.minimumSampleSize = minimumSampleSize
    self.n_ensembles = n_ensembles
      
  def fit (self, X_temp, y_temp):
    
    if self.n_ensembles == 1 :
        Tree = Dtree (X = X_temp ,
                       y= y_temp,
                       AllFeatures= X_temp.columns,
                       Type = "root",
                       MAX_DEPTH = self.MAX_DEPTH,
                       error = self.error_metric ,
                       confidenceLevel = self.confidence_level,
                       minSampleSize = self.minimumSampleSize)
        
        self.forest.append(Tree)
        
    elif self.n_ensembles > 1:
        self.ensemble(X_temp, y_temp)
        
    else:
        raise ValueError('Error n_ensembles cannot be negative!')
        
    
  def predict (self, X):
    
    ids = X.index  
    predictions = {i:{"IE":0 ,"EI":0, "N":0 } for i in ids}
    
    for i in ids:  #iterate over all rows
        
        #predict row_i using all models
        for tree in self.forest : 
            
            # predict one row using tree_i
            label = tree.predictOne(dict(X.loc[i])) 
            
            # increase the predicted label counter
            predictions[i][label] +=1    

    #extract labels with max predicted count    
    forestPredictions = [max(predictions[i],
                             key=predictions[i].get) for i in predictions.keys()]

    return np.array(forestPredictions)

    

  def accuracy_score(self, X_test , y_true):
        y_pred = self.predict(X_test)
        return np.sum(np.equal(y_true, y_pred)) / len(y_true)
    

  def ensemble (self, X, y):
    
    
    #sampled rows should be attached with the correct label.
    # We need to sample both X and y together.
    df = X.copy()
    df["label"] = y
    
    # reset index to start from zero, pd.sample method takes ordered dataframe
    df = df.reset_index(drop=True) 
    
    
    for i in range (self.n_ensembles):

        df_sampled = df.sample(frac=1 , axis= 0 , replace=True).reset_index(drop=True) 
        
        y_sampled = df_sampled["label"]
        X_sampled = df_sampled.drop(labels='label', axis=1)
        
        
        Tree = Dtree (X = X_sampled,
                       y= y_sampled,
                       AllFeatures= X_sampled.columns,
                       Type = "root",
                       MAX_DEPTH = self.MAX_DEPTH,
                       error = self.error_metric ,
                       confidenceLevel = self.confidence_level,
                       minSampleSize = self.minimumSampleSize)
                      
        self.forest.append(Tree)
        print("Classifier #", i+1, "completed training: ")
        print("\tAccuracy on Training data: ", self.accuracy_score(X_sampled, y_sampled))
        print()
  


In [4]:

def replace (s , replace, listOfStr):
    """ given a string return a new a string with a char replaced randomly from a given list."""
    countReplace = s.count(replace)
    for i in range (s.count(replace)):
        s = s.replace(replace , random.choice(listOfStr) , 1 )
    
    return s


def split(s: str ,chunk_size: int ):
    """Split a string into chunks based on chunk_size """
    
    listStr =[]
    
    for i in range(0, len(s), chunk_size) :
        listStr.append(s[i:i+chunk_size])

    return listStr

def n_grams(df , chunk_size=1): 
    """Split the samples into features of grams based into chunk_size """
    
    features = list(range(1, int( len(df["sample"][0])/chunk_size ) +1 , 1))
    List = [split(s , chunk_size) for s in df["sample"]]
    
    df = pd.concat(
        [df, pd.DataFrame(
            List, 
            index=df.index, 
            columns=features) ] , axis=1 )

    return df , features

def preprocess (df, ngrams = 1 , yFlag = False , duplicates = "no", unknown="no"):
    
    """perform the preprocessing steps:
        - replace ambigous letters
        - generate features based on the n-gram model
        - deal with duplicate rows
        
        parameters:
            y_flag: True, if df contains labels. The labels will be extracted.
            duplicates: if "drop", the duplicate rows will be dropped. Otherwise, they will be kept.
            unknown: if "drop", rows with ambiguous letters will be dropped. 
                     Otherwise, they will be replaced based on "replacements" dictionary.
            
    """
    
    replecements = {"D": ["A", "G", "T"],
                "N": ["A", "G", "C", "T"],
                "S": ["C" , "G"],
                "R" : ["A" , "G"]}
    
    if duplicates == "drop":
        df.drop_duplicates(subset=['sample'])
        
    if unknown == "drop":
        unknowDNA = df["sample"].apply(lambda x: ("N" in x) or ("D" in x) or ("S" in x) or ("R" in x))
        df = df.drop( unknowDNA[unknowDNA ==True].index )
    else:
        df['sample'] = df['sample'].apply(lambda x: replace(x, 'D' , replecements["D"]))
        df['sample'] = df['sample'].apply(lambda x: replace(x, 'N' , replecements["N"]))
        df['sample'] = df['sample'].apply(lambda x: replace(x, 'S' , replecements["S"]))
        df['sample'] = df['sample'].apply(lambda x: replace(x, 'R' , replecements["R"]))

        
    df, features = n_grams(df , chunk_size = ngrams)
    X = df[features]
    y =None
    
    if yFlag:
        y = df["label"]
        
    return X , y


In [5]:
dataset = pd.read_csv("/Users/ajararweh/Downloads/train.csv" , names = ["id", "sample", "label"])
df = dataset.copy()

X , y = preprocess(df, ngrams= 1 , yFlag=True, duplicates="no")

In [6]:
X_train, X_CV, y_train, y_CV = train_test_split(X, y, test_size = 0.2,random_state = 0)


classifier = DecisionTreeClassifier(error_metric="gini" ,
                                    minimumSampleSize =4 ,
                                    MAX_DEPTH = 7,
                                    confidence_level= 0.999,
                                    n_ensembles=2)

classifier.fit(X_train,y_train)

print("Train: ",classifier.accuracy_score(X_train,y_train))
print ("CV: "  , classifier.accuracy_score(X_CV, y_CV))

Classifier # 1 completed training: 
	Accuracy on Training data:  0.998125

Classifier # 2 completed training: 
	Accuracy on Training data:  0.9825

Train:  0.95625
CV:  0.8875


In [None]:
testData = pd.read_csv("/Users/ajararweh/Downloads/test.csv" , names = ["id", "sample"])
X_test , _ = preprocess(testData , yFlag=False, duplicates="no")

In [10]:
y_pred = pd.DataFrame({"id":testData["id"], "class":classifier.predict(X_test)})

In [11]:
y_pred.to_csv("/Users/ajararweh/Downloads/submission31.csv", index = False )