# **Implementing a Random Forest from scratch and running it on spotify data to predict whether a user would like a particular song or not. Sources I referred for this code are cited at the end of this notebook! :)**

Importing Required Libraries

In [1]:
import numpy as np
import pandas as pd
import random
from pprint import pprint

Reading CSV data and dropping unecessary columns

In [4]:
spotifyData = pd.read_csv("data1.csv")
spotifyData = spotifyData.drop(['song_title', 'artist', 'serial_num'], axis=1)
spotifyData.head()

Unnamed: 0,acousticness,danceability,duration_ms,energy,instrumentalness,key,liveness,loudness,mode,speechiness,tempo,time_signature,valence,target
0,0.0102,0.833,204600,0.434,0.0219,2,0.165,-8.795,1,0.431,150.062,4.0,0.286,1
1,0.199,0.743,326933,0.359,0.00611,1,0.137,-10.401,1,0.0794,160.083,4.0,0.588,1
2,0.0344,0.838,185707,0.412,0.000234,2,0.159,-7.148,1,0.289,75.044,4.0,0.173,1
3,0.604,0.494,199413,0.338,0.51,5,0.0922,-15.236,1,0.0261,86.468,4.0,0.23,1
4,0.18,0.678,392893,0.561,0.512,5,0.439,-11.648,0,0.0694,174.004,4.0,0.904,1


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Function to custom split training and testing data

In [5]:
def custom_train_test(spotifyData, split_size) :
	#Get random indices from data
	index_test_data = random.sample(population=spotifyData.index.tolist(), k=split_size)
	#Make test_data from these random indices
	test_data = spotifyData.loc[index_test_data]

	#Remaining data after dropping these indices = training data
	train_data = spotifyData.drop(index_test_data)

	return test_data,train_data

Creating training and testing data

In [6]:
random.seed(0)
test_data, train_data = custom_train_test(spotifyData,300)

Base function - Checks if we have unique classes or not. If there is just one unique class, we have reached a classification

In [7]:
def are_distinct_classes_present(arrData):
	distinct_values = np.unique(arrData[:,-1])
	if(distinct_values.size == 1):
		return 1
	else:
		return 0

Function to check which classification is occurring maximum, and hence needs to be returned

In [8]:
def max_occurring_class(arrData):
  values, counts = np.unique(arrData[:,-1],return_counts=True)
  max_occurring_index = counts.argmax()
  max_occurring_class_value = values[max_occurring_index]
  return max_occurring_class_value

Function to find points to split decision tree

In [9]:
def find_split_points(arrData,subspace):
  rows,columns = arrData.shape
  split_points = {}

  all_columns = list(range(columns-1))

  if((subspace != None) and (subspace <= len(all_columns))):
    all_columns = random.sample(population=all_columns, k=subspace)
  
  for col in all_columns :
    data_points = arrData[:, col]
    distinct_values = np.unique(data_points)
    split_points[col] = distinct_values
        
  return split_points

Function to split a dataset into two based on value passed

In [10]:
def split_dataset(arrData, col, value):
  split_values = arrData[:, col]

  if(feature_list[col] == "cont"):
    data1 = arrData[split_values <= value]
    data2 = arrData[split_values > value]
  else:
    data1 = arrData[split_values == value]
    data2 = arrData[split_values != value]

  return data1, data2

Get entropy of datasets

In [11]:
def entropy_single_dataset(arrData):
	values, unique_counts = np.unique(arrData[:,-1],return_counts=True)
	probArr = unique_counts / unique_counts.sum()
	
	entropy1 = sum(probArr * -np.log2(probArr))
	
	return entropy1

def entropy_multiple_datasets(data1, data2):
	prob_data1 = len(data1) / (len(data1) + len(data2))
	prob_data2 = len(data2) / (len(data1) + len(data2))
	
	total_entropy = (prob_data1 * entropy_single_dataset(data1) + prob_data2 * entropy_single_dataset(data2))

	return total_entropy

Finalize point to split decision tree based on entropy of available options

In [12]:
def final_split_point(arrData, split_points):
	best_entropy = 9999

	for col in split_points:
		for value in split_points[col]:
			data1 , data2 = split_dataset(arrData, col=col, value=value)
			curr_entropy = entropy_multiple_datasets(data1,data2)

			if(curr_entropy <= best_entropy):
				best_entropy = curr_entropy
				best_split_col = col
				best_split_value = value

	return best_split_col , best_split_value

Make the decision trees with minimum number of elements passed and maximum depth

In [13]:
def make_decision_trees(data, num_calls=0,minimum=2, maximum=20, subspace=None):

	if num_calls == 0 :
		global headers, feature_list
		feature_list = differentiate_features(data)
		headers = data.columns
		arrData = data.to_numpy()
	else:
		arrData = data


	if((are_distinct_classes_present(arrData) == 1) or (len(data) < minimum) or (num_calls == maximum)) :
		return max_occurring_class(arrData)

	else:
		num_calls += 1

		split_points = find_split_points(arrData,subspace)
		best_split_col , best_split_value = final_split_point(arrData,split_points)
		data1 , data2 = split_dataset(arrData, best_split_col, best_split_value)
	
		if ((len(data1) == 0) or (len(data2) == 0)):
			return max_occurring_class(arrData)

		name = headers[best_split_col]
		if(feature_list[best_split_col] == "cont"):
			split_title = "{} <= {}".format(name, best_split_value)
		else:
			split_title = "{} = {}".format(name, best_split_value)

		sub_tree = {split_title: []}

		yes = make_decision_trees(data1, num_calls, minimum, maximum,subspace)
		no = make_decision_trees(data2, num_calls, minimum, maximum,subspace)

		if yes == no:
			sub_tree = yes
		else:
			sub_tree[split_title].append(yes)
			sub_tree[split_title].append(no)


		return sub_tree

Testing tree , uncomment print statement to test it out! :D

In [22]:
tree = make_decision_trees(train_data, maximum=10)
#pprint(tree)

Function to classify one particular test

In [16]:
def test_one(test1,tree):
  split_title = list(tree.keys())[0]
  feature, operator, value = split_title.split()

  if (operator == "<="):
    if test1[feature] <= float(value):
      classification = tree[split_title][0] #Yes Answer
    else:
      classification = tree[split_title][1]
  else:
    if str(test1[feature]) == value:
      classification = tree[split_title][0] #Yes Answer
    else:
      classification = tree[split_title][1]    

  if not (type(classification) is dict):
    return classification

  else:
    return test_one(test1, classification)


Calculate the accuracy of our predictions based on the actual values in the dataset

In [17]:
def accuracy(predicted_targets,targets):
    correctness = predicted_targets == targets
    calculated_accuracy = correctness.mean()
    return calculated_accuracy


Run the test_one function for all options

In [18]:
def test_all(data, tree):
    result = data.apply(test_one, args=(tree,), axis=1)
    return result

Split feature set between categorical and continuous to decide how they will be compared to each other while building the trees

In [19]:
def differentiate_features(data):
  feature_list = []
  for col in data.columns:
    if(len(data[col].unique()) <= 20):
      feature_list.append("cat")
    else:
      feature_list.append("cont")

  return feature_list


Bootstrap our data with replacement

In [20]:
def lets_bootstrap(data,n):
  random_indices = np.random.randint(low=0, high=len(data), size=n)
  new_df = data.iloc[random_indices]
  return new_df

Actual RF function and predictions

In [21]:
def rf_final(train_data, num_trees, num_bootstrap, num_features, maximum_d):
    forest = []
    for i in range(num_trees):
        bootstrap_data = lets_bootstrap(train_data, num_bootstrap)
        tree = make_decision_trees(bootstrap_data, maximum=maximum_d, subspace=num_features)
        forest.append(tree)
    
    return forest

def random_forest_predictions(test_data, forest):
    predictions_data = {}
    for i in range(len(forest)):
        column = "tree_{}".format(i)
        predictions = test_all(test_data, tree=forest[i])
        predictions_data[column] = predictions

    predictions_data = pd.DataFrame(predictions_data)
    random_forest_predictions = predictions_data.mode(axis=1)[0]
    
    return random_forest_predictions

Calling the forest and prediction functions. Uncomment the print statement to print our forest being formed

In [24]:
forest = rf_final(train_data, num_trees=50, num_bootstrap=800, num_features=8, maximum_d=50)
predictions = random_forest_predictions(test_data, forest)
calculated_accuracy = accuracy(predictions, test_data.target)
#pprint(forest, width=20)
print("Accuracy = {}".format(calculated_accuracy))


Accuracy = 0.77


Sources referred -

[1] https://www.bogotobogo.com/python/scikit-learn/scikt_machine_learning_Decision_Tree_Learning_Informatioin_Gain_IG_Impurity_Entropy_Gini_Classification_Error.php               
[2] https://python-course.eu/machine-learning/decision-trees-in-python.php   
[3] https://www.sebastian-mantey.com       
[4] https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html  
[5] https://towardsdatascience.com/master-machine-learning-random-forest-from-scratch-with-python-3efdd51b6d7a              
[6] https://towardsdatascience.com/random-forest-explained-7eae084f3ebe
