Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SLING-9389: Distribution Event Packages should contain queue item creation time #41

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.distribution.api</artifactId>
<version>0.4.0</version>
<version>0.4.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.sling</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.common.DistributionException;
Expand All @@ -36,8 +37,8 @@
import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -126,7 +127,8 @@ private Collection<SimpleDistributionResponse> scheduleImportPackage(Distributio
}

distributionEventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_QUEUED,
DistributionComponentKind.AGENT, agentName, distributionPackage.getInfo());
DistributionComponentKind.AGENT, agentName, distributionPackage.getInfo(),
null);
} catch (DistributionException e) {
log.error("an error happened during dispatching items to the queue(s)", e);
distributionResponses.add(new SimpleDistributionResponse(DistributionRequestState.DROPPED, e.toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,43 @@
*/
package org.apache.sling.distribution.agent.impl;

import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.component.impl.DistributionComponentKind;
import org.apache.sling.distribution.component.impl.SettingsUtils;
import org.apache.sling.distribution.event.DistributionEventTopics;
import org.apache.sling.distribution.event.impl.DistributionEventFactory;
import org.apache.sling.distribution.impl.CompositeDistributionResponse;
import org.apache.sling.distribution.impl.SimpleDistributionResponse;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.impl.DistributionPackageExporter;
import org.apache.sling.distribution.packaging.impl.DistributionPackageImporter;
import org.apache.sling.distribution.packaging.impl.DistributionPackageProcessor;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.DistributionQueueState;
import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.impl.SimpleAgentDistributionQueue;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.trigger.DistributionTrigger;
import org.apache.sling.distribution.util.impl.DistributionUtils;
import org.apache.sling.jcr.api.SlingRepository;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Basic implementation of a {@link DistributionAgent}
*/
Expand Down Expand Up @@ -355,7 +355,8 @@ public void disable() {

private void generatePackageEvent(String topic, DistributionPackage... distributionPackages) {
for (DistributionPackage distributionPackage : distributionPackages) {
distributionEventFactory.generatePackageEvent(topic, DistributionComponentKind.AGENT, name, distributionPackage.getInfo());
distributionEventFactory.generatePackageEvent(topic, DistributionComponentKind.AGENT, name,
distributionPackage.getInfo(), null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.sling.distribution.agent.impl;

import java.util.Calendar;

import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.common.DistributionException;
Expand All @@ -33,9 +35,9 @@
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
import org.apache.sling.distribution.util.impl.DistributionUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
Expand Down Expand Up @@ -108,6 +110,8 @@ private boolean processQueueItem(String queueName, DistributionQueueEntry queueE
DistributionPackage distributionPackage = null;
DistributionQueueItem queueItem = queueEntry.getItem();
DistributionQueueItemStatus queueItemStatus = queueEntry.getStatus();

final Calendar queueItemCreationTime = queueItemStatus.getEntered();
try {

String callingUser = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER, String.class);
Expand Down Expand Up @@ -135,7 +139,7 @@ private boolean processQueueItem(String queueName, DistributionQueueEntry queueE

// generated event
distributionEventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED,
DistributionComponentKind.AGENT, agentName, distributionPackage.getInfo());
DistributionComponentKind.AGENT, agentName, distributionPackage.getInfo(), queueItemCreationTime);

removeItemFromQueue = true;
final long endTime = System.currentTimeMillis();
Expand All @@ -152,7 +156,7 @@ private boolean processQueueItem(String queueName, DistributionQueueEntry queueE
if (errorQueueStrategy != null && queueItemStatus.getAttempts() > retryAttempts) {
removeItemFromQueue = reEnqueuePackage(distributionPackage);
distributionEventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DROPPED,
DistributionComponentKind.AGENT, agentName, distributionPackage.getInfo());
DistributionComponentKind.AGENT, agentName, distributionPackage.getInfo(), queueItemCreationTime);
distributionLog.info("[{}] PACKAGE-QUEUED {}: distribution package {} was enqueued to an error queue", queueName, requestId, distributionPackage.getId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.sling.distribution.event.impl;

import java.util.Calendar;
import java.util.Dictionary;
import java.util.Hashtable;
import org.apache.felix.scr.annotations.Component;
Expand Down Expand Up @@ -50,7 +51,8 @@ private void generateEvent(@NotNull String distributionEventTopic, @NotNull Dict
}

public void generatePackageEvent(@NotNull String distributionEventTopic, @NotNull DistributionComponentKind kind,
@NotNull String name, @NotNull DistributionPackageInfo info) {
@NotNull String name, @NotNull DistributionPackageInfo info,
Calendar queueItemCreationTime) {
try {
Dictionary<String, Object> dictionary = new Hashtable<String, Object>();
dictionary.put(DistributionEventProperties.DISTRIBUTION_COMPONENT_NAME, name);
Expand All @@ -61,8 +63,12 @@ public void generatePackageEvent(@NotNull String distributionEventTopic, @NotNul
if (info.getPaths() != null) {
dictionary.put(DistributionEventProperties.DISTRIBUTION_PATHS, info.getPaths());
}
generateEvent(distributionEventTopic, dictionary);

if(null != queueItemCreationTime) {
dictionary.put(DistributionEventProperties.DISTRIBUTION_ENQUEUE_TIMESTAMP, queueItemCreationTime.getTimeInMillis());
}

generateEvent(distributionEventTopic, dictionary);
} catch (Throwable e) {
log.error("Cannot generate package event", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.sling.distribution.event.impl;

import java.util.Calendar;
import org.apache.sling.distribution.component.impl.DistributionComponentKind;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import org.jetbrains.annotations.NotNull;
Expand All @@ -33,6 +34,6 @@ public interface DistributionEventFactory {
* @param distributionEventType the type of event to be generated
*/
void generatePackageEvent(@NotNull String distributionEventType, @NotNull DistributionComponentKind kind,
@NotNull String name, @NotNull DistributionPackageInfo info);
@NotNull String name, @NotNull DistributionPackageInfo info, Calendar queueItemCreationTime);

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;

import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.component.impl.DistributionComponentKind;
import org.apache.sling.distribution.event.DistributionEventTopics;
import org.apache.sling.distribution.event.impl.DistributionEventFactory;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.packaging.impl.DistributionPackageImporter;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import org.apache.sling.distribution.packaging.impl.DistributionPackageImporter;
import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
import org.apache.sling.distribution.packaging.impl.ReferencePackage;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -79,7 +80,8 @@ public void importPackage(@NotNull ResourceResolver resourceResolver, @NotNull D
log.warn("could not install distribution package {}", distributionPackage.getId());
}

eventFactory.generatePackageEvent(DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED, DistributionComponentKind.IMPORTER, name, distributionPackage.getInfo());
eventFactory.generatePackageEvent(DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED, DistributionComponentKind.IMPORTER,
name, distributionPackage.getInfo(), null);
}

@Override
Expand All @@ -105,7 +107,8 @@ public DistributionPackageInfo importStream(@NotNull ResourceResolver resourceRe
if (packageBuilder.installPackage(resourceResolver, distributionPackage)) {
DistributionPackageInfo info = distributionPackage.getInfo();
log.info("package installed {}", info);
eventFactory.generatePackageEvent(DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED, DistributionComponentKind.IMPORTER, name, info);
eventFactory.generatePackageEvent(DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED,
DistributionComponentKind.IMPORTER, name, info, null);
return info;
} else {
throw new DistributionException("could not install package {}" + distributionPackage);
Expand Down Expand Up @@ -134,13 +137,15 @@ public DistributionPackageInfo importStream(@NotNull ResourceResolver resourceRe
packageInfo = packageBuilder.installPackage(resourceResolver, stream);
log.info("package installed {}", packageInfo);
}
eventFactory.generatePackageEvent(DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED, DistributionComponentKind.IMPORTER, name, packageInfo);
eventFactory.generatePackageEvent(DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED, DistributionComponentKind.IMPORTER,
name, packageInfo, null);
return packageInfo;
}
} else {
DistributionPackageInfo packageInfo = packageBuilder.installPackage(resourceResolver, stream);
log.info("package installed");
eventFactory.generatePackageEvent(DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED, DistributionComponentKind.IMPORTER, name, packageInfo);
eventFactory.generatePackageEvent(DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED,
DistributionComponentKind.IMPORTER, name, packageInfo, null);
return packageInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
*/
package org.apache.sling.distribution.trigger.impl;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestType;
Expand All @@ -32,13 +39,6 @@
import org.junit.Test;
import org.osgi.framework.BundleContext;

import static org.mockito.Mockito.mock;
import static org.junit.Assert.*;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Testcase for {@link DistributionEventDistributeDistributionTrigger}
*/
Expand Down Expand Up @@ -98,7 +98,7 @@ public DistributionComponentKind getComponentKind() {
public void handle(ResourceResolver resourceResolver, DistributionRequest request) {
// we simple fire an event, to cause the loop
eventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED,
DistributionComponentKind.AGENT, "test", info);
DistributionComponentKind.AGENT, "test", info, null);
handled.addAndGet(1);
}
};
Expand All @@ -108,7 +108,7 @@ public void handle(ResourceResolver resourceResolver, DistributionRequest reques
Thread testExecution = new Thread() {
@Override public void run() {
eventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED, DistributionComponentKind.AGENT,
"origin", info);
"origin", info, null);
}
};

Expand Down