In [2]:
'''Deploy cross-node bandwidth measurement pods'''

from kubernetes import client, config

def check_pods_existence(namespace):
    core_v1 = client.CoreV1Api()
    try:
        pods = core_v1.list_namespaced_pod(namespace=namespace)
        return len(pods.items) > 0  # return True if pods exist
    except client.rest.ApiException as e:
        print(f"Exception when calling CoreV1Api->list_namespaced_pod: {e}")
        return False

def deploy_bandwidth_measurement_daemonset_if_needed():
    namespace = 'measure-nodes-bd'
    ds_name = 'bandwidth-measurement-ds'
    
    # Load Kubernetes configuration
    config.load_kube_config()
    apps_v1 = client.AppsV1Api()

    # Check if pods exist in the namespace
    pods_exist = check_pods_existence(namespace)

    # If no pods exist, deploy the DaemonSet
    if not pods_exist:
        # Define the body of the DaemonSet for `iperf3` server deployment
        ds_body = client.V1DaemonSet(
            api_version="apps/v1",
            kind="DaemonSet",
            metadata=client.V1ObjectMeta(name=ds_name),
            spec=client.V1DaemonSetSpec(
                selector=client.V1LabelSelector(
                    match_labels={"app": "bandwidth-measurement"}
                ),
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(labels={"app": "bandwidth-measurement"}),
                    spec=client.V1PodSpec(
                        containers=[client.V1Container(
                            name="iperf3-server",
                            image="networkstatic/iperf3",
                            args=["-s"],  # Run in server mode
                            ports=[client.V1ContainerPort(container_port=5201)]
                        )],
                        restart_policy="Always"
                    )
                )
            )
        )

        # Deploy the DaemonSet
        try:
            apps_v1.create_namespaced_daemon_set(namespace=namespace, body=ds_body)
            print(f"Deployed DaemonSet {ds_name} in namespace {namespace}")
        except client.rest.ApiException as e:
            print(f"Exception when calling AppsV1Api->create_namespaced_daemon_set: {e}")
    else:
        print("Pods already exist in the namespace. Skipping DaemonSet deployment.")

# Call the function to deploy the DaemonSet if needed
deploy_bandwidth_measurement_daemonset_if_needed()

Pods already exist in the namespace. Skipping DaemonSet deployment.


In [3]:
import re
from kubernetes import client, config, stream
import concurrent.futures
import time

# Load Kubernetes configuration
config.load_kube_config()
v1 = client.CoreV1Api()

# Function to measure bandwidth between source and target pods with retries
def measure_bandwidth_from_source_to_target(v1, namespace, source_pod, target_pod, test_duration=5, max_retries=3):
    source_pod_name = source_pod.metadata.name
    source_pod_node_name = source_pod.spec.node_name
    target_pod_ip = target_pod.status.pod_ip
    target_pod_name = target_pod.metadata.name
    target_pod_node_name = target_pod.spec.node_name
    result = (source_pod_node_name, target_pod_node_name, None)

    if source_pod_name != target_pod_name:
        exec_command = ['iperf3', '-c', target_pod_ip, '-t', str(test_duration)]
        attempts = 0

        while attempts < max_retries:
            try:
                # Run iperf3 command
                resp = stream.stream(v1.connect_get_namespaced_pod_exec,
                                     source_pod_name,
                                     namespace,
                                     command=exec_command,
                                     stderr=True,
                                     stdin=False,
                                     stdout=True,
                                     tty=False)

                # Check for "server is busy" in the output
                if "the server is busy" in resp:
                    print(f"Server is busy for connection from {source_pod_name} to {target_pod_name}. Retrying...")
                    attempts += 1
                    time.sleep(test_duration/2)  # Wait before retrying
                    continue

                # Parse the output for bandwidth
                print(f"Full output from {source_pod_name} to {target_pod_name}:\n{resp}")
                match = re.search(r'(\d+\.?\d*\s[MKG]bits/sec)', resp)
                if match:
                    bandwidth = match.group(1)
                    result = (source_pod_node_name, target_pod_node_name, bandwidth)
                    print(f"Bandwidth from {source_pod_node_name} to {target_pod_node_name}: {bandwidth}")
                else:
                    print(f"Could not extract bandwidth for connection from {source_pod_name} to {target_pod_name}.")
                    result = (source_pod_node_name, target_pod_node_name, "Parsing Error")
                break  # Exit loop on success

            except Exception as e:
                print(f"Error executing command in pod {source_pod_name}: {e}")
                result = (source_pod_node_name, target_pod_node_name, "Error")
                break

        if attempts == max_retries:
            print(f"Max retries reached for connection from {source_pod_name} to {target_pod_name}.")
            result = (source_pod_node_name, target_pod_node_name, "Server Busy Error")

    return result

def measure_bandwidth(namespace='measure-nodes-bd', max_concurrent_tasks=3, test_duration=5):
    v1 = client.CoreV1Api()
    pods = v1.list_namespaced_pod(namespace, label_selector="app=bandwidth-measurement").items
    bandwidth_results = {}

    # Use ThreadPoolExecutor for controlled concurrent execution
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_tasks) as executor:
        futures = []
        for source_pod in pods:
            for target_pod in pods:
                if source_pod.metadata.name != target_pod.metadata.name:  # Skip self-tests
                    futures.append(executor.submit(measure_bandwidth_from_source_to_target,
                                                   v1, namespace, source_pod, target_pod, test_duration))

        # Collect the completed results
        for future in concurrent.futures.as_completed(futures):
            source_pod_node_name, target_pod_node_name, bandwidth = future.result()
            if source_pod_node_name not in bandwidth_results:
                bandwidth_results[source_pod_node_name] = {}
            bandwidth_results[source_pod_node_name][target_pod_node_name] = bandwidth

    return bandwidth_results

# Call the function to measure bandwidth after deploying the DaemonSet
namespace = 'measure-nodes-bd'
# Reduced concurrency and added retry logic
bandwidth_results = measure_bandwidth(namespace=namespace, max_concurrent_tasks=3, test_duration=10)
print(bandwidth_results)


Full output from bandwidth-measurement-ds-6dldb to bandwidth-measurement-ds-p2vgl:
Connecting to host 192.168.18.232, port 5201
[  5] local 192.168.69.209 port 40846 connected to 192.168.18.232 port 5201
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd
[  5]   0.00-1.00   sec  17.7 MBytes   148 Mbits/sec    0    921 KBytes       
[  5]   1.00-2.00   sec  14.7 MBytes   124 Mbits/sec    0   1.59 MBytes       
[  5]   2.00-3.00   sec  16.2 MBytes   136 Mbits/sec    0   2.38 MBytes       
[  5]   3.00-4.00   sec  18.8 MBytes   157 Mbits/sec    0   3.15 MBytes       
[  5]   4.00-5.00   sec  15.0 MBytes   126 Mbits/sec   17   2.36 MBytes       
[  5]   5.00-6.00   sec  17.5 MBytes   147 Mbits/sec    0   2.68 MBytes       
[  5]   6.00-7.00   sec  17.5 MBytes   147 Mbits/sec    0   2.89 MBytes       
[  5]   7.00-8.00   sec  18.8 MBytes   157 Mbits/sec    0   3.02 MBytes       
[  5]   8.00-9.00   sec  17.5 MBytes   147 Mbits/sec    0   3.02 MBytes       
[  5]   9.00-10.00  

In [4]:
import pandas as pd

# Convert the nested dictionary into a pandas DataFrame
df = pd.DataFrame(bandwidth_results)

# Transpose the DataFrame to align the source and destination workers as per convention
df = df.T

# Fill diagonal with 0's for self-latency (optional, if desired for clarity)
for worker in df.columns:
    df.at[worker, worker] = 0

# Save the DataFrame to a CSV file, and add the date, hour and minute to the file name
import datetime
now = datetime.datetime.now()
filename = f"bandwidth_results_{now.strftime('%Y-%m-%d_%H-%M')}.csv"
df.to_csv("/home/ubuntu/iDynamics/iBandwidth/measurer/data/"+filename)
df

Unnamed: 0,k8s-worker-5,k8s-worker-1,k8s-worker-8,k8s-worker-6,k8s-worker-2,k8s-worker-9,k8s-worker-7,k8s-worker-4,k8s-worker-3
k8s-worker-3,148 Mbits/sec,295 Mbits/sec,94.0 Mbits/sec,198 Mbits/sec,176 Mbits/sec,170 Mbits/sec,136 Mbits/sec,368 Mbits/sec,0
k8s-worker-1,91.9 Mbits/sec,0,125 Mbits/sec,172 Mbits/sec,152 Mbits/sec,436 Mbits/sec,351 Mbits/sec,612 Mbits/sec,115 Mbits/sec
k8s-worker-8,232 Mbits/sec,219 Mbits/sec,0,330 Mbits/sec,123 Mbits/sec,127 Mbits/sec,125 Mbits/sec,392 Mbits/sec,409 Mbits/sec
k8s-worker-5,0,175 Mbits/sec,392 Mbits/sec,392 Mbits/sec,185 Mbits/sec,191 Mbits/sec,488 Mbits/sec,248 Mbits/sec,199 Mbits/sec
k8s-worker-9,208 Mbits/sec,425 Mbits/sec,259 Mbits/sec,186 Mbits/sec,476 Mbits/sec,0,187 Mbits/sec,782 Mbits/sec,781 Mbits/sec
k8s-worker-6,156 Mbits/sec,168 Mbits/sec,116 Mbits/sec,0,78.9 Mbits/sec,105 Mbits/sec,246 Mbits/sec,18.9 Mbits/sec,177 Mbits/sec
k8s-worker-2,177 Mbits/sec,291 Mbits/sec,192 Mbits/sec,482 Mbits/sec,0,158 Mbits/sec,109 Mbits/sec,651 Mbits/sec,322 Mbits/sec
k8s-worker-7,179 Mbits/sec,273 Mbits/sec,273 Mbits/sec,364 Mbits/sec,27.0 Mbits/sec,289 Mbits/sec,0,140 Mbits/sec,645 Mbits/sec
k8s-worker-4,107 Mbits/sec,192 Mbits/sec,50.0 Mbits/sec,123 Mbits/sec,78.4 Mbits/sec,110 Mbits/sec,60.0 Mbits/sec,0,61.0 Mbits/sec


Bandwidth from k8s-worker-3 to k8s-worker-1: 806 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-8: 801 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-5: 814 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-9: 813 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-6: 813 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-2: 805 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-7: 815 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-4: 806 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-3: 806 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-8: 819 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-5: 814 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-9: 819 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-6: 807 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-2: 813 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-7: 813 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-4: 810 Mbits/sec
Bandwidth from k8s-worker-8 to k8s-worker-3: 794 Mbits/s