# Lapse: Latency \& Power-Aware Placement of Data Stream Applications on Edge Computing

**Abstract:** Data Stream Processing (DSP) systems have gained considerable attention in edge computing environments to handle data streams from diverse sources, notably IoT devices, in real-time at the network’s edge. However, their effective utilization concerning end-to-end processing latency, SLA violations, and infrastructure power consumption in heterogeneous environments depends on the adopted placement strategy, posing a significant challenge. This paper introduces Lapse, an innovative cost-based heuristic algorithm specifically crafted to optimize the placement of DSP applications within edge computing environments. Lapse aims to concurrently minimize latency SLA violations and curtail the overall power consumption of the underlying infrastructure. Simulation-driven experiments indicate that Lapse outperforms baseline strategies, substantially reducing the power consumption of the infrastructure by up to 24.42% and SLA violations by up to 75%.

<!-- There is two convenient options to reproduce ours experiments. First, you can clone the repository and execute this notebook on your local machine. Alternatively, you can access and run it directly on [MyBinder](https://mybinder.org/v2/gh/carloshkayser/lapse/master?filepath=analysis.ipynb).

[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/carloshkayser/lapse/HEAD?labpath=analysis.ipynb) -->


Let's define the name of our dataset:

In [None]:
dataset = "dataset"

from IPython.display import IFrame
IFrame(f"datasets/{dataset}.pdf", width=800, height=600)

Now, let's execute the experiments:

In [None]:
!python -B -m simulator --dataset datasets/{dataset}.json --algorithm storm

In [None]:
!python -B -m simulator --dataset datasets/{dataset}.json --algorithm storm_la

In [None]:
!python -B -m simulator --dataset datasets/{dataset}.json --algorithm aels

In [None]:
!python -B -m simulator --dataset datasets/{dataset}.json --algorithm aels_pa

In [None]:
!python -B -m simulator --dataset datasets/{dataset}.json --algorithm lapse

---


## Results

With the experiments executed, we can now analyze the results. First, let's import some libraries and load the results:


In [None]:
!mkdir plots

In [None]:
from glob import glob

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import msgpack

pd.set_option("display.max_colwidth", None)

sns.set_theme(style="whitegrid")

colors = ["#F12B2E", "#BEC42E", "#25912E", "#3454D1", "#FF8019"]
sns.set_palette(sns.color_palette(colors, desat=0.75))


In [None]:
def read_results(path):
    data = []
    for algorithm in glob(path):
        for log in glob(algorithm + "/*"):
            if "Topology" in log:
                with open(log, "rb") as f:
                    metrics = msgpack.load(f, strict_map_key=False)[-1]
                    metrics["algorithm"] = algorithm
                    data.append(metrics)

    return data


In [None]:
data = read_results(f"logs/algorithm=*;dataset={dataset};")
results_df = pd.DataFrame(data)

results_df["algorithm"]


In [None]:
results_df["algorithm"] = results_df["algorithm"].str.split(";").str[0].str.split("=").str[1]

results_df[["algorithm", "overall_occupation", "overall_power_consumption", "number_of_processing_latency_sla_violation"]].sort_values(
    by=["algorithm"], ascending=True
).reset_index(drop=True)


In [None]:
names = {
    "storm": "Storm",
    "storm_la": "Storm-LA",
    "aels": "AELS",
    "aels_pa": "AELS-PA",
    "lapse": "Lapse",
}

rename = lambda x: names[x] if x in names else x

results_df["algorithm"] = results_df["algorithm"].apply(rename)

results_df = results_df.sort_values(by=["algorithm"], ascending=True).reset_index(drop=True)

results_df["algorithm"]


#### SLA Violations by Chain Size


In [None]:
sla_violations_by_chain_size = results_df[["algorithm", "chain_size"]]

sla_violations_by_chain_size = sla_violations_by_chain_size.explode("chain_size").reset_index(drop=True)

sla_violations_by_chain_size = pd.concat(
    [sla_violations_by_chain_size[["algorithm"]], pd.json_normalize(sla_violations_by_chain_size["chain_size"])], axis=1
)

sla_violations_by_chain_size


In [None]:
data = {}
for index, row in sla_violations_by_chain_size.iterrows():
    if row["algorithm"] not in data:
        data[row["algorithm"]] = {}

    if row["chain_size"] not in data[row["algorithm"]]:
        data[row["algorithm"]][row["chain_size"]] = 0

    data[row["algorithm"]][row["chain_size"]] += row["delay_sla_violations"]

data


In [None]:
df = pd.DataFrame(data)

df = df.transpose()

ax = df.plot.bar(stacked=True, figsize=(5, 5))

plt.ylabel("Latency SLA Violations", fontsize=16, fontweight="bold", labelpad=10)

# sort both labels and handles by labels
handles, labels = ax.get_legend_handles_labels()
labels, handles = zip(*sorted(zip(labels, handles), key=lambda t: int(t[0])))

plt.legend(
    handles,
    labels,
    borderaxespad=-2,
    bbox_to_anchor=(0, 0.8, 1, 0.2),
    loc="upper center",
    mode="expand",
    ncol=3,
    prop={"size": 14},
    markerscale=2,
    frameon=False,
)

# set grid style
ax.grid(axis="x")
ax.grid(axis="y", linestyle="--")

# increase tick size
ax.tick_params(axis="both", which="major", labelsize=16)

# put x axis labels on an angle
for item in ax.get_xticklabels():
    item.set_rotation(45)

plt.savefig(f"plots/{dataset}-sla-violations-by-chain-size.pdf", dpi=300, bbox_inches="tight")

plt.show()


#### Power Consumption


In [None]:
data_by_model = results_df[["algorithm", "model"]]
data_by_model = data_by_model.explode("model").reset_index(drop=True)
data_by_model = pd.concat([data_by_model[["algorithm"]], pd.json_normalize(data_by_model["model"])], axis=1)

data_by_model


In [None]:
data = {}
for index, row in data_by_model.iterrows():
    if row["algorithm"] not in data:
        data[row["algorithm"]] = {}

    if row["codename"] not in data[row["algorithm"]]:
        data[row["algorithm"]][row["codename"]] = 0

    data[row["algorithm"]][row["codename"]] += row["power_consumption"]

data


In [None]:
df = pd.DataFrame(data)

# set algorithm as index
df = df.transpose()

ax = df.plot.bar(stacked=True, figsize=(5, 5))

plt.ylabel("Power Consumption (W)", fontsize=16, fontweight="bold", labelpad=10)

plt.legend(
    borderaxespad=-2,
    bbox_to_anchor=(0, 0.8, 1, 0.2),
    loc="upper center",
    mode="expand",
    ncol=3,
    prop={"size": 14},
    markerscale=2,
    frameon=False,
)

# increase tick size
ax.tick_params(axis="both", which="major", labelsize=16)

ax.grid(axis="x")
ax.grid(axis="y", linestyle="--")

# put x axis labels on an angle
for item in ax.get_xticklabels():
    item.set_rotation(45)

plt.savefig(f"plots/{dataset}-power-consumption-by-model.pdf", dpi=300, bbox_inches="tight")

plt.show()


#### Edge Servers Occupation


In [None]:
data = {}
for index, row in data_by_model.iterrows():
    if row["algorithm"] not in data:
        data[row["algorithm"]] = {}

    if row["codename"] not in data[row["algorithm"]]:
        data[row["algorithm"]][row["codename"]] = 0

    data[row["algorithm"]][row["codename"]] += row["occupation"]

data


In [None]:
# Normalize data
for algorithm in data:
    total = sum(data[algorithm].values())

    for model in data[algorithm]:
        data[algorithm][model] = data[algorithm][model] / total * 100

data


In [None]:
df = pd.DataFrame(data)

# set algorithm as index
df = df.transpose()

ax = df.plot.bar(stacked=True, figsize=(5, 5))

plt.ylabel("Edge Server Occupation (%)", fontsize=16, fontweight="bold", labelpad=8)

plt.legend(
    borderaxespad=-2,
    bbox_to_anchor=(0, 0.8, 1, 0.2),
    loc="upper center",
    mode="expand",
    ncol=4,
    prop={"size": 14},
    markerscale=2,
    frameon=False,
)

ax.grid(axis="x")
ax.grid(axis="y", linestyle="--")

# increase tick size
ax.tick_params(axis="both", which="major", labelsize=16)

# # put x axis labels on an angle
for item in ax.get_xticklabels():
    item.set_rotation(45)

plt.savefig(f"plots/{dataset}-occupation-by-model.pdf", dpi=300, bbox_inches="tight")

plt.show()


#### Application Communication Path Size by Chain Size


In [None]:
results_df[["algorithm", "path_size", "path_size_by_sla", "path_size_by_chain_size"]]


In [None]:
data = {"algorithm": [], "app_size": [], "path_size": []}
for index, row in results_df.iterrows():
    for app_size, path_size in row["path_size_by_chain_size"].items():
        for size in path_size:
            data["algorithm"].append(row["algorithm"])
            data["app_size"].append(app_size)
            data["path_size"].append(size)

df = pd.DataFrame(data)
df


In [None]:
from matplotlib.patheffects import withStroke, Normal

# Calculate aspect ratio based on figsize
figsize = (8, 5)
height = figsize[1]
width = figsize[0]
aspect_ratio = width / height

g = sns.catplot(
    data=df,
    kind="bar",
    x="app_size",
    y="path_size",
    hue="algorithm",
    height=height,  # Set the height of the figure
    aspect=aspect_ratio,  # Use the calculated aspect ratio
    width=0.9,
    # saturation=1,
)

# Remove legend
g._legend.remove()

plt.xlabel("Number of operators", fontsize=16, fontweight="bold", labelpad=8)
plt.ylabel("Path Size", fontsize=16, fontweight="bold", labelpad=8)

plt.legend(
    borderaxespad=-2,
    bbox_to_anchor=(0, 0.8, 1, 0.2),
    loc="upper center",
    mode="expand",
    ncol=5,
    prop={"size": 14},
    markerscale=2,
    frameon=False,
)

# set grid style
g.ax.grid(axis="y", linestyle="--")

# set grid box
g.ax.set_axisbelow(True)

# Increase tick size
g.ax.tick_params(axis="both", which="major", labelsize=16)

# Put x-axis labels on an angle
for item in g.ax.get_xticklabels():
    item.set_rotation(0)

# Annotate the bars with their values
for p in g.ax.patches:
    stroke = withStroke(linewidth=2, foreground="black")
    normal = Normal()

    g.ax.annotate(
        "{:.1f}".format(p.get_height()),
        (p.get_x() + p.get_width() / 2.0, 0.5),
        ha="center",
        va="bottom",
        fontsize=14,
        color="white",
        weight="bold",
        path_effects=[stroke, normal],
    )


plt.savefig(f"plots/{dataset}-path-size-by-chain-size.pdf", dpi=300, bbox_inches="tight")
plt.show()


#### Bandwidth Available for each Data Flow


In [None]:
bw_avail_for_each_flow = results_df[["algorithm", "bandwidth_available_for_each_flow_percentage"]].copy()
bw_avail_for_each_flow["bandwidth_available_for_each_flow_percentage"] = bw_avail_for_each_flow[
    "bandwidth_available_for_each_flow_percentage"
].apply(lambda x: list(x.values()))
bw_avail_for_each_flow = bw_avail_for_each_flow.explode("bandwidth_available_for_each_flow_percentage")

bw_avail_for_each_flow.head(5)


In [None]:
bw_avail_for_each_flow = bw_avail_for_each_flow.astype({"bandwidth_available_for_each_flow_percentage": float})
bw_avail_for_each_flow.head(5)


In [None]:
# compute mean, median, max, min for each algorithm
bw_avail_for_each_flow.groupby("algorithm").describe()


In [None]:
plt.figure(figsize=(8, 5))

sns.boxplot(
    x="algorithm",
    y="bandwidth_available_for_each_flow_percentage",
    data=bw_avail_for_each_flow,
    showmeans=True,
    meanprops={"marker": "o", "markerfacecolor": "white", "markeredgecolor": "black", "markersize": "8"},
    showfliers=False,
    whis=(0, 100),
)

# set grid style
plt.grid(axis="y", linestyle="--")

# remove legend
plt.legend([], [], frameon=False)

# increase tick size
plt.tick_params(axis="both", which="major", labelsize=16)

# start at 0
plt.ylim(0, 105)

# set y axis ticks
plt.yticks(np.arange(0, 110, 10))

plt.xlabel("Algorithm", fontsize=18, fontweight="bold", labelpad=10)
plt.ylabel("Bandwidth Available (%)", fontsize=18, fontweight="bold", labelpad=10)

plt.savefig(f"plots/{dataset}-bandwidth-available-flow.pdf", dpi=300, bbox_inches="tight")
