# **C4.5 on Spark**

In [31]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [32]:
from google.colab import drive
drive.mount("/content/gdrive")

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [33]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
import time

In [58]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark Example").config("spark.some.config.option").getOrCreate()

ValueError: ignored

In [59]:
data = spark.read.csv('/content/gdrive/My Drive/IT488/lymph.csv', header=True, inferSchema=True)

In [60]:
data.show(5)

+----------+---------------+-------------+-------------+-------+------------+---------------+---------------+---------------+---------------+--------------+--------------+---------------+---------------+-------------+--------------+---------------+--------------+------------+
|lymphatics|block_of_affere|bl_of_lymph_c|bl_of_lymph_s|by_pass|extravasates|regeneration_of|early_uptake_in|lym_nodes_dimin|lym_nodes_enlar|changes_in_lym|defect_in_node|changes_in_node|changes_in_stru|special_forms|dislocation_of|exclusion_of_no|no_of_nodes_in|       class|
+----------+---------------+-------------+-------------+-------+------------+---------------+---------------+---------------+---------------+--------------+--------------+---------------+---------------+-------------+--------------+---------------+--------------+------------+
|    arched|            yes|           no|           no|     no|          no|             no|            yes|              1|              4|         round|   lac_centra

In [63]:
index = [0,1,2,3,4,5,6,7,8,9,10]

categorical_columns = [data.columns[i] for i in index]

In [64]:
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c) for c in categorical_columns]
# encode label column and add it to stringindexer_stages
stringindexer_stages += [StringIndexer(inputCol='class', outputCol='label')]

In [65]:
onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in categorical_columns]

In [67]:
feature_columns = ['onehot_' + c for c in categorical_columns]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features')

In [68]:
all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]
pipeline = Pipeline(stages=all_stages)

In [69]:
pipeline_model = pipeline.fit(data)

In [70]:
final_columns = feature_columns + ['features', 'label']
data_df = pipeline_model.transform(data).select(final_columns)
            
data_df.show(5)

+-----------------+----------------------+--------------------+--------------------+--------------+-------------------+----------------------+----------------------+----------------------+----------------------+---------------------+--------------------+-----+
|onehot_lymphatics|onehot_block_of_affere|onehot_bl_of_lymph_c|onehot_bl_of_lymph_s|onehot_by_pass|onehot_extravasates|onehot_regeneration_of|onehot_early_uptake_in|onehot_lym_nodes_dimin|onehot_lym_nodes_enlar|onehot_changes_in_lym|            features|label|
+-----------------+----------------------+--------------------+--------------------+--------------+-------------------+----------------------+----------------------+----------------------+----------------------+---------------------+--------------------+-----+
|    (2,[0],[1.0])|         (1,[0],[1.0])|       (1,[0],[1.0])|       (1,[0],[1.0])| (1,[0],[1.0])|          (1,[],[])|         (1,[0],[1.0])|         (1,[0],[1.0])|         (2,[0],[1.0])|         (3,[2],[1.0])|      

In [71]:
training, test = data_df.randomSplit([0.7, 0.3], seed=2000)

In [73]:
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')

In [75]:
st_time = time.time()
dtc = dt.fit(training)
pred = dtc.transform(test)
ed_time = time.time()
print(ed_time-st_time)
pred.show(3)

1.6835670471191406
+-----------------+----------------------+--------------------+--------------------+--------------+-------------------+----------------------+----------------------+----------------------+----------------------+---------------------+--------------------+-----+-------------+--------------------+----------+
|onehot_lymphatics|onehot_block_of_affere|onehot_bl_of_lymph_c|onehot_bl_of_lymph_s|onehot_by_pass|onehot_extravasates|onehot_regeneration_of|onehot_early_uptake_in|onehot_lym_nodes_dimin|onehot_lym_nodes_enlar|onehot_changes_in_lym|            features|label|rawPrediction|         probability|prediction|
+-----------------+----------------------+--------------------+--------------------+--------------+-------------------+----------------------+----------------------+----------------------+----------------------+---------------------+--------------------+-----+-------------+--------------------+----------+
|        (2,[],[])|         (1,[0],[1.0])|           (1,[],[

In [48]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_iris

In [None]:

# evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")
# acc = evaluator.evaluate(pred)
 
# print("Prediction Accuracy: ", acc)
 
# y_pred=pred.select("prediction").collect()
# y_orig=pred.select("label").collect()

# cm = confusion_matrix(y_orig, y_pred)
# print("Confusion Matrix:")
# print(cm)
 


In [76]:
print(dtc.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_21ea23b903f6, depth=5, numNodes=29, numClasses=3, numFeatures=16
  If (feature 2 in {0.0})
   If (feature 8 in {0.0})
    If (feature 5 in {0.0})
     Predict: 2.0
    Else (feature 5 not in {0.0})
     If (feature 1 in {0.0})
      Predict: 0.0
     Else (feature 1 not in {0.0})
      If (feature 6 in {0.0})
       Predict: 0.0
      Else (feature 6 not in {0.0})
       Predict: 1.0
   Else (feature 8 not in {0.0})
    If (feature 11 in {0.0})
     If (feature 14 in {0.0})
      If (feature 6 in {0.0})
       Predict: 0.0
      Else (feature 6 not in {0.0})
       Predict: 1.0
     Else (feature 14 not in {0.0})
      Predict: 1.0
    Else (feature 11 not in {0.0})
     If (feature 1 in {0.0})
      Predict: 1.0
     Else (feature 1 not in {0.0})
      Predict: 0.0
  Else (feature 2 not in {0.0})
   If (feature 11 in {0.0})
    If (feature 7 in {0.0})
     Predict: 1.0
    Else (feature 7 not in {0.0})
     If (feature 1 in {

In [50]:
def parse_debug_string_lines(lines):
    
    block = []
    while lines:


        if lines[0].startswith('If'):
            bl = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
            block.append({'name': bl, 'children': parse_debug_string_lines(lines)})


            if lines[0].startswith('Else'):
                be = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
                block.append({'name': be, 'children': parse_debug_string_lines(lines)})
        elif not lines[0].startswith(('If', 'Else')):
            block2 = lines.pop(0)
            block.append({'name': block2})
        else:
            break
    
    return block


def debug_str_to_json(debug_string):
    data = []
    for line in debug_string.splitlines():
        if line.strip():
            line = line.strip()
            data.append(line)
        else:
            break
        if not line: break

    json = {'name': 'Root', 'children': parse_debug_string_lines(data[1:])}
    

    return json

In [51]:
import json

dict_tree_json = debug_str_to_json(dtc.toDebugString)

print(json.dumps(dict_tree_json,indent = 1 ))


{
 "name": "Root",
 "children": [
  {
   "name": "feature 14 in {0.0}",
   "children": [
    {
     "name": "feature 8 in {0.0}",
     "children": [
      {
       "name": "feature 5 in {0.0}",
       "children": [
        {
         "name": "Predict: 2.0"
        }
       ]
      },
      {
       "name": "feature 5 not in {0.0}",
       "children": [
        {
         "name": "feature 1 in {0.0}",
         "children": [
          {
           "name": "Predict: 0.0"
          }
         ]
        },
        {
         "name": "feature 1 not in {0.0}",
         "children": [
          {
           "name": "Predict: 1.0"
          }
         ]
        }
       ]
      }
     ]
    },
    {
     "name": "feature 8 not in {0.0}",
     "children": [
      {
       "name": "feature 24 in {0.0}",
       "children": [
        {
         "name": "feature 15 in {0.0}",
         "children": [
          {
           "name": "Predict: 1.0"
          }
         ]
        },
        {
         "nam

In [52]:
f_type_to_flist_dict = data_df.schema['features'].metadata["ml_attr"]["attrs"]

f_index_to_name_dict = {}

for f_type, f_list in f_type_to_flist_dict.items():




  for f in f_list:

    f_index = f['idx']

    f_name = f['name']

    f_index_to_name_dict[f_index] = f_name




print(f_index_to_name_dict)



{0: 'onehot_lymphatics_arched', 1: 'onehot_lymphatics_deformed', 2: 'onehot_block_of_affere_yes', 3: 'onehot_bl_of_lymph_c_no', 4: 'onehot_bl_of_lymph_s_no', 5: 'onehot_by_pass_no', 6: 'onehot_extravasates_yes', 7: 'onehot_regeneration_of_no', 8: 'onehot_early_uptake_in_yes', 9: 'onehot_changes_in_lym_oval', 10: 'onehot_changes_in_lym_round', 11: 'onehot_defect_in_node_lac_central', 12: 'onehot_defect_in_node_lacunar', 13: 'onehot_defect_in_node_lac_margin', 14: 'onehot_changes_in_node_lac_margin', 15: 'onehot_changes_in_node_lacunar', 16: 'onehot_changes_in_node_lac_central', 17: 'onehot_changes_in_stru_faint', 18: 'onehot_changes_in_stru_coarse', 19: 'onehot_changes_in_stru_diluted', 20: 'onehot_changes_in_stru_drop_like', 21: 'onehot_changes_in_stru_grainy', 22: 'onehot_changes_in_stru_stripped', 23: 'onehot_changes_in_stru_reticular', 24: 'onehot_special_forms_vesicles', 25: 'onehot_special_forms_chalices', 26: 'onehot_dislocation_of_yes', 27: 'onehot_exclusion_of_no_yes'}
