# This is the python template for Assignment 04.  
- You must use this template.  
- You must not change any signatures of the methods, only edit the sections indicated with "Write your code here."  
- The return of every function has to be in the right format, otherwise this is a desk reject.  
- Plagiarism leads to failing the assignment!  
- We will terminate the script after 10 min, try to use efficient algorithms.

In [1]:
import pandas as pd

In [2]:
def get_name():
    return "Irem Begüm Gündüz"
def get_matriculationnumber():
    return 7026821

## Useful information:

The structure of a CART is a dict. Use the same names as shown in the example, using other names makes your format invalid and leads to a desk reject.

In [3]:
cart = { "name":"X", "mean":456, "split_by_feature": "aes", "error_of_split": 0.0,
        "successor_left": { "name":"XL", "mean":1234, "split_by_feature": None, "error_of_split":None,
                           "successor_left":None,
                           "successor_right":None
                          },
        "successor_right":{ "name":"XR", "mean":258, "split_by_feature": None,"error_of_split":None,
                           "successor_left":None,
                           "successor_right":None
                          }
       }

The names of the features must be used as defined in this list, using other names makes your format invalid and leads to a desk reject.

In [4]:
features = ["secompress", "encryption", "aes", "blowfish", "algorithm", "rar", "zip", "signature",
            "timestamp", "segmentation", "onehundredmb", "onegb"]

# Task 1: Create a CART

In [5]:
def compute_performance(dataframe,dataframe2=pd.DataFrame(),metric= "performance"):
  #compute the performance of given dataframe(S)
    if  dataframe2.empty:
         feature = extract_features(dataframe)
         results_table = compute_performance_helper(dataframe,feature, metric)
         return results_table
    else:
       features,features2 = extract_features(dataframe,dataframe2)
       results_table = compute_performance_helper(dataframe,features, metric)
       results_table2 = compute_performance_helper(dataframe2,features2, metric)
       return results_table, results_table2

def compute_performance_helper(dataframe,features, metric= "performance"):
    # Initialize a results dataframe
    results_table = results_helper_df()
    
    for feature in features:
        # Split the dataframe into two based on the feature
        left_df = dataframe[dataframe[feature] == 1]
        right_df  = dataframe[dataframe[feature] == 0]

        # Sum the values 
        left_sum = left_df[metric].sum()
        right_sum = right_df [metric].sum()

        # Calculate the mean of the performance 
        if len(left_df) != 0: 
          left_mean = round(left_sum / len(left_df),2) 
        else: 
          left_mean = 0

        if len(right_df ) != 0: 
          right_mean = round(right_sum / len(right_df ),2)
        else: 
          right_mean = 0

        # Calculate the squared error 
        left_sq_err = round(((left_df[metric] - left_mean) ** 2).sum(), 2)
        right_sq_err = round(((right_df [metric] - right_mean) ** 2).sum(), 2)

        # return the total squared error per feature
        total_error = left_sq_err + right_sq_err
        result = [feature, total_error]
        results_table.loc[len(results_table)] = result
    return results_table 

def results_helper_df():
    #create an empty dataframe
    results_df = pd.DataFrame(columns=['feature', 'total_error'])
    return results_df

def extract_features(df,df2=pd.DataFrame()):
    #extract features from columns by disgarding id and performance
    #make the features lower so, we fit the criteria for feature names
    features = [col.lower() for col in df.columns if col != 'performance' and col != 'Id']
    if  df2.size == 0: 
      return features
    else :
      features2 = [col.lower() for col in df2.columns if col != 'performance' and col != 'Id']
      return features,features2

def get_split_var(df, tot_error_col='total_error', feature_col='feature'):
  # sanity checking
  if df[tot_error_col].isna().any() or df[tot_error_col].min() == None:
    return None
  # check if minimum value in the tot_error_col is None
  min_error = df[tot_error_col].min()

  # Check if more than one min exists
  counts_min = df[tot_error_col].value_counts()[min_error]

  # if there is only one feature with the min error extract the split var
  if counts_min == 1:
    split_var = df.loc[df[tot_error_col] == min_error, feature_col].values[0]
  else:
    df_min = df.query(f'{tot_error_col} == @min_error')
    #if all features have the same min error return none
    if df_min[feature_col].nunique() == df[feature_col].nunique():
       return None
     #else sort the features alphabetically, return the first one
    split_var = df_min[feature_col].sort_values().values[0]
  return split_var


def build_tree(df, results=pd.DataFrame(), feature_col='feature', totError='total_error', name='X'):
  if results.empty:
    #compute the initial performance 
    results = compute_performance(df, metric = 'performance')
    
  #start an empty tree 
  tree = {}
  tree['name'] = name
  tree['mean'] = df['performance'].mean()
  splitting_var = get_split_var(results)
    
  # If no more splits are possible
  if splitting_var is None:
    tree['split_by_feature'] = None
    tree['error_of_split'] = None
    tree['successor_left'] = None
    tree['successor_right'] = None

  else:
    tree['split_by_feature'] = splitting_var
    tree['error_of_split'] = results[results[feature_col] == splitting_var][totError].values[0]
    left_df = df.query(f'{splitting_var} == 1')
    left_df = left_df.drop(splitting_var, axis = 1)
    right_df = df.query(f'{splitting_var} == 0')
    right_df = right_df.drop(splitting_var, axis = 1)
    
    # Compute the performance of the left and right splits
    results_left,results_right = compute_performance(left_df,right_df)
    
    # build the rest of the tree
    tree['successor_left'] = build_tree(left_df, results_left, name=name+'L')
    tree['successor_right'] = build_tree(right_df, results_right, name=name+'R')
        
  return tree


In [6]:
def get_cart(sample_set_csv):
    # The sample_set_csv is a file path to a csv data, this can be imported into a dataframe
    df = pd.read_csv(sample_set_csv)
    
    #build the cart tree
    cart = build_tree(df)

    return cart


# Task 2a: Highest influencing feature

In [7]:
def get_highest_influence_feature_helper(cart):
    #initialize the outcomes
    highest_feature = cart["name"]
    highest_error = cart["error_of_split"]
    feature = cart["split_by_feature"]

    # Check if the left successor exists
    if cart["successor_left"] != None:
      left_feature, left_error, left_feature= get_highest_influence_feature_helper(cart["successor_left"])
      if (left_feature is not None and left_error is not None):
        # Compare the left error with the current  error
        if left_error > highest_error:
          highest_feature = left_feature
          highest_error = left_error
          feature = left_feature

    if cart["successor_right"] != None:
      right_feature, right_error, right_feature = get_highest_influence_feature_helper(cart["successor_right"])
      if (right_feature is not None and right_error is not None):
        if right_error > highest_error:
          highest_feature = right_feature
          highest_error = right_error
          feature = right_feature
    #return name, error and original name of the feautre with highest influence
    return highest_feature, highest_error , feature

In [8]:
def get_highest_influence_feature(cart):
   highest_feature, highest_error , feature_split = get_highest_influence_feature_helper(cart)
   return feature_split

# Task 2b: Calculate the error rate

In [9]:
def predict_performance(tree, feature):
  # if no more splits return the mean,
    if tree['split_by_feature'] is None:
        return tree['mean']
    else:
      # Check the value of the feature used for the split
        feature_value = feature[tree['split_by_feature']]
        if feature_value == 1:
            return predict_performance(tree['successor_left'], feature)
        else:
            return predict_performance(tree['successor_right'], feature)

In [10]:
def get_error_rate(cart, sample_set_csv):
      # The sample_set_csv is a file path to a csv data, this can be imported into a dataframe
      df = pd.read_csv(sample_set_csv)
    
      # Initialize error rate
      error_rate = 0

      for index, feature in df.iterrows():
        #predict the performance per feature
        pred_perf = predict_performance(cart, feature)
        
        #compute prediction error 
        error_rate = error_rate + abs(pred_perf - feature['performance'])

      #make it average error rate
      error_rate = error_rate/ len(df)

      return error_rate

# Task 2c: Generate optimal configuration

In [11]:
def get_optimal_configuration(cart, partial_config):
    optimal_config = set(partial_config)
    #stop when split feature is none
    if cart['split_by_feature'] is None:
      #arrange optimal_config based on feature diagram 
        if "zip" in partial_config or cart:
            if "zip" and "rar" not in optimal_config:
                optimal_config.add("rar")
        elif "rar" in partial_config or cart:
            if "rar" not in optimal_config:
                optimal_config.add("zip")
        if "encryption" not in partial_config and cart:
            for i in ("aes", "blowfish"):
                if i in optimal_config:
                  optimal_config.remove(i)
        else:
          if "aes" in partial_config or cart:
            if "aes" not in optimal_config:
              optimal_config.add("blowfish")
          if "blowfish" in partial_config or cart:
            if "blowfish" not in optimal_config:
              optimal_config.add("aes")
        if "segmentation" not in partial_config and cart:
          for i in ("onehundredmb", "onegb"):
            if i in optimal_config:
              optimal_config.remove(i)
        else:
          if "onehundredmb" in partial_config or cart:
            if "onehundredmb" not in optimal_config:
              optimal_config.add("onegb")
          if "onegb" in partial_config or cart:
            if "onegb" not in optimal_config:
              optimal_config.add("onehundredmb")
        return optimal_config
    else:
        feature = cart['split_by_feature']
        #seperate partial config to right and left
        left_config = set(partial_config)
        right_config = set(partial_config)
        left_config.add(feature)

        #predict performance for each 
        left_performance = predict_performance(cart['successor_left'], left_config)
        right_performance = predict_performance(cart['successor_right'], right_config)

        if left_performance <= right_performance:
            return get_optimal_configuration(cart['successor_left'], left_config)
        else:
            return get_optimal_configuration(cart['successor_right'], right_config)


# Tests:  
In the following cells, we provide you some test cases (but not all possible test cases!)

In [12]:
# Task 1

test_cart = {'name': 'X', 'mean': 763.2, 'split_by_feature': 'segmentation', 'error_of_split': 6.0, 
             'successor_left': 
                 {'name': 'XL', 'mean': 772.0, 'split_by_feature': 'onegb', 'error_of_split': 0.0, 
                  'successor_left': 
                      {'name': 'XLL', 'mean': 770.0, 'split_by_feature': None, 'error_of_split': None, 
                       'successor_left': None, 
                       'successor_right': None
                      }, 
                  'successor_right': 
                      {'name': 'XLR', 'mean': 773.0, 'split_by_feature': None, 'error_of_split': None, 
                       'successor_left': None, 
                       'successor_right': None
                      }
                 }, 
             'successor_right': 
                 {'name': 'XR', 'mean': 750.0, 'split_by_feature': None, 'error_of_split': None, 
                  'successor_left': None, 
                  'successor_right': None}
            }


if get_cart("Performance_01.csv") == test_cart:
    print("passed")
else:
    print("failed")

passed


In [13]:
# Task 2b
if get_error_rate(test_cart, "Performance_02b.csv") == 5:
    print("passed")
else:
    print("failed")

passed


In [14]:
# Task 2c
test_cart_v2 = {'name': 'X', 'mean': 763.2, 'split_by_feature': 'zip', 'error_of_split': 0.0, 
                 'successor_left': {'name': 'XL', 'mean': 772.0, 'split_by_feature': None, 'error_of_split': None, 
                                    'successor_left': None, 
                                    'successor_right': None}, 
                 'successor_right': {'name': 'XR', 'mean': 750.0, 'split_by_feature': None, 'error_of_split': None, 
                                     'successor_left': None, 
                                     'successor_right': None}
                }

optimal_config = get_optimal_configuration(test_cart_v2, {"secompress", "encryption", "aes", "algorithm", "signature",
                                                        "timestamp", "segmentation", "onehundredmb"})
reference = {'aes', 'algorithm', 'encryption', 'onehundredmb', 'rar', 'secompress', 'segmentation', 'signature',
            'timestamp'}

if optimal_config == reference:
    print("passed")
else:
    print("failed")

passed
