## Spark

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
import os
cur_path = "/content/drive"
os.listdir(cur_path)

['.shortcut-targets-by-id', 'MyDrive', '.file-revisions-by-id', '.Trash-0']

In [4]:
!pip install pyspark 
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=d2f00b902b8ee8654959bd662c60f6107dd8f0f78cd465f959701b1da635b6ff
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fon

In [7]:
!pip install scikit-optimize

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting scikit-optimize
  Downloading scikit_optimize-0.9.0-py2.py3-none-any.whl (100 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m100.3/100.3 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
Collecting pyaml>=16.9
  Downloading pyaml-21.10.1-py2.py3-none-any.whl (24 kB)
Installing collected packages: pyaml, scikit-optimize
Successfully installed pyaml-21.10.1 scikit-optimize-0.9.0


In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[5]").appName('tree').getOrCreate()

In [9]:
df = spark.read.format("csv").option("header","false").option("inferSchema" , "true").load(os.path.join(cur_path, 'adult.csv'))
raw_cols = "age|  workclass|fnlwgt|   education|education.num|marital.status|       occupation|  relationship| race|   sex|capital.gain|capital.loss|hours.per.week|native.country|income".split("|")
raw_cols = [x.strip().replace('.','_') for x in raw_cols]
for i in range(len(raw_cols)):
    df = df.withColumnRenamed(f"_c{i}", raw_cols[i])

AnalysisException: ignored

### Pandas

In [10]:
#df_pd = pd.read_csv("../../../data/p_dsi/big_data_scaling_sp23/project/DT-MapReduce/adult.csv").clean_names()
df_pd = df.toPandas()

NameError: ignored

In [None]:
from pandas.api.types import is_string_dtype
from pandas.api.types import is_numeric_dtype
from pandas.api.types import is_categorical_dtype
import numpy as np
import pandas as pd
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report, roc_auc_score, roc_curve, confusion_matrix
from sklearn.model_selection import GridSearchCV, train_test_split, RandomizedSearchCV
from skopt import BayesSearchCV
from sklearn.preprocessing import LabelEncoder, OrdinalEncoder, MinMaxScaler, Normalizer, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
df_pd.columns

In [None]:
df_pd["marital_relation"] = df_pd["marital_status"] + "_" + df_pd["relationship"]
df_pd = df_pd.drop(["sex","race","education_num", "native_country", "marital_status","relationship"], axis = 1)

In [None]:
df_pd["income"].unique()

In [None]:
df_pd["income"] = np.where(df_pd.income==' <=50K', 0, 1)

In [None]:
df_pd["income"].unique()

### Feature Engineering

In [None]:
pd_new = pd.get_dummies(df_pd.drop_duplicates())

In [None]:
pd_new.shape

In [None]:
X, y = pd_new.drop('income', axis=1), pd_new.income

In [None]:
X_test, X_train, y_test, y_train = train_test_split(X, y, test_size=0.2, random_state=1)

### Training

In [None]:
# ct = ColumnTransformer([("ohe", OneHotEncoder(handle_unknown="ignore"), stri), ("ord", ordi, ["education"]), ("min_max", MinMaxScaler(), num)])
# Tree based model don't have to be normalized

In [None]:
dt = DecisionTreeClassifier(criterion="entropy")

In [None]:
import numpy as np
max_features = list(np.arange(3,20))
max_depth = list(np.arange(3,20))

In [None]:
param_grid = [{"max_features":max_features, "max_depth": max_depth}]

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
bayes_search = BayesSearchCV(dt, param_grid, cv = 5, n_iter = 30)

In [None]:
bayes_search.fit(X_train, y_train);

In [None]:
bayes_search.best_params_

In [None]:
bayes_search.best_score_

In [None]:
result = pd.DataFrame(bayes_search.cv_results_).sort_values("rank_test_score", ascending = True)

In [None]:
result

In [None]:
model = bayes_search.best_estimator_

In [None]:
model

In [None]:
from sklearn import tree
fig = plt.figure(figsize=(30,20))
_ = tree.plot_tree(model, 
                   feature_names= X.columns,  
                   class_names=["<=50K", ">50K"],
                   filled=True)

In [None]:
#text_representation = tree.export_text(model[1])
#print(text_representation)

In [None]:
pred = model.predict(X_test)

In [None]:
print(classification_report(y_test, pred))

In [None]:
conf = confusion_matrix(y_test,pred)

In [None]:
sns.heatmap(conf, annot = True, cmap = "RdBu")

### Feature Importance

In [None]:
fi = model.feature_importances_

In [None]:
features = pd.DataFrame({'feat_names': X.columns, 'feat_importance': fi})

features = features.assign(magnitude = abs(features['feat_importance']), sign = np.sign(features['feat_importance']))

features = features.set_index('feat_names').sort_values(by='magnitude', ascending=False)
features.head(20)

In [None]:
plt.figure(figsize = (20,8))
sns.barplot(y=features.index[:20], x='magnitude', hue='sign', data=features[:20], orient='h', dodge=False);

In [None]:
important_features = features.iloc[:15].index.tolist()
important_features

In [None]:
df_selected = X[important_features]

In [None]:
df_selected['income'] = y

In [None]:
df_selected.columns = [x.replace(' ','').replace('-','_') for x in df_selected.columns]

### Now let's use the selected features to build a decision tree from scatch with MapReduce

In [None]:
from pyspark.sql.functions import col, count, when

In [None]:
# Putting df_selected back to a spark df
df = spark.createDataFrame(df_selected)
df.printSchema()

In [None]:
train_fraction = 0.8
test_fraction = 0.2
random_seed = 0

train_data, test_data = df.randomSplit([train_fraction, test_fraction], seed=random_seed)

In [None]:
def gini_impurity(dataframe, label_col):
    total_count = dataframe.count()
    class_counts = dataframe.groupBy(label_col).count().collect()
    impurity = 1.0

    for row in class_counts:
        p = row['count'] / total_count
        impurity -= p * p

    return impurity

def split_dataframe(dataframe, feature, value):
    left_split = dataframe.filter(col(feature) <= value)
    right_split = dataframe.filter(col(feature) > value)

    return left_split, right_split

def find_best_split(dataframe, label_col, features):
    best_gini = float('inf')
    best_feature = None
    best_value = None

    for feature in features:
        feature_values = dataframe.select(feature).toPandas()[feature].values
        if len(np.unique(feature_values)) == 2:
            feature_values = [0]
        else:
            feature_values = [np.quantile(feature_values, 0.5)]

        for value in feature_values:
            left_split, right_split = split_dataframe(dataframe, feature, value)

            left_gini = gini_impurity(left_split, label_col)
            right_gini = gini_impurity(right_split, label_col)

            split_gini = (left_split.count() * left_gini + right_split.count() * right_gini) / dataframe.count()

            if split_gini < best_gini:
                best_gini = split_gini
                best_feature = feature
                best_value = value

    return best_feature, best_value

In [None]:
class DecisionTreeNode:
    def __init__(self, data, label_col, features, max_depth=5, min_samples_split=10, depth=0):
        self.data = data
        self.label_col = label_col
        self.features = features
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.depth = depth
        self.left = None
        self.right = None
        self.split_feature = None
        self.split_value = None

    def build_tree(self):
        majority_class_row = self.data.groupBy(self.label_col).count().orderBy("count", ascending=False).first()
        self.majority_class = majority_class_row[self.label_col]
        
        if self.depth >= self.max_depth or self.data.count() < self.min_samples_split:
            self.data = None  # Detach the data from the node
            return

        best_split = find_best_split(self.data, self.label_col, self.features)

        if not best_split:
            return

        self.split_feature, self.split_value = best_split

        left_data, right_data = split_dataframe(self.data, self.split_feature, self.split_value)

        self.left = DecisionTreeNode(left_data, self.label_col, self.features, self.max_depth, self.min_samples_split, self.depth + 1)
        self.right = DecisionTreeNode(right_data, self.label_col, self.features, self.max_depth, self.min_samples_split, self.depth + 1)

In [None]:
label_col = 'income'

root = DecisionTreeNode(train_data, label_col, df.columns)
root.build_tree()


In [None]:
from pyspark.sql.functions import udf, expr
from pyspark.sql.types import StringType

def predict(node, features, label_col):
    if not node.left or not node.right:
        return node.majority_class

    if features[node.split_feature] <= node.split_value:
        return predict(node.left, features, label_col)
    else:
        return predict(node.right, features, label_col)

fs = df.columns
fs.remove('income')

def predict_udf(*features):
    instance_features = dict(zip(fs, features))  # Replace with your actual feature column names
    return predict(root, instance_features, label_col)

prediction_udf = udf(predict_udf, StringType())

test_data_with_predictions = test_data.withColumn("prediction", prediction_udf(*fs))
test_data_with_predictions.show()

In [None]:
fs