Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add something #217

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
20af844
SEARCH-28536 , fix defect in jesque when working in RESET_TO_HIGHEST_…
dvizelman Aug 13, 2017
2267aac
extract the current time in millis to local variable
dvizelman Aug 17, 2017
f8c5b8b
Merge pull request #1 from kenshoo/delayed_tasks_priority_queues
dvizelman Aug 17, 2017
d85fc13
lua based ack implementation
dvizelman Aug 20, 2017
cb3e738
Use separate schedulers for isAlive and requeueInactive jobs.
dvizelman Aug 23, 2017
fab0091
Break down watchdog acivate() to smaller functions
dvizelman Aug 23, 2017
f4fb112
Allow watchdog to use 1.8 features
dvizelman Aug 23, 2017
45f07bf
Merge pull request #3 from kenshoo/lua_based_ack_implementation
dvizelman Aug 24, 2017
bd8dbbc
Make watchdog working with jedis pool
dvizelman Sep 4, 2017
63e8ab9
switch off script effects replication
dvizelman Sep 5, 2017
9a26725
Merge pull request #5 from kenshoo/watchdog_jedis_pool
dvizelman Sep 6, 2017
a141b7e
Introduce redis restart recovery
dvizelman Sep 7, 2017
118615d
Update FINISHED/FAILED recovery statuses immidiately after listener f…
dvizelman Sep 8, 2017
fddf9b9
On workers restart check whether redis recovery process was interrupt…
dvizelman Sep 10, 2017
8005759
On redis restart watchdog script should be reloaded
dvizelman Sep 11, 2017
0767d1d
on redis restart reload worker scripts
dvizelman Sep 12, 2017
2e0dc4b
Make WorkerImpl use dedicated ExceptionHandler in order to reload Wor…
dvizelman Sep 12, 2017
6066f01
Merge pull request #6 from kenshoo/redis_restart_recovery_listener
dvizelman Sep 13, 2017
3482c6e
job uniqueness plugin mechanism
dvizelman Sep 17, 2017
3a0a7d0
move push to queue to the lua script; save uniqness key in jobJson du…
dvizelman Sep 18, 2017
8aad2bd
remove default value for delay in RetryJobException
dvizelman Sep 19, 2017
fd15b9e
unique key would be generated in lua script based on the Job's class …
dvizelman Sep 19, 2017
0c6ac00
Merge pull request #7 from kenshoo/jesque_job_uniqueness
nanimrod Sep 19, 2017
4cf5a6a
Do not run both remove in flight keys and resubmit in flight key upon…
dvizelman Sep 25, 2017
dc01bfe
Merge pull request #8 from kenshoo/move_resubmit_to_finally
nanimrod Sep 25, 2017
a9d621e
Add logs to client (#10)
dvizelman Nov 7, 2017
988fe81
Update jackson library
VolodymyrMudryk Sep 8, 2020
d6905c8
Merge pull request #11 from kenshoo/VolodymyrMudryk/setDateFormat
mmarusich Sep 8, 2020
9711c8e
SOCIAL-1 : for testing
olexandrabrosimov Jul 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jedis.version>2.9.0</jedis.version>
<jackson.version>2.8.5</jackson.version>
<jackson.version>2.9.5</jackson.version>
<slf4j.version>1.7.21</slf4j.version>
<logback.version>1.1.7</logback.version>
<junit.version>4.12</junit.version>
Expand Down Expand Up @@ -137,8 +137,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/net/greghaines/jesque/DuplicateJobException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package net.greghaines.jesque;

/**
* Created by dimav
* on 14/09/17 22:23.
*/
public class DuplicateJobException extends RuntimeException{

public DuplicateJobException(final String message) {

super(message);
}
}
18 changes: 18 additions & 0 deletions src/main/java/net/greghaines/jesque/RetryJobException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package net.greghaines.jesque;

/**
* Created by dimav
* on 14/09/17 22:23.
*/
public class RetryJobException extends RuntimeException{
private final long delay;

public RetryJobException(final String message, long delay) {
super(message);
this.delay = delay;
}

public long getDelay() {
return delay;
}
}
137 changes: 119 additions & 18 deletions src/main/java/net/greghaines/jesque/client/AbstractClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,42 @@
import static net.greghaines.jesque.utils.ResqueConstants.QUEUES;

import net.greghaines.jesque.Config;
import net.greghaines.jesque.DuplicateJobException;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.json.ObjectMapperFactory;
import net.greghaines.jesque.utils.JedisUtils;
import net.greghaines.jesque.utils.JesqueUtils;
import net.greghaines.jesque.utils.ScriptUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisException;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Common logic for Client implementations.
*
*
* @author Greg Haines
* @author Animesh Kumar
*/
public abstract class AbstractClient implements Client {
private static final Logger LOG = LoggerFactory.getLogger(AbstractClient.class);

private static final String ENQUEUE = "enqueue";
private static final String PRIORITY_ENQUEUE = "priorityEnqueue";
private static final String DELAYED_ENQUEUE = "delayedEnqueue";
private static final String DUPLICATED = "duplicated";
private final String namespace;
protected boolean jobUniquenessValidation;
private static final String PUSH_LUA = "/clientScripts/jesque_push.lua";
private static final AtomicReference<String> pushScriptHash = new AtomicReference<>(null);

/**
* Constructor.
*
*
* @param config
* used to get the namespace for key creation
*/
Expand All @@ -56,9 +72,34 @@ protected String getNamespace() {
return this.namespace;
}

protected boolean getJobUniquenessValidation() {
return jobUniquenessValidation;
}

private static boolean isScriptLoaded = false;

static void loadScript(Jedis jedis) {
if(!isScriptLoaded) {
synchronized (AbstractClient.class) {
if(!isScriptLoaded) {
doLoadScript(jedis);
isScriptLoaded = true;
}
}
}
}

private static void doLoadScript(final Jedis jedis) {
try {
pushScriptHash.set(jedis.scriptLoad(ScriptUtils.readScript(PUSH_LUA)));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Builds a namespaced Redis key with the given arguments.
*
*
* @param parts
* the key parts to be joined
* @return an assembled String key
Expand All @@ -67,6 +108,8 @@ protected String key(final String... parts) {
return JesqueUtils.createKey(this.namespace, parts);
}

abstract Jedis getJedis();

/**
* {@inheritDoc}
*/
Expand All @@ -75,6 +118,9 @@ public void enqueue(final String queue, final Job job) {
validateArguments(queue, job);
try {
doEnqueue(queue, ObjectMapperFactory.get().writeValueAsString(job));
} catch (JedisException re) {
doLoadScript(getJedis());
throw re;
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
Expand All @@ -90,6 +136,9 @@ public void priorityEnqueue(final String queue, final Job job) {
validateArguments(queue, job);
try {
doPriorityEnqueue(queue, ObjectMapperFactory.get().writeValueAsString(job));
} catch (JedisException re) {
doLoadScript(getJedis());
throw re;
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
Expand Down Expand Up @@ -122,7 +171,7 @@ public boolean acquireLock(final String lockName, final String lockHolder, final

/**
* Actually enqueue the serialized job.
*
*
* @param queue
* the queue to add the Job to
* @param msg
Expand All @@ -134,7 +183,7 @@ public boolean acquireLock(final String lockName, final String lockHolder, final

/**
* Actually enqueue the serialized job with high priority.
*
*
* @param queue
* the queue to add the Job to
* @param msg
Expand All @@ -146,7 +195,7 @@ public boolean acquireLock(final String lockName, final String lockHolder, final

/**
* Actually acquire the lock based upon the client acquisition model.
*
*
* @param lockName
* the name of the lock to acquire
* @param timeout
Expand All @@ -157,13 +206,13 @@ public boolean acquireLock(final String lockName, final String lockHolder, final
* @throws Exception
* in case something goes wrong
*/
protected abstract boolean doAcquireLock(final String lockName, final String lockHolder,
protected abstract boolean doAcquireLock(final String lockName, final String lockHolder,
final int timeout) throws Exception;

/**
* Helper method that encapsulates the minimum logic for adding a job to a
* queue.
*
*
* @param jedis
* the connection to Redis
* @param namespace
Expand All @@ -174,14 +223,20 @@ protected abstract boolean doAcquireLock(final String lockName, final String loc
* the job serialized as JSON
*/
public static void doEnqueue(final Jedis jedis, final String namespace, final String queue, final String jobJson) {
jedis.sadd(JesqueUtils.createKey(namespace, QUEUES), queue);
jedis.rpush(JesqueUtils.createKey(namespace, QUEUE, queue), jobJson);
doEnqueue(jedis,namespace,queue, jobJson , false);
}

static void doEnqueue(final Jedis jedis, final String namespace, final String queue, final String jobJson, boolean jobUniquenessValidation) {
final String queuesKey = JesqueUtils.createKey(namespace, QUEUES);
final String queueKey = JesqueUtils.createKey(namespace, QUEUE, queue);

doEnqueue(jedis, ENQUEUE, queue, jobJson, queueKey, queuesKey, jobUniquenessValidation);
}

/**
* Helper method that encapsulates the minimum logic for adding a high
* priority job to a queue.
*
*
* @param jedis
* the connection to Redis
* @param namespace
Expand All @@ -192,13 +247,19 @@ public static void doEnqueue(final Jedis jedis, final String namespace, final St
* the job serialized as JSON
*/
public static void doPriorityEnqueue(final Jedis jedis, final String namespace, final String queue, final String jobJson) {
jedis.sadd(JesqueUtils.createKey(namespace, QUEUES), queue);
jedis.lpush(JesqueUtils.createKey(namespace, QUEUE, queue), jobJson);
doPriorityEnqueue(jedis,namespace, queue, jobJson,false);
}

static void doPriorityEnqueue(final Jedis jedis, final String namespace, final String queue, final String jobJson , boolean jobUniquenessValidation) {
final String queuesKey = JesqueUtils.createKey(namespace, QUEUES);
final String queueKey = JesqueUtils.createKey(namespace, QUEUE, queue);

doEnqueue(jedis, PRIORITY_ENQUEUE, queue, jobJson, queueKey, queuesKey, jobUniquenessValidation);
}

/**
* Helper method that encapsulates the logic to acquire a lock.
*
*
* @param jedis
* the connection to Redis
* @param namespace
Expand Down Expand Up @@ -269,16 +330,49 @@ public static boolean doAcquireLock(final Jedis jedis, final String namespace, f
}

public static void doDelayedEnqueue(final Jedis jedis, final String namespace, final String queue, final String jobJson, final long future) {
final String key = JesqueUtils.createKey(namespace, QUEUE, queue);
doDelayedEnqueue(jedis,namespace,queue,jobJson,future, false);
}

static void doDelayedEnqueue(final Jedis jedis, final String namespace, final String queue, final String jobJson, final long future, boolean jobUniquenessValidation) {
final String queueKey = JesqueUtils.createKey(namespace, QUEUE, queue);
// Add task only if this queue is either delayed or unused
if (JedisUtils.canUseAsDelayedQueue(jedis, key)) {
jedis.zadd(key, future, jobJson);
jedis.sadd(JesqueUtils.createKey(namespace, QUEUES), queue);
if (JedisUtils.canUseAsDelayedQueue(jedis, queueKey)) {
final String queuesKey = JesqueUtils.createKey(namespace, QUEUES);
doEnqueue(jedis, DELAYED_ENQUEUE, queue, jobJson, queueKey, queuesKey, jobUniquenessValidation, future);
} else {
throw new IllegalArgumentException(queue + " cannot be used as a delayed queue");
}
}

private static void doEnqueue(final Jedis jedis, final String enqueueType, final String queue, final String jobJson, final String queueKey, final String queuesKey, final boolean jobUniquenessValidation) {
doEnqueue(jedis, enqueueType, queue, jobJson, queueKey, queuesKey, jobUniquenessValidation, 0L);
}

private static void doEnqueue(final Jedis jedis, final String enqueueType, final String queue, final String jobJson, final String queueKey, final String queuesKey, final boolean jobUniquenessValidation, final long future) {
final String pushStatus;
LOG.info("Jesque starting {} job {} to the queue {} with delay {} ", enqueueType, jobJson, queue, future);
try {
final String uniquenessValidation = String.valueOf(jobUniquenessValidation);

if(future==0) {
pushStatus = (String) jedis.evalsha(pushScriptHash.get(), 2, queuesKey, queueKey, enqueueType, getCurrTime(), queue, jobJson, uniquenessValidation);
} else{
pushStatus = (String) jedis.evalsha(pushScriptHash.get(), 2, queuesKey, queueKey, enqueueType, getCurrTime(), queue, jobJson, uniquenessValidation, String.valueOf(future));
}
} catch (Exception e) {
LOG.error("Jesque {} job {} to the queue {} has failed", enqueueType, jobJson, queue, e);
throw e;
}

if(DUPLICATED.equals(pushStatus)){
final String duplicatedJobMessage = String.format("Jesque duplicated job %s has been found", jobJson);
LOG.warn(duplicatedJobMessage,jobJson);
throw new DuplicateJobException(duplicatedJobMessage);
}

LOG.info("Jesque {} job {} to the queue {} has finished successfully", enqueueType, jobJson, queue);
}

protected abstract void doDelayedEnqueue(String queue, String msg, long future) throws Exception;

/**
Expand All @@ -289,6 +383,9 @@ public void delayedEnqueue(final String queue, final Job job, final long future)
validateArguments(queue, job, future);
try {
doDelayedEnqueue(queue, ObjectMapperFactory.get().writeValueAsString(job), future);
} catch (JedisException re) {
doLoadScript(getJedis());
throw re;
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
Expand Down Expand Up @@ -411,4 +508,8 @@ private static void validateArguments(final String queue, final Job job, final l
throw new IllegalArgumentException("frequency must be greater than one second");
}
}

private static String getCurrTime() {
return String.valueOf(System.currentTimeMillis());
}
}
29 changes: 26 additions & 3 deletions src/main/java/net/greghaines/jesque/client/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,35 @@ public ClientImpl(final Config config) {
* if the config is null
*/
public ClientImpl(final Config config, final boolean checkConnectionBeforeUse) {
this(config,checkConnectionBeforeUse,false);
}

/**
* Create a new ClientImpl, which creates it's own connection to Redis using
* values from the config.
*
* @param config
* used to create a connection to Redis
* @param checkConnectionBeforeUse
* check to make sure the connection is alive before using it
* @param jobUniquenessValidation avoid duplicate jobs submission
* @throws IllegalArgumentException
* if the config is null
*/
public ClientImpl(final Config config, final boolean checkConnectionBeforeUse, boolean jobUniquenessValidation) {
super(config);
this.config = config;
this.jedis = new Jedis(config.getHost(), config.getPort(), config.getTimeout());
authenticateAndSelectDB();
this.checkConnectionBeforeUse = checkConnectionBeforeUse;
this.keepAliveService = null;
this.jobUniquenessValidation = jobUniquenessValidation;
loadScript(jedis);
}

@Override
Jedis getJedis() {
return jedis;
}

/**
Expand Down Expand Up @@ -106,7 +129,7 @@ public void run() {
@Override
protected void doEnqueue(final String queue, final String jobJson) {
ensureJedisConnection();
doEnqueue(this.jedis, getNamespace(), queue, jobJson);
doEnqueue(this.jedis, getNamespace(), queue, jobJson, getJobUniquenessValidation());
}

/**
Expand All @@ -115,7 +138,7 @@ protected void doEnqueue(final String queue, final String jobJson) {
@Override
protected void doPriorityEnqueue(final String queue, final String jobJson) {
ensureJedisConnection();
doPriorityEnqueue(this.jedis, getNamespace(), queue, jobJson);
doPriorityEnqueue(this.jedis, getNamespace(), queue, jobJson, getJobUniquenessValidation());
}

/**
Expand Down Expand Up @@ -145,7 +168,7 @@ public void end() {
@Override
protected void doDelayedEnqueue(final String queue, final String msg, final long future) throws Exception {
ensureJedisConnection();
doDelayedEnqueue(this.jedis, getNamespace(), queue, msg, future);
doDelayedEnqueue(this.jedis, getNamespace(), queue, msg, future, getJobUniquenessValidation());
}

/**
Expand Down
Loading