In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
import pandas as pd

from d_imm.imm_model import DistributedIMM

In [2]:
import os

# Set Java environment variable if needed
os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk1.8.0_261"
os.environ["PYSPARK_PYTHON"] = "C:\\Users\\saadha\\Desktop\\FYP-code\\GITHUB\\distributed-imm\\d-imm-python\\version-1\\venv\\Scripts\\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = "C:\\Users\\saadha\\Desktop\\FYP-code\\GITHUB\\distributed-imm\\d-imm-python\\version-1\\venv\\Scripts\\python.exe"

In [3]:
spark = SparkSession.builder \
    .appName("KMeansIrisExample") \
    .master("local[*]") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [4]:
sc = spark.sparkContext

In [5]:
# Load the Iris dataset from the UCI Machine Learning Repository
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
column_names = ["sepal_length", "sepal_width", "petal_length", "petal_width", "species"]
iris_df = pd.read_csv(url, header=None, names=column_names)

In [6]:
# Convert the pandas DataFrame to a Spark DataFrame
df_1 = spark.createDataFrame(iris_df)

# Stack the dataset 5 times row-wise
df = df_1
for _ in range(0):  # Repeat 4 more times to stack 5 times total
    df = df.union(df_1)

In [7]:
# Assemble features into a single vector column
assembler = VectorAssembler(
    inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"],
    outputCol="features"
)

feature_df = assembler.transform(df)

In [8]:
# Set up the KMeans model (k=3 for the three species in the Iris dataset)
kmeans = KMeans().setK(3).setSeed(1).setFeaturesCol("features")

# Fit the model
model = kmeans.fit(feature_df)

In [10]:
d_imm_tree = DistributedIMM(spark,3,verbose=4).fit(feature_df,model)

Running 'fit' method
Cluster centers: [array([6.85384615, 3.07692308, 5.71538462, 2.05384615]), array([5.006, 3.418, 1.464, 0.244]), array([5.88360656, 2.74098361, 4.38852459, 1.43442623])]
Sample of clustered data:
+-----------------+----------+
|         features|prediction|
+-----------------+----------+
|[5.1,3.5,1.4,0.2]|         1|
|[4.9,3.0,1.4,0.2]|         1|
|[4.7,3.2,1.3,0.2]|         1|
|[4.6,3.1,1.5,0.2]|         1|
|[5.0,3.6,1.4,0.2]|         1|
+-----------------+----------+
only showing top 5 rows
Time taken to build the histogram: 0 minutes and 30.88 seconds
Histogram: [[Split(feature_index=0, threshold=np.float64(4.55), categories=None, is_continuous=True), Split(feature_index=0, threshold=np.float64(4.65), categories=None, is_continuous=True), Split(feature_index=0, threshold=np.float64(4.85), categories=None, is_continuous=True), Split(feature_index=0, threshold=np.float64(4.95), categories=None, is_continuous=True), Split(feature_index=0, threshold=np.float64(5.05)

In [11]:
# Extract feature names from the VectorAssembler
feature_names = assembler.getInputCols()

# Print the feature names to confirm
print("Feature names:", feature_names)

# Plot the tree using the dynamically retrieved feature names
try:
    d_imm_tree.plot(filename="iris_imm_tree_3", feature_names=feature_names, view=True)
    print("Tree plot saved as 'iris_imm_tree_3.png' and displayed.")

except Exception as e:
    print(f"An error occurred while plotting the tree: {e}")

Feature names: ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
Tree plot saved as 'iris_imm_tree_3.png' and displayed.


In [12]:
d_imm_tree.feature_importance()

Running 'feature_importance' method


[0, 0, 2, 0]

In [23]:
import pickle

# Define the file path to save the tree
tree_save_path = "iris_imm_tree.pkl"

# Save the trained IMM tree to a file
with open(tree_save_path, "wb") as f:
    pickle.dump(d_imm_tree.tree, f)

print(f"IMM tree saved successfully to {tree_save_path}")




IMM tree saved successfully to iris_imm_tree.pkl


In [ ]:
# Load IMM tree
with open(tree_save_path, "rb") as f:
    loaded_d_imm_tree = pickle.load(f)

# Use the tree directly
print(loaded_d_imm_tree.tree)

In [13]:
from d_imm.imm_model import Node

# Root Node
root_node = Node()
root_node.feature = 2  # petal length (cm) column index
root_node.value = 1.9  # Threshold for split

# Left Child - Leaf Node
root_node.left = Node()
root_node.left.value = 1  # Cluster label
# root_node.left.samples = 50
# root_node.left.mistakes = 0

# Right Child - Internal Node
root_node.right = Node()
root_node.right.feature = 2  # petal length (cm) column index
root_node.right.value = 5.1  # Threshold for split

# Right-Left Child - Leaf Node
root_node.right.left = Node()
root_node.right.left.value = 2  # Cluster label
# root_node.right.left.samples = 66
# root_node.right.left.mistakes = 5

# Right-Right Child - Leaf Node
root_node.right.right = Node()
root_node.right.right.value = 0  # Cluster label
# root_node.right.right.samples = 34
# root_node.right.right.mistakes = 0

from pyspark.ml.functions import vector_to_array

d_imm = DistributedIMM(spark, k=3, verbose=1)

clustered_data = model.transform(feature_df).select("features", "prediction")
clustered_data_vector = clustered_data.withColumn("features_array", vector_to_array("features"))

# Test the fill_stats_distributed method with the manually created tree
d_imm.fill_stats_distributed(root_node,clustered_data_vector)

# Print the results for verification
print("Root Node Stats: Samples =", root_node.samples)
print("Left Child Stats: Samples =", root_node.left.samples, "Mistakes =", root_node.left.mistakes)
print("Right Child Stats: Samples =", root_node.right.samples)
print("Right-Left Child Stats: Samples =", root_node.right.left.samples, "Mistakes =", root_node.right.left.mistakes)
print("Right-Right Child Stats: Samples =", root_node.right.right.samples, "Mistakes =", root_node.right.right.mistakes)

Root Node Stats: Samples = 150
Left Child Stats: Samples = 50 Mistakes = 0
Right Child Stats: Samples = 100
Right-Left Child Stats: Samples = 66 Mistakes = 5
Right-Right Child Stats: Samples = 34 Mistakes = 0


TESTING FEATURE IMPORTANCE 

In [14]:
feature_imp = d_imm.feature_importance()
print(feature_imp)

Running 'feature_importance' method
[0, 0, 2, 0]


In [17]:
d_imm_tree.tree

<d_imm.imm_model.Node at 0x1e0c26dd910>

TESTING SCORE FUNCTIONS

In [15]:
# Compute Score and Surrogate Score with udf
score_value = d_imm_tree.score(feature_df)
surrogate_score_value = d_imm_tree.surrogate_score(feature_df)

# Print results
print("\n===== Distributed IMM Score Testing =====")
print(f"Score (K-Means Cost): {score_value:.4f}")
print(f"Surrogate Score (K-Means Surrogate Cost): {surrogate_score_value:.4f}")

# Validate that surrogate score is greater than or equal to k-means score
assert surrogate_score_value >= score_value, "Surrogate score should be greater than or equal to the normal score."

print("✅ Score function tests passed successfully.")

ImportError: PyArrow >= 4.0.0 must be installed; however, it was not found.

In [16]:
# Compute Score and Surrogate Score with spark sql
score_value = d_imm_tree.score_sql(feature_df)
surrogate_score_value = d_imm_tree.surrogate_score_sql(feature_df)

# Print results
print("\n===== Distributed IMM Score Testing =====")
print(f"Score (K-Means Cost): {score_value:.4f}")
print(f"Surrogate Score (K-Means Surrogate Cost): {surrogate_score_value:.4f}")

# Validate that surrogate score is greater than or equal to k-means score
assert surrogate_score_value >= score_value, "Surrogate score should be greater than or equal to the normal score."

print("✅ Score function tests passed successfully.")

ImportError: PyArrow >= 4.0.0 must be installed; however, it was not found.