Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
refactoring daos to enable different implementations and separation o…
Browse files Browse the repository at this point in the history
…f concern
  • Loading branch information
apanicker-nflx committed Jan 31, 2020
1 parent 7a83695 commit e602bad
Show file tree
Hide file tree
Showing 48 changed files with 882 additions and 634 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.core.execution.ApplicationException.Code;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.PollDataDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.util.Statements;
import java.util.ArrayList;
Expand All @@ -57,7 +58,7 @@
import org.slf4j.LoggerFactory;

@Trace
public class CassandraExecutionDAO extends CassandraBaseDAO implements ExecutionDAO {
public class CassandraExecutionDAO extends CassandraBaseDAO implements ExecutionDAO, PollDataDAO {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraExecutionDAO.class);
private static final String CLASS_NAME = CassandraExecutionDAO.class.getSimpleName();

Expand Down Expand Up @@ -530,7 +531,7 @@ public List<EventExecution> getEventExecutions(String eventHandlerName, String e
* for Cassandra backed Conductor
*/
@Override
public void updateLastPoll(String taskDefName, String domain, String workerId) {
public void updateLastPollData(String taskDefName, String domain, String workerId) {
throw new UnsupportedOperationException("This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -13,21 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
*
*/
package com.netflix.conductor.common.metadata.events;

import com.github.vmg.protogen.annotations.ProtoEnum;
import com.github.vmg.protogen.annotations.ProtoField;
import com.github.vmg.protogen.annotations.ProtoMessage;
import com.google.protobuf.Any;
import com.github.vmg.protogen.annotations.*;

import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

/**
* @author Viren
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public Map<String, Map<String, Long>> getQueueSizes() {

private void refresh() {
try {
Set<String> events = metadataService.getEventHandlers().stream()
Set<String> events = metadataService.getAllEventHandlers().stream()
.map(EventHandler::getEvent)
.collect(Collectors.toSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,13 +473,13 @@ public void rewind(String workflowId, boolean useLatestDefinitions) {

WorkflowDef workflowDef;
if (useLatestDefinitions) {
workflowDef = metadataDAO.getLatest(workflow.getWorkflowName())
workflowDef = metadataDAO.getLatestWorkflowDef(workflow.getWorkflowName())
.orElseThrow(() -> new ApplicationException(NOT_FOUND, String.format("Unable to find latest definition for %s", workflowId)));
workflow.setVersion(workflowDef.getVersion()); // setting this here to ensure backward compatibility and consistency for workflows without the embedded workflow definition
workflow.setWorkflowDefinition(workflowDef);
} else {
workflowDef = Optional.ofNullable(workflow.getWorkflowDefinition())
.orElseGet(() -> metadataDAO.get(workflow.getWorkflowName(), workflow.getWorkflowVersion())
.orElseGet(() -> metadataDAO.getWorkflowDef(workflow.getWorkflowName(), workflow.getWorkflowVersion())
.orElseThrow(() -> new ApplicationException(NOT_FOUND, String.format("Unable to find definition for %s", workflowId)))
);
}
Expand Down Expand Up @@ -638,7 +638,7 @@ void completeWorkflow(Workflow wf) {
if (workflow.getParentWorkflowId() != null) {
Workflow parent = executionDAOFacade.getWorkflowById(workflow.getParentWorkflowId(), false);
WorkflowDef parentDef = Optional.ofNullable(parent.getWorkflowDefinition())
.orElseGet(() -> metadataDAO.get(parent.getWorkflowName(), parent.getWorkflowVersion())
.orElseGet(() -> metadataDAO.getWorkflowDef(parent.getWorkflowName(), parent.getWorkflowVersion())
.orElseThrow(() -> new ApplicationException(NOT_FOUND, String.format("Unable to find parent workflow definition for %s", wf.getWorkflowId())))
);
LOGGER.debug("Completed sub-workflow {}, deciding parent workflow {}", workflow.getWorkflowId(), workflow.getParentWorkflowId());
Expand Down Expand Up @@ -734,7 +734,7 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
input.put("failureStatus", workflow.getStatus().toString());

try {
WorkflowDef latestFailureWorkflow = metadataDAO.getLatest(failureWorkflow)
WorkflowDef latestFailureWorkflow = metadataDAO.getLatestWorkflowDef(failureWorkflow)
.orElseThrow(() ->
new RuntimeException("Failure Workflow Definition not found for: " + failureWorkflow)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private Integer getSubWorkflowVersion(Map<String, Object> resolvedParams, String
.map(Object::toString)
.map(Integer::parseInt)
.orElseGet(
() -> metadataDAO.getLatest(subWorkflowName)
() -> metadataDAO.getLatestWorkflowDef(subWorkflowName)
.map(WorkflowDef::getVersion)
.orElseThrow(() -> {
String reason = String.format("The Task %s defined as a sub-workflow has no workflow definition available ", subWorkflowName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,42 @@
/*
* Copyright 2019 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.conductor.core.execution.tasks;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.service.MetadataService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class IsolatedTaskQueueProducer {

private static Logger logger = LoggerFactory.getLogger(IsolatedTaskQueueProducer.class);
private static final Logger logger = LoggerFactory.getLogger(IsolatedTaskQueueProducer.class);
private MetadataService metadataService;
private int pollingTimeOut;


@Inject
public IsolatedTaskQueueProducer(MetadataService metadataService, Configuration config) {
Expand All @@ -34,31 +46,28 @@ public IsolatedTaskQueueProducer(MetadataService metadataService, Configuration

if (listenForIsolationGroups) {

this.pollingTimeOut = config.getIntProperty("workflow.isolated.system.task.poll.time.secs", 10);
int pollingTimeOut = config.getIntProperty("workflow.isolated.system.task.poll.time.secs", 10);
logger.info("Listening for isolation groups");

Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(() -> addTaskQueues(), 1000, pollingTimeOut, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::addTaskQueues, 1000,
pollingTimeOut, TimeUnit.SECONDS);
} else {
logger.info("Isolated System Task Worker DISABLED");
}

}


private Set<TaskDef> getIsolationExecutionNameSpaces() {

Set<TaskDef> isolationExecutionNameSpaces = Collections.emptySet();

try {

List<TaskDef> taskDefs = metadataService.getTaskDefs();
isolationExecutionNameSpaces = taskDefs.stream().
filter(taskDef -> StringUtils.isNotBlank(taskDef.getIsolationGroupId())|| StringUtils.isNotBlank(taskDef.getExecutionNameSpace())).
collect(Collectors.toSet());

isolationExecutionNameSpaces = taskDefs.stream()
.filter(taskDef -> StringUtils.isNotBlank(taskDef.getIsolationGroupId()) || StringUtils
.isNotBlank(taskDef.getExecutionNameSpace()))
.collect(Collectors.toSet());
} catch (RuntimeException unknownException) {

logger.error("Unknown exception received in getting isolation groups, sleeping and retrying", unknownException);
logger.error("Unknown exception received in getting isolation groups, sleeping and retrying",
unknownException);
}
return isolationExecutionNameSpaces;
}
Expand All @@ -80,5 +89,4 @@ void addTaskQueues() {
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ public WorkflowDef lookupForWorkflowDefinition(String name, Integer version) {
@VisibleForTesting
Optional<WorkflowDef> lookupWorkflowDefinition(String workflowName, int workflowVersion) {
Preconditions.checkArgument(StringUtils.isNotBlank(workflowName), "Workflow name must be specified when searching for a definition");
return metadataDAO.get(workflowName, workflowVersion);
return metadataDAO.getWorkflowDef(workflowName, workflowVersion);
}

@VisibleForTesting
Optional<WorkflowDef> lookupLatestWorkflowDefinition(String workflowName) {
Preconditions.checkArgument(StringUtils.isNotBlank(workflowName), "Workflow name must be specified when searching for a definition");
return metadataDAO.getLatest(workflowName);
return metadataDAO.getLatestWorkflowDef(workflowName);
}

public Workflow populateWorkflowWithDefinitions(Workflow workflow) {
Expand Down Expand Up @@ -125,7 +125,7 @@ private void populateVersionForSubWorkflow(WorkflowTask workflowTask) {
if (subworkflowParams.getVersion() == null) {
String subWorkflowName = subworkflowParams.getName();
Integer subWorkflowVersion =
metadataDAO.getLatest(subWorkflowName)
metadataDAO.getLatestWorkflowDef(subWorkflowName)
.map(WorkflowDef::getVersion)
.orElseThrow(
() -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 Netflix, Inc.
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -12,6 +12,8 @@
*/
package com.netflix.conductor.core.orchestration;

import static com.netflix.conductor.core.execution.WorkflowExecutor.DECIDER_QUEUE;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.PollData;
Expand All @@ -26,8 +28,9 @@
import com.netflix.conductor.core.execution.ApplicationException.Code;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.dao.PollDataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.dao.RateLimitingDao;
import com.netflix.conductor.dao.RateLimitingDAO;
import com.netflix.conductor.metrics.Monitors;
import java.io.IOException;
import java.util.Collections;
Expand All @@ -42,10 +45,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.netflix.conductor.core.execution.WorkflowExecutor.DECIDER_QUEUE;

/**
* Service that acts as a facade for accessing execution data from the {@link ExecutionDAO}, {@link RateLimitingDao} and {@link IndexDAO} storage layers
* Service that acts as a facade for accessing execution data from the {@link ExecutionDAO}, {@link RateLimitingDAO} and {@link IndexDAO} storage layers
*/
@Singleton
public class ExecutionDAOFacade {
Expand All @@ -57,23 +58,21 @@ public class ExecutionDAOFacade {
private final ExecutionDAO executionDAO;
private final QueueDAO queueDAO;
private final IndexDAO indexDAO;
private final RateLimitingDao rateLimitingDao;
private final RateLimitingDAO rateLimitingDao;
private final PollDataDAO pollDataDAO;
private final ObjectMapper objectMapper;
private final Configuration config;

private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;

@Inject
public ExecutionDAOFacade(ExecutionDAO executionDAO,
QueueDAO queueDAO,
IndexDAO indexDAO,
RateLimitingDao rateLimitingDao,
ObjectMapper objectMapper,
Configuration config) {
public ExecutionDAOFacade(ExecutionDAO executionDAO, QueueDAO queueDAO, IndexDAO indexDAO,
RateLimitingDAO rateLimitingDao, PollDataDAO pollDataDAO, ObjectMapper objectMapper, Configuration config) {
this.executionDAO = executionDAO;
this.queueDAO = queueDAO;
this.indexDAO = indexDAO;
this.rateLimitingDao = rateLimitingDao;
this.pollDataDAO = pollDataDAO;
this.objectMapper = objectMapper;
this.config = config;
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(4,
Expand Down Expand Up @@ -360,15 +359,15 @@ public void removeTask(String taskId) {
}

public List<PollData> getTaskPollData(String taskName) {
return executionDAO.getPollData(taskName);
return pollDataDAO.getPollData(taskName);
}

public PollData getTaskPollDataByDomain(String taskName, String domain) {
return executionDAO.getPollData(taskName, domain);
return pollDataDAO.getPollData(taskName, domain);
}

public void updateTaskLastPoll(String taskName, String domain, String workerId) {
executionDAO.updateLastPoll(taskName, domain, workerId);
pollDataDAO.updateLastPollData(taskName, domain, workerId);
}

/**
Expand Down
53 changes: 53 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/EventHandlerDAO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.conductor.dao;

import com.netflix.conductor.common.metadata.events.EventHandler;
import java.util.List;

/**
* An abstraction to enable different Event Handler store implementations
*/
public interface EventHandlerDAO {

/**
* @param eventHandler Event handler to be added. Will throw an exception if an event handler already exists with
* the name
*/
void addEventHandler(EventHandler eventHandler);

/**
* @param eventHandler Event handler to be updated.
*/
void updateEventHandler(EventHandler eventHandler);

/**
* @param name Removes the event handler from the system
*/
void removeEventHandler(String name);

/**
* @return All the event handlers registered in the system
*/
List<EventHandler> getAllEventHandlers();

/**
* @param event name of the event
* @param activeOnly if true, returns only the active handlers
* @return Returns the list of all the event handlers for a given event
*/
List<EventHandler> getEventHandlersForEvent(String event, boolean activeOnly);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.run.Workflow;

import java.util.List;

/**
Expand Down Expand Up @@ -245,11 +244,4 @@ public interface ExecutionDAO {
* @return list of matching events
*/
List<EventExecution> getEventExecutions(String eventHandlerName, String eventName, String messageId, int max);

void updateLastPoll(String taskDefName, String domain, String workerId);

PollData getPollData(String taskDefName, String domain);

List<PollData> getPollData(String taskDefName);

}
Loading

0 comments on commit e602bad

Please sign in to comment.