From ebc864fe280a283393ca049bb1b77d9bad8278e5 Mon Sep 17 00:00:00 2001 From: devleeskNex Date: Thu, 25 Apr 2019 18:11:08 +0900 Subject: [PATCH] =?UTF-8?q?OKE=EC=97=90=20=EB=8C=80=EC=9D=91=20=ED=95=98?= =?UTF-8?q?=EA=B8=B0=20=EC=9C=84=ED=95=B4=20K8S=EC=A0=95=EB=B3=B4=EC=A4=91?= =?UTF-8?q?=20nodeName=EC=9D=84=20Host=20name=EC=9C=BC=EB=A1=9C=20?= =?UTF-8?q?=EB=B3=80=EA=B2=BD=ED=95=A8.=20OKE=EB=8A=94=20node=20name?= =?UTF-8?q?=EB=B6=80=EB=B6=84=EC=9D=84=20IP=EB=A1=9C=20=EB=8C=80=EC=B2=B4?= =?UTF-8?q?=ED=95=A8.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/akka/actor/JsonK8SAPIParserActor.java | 102 +++++++++++++----- .../akka/actor/KafkaDockerConsumerActor.java | 7 +- .../akka/actor/KafkaHostConsumerActor.java | 8 +- .../akka/actor/KafkaK8SAPIConsumerActor.java | 7 +- .../com/nexcloud/api/rabitmq/Publish.java | 8 +- 5 files changed, 99 insertions(+), 33 deletions(-) diff --git a/NexCollector/src/main/java/com/nexcloud/api/akka/actor/JsonK8SAPIParserActor.java b/NexCollector/src/main/java/com/nexcloud/api/akka/actor/JsonK8SAPIParserActor.java index a52add1..ac8820c 100644 --- a/NexCollector/src/main/java/com/nexcloud/api/akka/actor/JsonK8SAPIParserActor.java +++ b/NexCollector/src/main/java/com/nexcloud/api/akka/actor/JsonK8SAPIParserActor.java @@ -60,6 +60,7 @@ public class JsonK8SAPIParserActor extends UntypedActor{ private String data; private Map containerMap = new HashMap(); + private Map hostNameInfo = new HashMap(); private Map hostInfo = new HashMap(); private Map hostCpuInfo = new HashMap(); @@ -102,38 +103,41 @@ public void onReceive(Object message){ public void exec( SendData sendData, String data ) { - RedisCluster redisCluster = null; - String msg = ""; + RedisCluster redisCluster = null; + String msg = ""; - String pattern = "#####.###"; - DecimalFormat dformat = new DecimalFormat( pattern ); + String nodeName = null; + String nodeIP = null; + + String pattern = "#####.###"; + DecimalFormat dformat = new DecimalFormat( pattern ); - ConsumerRecords records = null; + ConsumerRecords records = null; - Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create(); + Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create(); try{ // Redis Cluster connection - redisCluster = RedisCluster.getInstance(sendData.getRedis_host(), Integer.parseInt(sendData.getRedis_port())); + redisCluster = RedisCluster.getInstance(sendData.getRedis_host(), Integer.parseInt(sendData.getRedis_port())); /** * Agent metric data */ - ResponseData resData = null; - Header header = null; - String body = null; + ResponseData resData = null; + Header header = null; + String body = null; - msg = ""; + msg = ""; - Host host = null; - Docker docker = null; + Host host = null; + Docker docker = null; - K8SDaemonSetThread daemonsetThread = K8SDaemonSetThread.getInstance(); - K8SDeploymentThread deploymentThread = K8SDeploymentThread.getInstance(); - K8SEndpointThread endpointThread = K8SEndpointThread.getInstance(); - K8SNamespaceThread namespaceThread = K8SNamespaceThread.getInstance(); - K8SStatefulsetThread statefulThread = K8SStatefulsetThread.getInstance(); - K8SReplicasetThread replicaThread = K8SReplicasetThread.getInstance(); + K8SDaemonSetThread daemonsetThread = K8SDaemonSetThread.getInstance(); + K8SDeploymentThread deploymentThread = K8SDeploymentThread.getInstance(); + K8SEndpointThread endpointThread = K8SEndpointThread.getInstance(); + K8SNamespaceThread namespaceThread = K8SNamespaceThread.getInstance(); + K8SStatefulsetThread statefulThread = K8SStatefulsetThread.getInstance(); + K8SReplicasetThread replicaThread = K8SReplicasetThread.getInstance(); resData = Util.JsonTobean(data, ResponseData.class); header = resData.getHeader(); @@ -154,6 +158,7 @@ public void exec( SendData sendData, String data ) { hostInfo.clear(); hostCpuInfo.clear(); + hostNameInfo.clear(); ips = gson.fromJson(data, new TypeToken>(){}.getType()); @@ -164,6 +169,7 @@ public void exec( SendData sendData, String data ) host = Util.JsonTobean(data, Host.class); hostInfo.put(host.getHost_name(), ip); hostCpuInfo.put(host.getHost_name(), host.getCpu().getCpu_total()); + hostNameInfo.put(ip, host.getHost_name()); data = redisCluster.get(Const.DOCKER, ip); @@ -343,6 +349,21 @@ public void exec( SendData sendData, String data ) pod_mem_used = 0d; pod_mem_limit = 0d; + + + nodeName = item.getSpec().getNodeName(); + nodeIP = hostInfo.get(nodeName); + + /** + * OKE같은 경우 Kubernetes Master의 node관리가 host ip로 관리되기ㅣ 때문에 + * 실제 host ip정보로 node name을 세팅한다 + */ + if( nodeIP == null ) + { + nodeIP = nodeName; + nodeName = hostNameInfo.get(nodeIP); + } + // Container for( Container container : item.getSpec().getContainers() ) { @@ -356,7 +377,7 @@ public void exec( SendData sendData, String data ) mem_used = 0d; mem_limit = 0d; - msg += "k8s_container,pod="+item.getMetadata().getName()+",container="+container.getName()+",namespace="+item.getMetadata().getNamespace()+",node_name="+item.getSpec().getNodeName()+",node_ip="+hostInfo.get(item.getSpec().getNodeName()); + msg += "k8s_container,pod="+item.getMetadata().getName()+",container="+container.getName()+",namespace="+item.getMetadata().getNamespace()+",node_name="+nodeName+",node_ip="+nodeIP; if( container.getResources() != null && container.getResources().getLimits() != null) { @@ -487,7 +508,7 @@ public void exec( SendData sendData, String data ) } // POD - msg += "k8s_pod,pod="+item.getMetadata().getName()+",namespace="+item.getMetadata().getNamespace()+",node_name="+item.getSpec().getNodeName()+",node_ip="+hostInfo.get(item.getSpec().getNodeName())+",pod_ip="+item.getStatus().getPodIP(); + msg += "k8s_pod,pod="+item.getMetadata().getName()+",namespace="+item.getMetadata().getNamespace()+",node_name="+nodeName+",node_ip="+nodeIP+",pod_ip="+item.getStatus().getPodIP(); // Pod phase Failed if( item.getStatus().getPhase() != null && "Failed".equals(item.getStatus().getPhase())) @@ -575,8 +596,10 @@ else if( "Unknown".equals(condition.getStatus()) ) pod_cpu_used = Float.parseFloat(dformat.format(pod_cpu_used)); + try{ - pod_cpu_used_percent = Float.parseFloat(dformat.format(((pod_cpu_used/hostCpuInfo.get(item.getSpec().getNodeName()))*100)*hostCpuInfo.get(item.getSpec().getNodeName()))); + pod_cpu_used_percent = Float.parseFloat(dformat.format(((pod_cpu_used/hostCpuInfo.get(nodeName))*100)*hostCpuInfo.get(nodeName))); + //pod_cpu_used_percent = Float.parseFloat(dformat.format(((pod_cpu_used/hostCpuInfo.get(item.getSpec().getNodeName()))*100)*hostCpuInfo.get(item.getSpec().getNodeName()))); }catch(Exception e){ pod_cpu_used_percent = 0f; } @@ -601,7 +624,8 @@ else if( "Unknown".equals(condition.getStatus()) ) // Node Used & Used percent nodeResource = null; - if( nodeResourceMap.get(item.getSpec().getNodeName()) == null ) + //if( nodeResourceMap.get(item.getSpec().getNodeName()) == null ) + if( nodeResourceMap.get(nodeName) == null ) { nodeResource = new Resource(); @@ -610,7 +634,8 @@ else if( "Unknown".equals(condition.getStatus()) ) } else { - nodeResource = nodeResourceMap.get(item.getSpec().getNodeName()); + //nodeResource = nodeResourceMap.get(item.getSpec().getNodeName()); + nodeResource = nodeResourceMap.get(nodeName); Double node_cpu_used = Double.parseDouble(nodeResource.getUsed().getCpu())+Double.parseDouble(Float.toString(pod_cpu_used)); Double node_mem_used = Double.parseDouble(nodeResource.getUsed().getMemory())+pod_mem_used; @@ -618,7 +643,8 @@ else if( "Unknown".equals(condition.getStatus()) ) nodeResource.getUsed().setCpu(""+node_cpu_used); nodeResource.getUsed().setMemory(""+node_mem_used); } - nodeResourceMap.put(item.getSpec().getNodeName(), nodeResource); + //nodeResourceMap.put(item.getSpec().getNodeName(), nodeResource); + nodeResourceMap.put(nodeName, nodeResource); msg += ",limit_cpu="+pod_limit_cpu+",request_cpu="+pod_request_cpu+",cpu_used="+pod_cpu_used+",cpu_used_percent="+pod_cpu_used_percent; @@ -718,10 +744,26 @@ else if( "Unknown".equals(condition.getStatus()) ) } } - msg += "k8s_node,node_name="+item.getMetadata().getName()+",node_ip="+hostInfo.get(item.getMetadata().getName()); + + nodeName = item.getMetadata().getName(); + nodeIP = hostInfo.get(nodeName); + + /** + * OKE같은 경우 Kubernetes Master의 node관리가 host ip로 관리되기ㅣ 때문에 + * 실제 host ip정보로 node name을 세팅한다 + */ + if( nodeIP == null ) + { + nodeIP = nodeName; + nodeName = hostNameInfo.get(nodeIP); + } + + //msg += "k8s_node,node_name="+item.getMetadata().getName()+",node_ip="+hostInfo.get(item.getMetadata().getName()); + msg += "k8s_node,node_name="+nodeName+",node_ip="+nodeIP; msg += " unschedulable="+unschedulable+",allocate_cpu="+item.getStatus().getCapacity().getCpu()+",allocate_mem="+item.getStatus().getAllocatable().getMemory()+",allocate_pod="+item.getStatus().getAllocatable().getPods()+",total_cpu="+item.getStatus().getCapacity().getCpu()+",total_mem="+item.getStatus().getCapacity().getMemory()+",total_pod="+item.getStatus().getCapacity().getPods(); - Resource resource = nodeResourceMap.get(item.getMetadata().getName()); + //Resource resource = nodeResourceMap.get(item.getMetadata().getName()); + Resource resource = nodeResourceMap.get(nodeName); if( resource != null ) { double node_cpu_used_percent= 0d; @@ -753,9 +795,13 @@ else if( "Unknown".equals(condition.getStatus()) ) msg += ",ready_true="+ready_true; msg += ",ready_unknown="+ready_unknown+"\n"; + item.getMetadata().setNode_name(nodeName); + item.getMetadata().setNode_ip(nodeIP); + + /* item.getMetadata().setNode_name(item.getMetadata().getName()); item.getMetadata().setNode_ip(hostInfo.get(item.getMetadata().getName())); - + */ cluster_total_cpus += Integer.parseInt(item.getStatus().getCapacity().getCpu()); cluster_total_mem += Double.parseDouble(item.getStatus().getAllocatable().getMemory()); cluster_total_pod += Integer.parseInt(item.getStatus().getAllocatable().getPods()); diff --git a/NexCollector/src/main/java/com/nexcloud/api/akka/actor/KafkaDockerConsumerActor.java b/NexCollector/src/main/java/com/nexcloud/api/akka/actor/KafkaDockerConsumerActor.java index e4c1a53..2d17d9f 100644 --- a/NexCollector/src/main/java/com/nexcloud/api/akka/actor/KafkaDockerConsumerActor.java +++ b/NexCollector/src/main/java/com/nexcloud/api/akka/actor/KafkaDockerConsumerActor.java @@ -17,7 +17,9 @@ import java.io.IOException; import java.net.URLEncoder; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.slf4j.Logger; @@ -148,7 +150,10 @@ public void shutdownCompleted(ShutdownSignalException cause) { Channel channel = connection.createChannel(); - channel.queueDeclare(sendData.getKafka_topic(), false, false, true, null); + Map args = new HashMap(); + args.put("x-queue-mode", "lazy"); + channel.queueDeclare(sendData.getKafka_topic(), false, false, false, args); + //channel.queueDeclare(sendData.getKafka_topic(), false, false, true, null); channel.addShutdownListener(new ShutdownListener() { @Override diff --git a/NexCollector/src/main/java/com/nexcloud/api/akka/actor/KafkaHostConsumerActor.java b/NexCollector/src/main/java/com/nexcloud/api/akka/actor/KafkaHostConsumerActor.java index 695d8fb..134f323 100644 --- a/NexCollector/src/main/java/com/nexcloud/api/akka/actor/KafkaHostConsumerActor.java +++ b/NexCollector/src/main/java/com/nexcloud/api/akka/actor/KafkaHostConsumerActor.java @@ -17,7 +17,9 @@ import java.io.IOException; import java.net.URLEncoder; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.slf4j.Logger; @@ -143,7 +145,11 @@ public void shutdownCompleted(ShutdownSignalException cause) { Channel channel = connection.createChannel(); - channel.queueDeclare(sendData.getKafka_topic(), false, false, true, null); + Map args = new HashMap(); + args.put("x-queue-mode", "lazy"); + channel.queueDeclare(sendData.getKafka_topic(), false, false, false, args); + //channel.queueDeclare(sendData.getKafka_topic(), false, false, true, null); + channel.addShutdownListener(new ShutdownListener() { @Override diff --git a/NexCollector/src/main/java/com/nexcloud/api/akka/actor/KafkaK8SAPIConsumerActor.java b/NexCollector/src/main/java/com/nexcloud/api/akka/actor/KafkaK8SAPIConsumerActor.java index 503d45c..20cd2ea 100644 --- a/NexCollector/src/main/java/com/nexcloud/api/akka/actor/KafkaK8SAPIConsumerActor.java +++ b/NexCollector/src/main/java/com/nexcloud/api/akka/actor/KafkaK8SAPIConsumerActor.java @@ -17,7 +17,9 @@ import java.io.IOException; import java.net.URLEncoder; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.slf4j.Logger; @@ -144,7 +146,10 @@ public void shutdownCompleted(ShutdownSignalException cause) { Channel channel = connection.createChannel(); - channel.queueDeclare(sendData.getKafka_topic(), false, false, true, null); + Map args = new HashMap(); + args.put("x-queue-mode", "lazy"); + channel.queueDeclare(sendData.getKafka_topic(), false, false, false, args); + //channel.queueDeclare(sendData.getKafka_topic(), false, false, true, null); channel.addShutdownListener(new ShutdownListener() { @Override diff --git a/NexCollector/src/main/java/com/nexcloud/api/rabitmq/Publish.java b/NexCollector/src/main/java/com/nexcloud/api/rabitmq/Publish.java index 4f70e4f..01f3665 100644 --- a/NexCollector/src/main/java/com/nexcloud/api/rabitmq/Publish.java +++ b/NexCollector/src/main/java/com/nexcloud/api/rabitmq/Publish.java @@ -225,10 +225,14 @@ public void shutdownCompleted(ShutdownSignalException cause) { //logger.error("Channel name::"+channelName); - channel.queueDeclare(channelName, false, false, true, null); + Map args = new HashMap(); + args.put("x-queue-mode", "lazy"); + channel.queueDeclare(channelName, false, false, false, args); + //channel.queueDeclare(channelName, false, false, true, null); channel.basicPublish("", channelName, null, message.getBytes()); - channel.queueDeclare(channelName+"_work", false, false, true, null); + channel.queueDeclare(channelName+"_work", false, false, false, args); + //channel.queueDeclare(channelName+"_work", false, false, true, null); channel.basicPublish("", channelName+"_work", null, message.getBytes()); channel.close();