In [2]:
'''Deploy cross-node bandwidth measurement pods'''

from kubernetes import client, config

def check_pods_existence(namespace):
    core_v1 = client.CoreV1Api()
    try:
        pods = core_v1.list_namespaced_pod(namespace=namespace)
        return len(pods.items) > 0  # return True if pods exist
    except client.rest.ApiException as e:
        print(f"Exception when calling CoreV1Api->list_namespaced_pod: {e}")
        return False

def deploy_bandwidth_measurement_daemonset_if_needed():
    namespace = 'measure-nodes-bd'
    ds_name = 'bandwidth-measurement-ds'
    
    # Load Kubernetes configuration
    config.load_kube_config()
    apps_v1 = client.AppsV1Api()

    # Check if pods exist in the namespace
    pods_exist = check_pods_existence(namespace)

    # If no pods exist, deploy the DaemonSet
    if not pods_exist:
        # Define the body of the DaemonSet for `iperf3` server deployment
        ds_body = client.V1DaemonSet(
            api_version="apps/v1",
            kind="DaemonSet",
            metadata=client.V1ObjectMeta(name=ds_name),
            spec=client.V1DaemonSetSpec(
                selector=client.V1LabelSelector(
                    match_labels={"app": "bandwidth-measurement"}
                ),
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(labels={"app": "bandwidth-measurement"}),
                    spec=client.V1PodSpec(
                        containers=[client.V1Container(
                            name="iperf3-server",
                            image="networkstatic/iperf3",
                            args=["-s"],  # Run in server mode
                            ports=[client.V1ContainerPort(container_port=5201)]
                        )],
                        restart_policy="Always"
                    )
                )
            )
        )

        # Deploy the DaemonSet
        try:
            apps_v1.create_namespaced_daemon_set(namespace=namespace, body=ds_body)
            print(f"Deployed DaemonSet {ds_name} in namespace {namespace}")
        except client.rest.ApiException as e:
            print(f"Exception when calling AppsV1Api->create_namespaced_daemon_set: {e}")
    else:
        print("Pods already exist in the namespace. Skipping DaemonSet deployment.")

# Call the function to deploy the DaemonSet if needed
deploy_bandwidth_measurement_daemonset_if_needed()

Pods already exist in the namespace. Skipping DaemonSet deployment.


In [7]:
import re
from kubernetes import client, config, stream
import concurrent.futures
import time

# Load Kubernetes configuration
config.load_kube_config()
v1 = client.CoreV1Api()

# Function to measure bandwidth between source and target pods with retries
def measure_bandwidth_from_source_to_target(v1, namespace, source_pod, target_pod, test_duration=5, max_retries=3):
    source_pod_name = source_pod.metadata.name
    source_pod_node_name = source_pod.spec.node_name
    target_pod_ip = target_pod.status.pod_ip
    target_pod_name = target_pod.metadata.name
    target_pod_node_name = target_pod.spec.node_name
    result = (source_pod_node_name, target_pod_node_name, None)

    if source_pod_name != target_pod_name:
        exec_command = ['iperf3', '-c', target_pod_ip, '-t', str(test_duration)]
        attempts = 0

        while attempts < max_retries:
            try:
                # Run iperf3 command
                resp = stream.stream(v1.connect_get_namespaced_pod_exec,
                                     source_pod_name,
                                     namespace,
                                     command=exec_command,
                                     stderr=True,
                                     stdin=False,
                                     stdout=True,
                                     tty=False)

                # Check for "server is busy" in the output
                if "the server is busy" in resp:
                    print(f"Server is busy for connection from {source_pod_name} to {target_pod_name}. Retrying...")
                    attempts += 1
                    time.sleep(test_duration/2)  # Wait before retrying
                    continue

                # Parse the output for bandwidth
                print(f"Full output from {source_pod_name} to {target_pod_name}:\n{resp}")
                match = re.search(r'(\d+\.?\d*\s[MKG]bits/sec)', resp)
                if match:
                    bandwidth = match.group(1)
                    result = (source_pod_node_name, target_pod_node_name, bandwidth)
                    print(f"Bandwidth from {source_pod_node_name} to {target_pod_node_name}: {bandwidth}")
                else:
                    print(f"Could not extract bandwidth for connection from {source_pod_name} to {target_pod_name}.")
                    result = (source_pod_node_name, target_pod_node_name, "Parsing Error")
                break  # Exit loop on success

            except Exception as e:
                print(f"Error executing command in pod {source_pod_name}: {e}")
                result = (source_pod_node_name, target_pod_node_name, "Error")
                break

        if attempts == max_retries:
            print(f"Max retries reached for connection from {source_pod_name} to {target_pod_name}.")
            result = (source_pod_node_name, target_pod_node_name, "Server Busy Error")

    return result

def measure_bandwidth(namespace='measure-nodes-bd', max_concurrent_tasks=3, test_duration=5):
    v1 = client.CoreV1Api()
    pods = v1.list_namespaced_pod(namespace, label_selector="app=bandwidth-measurement").items
    bandwidth_results = {}

    # Use ThreadPoolExecutor for controlled concurrent execution
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_tasks) as executor:
        futures = []
        for source_pod in pods:
            for target_pod in pods:
                if source_pod.metadata.name != target_pod.metadata.name:  # Skip self-tests
                    futures.append(executor.submit(measure_bandwidth_from_source_to_target,
                                                   v1, namespace, source_pod, target_pod, test_duration))

        # Collect the completed results
        for future in concurrent.futures.as_completed(futures):
            source_pod_node_name, target_pod_node_name, bandwidth = future.result()
            if source_pod_node_name not in bandwidth_results:
                bandwidth_results[source_pod_node_name] = {}
            bandwidth_results[source_pod_node_name][target_pod_node_name] = bandwidth

    return bandwidth_results

# Call the function to measure bandwidth after deploying the DaemonSet
namespace = 'measure-nodes-bd'
# Reduced concurrency and added retry logic
bandwidth_results = measure_bandwidth(namespace=namespace, max_concurrent_tasks=3, test_duration=10)
print(bandwidth_results)


Full output from bandwidth-measurement-ds-6dldb to bandwidth-measurement-ds-p2vgl:
Connecting to host 192.168.18.232, port 5201
[  5] local 192.168.69.209 port 55104 connected to 192.168.18.232 port 5201
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd
[  5]   0.00-1.00   sec  83.6 MBytes   701 Mbits/sec   21    930 KBytes       
[  5]   1.00-2.00   sec  73.3 MBytes   615 Mbits/sec    4   1.17 MBytes       
[  5]   2.00-3.00   sec  70.0 MBytes   587 Mbits/sec    0   1.42 MBytes       
[  5]   3.00-4.00   sec  70.0 MBytes   587 Mbits/sec    0   1.63 MBytes       
[  5]   4.00-5.00   sec  70.0 MBytes   587 Mbits/sec    0   1.82 MBytes       
[  5]   5.00-6.00   sec  70.0 MBytes   587 Mbits/sec    0   1.99 MBytes       
[  5]   6.00-7.00   sec  71.2 MBytes   598 Mbits/sec    0   2.14 MBytes       
[  5]   7.00-8.00   sec  70.0 MBytes   587 Mbits/sec    0   2.28 MBytes       
[  5]   8.00-9.00   sec  71.2 MBytes   598 Mbits/sec    0   2.44 MBytes       
[  5]   9.00-10.00  

In [8]:
import pandas as pd

# Convert the nested dictionary into a pandas DataFrame
df = pd.DataFrame(bandwidth_results)

# Transpose the DataFrame to align the source and destination workers as per convention
df = df.T

# Fill diagonal with 0's for self-latency (optional, if desired for clarity)
for worker in df.columns:
    df.at[worker, worker] = 0

# Save the DataFrame to a CSV file, and add the date, hour and minute to the file name
import datetime
now = datetime.datetime.now()
filename = f"bandwidth_results_{now.strftime('%Y-%m-%d_%H-%M')}.csv"
df.to_csv("/home/ubuntu/iDynamics/iBandwidth/measurer/data/"+filename)
df

Unnamed: 0,k8s-worker-5,k8s-worker-1,k8s-worker-8,k8s-worker-6,k8s-worker-9,k8s-worker-2,k8s-worker-7,k8s-worker-4,k8s-worker-3
k8s-worker-3,701 Mbits/sec,567 Mbits/sec,544 Mbits/sec,595 Mbits/sec,547 Mbits/sec,690 Mbits/sec,739 Mbits/sec,740 Mbits/sec,0
k8s-worker-1,788 Mbits/sec,0,784 Mbits/sec,797 Mbits/sec,769 Mbits/sec,793 Mbits/sec,798 Mbits/sec,804 Mbits/sec,796 Mbits/sec
k8s-worker-8,762 Mbits/sec,800 Mbits/sec,0,755 Mbits/sec,758 Mbits/sec,Error,Error,Error,803 Mbits/sec
k8s-worker-5,0,Error,Error,650 Mbits/sec,552 Mbits/sec,533 Mbits/sec,802 Mbits/sec,796 Mbits/sec,Error
k8s-worker-9,722 Mbits/sec,733 Mbits/sec,733 Mbits/sec,737 Mbits/sec,0,734 Mbits/sec,728 Mbits/sec,803 Mbits/sec,800 Mbits/sec
k8s-worker-6,521 Mbits/sec,800 Mbits/sec,527 Mbits/sec,0,754 Mbits/sec,553 Mbits/sec,442 Mbits/sec,744 Mbits/sec,796 Mbits/sec
k8s-worker-2,776 Mbits/sec,772 Mbits/sec,773 Mbits/sec,779 Mbits/sec,775 Mbits/sec,0,805 Mbits/sec,813 Mbits/sec,764 Mbits/sec
k8s-worker-7,769 Mbits/sec,764 Mbits/sec,749 Mbits/sec,803 Mbits/sec,803 Mbits/sec,809 Mbits/sec,0,805 Mbits/sec,807 Mbits/sec
k8s-worker-4,Error,727 Mbits/sec,Error,521 Mbits/sec,Error,623 Mbits/sec,760 Mbits/sec,0,726 Mbits/sec


Bandwidth from k8s-worker-3 to k8s-worker-1: 806 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-8: 801 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-5: 814 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-9: 813 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-6: 813 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-2: 805 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-7: 815 Mbits/sec
Bandwidth from k8s-worker-3 to k8s-worker-4: 806 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-3: 806 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-8: 819 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-5: 814 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-9: 819 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-6: 807 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-2: 813 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-7: 813 Mbits/sec
Bandwidth from k8s-worker-1 to k8s-worker-4: 810 Mbits/sec
Bandwidth from k8s-worker-8 to k8s-worker-3: 794 Mbits/s