Skip to content

Commit

Permalink
OKE에 대응 하기 위해 K8S정보중 nodeName을 Host name으로 변경함. OKE는 node name부분을 IP로…
Browse files Browse the repository at this point in the history
… 대체함.
  • Loading branch information
devleeskNex committed Apr 25, 2019
1 parent 21d85ae commit ebc864f
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 33 deletions.
Expand Up @@ -60,6 +60,7 @@ public class JsonK8SAPIParserActor extends UntypedActor{
private String data;

private Map<String, Containers> containerMap = new HashMap<String, Containers>();
private Map<String, String> hostNameInfo = new HashMap<String, String>();
private Map<String, String> hostInfo = new HashMap<String, String>();
private Map<String, Integer> hostCpuInfo = new HashMap<String, Integer>();

Expand Down Expand Up @@ -102,38 +103,41 @@ public void onReceive(Object message){

public void exec( SendData sendData, String data )
{
RedisCluster redisCluster = null;
String msg = "";
RedisCluster redisCluster = null;
String msg = "";

String pattern = "#####.###";
DecimalFormat dformat = new DecimalFormat( pattern );
String nodeName = null;
String nodeIP = null;

String pattern = "#####.###";
DecimalFormat dformat = new DecimalFormat( pattern );

ConsumerRecords<String, String> records = null;
ConsumerRecords<String, String> records = null;

Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();

try{
// Redis Cluster connection
redisCluster = RedisCluster.getInstance(sendData.getRedis_host(), Integer.parseInt(sendData.getRedis_port()));
redisCluster = RedisCluster.getInstance(sendData.getRedis_host(), Integer.parseInt(sendData.getRedis_port()));

/**
* Agent metric data
*/
ResponseData resData = null;
Header header = null;
String body = null;
ResponseData resData = null;
Header header = null;
String body = null;

msg = "";
msg = "";

Host host = null;
Docker docker = null;
Host host = null;
Docker docker = null;

K8SDaemonSetThread daemonsetThread = K8SDaemonSetThread.getInstance();
K8SDeploymentThread deploymentThread = K8SDeploymentThread.getInstance();
K8SEndpointThread endpointThread = K8SEndpointThread.getInstance();
K8SNamespaceThread namespaceThread = K8SNamespaceThread.getInstance();
K8SStatefulsetThread statefulThread = K8SStatefulsetThread.getInstance();
K8SReplicasetThread replicaThread = K8SReplicasetThread.getInstance();
K8SDaemonSetThread daemonsetThread = K8SDaemonSetThread.getInstance();
K8SDeploymentThread deploymentThread = K8SDeploymentThread.getInstance();
K8SEndpointThread endpointThread = K8SEndpointThread.getInstance();
K8SNamespaceThread namespaceThread = K8SNamespaceThread.getInstance();
K8SStatefulsetThread statefulThread = K8SStatefulsetThread.getInstance();
K8SReplicasetThread replicaThread = K8SReplicasetThread.getInstance();

resData = Util.JsonTobean(data, ResponseData.class);
header = resData.getHeader();
Expand All @@ -154,6 +158,7 @@ public void exec( SendData sendData, String data )
{
hostInfo.clear();
hostCpuInfo.clear();
hostNameInfo.clear();

ips = gson.fromJson(data, new TypeToken<List<String>>(){}.getType());

Expand All @@ -164,6 +169,7 @@ public void exec( SendData sendData, String data )
host = Util.JsonTobean(data, Host.class);
hostInfo.put(host.getHost_name(), ip);
hostCpuInfo.put(host.getHost_name(), host.getCpu().getCpu_total());
hostNameInfo.put(ip, host.getHost_name());


data = redisCluster.get(Const.DOCKER, ip);
Expand Down Expand Up @@ -343,6 +349,21 @@ public void exec( SendData sendData, String data )
pod_mem_used = 0d;
pod_mem_limit = 0d;



nodeName = item.getSpec().getNodeName();
nodeIP = hostInfo.get(nodeName);

/**
* OKE같은 경우 Kubernetes Master의 node관리가 host ip로 관리되기ㅣ 때문에
* 실제 host ip정보로 node name을 세팅한다
*/
if( nodeIP == null )
{
nodeIP = nodeName;
nodeName = hostNameInfo.get(nodeIP);
}

// Container
for( Container container : item.getSpec().getContainers() )
{
Expand All @@ -356,7 +377,7 @@ public void exec( SendData sendData, String data )
mem_used = 0d;
mem_limit = 0d;

msg += "k8s_container,pod="+item.getMetadata().getName()+",container="+container.getName()+",namespace="+item.getMetadata().getNamespace()+",node_name="+item.getSpec().getNodeName()+",node_ip="+hostInfo.get(item.getSpec().getNodeName());
msg += "k8s_container,pod="+item.getMetadata().getName()+",container="+container.getName()+",namespace="+item.getMetadata().getNamespace()+",node_name="+nodeName+",node_ip="+nodeIP;

if( container.getResources() != null && container.getResources().getLimits() != null)
{
Expand Down Expand Up @@ -487,7 +508,7 @@ public void exec( SendData sendData, String data )
}

// POD
msg += "k8s_pod,pod="+item.getMetadata().getName()+",namespace="+item.getMetadata().getNamespace()+",node_name="+item.getSpec().getNodeName()+",node_ip="+hostInfo.get(item.getSpec().getNodeName())+",pod_ip="+item.getStatus().getPodIP();
msg += "k8s_pod,pod="+item.getMetadata().getName()+",namespace="+item.getMetadata().getNamespace()+",node_name="+nodeName+",node_ip="+nodeIP+",pod_ip="+item.getStatus().getPodIP();

// Pod phase Failed
if( item.getStatus().getPhase() != null && "Failed".equals(item.getStatus().getPhase()))
Expand Down Expand Up @@ -575,8 +596,10 @@ else if( "Unknown".equals(condition.getStatus()) )

pod_cpu_used = Float.parseFloat(dformat.format(pod_cpu_used));


try{
pod_cpu_used_percent = Float.parseFloat(dformat.format(((pod_cpu_used/hostCpuInfo.get(item.getSpec().getNodeName()))*100)*hostCpuInfo.get(item.getSpec().getNodeName())));
pod_cpu_used_percent = Float.parseFloat(dformat.format(((pod_cpu_used/hostCpuInfo.get(nodeName))*100)*hostCpuInfo.get(nodeName)));
//pod_cpu_used_percent = Float.parseFloat(dformat.format(((pod_cpu_used/hostCpuInfo.get(item.getSpec().getNodeName()))*100)*hostCpuInfo.get(item.getSpec().getNodeName())));
}catch(Exception e){
pod_cpu_used_percent = 0f;
}
Expand All @@ -601,7 +624,8 @@ else if( "Unknown".equals(condition.getStatus()) )

// Node Used & Used percent
nodeResource = null;
if( nodeResourceMap.get(item.getSpec().getNodeName()) == null )
//if( nodeResourceMap.get(item.getSpec().getNodeName()) == null )
if( nodeResourceMap.get(nodeName) == null )
{
nodeResource = new Resource();

Expand All @@ -610,15 +634,17 @@ else if( "Unknown".equals(condition.getStatus()) )
}
else
{
nodeResource = nodeResourceMap.get(item.getSpec().getNodeName());
//nodeResource = nodeResourceMap.get(item.getSpec().getNodeName());
nodeResource = nodeResourceMap.get(nodeName);

Double node_cpu_used = Double.parseDouble(nodeResource.getUsed().getCpu())+Double.parseDouble(Float.toString(pod_cpu_used));
Double node_mem_used = Double.parseDouble(nodeResource.getUsed().getMemory())+pod_mem_used;

nodeResource.getUsed().setCpu(""+node_cpu_used);
nodeResource.getUsed().setMemory(""+node_mem_used);
}
nodeResourceMap.put(item.getSpec().getNodeName(), nodeResource);
//nodeResourceMap.put(item.getSpec().getNodeName(), nodeResource);
nodeResourceMap.put(nodeName, nodeResource);


msg += ",limit_cpu="+pod_limit_cpu+",request_cpu="+pod_request_cpu+",cpu_used="+pod_cpu_used+",cpu_used_percent="+pod_cpu_used_percent;
Expand Down Expand Up @@ -718,10 +744,26 @@ else if( "Unknown".equals(condition.getStatus()) )
}
}

msg += "k8s_node,node_name="+item.getMetadata().getName()+",node_ip="+hostInfo.get(item.getMetadata().getName());

nodeName = item.getMetadata().getName();
nodeIP = hostInfo.get(nodeName);

/**
* OKE같은 경우 Kubernetes Master의 node관리가 host ip로 관리되기ㅣ 때문에
* 실제 host ip정보로 node name을 세팅한다
*/
if( nodeIP == null )
{
nodeIP = nodeName;
nodeName = hostNameInfo.get(nodeIP);
}

//msg += "k8s_node,node_name="+item.getMetadata().getName()+",node_ip="+hostInfo.get(item.getMetadata().getName());
msg += "k8s_node,node_name="+nodeName+",node_ip="+nodeIP;
msg += " unschedulable="+unschedulable+",allocate_cpu="+item.getStatus().getCapacity().getCpu()+",allocate_mem="+item.getStatus().getAllocatable().getMemory()+",allocate_pod="+item.getStatus().getAllocatable().getPods()+",total_cpu="+item.getStatus().getCapacity().getCpu()+",total_mem="+item.getStatus().getCapacity().getMemory()+",total_pod="+item.getStatus().getCapacity().getPods();

Resource resource = nodeResourceMap.get(item.getMetadata().getName());
//Resource resource = nodeResourceMap.get(item.getMetadata().getName());
Resource resource = nodeResourceMap.get(nodeName);
if( resource != null )
{
double node_cpu_used_percent= 0d;
Expand Down Expand Up @@ -753,9 +795,13 @@ else if( "Unknown".equals(condition.getStatus()) )
msg += ",ready_true="+ready_true;
msg += ",ready_unknown="+ready_unknown+"\n";

item.getMetadata().setNode_name(nodeName);
item.getMetadata().setNode_ip(nodeIP);

/*
item.getMetadata().setNode_name(item.getMetadata().getName());
item.getMetadata().setNode_ip(hostInfo.get(item.getMetadata().getName()));

*/
cluster_total_cpus += Integer.parseInt(item.getStatus().getCapacity().getCpu());
cluster_total_mem += Double.parseDouble(item.getStatus().getAllocatable().getMemory());
cluster_total_pod += Integer.parseInt(item.getStatus().getAllocatable().getPods());
Expand Down
Expand Up @@ -17,7 +17,9 @@
import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
Expand Down Expand Up @@ -148,7 +150,10 @@ public void shutdownCompleted(ShutdownSignalException cause) {

Channel channel = connection.createChannel();

channel.queueDeclare(sendData.getKafka_topic(), false, false, true, null);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare(sendData.getKafka_topic(), false, false, false, args);
//channel.queueDeclare(sendData.getKafka_topic(), false, false, true, null);

channel.addShutdownListener(new ShutdownListener() {
@Override
Expand Down
Expand Up @@ -17,7 +17,9 @@
import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
Expand Down Expand Up @@ -143,7 +145,11 @@ public void shutdownCompleted(ShutdownSignalException cause) {

Channel channel = connection.createChannel();

channel.queueDeclare(sendData.getKafka_topic(), false, false, true, null);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare(sendData.getKafka_topic(), false, false, false, args);
//channel.queueDeclare(sendData.getKafka_topic(), false, false, true, null);


channel.addShutdownListener(new ShutdownListener() {
@Override
Expand Down
Expand Up @@ -17,7 +17,9 @@
import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
Expand Down Expand Up @@ -144,7 +146,10 @@ public void shutdownCompleted(ShutdownSignalException cause) {

Channel channel = connection.createChannel();

channel.queueDeclare(sendData.getKafka_topic(), false, false, true, null);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare(sendData.getKafka_topic(), false, false, false, args);
//channel.queueDeclare(sendData.getKafka_topic(), false, false, true, null);

channel.addShutdownListener(new ShutdownListener() {
@Override
Expand Down
Expand Up @@ -225,10 +225,14 @@ public void shutdownCompleted(ShutdownSignalException cause) {

//logger.error("Channel name::"+channelName);

channel.queueDeclare(channelName, false, false, true, null);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare(channelName, false, false, false, args);
//channel.queueDeclare(channelName, false, false, true, null);
channel.basicPublish("", channelName, null, message.getBytes());

channel.queueDeclare(channelName+"_work", false, false, true, null);
channel.queueDeclare(channelName+"_work", false, false, false, args);
//channel.queueDeclare(channelName+"_work", false, false, true, null);
channel.basicPublish("", channelName+"_work", null, message.getBytes());

channel.close();
Expand Down

0 comments on commit ebc864f

Please sign in to comment.