Skip to content
Permalink
Browse files
AMBARI-24974. Sometimes Task Log is not refreshed in UI after operati…
…on completes. (mpapirkovskyy) (#2745)
  • Loading branch information
mpapirkovskyy committed Dec 27, 2018
1 parent d069e15 commit 63df198782b4caba069efd0cee35ef875ab550b8
Showing 10 changed files with 693 additions and 5 deletions.
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.api.stomp;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import org.springframework.web.socket.messaging.SessionUnsubscribeEvent;

@Component
public class NamedTasksSubscribeListener {
private static Logger LOG = LoggerFactory.getLogger(NamedTasksSubscribeListener.class);

@Autowired
private NamedTasksSubscriptions namedTasksSubscriptions;

@EventListener
public void subscribe(SessionSubscribeEvent sse)
{
MessageHeaders msgHeaders = sse.getMessage().getHeaders();
String sessionId = (String) msgHeaders.get("simpSessionId");
String destination = (String) msgHeaders.get("simpDestination");
String id = (String) msgHeaders.get("simpSubscriptionId");
if (sessionId != null && destination != null && id != null) {
namedTasksSubscriptions.addDestination(sessionId, destination, id);
}
LOG.info(String.format("API subscribe was arrived with sessionId = %s, destination = %s and id = %s",
sessionId, destination, id));
}

@EventListener
public void unsubscribe(SessionUnsubscribeEvent suse)
{
MessageHeaders msgHeaders = suse.getMessage().getHeaders();
String sessionId = (String) msgHeaders.get("simpSessionId");
String id = (String) msgHeaders.get("simpSubscriptionId");
if (sessionId != null && id != null) {
namedTasksSubscriptions.removeId(sessionId, id);
}
LOG.info(String.format("API unsubscribe was arrived with sessionId = %s and id = %s",
sessionId, id));
}

@EventListener
public void disconnect(SessionDisconnectEvent sde)
{
MessageHeaders msgHeaders = sde.getMessage().getHeaders();
String sessionId = (String) msgHeaders.get("simpSessionId");
if (sessionId != null) {
namedTasksSubscriptions.removeSession(sessionId);
}
LOG.info(String.format("API disconnect was arrived with sessionId = %s",
sessionId));
}
}
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.api.stomp;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;

@Singleton
public class NamedTasksSubscriptions {
private static Logger LOG = LoggerFactory.getLogger(NamedTasksSubscriptions.class);

private ConcurrentHashMap<String, List<SubscriptionId>> taskIds = new ConcurrentHashMap<>();
private final Pattern pattern = Pattern.compile("^/events/tasks/(\\d*)$");
private final Lock taskIdsLock = new ReentrantLock();

private Provider<TaskStatusListener> taskStatusListenerProvider;

@Inject
public NamedTasksSubscriptions(Provider<TaskStatusListener> taskStatusListenerProvider) {
this.taskStatusListenerProvider = taskStatusListenerProvider;
}

public void addTaskId(String sessionId, Long taskId, String id) {
try {
taskIdsLock.lock();
taskIds.compute(sessionId, (sid, ids) -> {
if (ids == null) {
ids = new ArrayList<>();
}
AtomicBoolean completed = new AtomicBoolean(false);
taskStatusListenerProvider.get().getActiveTasksMap().computeIfPresent(taskId, (tid, task) -> {
if (task.getStatus().isCompletedState()) {
completed.set(true);
}
return task;
});
if (!completed.get()) {
ids.add(new SubscriptionId(taskId, id));
}
return ids;
});
LOG.info(String.format("Task subscription was added for sessionId = %s, taskId = %s, id = %s",
sessionId, taskId, id));
} finally {
taskIdsLock.unlock();
}
}

public void removeId(String sessionId, String id) {
taskIds.computeIfPresent(sessionId, (sid, tasks) -> {
Iterator<SubscriptionId> iterator = tasks.iterator();
while (iterator.hasNext()) {
if (iterator.next().getId().equals(id)) {
iterator.remove();
LOG.info(String.format("Task subscription was removed for sessionId = %s, id = %s", sessionId, id));
}
}
return tasks;
});
}

public void removeTaskId(Long taskId) {
try {
taskIdsLock.lock();
for (String sessionId : taskIds.keySet()) {
taskIds.computeIfPresent(sessionId, (id, tasks) -> {
Iterator<SubscriptionId> iterator = tasks.iterator();
while (iterator.hasNext()) {
if (iterator.next().getTaskId().equals(taskId)) {
iterator.remove();
LOG.info(String.format("Task subscription was removed for sessionId = %s and taskId = %s",
sessionId, taskId));
}
}
return tasks;
});
}
} finally {
taskIdsLock.unlock();
}
}

public void removeSession(String sessionId) {
try {
taskIdsLock.lock();
taskIds.remove(sessionId);
LOG.info(String.format("Task subscriptions were removed for sessionId = %s", sessionId));
} finally {
taskIdsLock.unlock();
}
}

public Long matchDestination(String destination) {
Matcher m = pattern.matcher(destination);
if (m.matches()) {
return Long.parseLong(m.group(1));
}
return null;
}

public void addDestination(String sessionId, String destination, String id) {
Long taskId = matchDestination(destination);
if (taskId != null) {
addTaskId(sessionId, taskId, id);
}
}

public boolean checkTaskId(Long taskId) {
for (List<SubscriptionId> ids: taskIds.values()) {
for (SubscriptionId subscriptionId : ids) {
if (subscriptionId.getTaskId().equals(taskId)) {
return true;
}
}
}
return false;
}

public class SubscriptionId {
private final Long taskId;
private final String id;

public SubscriptionId(Long taskId, String id) {
this.taskId = taskId;
this.id = id;
}

public Long getTaskId() {
return taskId;
}

public String getId() {
return id;
}
}
}
@@ -20,7 +20,6 @@
import javax.servlet.ServletContext;

import org.apache.ambari.server.agent.stomp.HeartbeatController;
import org.apache.ambari.server.api.stomp.TestController;
import org.apache.ambari.server.events.DefaultMessageEmitter;
import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
@@ -41,7 +40,7 @@

@Configuration
@EnableWebSocketMessageBroker
@ComponentScan(basePackageClasses = {TestController.class, HeartbeatController.class})
@ComponentScan(basePackageClasses = {HeartbeatController.class})
@Import({RootStompConfig.class,GuiceBeansConfig.class})
public class AgentStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
private org.apache.ambari.server.configuration.Configuration configuration;
@@ -17,6 +17,7 @@
*/
package org.apache.ambari.server.configuration.spring;

import org.apache.ambari.server.api.stomp.NamedTasksSubscriptions;
import org.apache.ambari.server.api.stomp.TestController;
import org.apache.ambari.server.events.DefaultMessageEmitter;
import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener;
@@ -50,6 +51,11 @@ public STOMPUpdateListener requestSTOMPListener(Injector injector) {
return new STOMPUpdateListener(injector, DefaultMessageEmitter.DEFAULT_API_EVENT_TYPES);
}

@Bean
public NamedTasksSubscriptions namedTasksSubscribtions(Injector injector) {
return injector.getInstance(NamedTasksSubscriptions.class);
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/v1")
@@ -43,6 +43,7 @@ public class DefaultMessageEmitter extends MessageEmitter {
put(STOMPEvent.Type.AGENT_CONFIGS, "/configs");
put(STOMPEvent.Type.CONFIGS, "/events/configs");
put(STOMPEvent.Type.HOSTCOMPONENT, "/events/hostcomponents");
put(STOMPEvent.Type.NAMEDTASK, "/events/tasks");
put(STOMPEvent.Type.REQUEST, "/events/requests");
put(STOMPEvent.Type.SERVICE, "/events/services");
put(STOMPEvent.Type.HOST, "/events/hosts");
@@ -72,6 +73,7 @@ public class DefaultMessageEmitter extends MessageEmitter {
STOMPEvent.Type.UI_TOPOLOGY,
STOMPEvent.Type.CONFIGS,
STOMPEvent.Type.HOSTCOMPONENT,
STOMPEvent.Type.NAMEDTASK,
STOMPEvent.Type.REQUEST,
STOMPEvent.Type.SERVICE,
STOMPEvent.Type.HOST,
@@ -103,6 +105,6 @@ public void emitMessage(STOMPEvent event) throws AmbariException {

@Override
protected String getDestination(STOMPEvent stompEvent) {
return DEFAULT_DESTINATIONS.get(stompEvent.getType());
return stompEvent.completeDestination(DEFAULT_DESTINATIONS.get(stompEvent.getType()));
}
}

0 comments on commit 63df198

Please sign in to comment.