Skip to content

Commit

Permalink
Add possibility to schedule jobs by interval in seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoslot committed Feb 4, 2023
1 parent d39e7f5 commit 784cb47
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 11 deletions.
40 changes: 39 additions & 1 deletion expected/pg_cron-test.out
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ SELECT cron.unschedule(1);
-- Invalid input: input too long
SELECT cron.schedule(repeat('a', 1000), '');
ERROR: invalid schedule: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
-- Invalid input: missing parts
SELECT cron.schedule('* * * *', 'SELECT 1');
ERROR: invalid schedule: * * * *
-- Invalid input: trailing characters
SELECT cron.schedule('60 secondc', 'SELECT 1');
ERROR: invalid schedule: 60 secondc
SELECT cron.schedule('60 seconds c', 'SELECT 1');
ERROR: invalid schedule: 60 seconds c
-- Invalid input: 0 seconds
SELECT cron.schedule('0 seconds', 'SELECT 1');
ERROR: invalid schedule: 0 seconds
-- Try to update pg_cron on restart
SELECT cron.schedule('@restar', 'ALTER EXTENSION pg_cron UPDATE');
ERROR: invalid schedule: @restar
Expand Down Expand Up @@ -200,12 +211,39 @@ CREATE OR REPLACE FUNCTION public.func1(text, current_setting) RETURNS text
CREATE OR REPLACE FUNCTION public.func1(current_setting) RETURNS text
LANGUAGE sql volatile AS 'INSERT INTO test(data) VALUES (current_user); SELECT current_database()::text;';
CREATE CAST (current_setting AS text) WITH FUNCTION public.func1(current_setting) AS IMPLICIT;
CREATE EXTENSION pg_cron VERSION '1.4';
CREATE EXTENSION pg_cron;
select * from public.test;
data
------
(0 rows)

-- valid interval jobs
SELECT cron.schedule('1 second', 'SELECT 1');
schedule
----------
1
(1 row)

SELECT cron.schedule('60 seconds', 'SELECT 1');
schedule
----------
2
(1 row)

SELECT cron.schedule('600 seconds ', 'SELECT 1');
schedule
----------
3
(1 row)

SELECT jobid, jobname, schedule, command FROM cron.job ORDER BY jobid;
jobid | jobname | schedule | command
-------+---------+---------------+----------
1 | | 1 second | SELECT 1
2 | | 60 seconds | SELECT 1
3 | | 600 seconds | SELECT 1
(3 rows)

-- cleaning
DROP EXTENSION pg_cron;
drop user pgcron_cront;
Expand Down
2 changes: 1 addition & 1 deletion include/cron.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ typedef struct _entry {
uid_t uid;
gid_t gid;
char **envp;
char *cmd;
int secondsInterval;
bitstr_t bit_decl(minute, MINUTE_COUNT);
bitstr_t bit_decl(hour, HOUR_COUNT);
bitstr_t bit_decl(dom, DOM_COUNT);
Expand Down
2 changes: 2 additions & 0 deletions include/task_states.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ typedef struct CronTask
PGconn *connection;
PostgresPollingStatusType pollingStatus;
TimestampTz startDeadline;
TimestampTz lastStartTime;
uint32 secondsInterval;
bool isSocketReady;
bool isActive;
char *errorMessage;
Expand Down
18 changes: 17 additions & 1 deletion sql/pg_cron-test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ SELECT cron.unschedule(1);
-- Invalid input: input too long
SELECT cron.schedule(repeat('a', 1000), '');

-- Invalid input: missing parts
SELECT cron.schedule('* * * *', 'SELECT 1');

-- Invalid input: trailing characters
SELECT cron.schedule('60 secondc', 'SELECT 1');
SELECT cron.schedule('60 seconds c', 'SELECT 1');

-- Invalid input: 0 seconds
SELECT cron.schedule('0 seconds', 'SELECT 1');

-- Try to update pg_cron on restart
SELECT cron.schedule('@restar', 'ALTER EXTENSION pg_cron UPDATE');
SELECT cron.schedule('@restart', 'ALTER EXTENSION pg_cron UPDATE');
Expand Down Expand Up @@ -117,9 +127,15 @@ CREATE OR REPLACE FUNCTION public.func1(current_setting) RETURNS text

CREATE CAST (current_setting AS text) WITH FUNCTION public.func1(current_setting) AS IMPLICIT;

CREATE EXTENSION pg_cron VERSION '1.4';
CREATE EXTENSION pg_cron;
select * from public.test;

-- valid interval jobs
SELECT cron.schedule('1 second', 'SELECT 1');
SELECT cron.schedule('60 seconds', 'SELECT 1');
SELECT cron.schedule('600 seconds ', 'SELECT 1');
SELECT jobid, jobname, schedule, command FROM cron.job ORDER BY jobid;

-- cleaning
DROP EXTENSION pg_cron;
drop user pgcron_cront;
Expand Down
2 changes: 0 additions & 2 deletions src/entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ static int set_element(bitstr_t *, int, int, int);
void
free_entry(entry *e)
{
if (e->cmd)
free(e->cmd);
free(e);
}

Expand Down
60 changes: 57 additions & 3 deletions src/job_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ static void AlterJob(int64 jobId, text *scheduleText, text *commandText,
text *databaseText, text *usernameText, bool *active);

static Oid GetRoleOidIfCanLogin(char *username);
static entry * ParseSchedule(char *scheduleText);
static bool ParseInterval(char *scheduleText, uint32 *secondsInterval);


/* SQL-callable functions */
PG_FUNCTION_INFO_V1(cron_schedule);
Expand Down Expand Up @@ -215,7 +218,7 @@ ScheduleCronJob(text *scheduleText, text *commandText, text *databaseText,

/* check schedule is valid */
schedule = text_to_cstring(scheduleText);
parsedSchedule = parse_cron_entry(schedule);
parsedSchedule = ParseSchedule(schedule);

if (parsedSchedule == NULL)
{
Expand Down Expand Up @@ -977,7 +980,7 @@ TupleToCronJob(TupleDesc tupleDescriptor, HeapTuple heapTuple)
}
}

parsedSchedule = parse_cron_entry(job->scheduleText);
parsedSchedule = ParseSchedule(job->scheduleText);
if (parsedSchedule != NULL)
{
/* copy the schedule and free the allocated memory immediately */
Expand Down Expand Up @@ -1290,7 +1293,7 @@ AlterJob(int64 jobId, text *scheduleText, text *commandText, text *databaseText,
if (scheduleText != NULL)
{
schedule = text_to_cstring(scheduleText);
parsedSchedule = parse_cron_entry(schedule);
parsedSchedule = ParseSchedule(schedule);

if (parsedSchedule == NULL)
{
Expand Down Expand Up @@ -1475,3 +1478,54 @@ JobTableExists(void)

return jobTableOid != InvalidOid;
}


/*
* ParseSchedule attempts to parse a cron schedule or an interval in seconds.
* The returned pointer is allocated using malloc and should be freed by the
* caller.
*/
static entry *
ParseSchedule(char *scheduleText)
{
uint32 secondsInterval = 0;

/*
* Parse as interval on seconds or fall back to trying cron schedule.
*/
if (ParseInterval(scheduleText, &secondsInterval))
{
entry *schedule = calloc(sizeof(entry), sizeof(char));
schedule->secondsInterval = secondsInterval;
return schedule;
}

return parse_cron_entry(scheduleText);
}


/*
* ParseInterval returns whether scheduleText is of the form
* <positive number> second[s].
*/
static bool
ParseInterval(char *scheduleText, uint32 *secondsInterval)
{
char plural = '\0';
char extra = '\0';
int numParts = sscanf(scheduleText, "%u second%c %c", secondsInterval, &plural,
&extra);

if (numParts == 1)
{
/* <number> second (allow "2 second") */
return *secondsInterval > 0;
}
else if (numParts == 2 && plural == 's')
{
/* <number> seconds (allow "1 seconds") */
return *secondsInterval > 0;
}

return false;
}
43 changes: 40 additions & 3 deletions src/pg_cron.c
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,27 @@ StartAllPendingRuns(List *taskList, TimestampTz currentTime)
RebootJobsScheduled = true;
}

foreach(taskCell, taskList)
{
CronTask *task = (CronTask *) lfirst(taskCell);

if (task->secondsInterval > 0 && task->isActive)
{
/*
* For interval jobs, if a task takes longer than the interval,
* we only queue up once. So if a task that is supposed to run
* every 30 seconds takes 5 minutes, we start another run
* immediately after 5 minutes, but then return to regular cadence.
*/
if (task->pendingRunCount == 0 &&
TimestampDifferenceExceeds(task->lastStartTime, currentTime,
task->secondsInterval * 1000))
{
task->pendingRunCount += 1;
}
}
}

if (lastMinute == 0)
{
lastMinute = TimestampMinuteStart(currentTime);
Expand Down Expand Up @@ -1052,6 +1073,22 @@ PollForTasks(List *taskList)

if (task->state == CRON_TASK_WAITING && task->pendingRunCount == 0)
{
/*
* Make sure we do not wait past the next run time of an interval
* job.
*/
if (task->secondsInterval > 0)
{
TimestampTz nextRunTime =
TimestampTzPlusMilliseconds(task->lastStartTime,
task->secondsInterval * 1000);

if (TimestampDifferenceExceeds(nextRunTime, nextEventTime, 0))
{
nextEventTime = nextRunTime;
}
}

/* don't poll idle tasks */
continue;
}
Expand Down Expand Up @@ -1122,9 +1159,7 @@ PollForTasks(List *taskList)
pollTimeout = waitSeconds * 1000 + waitMicros / 1000;
if (pollTimeout <= 0)
{
pfree(polledTasks);
pfree(pollFDs);
return;
pollTimeout = 1;
}
else if (pollTimeout > MaxWait)
{
Expand Down Expand Up @@ -1238,6 +1273,8 @@ ManageCronTask(CronTask *task, TimestampTz currentTime)
else
task->state = CRON_TASK_START;

task->lastStartTime = currentTime;

RunningTaskCount++;

/* Add new entry to audit table. */
Expand Down
8 changes: 8 additions & 0 deletions src/task_states.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ RefreshTaskHash(void)

task = GetCronTask(job->jobId);
task->isActive = job->active;
task->secondsInterval = job->schedule.secondsInterval;
}

CronJobCacheValid = true;
Expand All @@ -122,6 +123,13 @@ GetCronTask(int64 jobId)
if (!isPresent)
{
InitializeCronTask(task, jobId);

/*
* We only initialize last run when entering into the hash.
* The net effect is that the timer for the first run of an
* interval job starts when pg_cron first learns about the job.
*/
task->lastStartTime = GetCurrentTimestamp();
}

return task;
Expand Down

0 comments on commit 784cb47

Please sign in to comment.