In [2]:
from kubernetes import client, config, stream
import pandas as pd
import matplotlib.pyplot as plt

from prometheus_api_client import PrometheusConnect

from datetime import datetime, timedelta
from difflib import diff_bytes

import seaborn as sns
import numpy as np
import random
import multiprocessing as mp
from timeit import default_timer as timer
from datetime import datetime, timedelta


# Kubernetes Config, initialize Kubernetes client

config.load_kube_config()
v1 = client.CoreV1Api()

# Prometheus Config
#prom_url = "http://<PROMETHEUS_SERVER_IP>:<PORT>"
# prom_url = "http://10.110.188.57:9090"
prom_url = "http://10.105.116.175:9090"

prom = PrometheusConnect(url=prom_url, disable_ssl=True)
#test prom connection
prom_connect_response = prom.custom_query(query="up")
print(prom_connect_response)

[{'metric': {'__name__': 'up', 'app': 'compose-post-service', 'instance': '192.168.18.238:15020', 'job': 'kubernetes-pods', 'namespace': 'social-network2', 'pod': 'compose-post-service-6876776d5d-jvlxx', 'pod_template_hash': '6876776d5d', 'security_istio_io_tlsMode': 'istio', 'service': 'compose-post-service', 'service_istio_io_canonical_name': 'compose-post-service', 'service_istio_io_canonical_revision': 'latest'}, 'value': [1717079244.067, '1']}, {'metric': {'__name__': 'up', 'app': 'compose-post-service', 'instance': '192.168.69.197:15020', 'job': 'kubernetes-pods', 'namespace': 'social-network3', 'pod': 'compose-post-service-866f6d7b74-vv756', 'pod_template_hash': '866f6d7b74', 'security_istio_io_tlsMode': 'istio', 'service': 'compose-post-service', 'service_istio_io_canonical_name': 'compose-post-service', 'service_istio_io_canonical_revision': 'latest'}, 'value': [1717079244.067, '1']}, {'metric': {'__name__': 'up', 'app': 'compose-post-service', 'instance': '192.168.69.208:1502

In [2]:
'''Deploy cross-node communication measuring pods'''

from kubernetes import client, config

def check_pods_existence(namespace):
    core_v1 = client.CoreV1Api()
    try:
        pods = core_v1.list_namespaced_pod(namespace=namespace)
        return len(pods.items) > 0 # return True
    except client.rest.ApiException as e:
        print(f"Exception when calling CoreV1Api->list_namespaced_pod: {e}")
        return False

def deploy_latency_measurement_daemonset_if_needed():
    namespace = 'measure-nodes'
    ds_name = 'latency-measurement-ds'
    
    # Assuming config and apps_v1 have been defined as before
    config.load_kube_config()
    apps_v1 = client.AppsV1Api()

    # Check if pods exist in the namespace
    pods_exist = check_pods_existence(namespace)

    # If no pods exist, deploy the DaemonSet
    if not pods_exist:
        # Define the body of the DaemonSet
        ds_body = client.V1DaemonSet(
            api_version="apps/v1",
            kind="DaemonSet",
            metadata=client.V1ObjectMeta(name=ds_name),
            spec=client.V1DaemonSetSpec(
                selector=client.V1LabelSelector(
                    match_labels={"app": "latency-measurement"}
                ),
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(labels={"app": "latency-measurement"}),
                    spec=client.V1PodSpec(
                        containers=[client.V1Container(
                            name="latency-container",
                            image="curlimages/curl",
                            security_context=client.V1SecurityContext(
                                capabilities=client.V1Capabilities(
                                    add=["NET_RAW"]  # Required for ping
                                )
                            )
                        )],
                        restart_policy="Always"
                    )
                )
            )
        )

        # Deploy the DaemonSet
        try:
            apps_v1.create_namespaced_daemon_set(namespace=namespace, body=ds_body)
            print(f"Deployed DaemonSet {ds_name} in namespace {namespace}")
        except client.rest.ApiException as e:
            print(f"Exception when calling AppsV1Api->create_namespaced_daemon_set: {e}")
    else:
        print("Pods already exist in the namespace. Skipping DaemonSet deployment.")

# Call the function to deploy the DaemonSet if needed
deploy_latency_measurement_daemonset_if_needed()

'''if the latency measuring pods are deployed, then run the mapping-problem'''

Pods already exist in the namespace. Skipping DaemonSet deployment.


'if the latency measuring pods are deployed, then run the mapping-problem'

In [4]:

# Function to measure node-to-node latency
def measure_http_latency(namespace='measure-nodes'):
    v1 = client.CoreV1Api()
    pods = v1.list_namespaced_pod(namespace, label_selector="app=latency-measurement").items
    latency_results = {}
    
    for source_pod in pods:
        source_pod_name = source_pod.metadata.name
        source_pod_node_name = source_pod.spec.node_name
        latency_results[source_pod_node_name] = {}
        
        for target_pod in pods:
            target_pod_ip = target_pod.status.pod_ip
            target_pod_name = target_pod.metadata.name
            target_pod_node_name = target_pod.spec.node_name
            if source_pod_name != target_pod_name:
                exec_command = ['curl', '-o', '/dev/null', '-s', '-w', '%{time_total}', f'http://{target_pod_ip}']
                try:
                    resp = stream.stream(v1.connect_get_namespaced_pod_exec,
                                         source_pod_name,
                                         namespace,
                                         command=exec_command,
                                         stderr=True,
                                         stdin=False,
                                         stdout=True,
                                         tty=False)
                    latency_results[source_pod_node_name][target_pod_node_name] = float(resp) * 1000
                except Exception as e:
                    print(f"Error executing command in pod {source_pod_name}: {e}")
                    latency_results[source_pod_node_name][target_pod_node_name] = np.inf  # Use infinity for errors
    
    return latency_results

# Call the function to measure latency
namespace = 'measure-nodes'
latency_results = measure_http_latency(namespace=namespace)

# Convert the nested dictionary into a pandas DataFrame and handle self-latency
df_latency = pd.DataFrame(latency_results).T
for worker in df_latency.columns:
    df_latency.at[worker, worker] = 0

# Convert DataFrame to delay matrix (numpy array)
delay_matrix = df_latency.to_numpy()

# Function to retrieve ready deployments
def get_ready_deployments(namespace):
    ready_deployments = []
    deployments = client.AppsV1Api().list_namespaced_deployment(namespace)
    for deployment in deployments.items:
        if deployment.status.ready_replicas == deployment.spec.replicas:
            ready_deployments.append(deployment.metadata.name)
    return ready_deployments

# Function to calculate transmitted requests
def transmitted_req_calculator(workload_src, workload_dst, timerange, step_interval):
    end_time = datetime.now() - timedelta(minutes=60*2)
    start_time = end_time - timedelta(minutes=timerange)

    istio_tcp_sent_query = f'istio_tcp_sent_bytes_total{{reporter="source",source_workload="{workload_src}",destination_workload="{workload_dst}"}}'
    istio_tcp_received_query = f'istio_tcp_received_bytes_total{{reporter="source",source_workload="{workload_src}",destination_workload="{workload_dst}"}}'

    istio_tcp_sent_response = prom.custom_query_range(
        query=istio_tcp_sent_query,
        start_time=start_time,
        end_time=end_time,
        step=step_interval
    )
    istio_tcp_received_response = prom.custom_query_range(
        query=istio_tcp_received_query,
        start_time=start_time,
        end_time=end_time,
        step=step_interval
    )

    if (not istio_tcp_sent_response or not istio_tcp_sent_response[0]['values']) and (not istio_tcp_received_response or not istio_tcp_received_response[0]['values']):
        return 0
    else:
        values_sent = istio_tcp_sent_response[0]['values']
        values_received = istio_tcp_received_response[0]['values']

        begin_timestamp, begin_traffic_sent_counter = values_sent[0]
        end_timestamp, end_traffic_sent_counter = values_sent[-1]
        begin_timestamp, begin_traffic_received_counter = values_received[0]
        end_timestamp, end_traffic_received_counter = values_received[-1]

        data_points_num_sent = len(values_sent)
        data_points_num_recevied = len(values_received)

        average_traffic_sent = (int(end_traffic_sent_counter) - int(begin_traffic_sent_counter)) / data_points_num_sent
        average_traffic_received = (int(end_traffic_received_counter) - int(begin_traffic_received_counter)) / data_points_num_recevied

        average_traffic_bytes = int((average_traffic_sent + average_traffic_received) / 2)
        return average_traffic_bytes

# Retrieve the list of ready deployments
namespace = 'social-network'
ready_deployments = get_ready_deployments(namespace)

# Initialize an empty DataFrame for the exec_graph
df_exec_graph = pd.DataFrame(index=ready_deployments, columns=ready_deployments, data=0.0)

# Fill the DataFrame with average request values
for deployment_src in ready_deployments:
    for deployment_dst in ready_deployments:
        if deployment_src != deployment_dst:
            average_requests = transmitted_req_calculator(
                workload_src=deployment_src, 
                workload_dst=deployment_dst, 
                timerange=10,  # look back at the 60mins long history
                step_interval='1m' # window step in 1 miniute
            )
            df_exec_graph.at[deployment_src, deployment_dst] = average_requests

# Plot the heatmap for exec_graph
# plt.figure(figsize=(15, 12))
# sns.heatmap(df_exec_graph, cmap='viridis', fmt=".2f")
# plt.title('Average Requests per 5 Minutes among Pods')
# plt.xlabel('Destination Pods')
# plt.ylabel('Source Pods')
# plt.show()

# Convert DataFrame to exec_graph (numpy array)
exec_graph = df_exec_graph.to_numpy()

# Define the functions for placement and cost calculation
def calculate_communication_cost(exec_graph, placement, delay_matrix):
    cost = 0
    for u in range(len(exec_graph)):
        for v in range(len(exec_graph[u])):
            if exec_graph[u][v] > 0:
                server_u = placement[u]
                server_v = placement[v]
                cost += exec_graph[u][v] * delay_matrix[server_u][server_v]
    return cost

def greedy_placement_worker(exec_graph, delay_matrix, placement, num_servers, start, end):
    current_cost = calculate_communication_cost(exec_graph, placement, delay_matrix)
    improved = True
    while improved:
        improved = False
        for u in range(start, end):
            current_server = placement[u]
            for new_server in range(num_servers):
                if new_server != current_server:
                    new_placement = placement.copy()
                    new_placement[u] = new_server
                    new_cost = calculate_communication_cost(exec_graph, new_placement, delay_matrix)
                    if new_cost < current_cost:
                        placement = new_placement
                        current_cost = new_cost
                        improved = True
                        break
    return placement, current_cost

def parallel_greedy_placement(exec_graph, delay_matrix, placement, num_servers, num_workers=4):
    pool = mp.Pool(num_workers)
    num_microservices = len(exec_graph)
    chunk_size = num_microservices // num_workers # //: divide with integral result (discard remainder)
    chunks = [(exec_graph, delay_matrix, placement, num_servers, i*chunk_size, (i+1)*chunk_size) for i in range(num_workers)]
    results = pool.starmap(greedy_placement_worker, chunks)
    pool.close()
    pool.join()

    # Combine results from different workers
    for result in results:
        placement, _ = result
    
    return placement, calculate_communication_cost(exec_graph, placement, delay_matrix)

# test usage with the generated delay matrix and initial placement
M = len(exec_graph)  # Number of microservices
N = len(delay_matrix)  # Number of servers

# Initial random placement
initial_placement = [random.randint(0, N - 1) for _ in range(M)]

# Perform parallel greedy placement
final_placement, total_cost = parallel_greedy_placement(exec_graph, delay_matrix, initial_placement, N, num_workers=mp.cpu_count())

print("initial_placement:", initial_placement)

print("Final Placement:", final_placement)
print("Total Communication Cost:", total_cost)


initial_placement: [7, 6, 6, 7, 3, 5, 3, 2, 8, 0, 8, 4, 6, 8, 5, 6, 2, 8, 8, 8, 2, 7, 4, 2, 2, 4, 1]
Final Placement: [7, 6, 6, 7, 3, 5, 3, 2, 8, 0, 8, 4, 6, 8, 5, 6, 2, 8, 8, 8, 2, 7, 4, 2, 2, 4, 1]
Total Communication Cost: 144256.735
