Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic scheduling scenario updated to enable static clustering. #7

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 20 additions & 23 deletions src/org/fog/entities/MicroserviceFogDevice.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,6 @@ public void addPlacementRequest(PlacementRequest pr) {
sendNow(getId(), FogEvents.PROCESS_PRS);
}

private void sendThroughFreeClusterLink(Tuple tuple, Integer clusterNodeID) {
double networkDelay = tuple.getCloudletFileSize() / getClusterLinkBandwidth();
setClusterLinkBusy(true);
double latency = (getClusterMembersToLatencyMap()).get(clusterNodeID);
send(getId(), networkDelay, FogEvents.UPDATE_CLUSTER_TUPLE_QUEUE);

if (tuple instanceof ManagementTuple) {
send(clusterNodeID, networkDelay + latency + ((ManagementTuple) tuple).processingDelay, FogEvents.MANAGEMENT_TUPLE_ARRIVAL, tuple);
//todo
// if (Config.ENABLE_NETWORK_USAGE_AT_PLACEMENT)
// NetworkUsageMonitor.sendingManagementTuple(latency, tuple.getCloudletFileSize());
} else {
send(clusterNodeID, networkDelay + latency, FogEvents.TUPLE_ARRIVAL, tuple);
NetworkUsageMonitor.sendingTuple(latency, tuple.getCloudletFileSize());
}
}

protected void setDeviceType(String deviceType) {
if (deviceType.equals(MicroserviceFogDevice.CLIENT) || deviceType.equals(MicroserviceFogDevice.FCN) ||
deviceType.equals(MicroserviceFogDevice.FON) || deviceType.equals(MicroserviceFogDevice.CLOUD))
Expand Down Expand Up @@ -197,7 +180,7 @@ protected void processTupleArrival(SimEvent ev) {
if (tuple.getDirection() == Tuple.UP) {
int destination = controllerComponent.getDestinationDeviceId(tuple.getDestModuleName());
if (destination == -1) {
System.out.println("Service DiscoveryInfo missing. Tuple routing stopped for : " + tuple.getDestModuleName());
System.out.println("Service DiscoveryInfo missing in device : " + getId() + "-" + getDeviceType() + ". Tuple routing stopped for : " + tuple.getDestModuleName());
return;
}
tuple.setDestinationDeviceId(destination);
Expand Down Expand Up @@ -452,14 +435,14 @@ protected void processModuleArrival(SimEvent ev) {
module.updateVmProcessing(CloudSim.clock(), getVmAllocationPolicy().getHost(module).getVmScheduler()
.getAllocatedMipsForVm(module));

System.out.println("Module " + module.getName() + "created on " + getName() + " under Launch module");
System.out.println("Module " + module.getName() + " created on " + getName() + " under Launch module");
Logger.debug("Module deploy success", "Module " + module.getName() + " placement on " + getName() + " successful. vm id : " + module.getId());
} else {
Logger.error("Module deploy error", "Module " + module.getName() + " placement on " + getName() + " failed");
System.out.println("Module " + module.getName() + " placement on " + getName() + " failed");
}
} else {
System.out.println("Module " + module.getName() + " already deplyed on" + getName());
System.out.println("Module " + module.getName() + " already deployed on " + getName());
}
}

Expand Down Expand Up @@ -653,21 +636,35 @@ protected void sendUpFreeLink(Tuple tuple) {
} else {
super.sendUpFreeLink(tuple);
}
}

protected void sendToCluster(Tuple tuple, int clusterNodeID) {
if (tuple instanceof ManagementTuple) {
double networkDelay = tuple.getCloudletFileSize() / getClusterLinkBandwidth();
setClusterLinkBusy(true);
double latency = getClusterMembersToLatencyMap().get(clusterNodeID);
send(getId(), networkDelay, FogEvents.UPDATE_CLUSTER_TUPLE_QUEUE);
send(clusterNodeID, networkDelay + latency + ((ManagementTuple) tuple).processingDelay, FogEvents.MANAGEMENT_TUPLE_ARRIVAL, tuple);
//todo
// if (Config.ENABLE_NETWORK_USAGE_AT_PLACEMENT)
// NetworkUsageMonitor.sendingManagementTuple(latency, tuple.getCloudletFileSize());
} else {
super.sendToCluster(tuple, clusterNodeID);
}
}

public void updateRoutingTable(int destId, int nextId) {
routingTable.put(destId, nextId);
}

private void updateCLusterConsInRoutingTable() {
for(int deviceId:clusterMembers){
routingTable.put(deviceId,deviceId);
for (int deviceId : clusterMembers) {
routingTable.put(deviceId, deviceId);
}
}

public void removeMonitoredDevice(FogDevice fogDevice) {
controllerComponent.removeMonitoredDevice(fogDevice);
controllerComponent.removeMonitoredDevice(fogDevice);
}

public void addMonitoredDevice(FogDevice fogDevice) {
Expand Down
16 changes: 13 additions & 3 deletions src/org/fog/placement/DistributedMicroservicePlacementLogic.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ public void updateResources(Map<Integer, Map<String, Double>> resourceAvailabili

@Override
public void postProcessing() {

currentCpuLoad = 0.0;
currentModuleMap = new ArrayList<>();
currentModuleLoadMap = new HashMap<>();
currentModuleInstanceNum = new HashMap<>();
prStatus = new HashMap<>();
}

private PlacementLogicOutput generatePlacementMap() {
Expand All @@ -87,7 +91,7 @@ private PlacementLogicOutput generatePlacementMap() {
placement.put(placementRequest.getPlacementRequestId(), placementRequest.getPlacedMicroservices());
}

Map<Integer, Map<Application, List<ModuleLaunchConfig>>> perDevice = new HashMap<>();
Map<Integer, Map<Application, List<ModuleLaunchConfig>>> perDevice = new HashMap<>(); // per this algo only contains microservices placed on this device
Map<Integer, List<Pair<String, Integer>>> serviceDiscoveryInfo = new HashMap<>();
if (placement != null) {
for (int prID : placement.keySet()) {
Expand All @@ -101,6 +105,8 @@ private PlacementLogicOutput generatePlacementMap() {
for (String microserviceName : placement.get(prID).keySet()) {
int deviceID = placement.get(prID).get(microserviceName);

if(deviceID!=fogDevice.getId())
continue;
//service discovery info propagation
List<Integer> clientDevices = getClientServiceNodeIds(application, microserviceName, placementRequest.getPlacedMicroservices(), placement.get(prID));
for (int clientDevice : clientDevices) {
Expand Down Expand Up @@ -139,7 +145,11 @@ private PlacementLogicOutput generatePlacementMap() {

}

return new PlacementLogicOutput(perDevice, serviceDiscoveryInfo, prStatus);
Map<PlacementRequest,Integer> prStatusTemp = new HashMap<>();
for(PlacementRequest pr:prStatus.keySet()){
prStatusTemp.put(pr,prStatus.get(pr));
}
return new PlacementLogicOutput(perDevice, serviceDiscoveryInfo, prStatusTemp);
}

public void mapModules() {
Expand Down
28 changes: 26 additions & 2 deletions src/org/fog/placement/MicroservicesController.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class MicroservicesController extends SimEntity {
* @param fogDevices
* @param sensors
* @param applications
* Used when monitored devices of each Fog Orchestration Node(which runs the placement logic) is not known but calculated dynamically.
*/
public MicroservicesController(String name, List<FogDevice> fogDevices, List<Sensor> sensors, List<Application> applications, List<Integer> clusterLevels, Double clusterLatency, int placementLogic) {
super(name);
Expand All @@ -46,6 +47,18 @@ public MicroservicesController(String name, List<FogDevice> fogDevices, List<Sen

}

/**
*
* @param name
* @param fogDevices
* @param sensors
* @param applications
* @param clusterLevels
* @param clusterLatency
* @param placementLogic
* @param monitored
* Used when monitored devices of each Fog Orchestration Node(which runs the placement logic) is pre-set.(@param - monitored)
*/
public MicroservicesController(String name, List<FogDevice> fogDevices, List<Sensor> sensors, List<Application> applications, List<Integer> clusterLevels, Double clusterLatency, int placementLogic, Map<Integer, List<FogDevice>> monitored) {
super(name);
this.fogDevices = fogDevices;
Expand All @@ -61,6 +74,11 @@ public MicroservicesController(String name, List<FogDevice> fogDevices, List<Sen
protected void init() {
connectWithLatencies();

/** supports 3 modes of clustering
* 1. No clustering : Config.ENABLE_STATIC_CLUSTERING = false, Config.ENABLE_DYNAMIC_CLUSTERING = false
* 2. Static clustering : Config.ENABLE_STATIC_CLUSTERING = true (Nodes of the same level that shares the same parent)
* 3. Dynamic clustering : Clustered based on location of the nodes and their transmission powers
**/
if (Config.ENABLE_STATIC_CLUSTERING) {
for (Integer id : clustering_levels)
createClusterConnections(id, fogDevices, Config.clusteringLatency);
Expand All @@ -74,7 +92,12 @@ protected void init() {
protected void init(Map<Integer, List<FogDevice>> monitored) {
connectWithLatencies();

if (!Config.ENABLE_STATIC_CLUSTERING) {
/** supports 3 modes of clustering
* 1. No clustering : Config.ENABLE_STATIC_CLUSTERING = false, Config.ENABLE_DYNAMIC_CLUSTERING = false
* 2. Static clustering : Config.ENABLE_STATIC_CLUSTERING = true (Nodes of the same level that shares the same parent)
* 3. Dynamic clustering : Clustered based on location of the nodes and their transmission powers
**/
if (Config.ENABLE_STATIC_CLUSTERING) {
for (Integer id : clustering_levels)
createClusterConnections(id, fogDevices, Config.clusteringLatency);
}
Expand Down Expand Up @@ -130,12 +153,13 @@ protected void generateRoutingTable() {
for (FogDevice f : fogDevices) {
((MicroserviceFogDevice) f).addRoutingTable(routing.get(f.getId()));
}

}

public void startEntity() {
//In STATIC mode initial placement is carried out before simulation start
if (MicroservicePlacementConfig.SIMULATION_MODE == "STATIC")
initiatePlacementRequestProcessing();
// In DYNAMIC placement initial placement can be carried out after the start of the simulation to simulate the effect of placement delays
if (MicroservicePlacementConfig.SIMULATION_MODE == "DYNAMIC")
initiatePlacementRequestProcessingDynamic();

Expand Down
27 changes: 12 additions & 15 deletions src/org/fog/test/perfeval/MicroservicesAppSample1.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class MicroservicesAppSample1 {

static int proxyServers = 2; // proxy server
static Integer[] gatewayDevices = new Integer[]{3, 3}; // GW devices
static Integer[] mobilesPerL2 = new Integer[]{3, 2, 1, 2, 3, 1}; // eg : client end devices ( mobiles )
static Integer[] mobilesPerL2 = new Integer[]{7, 4, 7, 8, 5, 4}; // eg : client end devices ( mobiles )
private static int l2Num = 0; // fog adding l1 nodes
static Integer deviceNum = 0;

Expand All @@ -50,7 +50,7 @@ public class MicroservicesAppSample1 {
static Integer[] cpus = new Integer[]{2800, 6000};
static Integer[] ram = new Integer[]{2048, 4096};

static double ECG_TRANSMISSION_TIME = 5;
static double ECG_TRANSMISSION_TIME = 10;

//cluster link latency 2ms
static Double clusterLatency = 2.0;
Expand All @@ -64,10 +64,11 @@ public class MicroservicesAppSample1 {

/**
* Config properties
* SIMULATION_MODE -> dynamic
* SIMULATION_MODE -> DYNAMIC
* PR_PROCESSING_MODE -> SEQUENTIAL
* ENABLE_RESOURCE_DATA_SHARING -> true
* DYNAMIC_CLUSTERING -> false
* STATIC_CLUSTERING -> true (with clustering) or false (without clustering)
*/
public static void main(String[] args) {

Expand Down Expand Up @@ -99,7 +100,7 @@ public static void main(String[] args) {
createFogDevices(broker.getId());

List<Integer> clusterLevelIdentifier = new ArrayList<>();
clusterLevelIdentifier.add(2);
clusterLevelIdentifier.add(2); // level of the clustered devices

Map<Integer, List<FogDevice>> monitored = new HashMap<>();
for (FogDevice f : fogDevices) {
Expand Down Expand Up @@ -173,10 +174,10 @@ private static FogDevice addL2Devices(String id, int userId, int parentId, int p
FogDevice dept;
if (diffResource) {
int pos = deviceNum % 2;
dept = createFogDevice("L2-" + id, cpus[pos], ram[pos], 1250000, 18750, 2, 0.0, 107.339, 83.4333, MicroserviceFogDevice.FON);
dept = createFogDevice("L2-" + parentId + "-" + id, cpus[pos], ram[pos], 1250000, 18750, 2, 0.0, 107.339, 83.4333, MicroserviceFogDevice.FON);
deviceNum = deviceNum + 1;
} else {
dept = createFogDevice("L2-" + id, 2800, 2048, 1250000, 18750, 2, 0.0, 107.339, 83.4333, MicroserviceFogDevice.FON);
dept = createFogDevice("L2-" + parentId + "_" + id, 2800, 2048, 1250000, 18750, 2, 0.0, 107.339, 83.4333, MicroserviceFogDevice.FON);
}
fogDevices.add(dept);
dept.setParentId(parentId);
Expand All @@ -194,12 +195,11 @@ private static FogDevice addMobile(String id, int userId, int parentId) {

Application application = applications.get(0);
String appId = application.getAppId();
double throughput = 200;

FogDevice mobile = createFogDevice("m-" + id, 1000, 2048, 18750, 250, 3, 0, 87.53, 82.44, MicroserviceFogDevice.CLIENT);
mobile.setParentId(parentId);

Sensor eegSensor = new Sensor("s-" + id, "ECG", userId, appId, new DeterministicDistribution(1000 / (throughput / 9 * 10))); // inter-transmission time of EEG sensor follows a deterministic distribution
Sensor eegSensor = new Sensor("s-" + id, "ECG", userId, appId, new DeterministicDistribution(ECG_TRANSMISSION_TIME)); // inter-transmission time of EEG sensor follows a deterministic distribution
eegSensor.setApp(application);
sensors.add(eegSensor);

Expand Down Expand Up @@ -339,17 +339,14 @@ private static Application createApplication(String appId, int userId) {
/*
* Adding modules (vertices) to the application model (directed graph)
*/
application.addAppModule("client", 128, 605, 100); // adding module Client to the application model MB,MIPS,MB,kbps
application.addAppModule("ECGFeature_Extractor", 256, 630, 200); // adding module Concentration Calculator to the application model
application.addAppModule("ECG_Analyser", 512, 100, 2000); // adding module Connector to the application model
application.addAppModule("client", 128, 300, 100); // adding module Client to the application model MB,MIPS,MB,kbps
application.addAppModule("ECGFeature_Extractor", 256, 450, 200); // adding module Concentration Calculator to the application model
application.addAppModule("ECG_Analyser", 512, 1100, 2000); // adding module Connector to the application model

/*
* Connecting the application modules (vertices) in the application model (directed graph) with edges
*/
if (ECG_TRANSMISSION_TIME == 10)
application.addAppEdge("ECG", "client", 2000, 500, "ECG", Tuple.UP, AppEdge.SENSOR); // adding edge from EEG (sensor) to Client module carrying tuples of type EEG
else
application.addAppEdge("ECG", "client", 3000, 500, "ECG", Tuple.UP, AppEdge.SENSOR);
application.addAppEdge("ECG", "client", 2000, 500, "ECG", Tuple.UP, AppEdge.SENSOR); // adding edge from EEG (sensor) to Client module carrying tuples of type EEG
application.addAppEdge("client", "ECGFeature_Extractor", 3500, 500, "_SENSOR", Tuple.UP, AppEdge.MODULE); // adding edge from Client to Concentration Calculator module carrying tuples of type _SENSOR
application.addAppEdge("ECGFeature_Extractor", "ECG_Analyser", 100, 10000, 1000, "ECG_FEATURES", Tuple.UP, AppEdge.MODULE); // adding periodic edge (period=1000ms) from Concentration Calculator to Connector module carrying tuples of type PLAYER_GAME_STATE
application.addAppEdge("ECGFeature_Extractor", "client", 14, 500, "ECG_FEATURE_ANALYSIS", Tuple.DOWN, AppEdge.MODULE); // adding edge from Concentration Calculator to Client module carrying tuples of type CONCENTRATION
Expand Down