In [79]:
import pandas as pd
import numpy as np
from scipy.stats import chi2


In [80]:
# declaring the positions as column names for the DNA squence after splitting it.
column_names=['p1','p2','p3','p4','p5','p6','p7','p8','p9','p10',
                        'p11','p12','p13','p14','p15','p16','p17','p18','p19','p20',
                        'p21','p22','p23','p24','p25','p26','p27','p28','p29','p30',
                        'p31','p32','p33','p34','p35','p36','p37','p38','p39','p40',
                        'p41','p42','p43','p44','p45','p46','p47','p48','p49','p50',
                        'p51','p52','p53','p54','p55','p56','p57','p58','p59','p60','label']


In [81]:
def read_training_data():
    """
    Read the DNA sequence and labels from the CSV file
    Split the sequence into individual characters
    Assign new column names as positions into the training data
    """ 
    # Read the DNA sequence and their classes from file training.csv into a dataframe.
    df = pd.read_csv('training.csv',header=None,  usecols=[1,2])
    df.columns=['data','label']
    
    # Split the sequence as individual postions into a new dataframe and assigning the column names declared to it.
    training_data_frame = pd.DataFrame()
    training_data_frame=df['data'].apply(lambda k: pd.Series(list(k)))                   
    training_data_frame[60] = df['label']
    training_data_frame.columns=column_names
    
    # Return the data frame.
    return training_data_frame

In [82]:
def read_testing_data():
    """
    Read the DNA sequence from the CSV file
    Split the sequence into individual characters
    Assign new column names as positions into the testing data
    """
    # Read the DNA sequence from file testing.csv into a dataframe.
    df = pd.read_csv('testing.csv',header=None,  usecols=[1])
    df.columns=['data']
    
    # Split the test DNA sequence as individual postions into a new dataframe and assigning the corresponding column names declared to it.
    testing_data_frame = pd.DataFrame()
    testing_data_frame=df['data'].apply(lambda k: pd.Series(list(k)))
    testing_data_frame.columns=column_names[0:60]
    
    # Return the data frame.
    return testing_data_frame

In [83]:
def calculate_misc_error(labels_column):
    """
    Calculates the Miscalculation Error at a node using below argument.
      1.labels_column = The entire labels column from the current dataset at that node
    """
    # Getting the unique labels and their counts in the label column.
    labels,total_number = np.unique(labels_column,return_counts=True)
    
    # Intializing the misc_error with value 1 and max_label_ratio with 0.
    misc_error=1
    max_label_ratio=0
    
    # Finding the max_label_ratio from the array of unique labels.
    for i in range(len(labels)):
        max_label_ratio = np.maximum(max_label_ratio,(total_number[i]/np.sum(total_number)))
    
    # Returning miscellaneous calculation error.
    return misc_error-max_label_ratio

In [84]:
def calculate_entropy(labels_column):
    """
    Calculates the Entropy at a node using below argument.
      1.labels_column = The entire labels column from the current dataset at that node
    """
    # Getting the unique labels and their counts in the label column.
    labels,total_number = np.unique(labels_column,return_counts=True)
    
    # Intializing the entropy with value 0.
    entropy=0
    
    # Evaluating the entropy for the array of labels and their corresponding counts.
    for i in range(len(labels)):
        entropy = entropy + (-total_number[i]/np.sum(total_number))*np.log2(total_number[i]/np.sum(total_number))
    
    # Returning entropy.
    return entropy

In [85]:
def calculate_gini_index(labels_column):
    """
    Calculates the Gini Index at a node using below argument.
      1.labels_column = The entire labels column from the current dataset at that node
    """
     # Getting the unique labels and their counts in the label column.
    labels,total_number = np.unique(labels_column,return_counts=True)
    
    # Intializing the entropy with value 0.
    gini_index=1
    
    # Evaluating the entropy for the array of labels and their corresponding counts.
    for i in range(len(labels)):
        gini_index = gini_index - np.square(total_number[i]/np.sum(total_number))*np.log2(total_number[i]/np.sum(total_number))
    
    # Returning entropy.
    return gini_index

In [86]:
def information_gain(data,splitting_attribute, impurity_method):
    """
    Calculates the Information Gain of the dataset on the splitting attribute using the below parameters.
     1.data= dataset for the Information gain needs to be calculated.
     2.splitting_attribute = the attribute on which dataset is split.
     3.impurity_method = method used to calcuate the impurity for the data splitting.
    """
    # Based on the input impurity_method parameter, setting the local impurity_method to misc_error or entropy or gini_index   
    if(impurity_method=='misc_error'):
        impurity_method=calculate_misc_error
    elif(impurity_method=='entropy'):
        impurity_method=calculate_entropy
    else:
        impurity_method=calculate_gini_index
     
    # Evaluate the impurity for the splitting attaribute using dataset and current impurity method.
    impurity_at_splitting_attribute = impurity_method(data['label'])
    
    # Getting the unique values and corrsponding counts for the splitting attribute.
    split_attribute_values,total_number = np.unique(data[splitting_attribute],return_counts=True)
    
    # Evaluating the sum of impurities on the child nodes after splitting on the split_attribue.
    total_impurity_after_split = np.sum([(total_number[i]/np.sum(total_number))*impurity_method(data.where(data[splitting_attribute]==split_attribute_values[i]).dropna()['label']) for i in range(len(split_attribute_values))])
    
    # Returning entropy as difference between impurity at splitting_attribute and the sum at its child nodes.
    return impurity_at_splitting_attribute-total_impurity_after_split

In [87]:
def ID3_algorithm(current_data,original_dataset,attributes,impurity_method,source_node_label=None):
    """
    Builds the Decision Tree using the below parameters.
     1.current_data= Dataset at the current node.
     2.original_dataset = The original dataset for the whole decision tree. 
     3.attributes = The list of attributes available in the current dataset. 
     4.impurity_method = Method used to calcuate the impurity for the data splitting.
     5.source_node_label = Most frequent label at the source node.
    """
    # If there is only one label present in the current dataset, then return it.
    if len(np.unique(current_data['label'])) <= 1:
        return np.unique(current_data['label'])[0]
    
    # If the current dataset is empty, then return the most frequent label from the original dataset.
    elif len(current_data)==0:
        return np.unique(original_dataset['label'])[np.argmax(np.unique(original_dataset['label'],return_counts=True)[1])]
    
    # If there are no more attributes present, then return the most frequent label under the source node of current node.
    elif len(attributes) ==0:
        return source_node_label
    else:
    
        # Calculate the most frequent label at the current node.
        source_node_label = np.unique(current_data['label'])[np.argmax(np.unique(current_data['label'],return_counts=True)[1])]
        
        # Calculate the Information Gain for all the attributes using the Impurity Method specified.
        if impurity_method=='misc_error':
            attributes_IG_values = [information_gain(current_data,attribute,'misc_error') for attribute in attributes] 
        elif impurity_method=='gini_index':
            attributes_IG_values = [information_gain(current_data,attribute,'gini_index') for attribute in attributes]
        else:
            attributes_IG_values = [information_gain(current_data,attribute,'entropy') for attribute in attributes]
        
        # Find the attribute with higest IG value in the array.
        optimal_split_attribute_index = np.argmax(attributes_IG_values)
        optimal_split_attribute = attributes[optimal_split_attribute_index]      
        
        # Create a new tree with root node as optimal splitting attribute.
        decision_tree = {optimal_split_attribute:{}}  
        
        # Set the most frequent label for the root node in the decision tree.
        decision_tree['max']=source_node_label
        
        # Perform Chi-square test to check if it is optimal to expand the decision tree.
        if(chi_square(current_data,optimal_split_attribute,'label',0.99)):
            
            # Remove the splitting attribute from the attribute list.
            attributes = [i for i in attributes if i != optimal_split_attribute]
            
            # For each of the values of the splitting attribute, create a new decision tree using and set them as child nodes in form of key-value pairs.
            for split_attribute_value in np.unique(current_data[optimal_split_attribute]):
                value = split_attribute_value
                new_dataset = current_data.where(current_data[optimal_split_attribute] == value).dropna()            
                sub_decision_tree = ID3_algorithm(new_dataset,original_dataset,attributes,impurity_method,source_node_label)            
                decision_tree[optimal_split_attribute][value] = sub_decision_tree
        
        # Return the decision tree.
        return(decision_tree)   
    
    
        
        

In [88]:
def chi_square(dataset, attribute_name, label_attribute_name,confidence):
    """
    Performs a chi-square test on the dataset and given attribute using the below parameters. 
    Returns boolean whether it is optimal to perform split on the given attribute or not.
     1.dataset= Dataset at the current node.
     2.attribute_name = The attribute on which we are performing the test. 
     3.label_attribute_name = The attribute name of the label column. 
     4.confidence = Confidence level currently used in the test.
    """
    
    # Getting the unique values and their corresponding counts for the chi-square test attribute.
    attribute_values,counts_attr_values = np.unique(dataset[attribute_name],return_counts=True)
    
    # Getting the unique labels and their corresponding counts in the dataset.
    label_attribute_values,counts_label_values=np.unique(dataset[label_attribute_name],return_counts=True)
    
    # Setting the intial chi-value to 0.
    actual_chi_value=0;
    
    # Calculating the actual chi-value 
    for i in range(len(attribute_values)):
        for j in range(len(label_attribute_values)):
            real_count= dataset.loc[(dataset[attribute_name]==attribute_values[i]) & (dataset[label_attribute_name]==label_attribute_values[j])].shape[0]
            expected_count = (counts_attr_values[i]*counts_label_values[j])/np.sum(counts_label_values)
            actual_chi_value= actual_chi_value + (np.square(real_count-expected_count)/expected_count)
    
    # Evaluating the degrees of freedom for the given attribute and dataset.
    dof = (len(attribute_values)-1)*(len(label_attribute_values)-1)
    
    # Calculating the expected chi-value.
    expected_chi_value = chi2.ppf(confidence,dof)
    
    # Check if the actual value is greater or equal to expected and return.
    return actual_chi_value>=expected_chi_value

In [89]:
def classify(sequence,decision_tree):
    """
    Classifies the DNA sequence using the below parameters.
     1. sequence = Input test DNA sequence.
     2. decision_tree = Decision Tree Build using the Training Data.
    """
    # Keep navigating down the decision through the branches having values same as test DNA sequence until a label is reached.
    # If the value is not found at any position in the tree, the return the most frequent label at that node as label for that sequence.
    while (isinstance(decision_tree,dict) and not isinstance(decision_tree,str)):
        try:
            root = list(decision_tree.keys())[0]
            decision_tree=decision_tree[root][sequence[root]] 
        except:
            decision_tree = decision_tree['max']
    # Return classified label.
    return decision_tree

In [90]:
def classification_test(test_dataset,decision_tree):
    """
    Performs the classifcation test on the test dataset and writes the output to a file.
     1.test_dataset = Dataset with all the test DNA sequences
     2. decision_tree = Decision Tree Build using the Training Data.
    """
    
    # Converting the test DNA sequences to key-value pairs.
    sequences = test_dataset.iloc[:,:].to_dict(orient = "records")
    
    # Classifing each sequence and write them to a file results.csv in the required format.
    df = pd.DataFrame(columns=['id','class'])
    for i in range(len(test_dataset)):
        df.at[i,'id']=2001+i
        df.at[i,'class']=  classify(sequences[i],decision_tree) 
    df.to_csv('results.csv',index=False)



In [91]:
# Read Training Dataset
training_dataset=read_training_data()

# Build a decision tree using the training data.
decision_tree = ID3_algorithm(training_dataset,training_dataset,training_dataset.columns[:60],'entropy')

# Read Test Dataset
test_dataset = read_testing_data()

In [92]:
# Perform classification test on the test dataset.
classification_test(test_dataset,decision_tree)