Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions regression-test/suites/job_p0/test_base_insert_job.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ suite("test_base_insert_job") {
insert into ${tableName} values
('2023-03-18', 1, 1)
"""
// create recurring job
sql """
CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test' DO INSERT INTO ${tableName} (`timestamp`, `type`, `user_id`)
WITH
Expand All @@ -97,23 +98,25 @@ suite("test_base_insert_job") {
"""
Awaitility.await().atMost(30, SECONDS).until(
{
def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='RECURRING' """
println(onceJob)
onceJob.size() == 1 && '1' <= onceJob.get(0).get(0)
def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='RECURRING' """
// check job status and succeed task count larger than 1
jobSuccendCount.size() == 1 && '1' <= jobSuccendCount.get(0).get(0)

}
)
sql """
PAUSE JOB where jobname = '${jobName}'
"""
def pausedJobStatus = sql """
select status from jobs("type"="insert") where Name='${jobName}'
"""
assert pausedJobStatus.get(0).get(0) == "PAUSED"
def tblDatas = sql """select * from ${tableName}"""
println tblDatas
assert tblDatas.size() >= 2 //at least 2 records
def pauseJobId = sql """select id from jobs("type"="insert") where Name='${jobName}'"""
def taskStatus = sql """select status from tasks("type"="insert") where jobid= '${pauseJobId.get(0).get(0)}'"""
println taskStatus

def taskStatus = sql """select status from tasks("type"="insert") where JobName ='${jobName}'"""
for (int i = 0; i < taskStatus.size(); i++) {
assert taskStatus.get(i).get(0) != "FAILED" || taskStatus.get(i).get(0) != "STOPPED" || taskStatus.get(i).get(0) != "STOPPED"
assert taskStatus.get(i).get(0) =="CANCELLED" || taskStatus.get(i).get(0) =="FINISHED"
}
sql """
CREATE JOB ${jobMixedName} ON SCHEDULE every 1 second DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
Expand All @@ -122,6 +125,7 @@ suite("test_base_insert_job") {
println mixedNameJobs
assert mixedNameJobs.size() == 1 && mixedNameJobs.get(0).get(0) == jobMixedName
assert mixedNameJobs.get(0).get(1) == ''
// clean up job and table
sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
"""
Expand All @@ -145,23 +149,25 @@ suite("test_base_insert_job") {
"""
def dataCount = sql """select count(*) from ${tableName}"""
assert dataCount.get(0).get(0) == 0
// create one time job
sql """
CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', 2, 1001);
"""

// wait job finished
Awaitility.await("create-one-time-job-test").atMost(30, SECONDS).until(
{
def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """
onceJob.size() == 1 && '1' == onceJob.get(0).get(0)
}
)
def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """
def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """
assert onceJob.size() == 1
//check succeed task count
assert '1' == onceJob.get(0).get(0)
def datas = sql """select status,taskid from tasks("type"="insert") where jobName= '${jobName}'"""
println datas
// table should have one record after job finished
assert datas.size() == 1
// one time job only has one task. when job finished, task status should be FINISHED
assert datas.get(0).get(0) == "FINISHED"
// check table data
def dataCount1 = sql """select count(1) from ${tableName} where user_id=1001"""
Expand All @@ -175,19 +181,20 @@ suite("test_base_insert_job") {
sql """
DROP JOB IF EXISTS where jobname = 'press'
"""

// create job with start time is current time and interval is 10 hours
sql """
CREATE JOB press ON SCHEDULE every 10 hour starts CURRENT_TIMESTAMP comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', 99, 99);
"""
Awaitility.await("create-immediately-job-test").atMost(60, SECONDS).until({
def pressJob = sql """ select SucceedTaskCount from jobs("type"="insert") where name='press'"""
println pressJob
// check job status and succeed task count is 1
pressJob.size() == 1 && '1' == onceJob.get(0).get(0)
})

sql """
DROP JOB IF EXISTS where jobname = 'past_start_time'
"""
// create job with start time is past time, job should be running
sql """
CREATE JOB past_start_time ON SCHEDULE every 10 hour starts '2023-11-13 14:18:07' comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', 99, 99);
"""
Expand All @@ -214,16 +221,24 @@ suite("test_base_insert_job") {
sql """
PAUSE JOB where jobname = '${jobName}'
"""
pausedJobStatus = sql """
select status from jobs("type"="insert") where Name='${jobName}'
"""
assert pausedJobStatus.get(0).get(0) == "PAUSED"
def tasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """
sql """
RESUME JOB where jobname = '${jobName}'
"""
println(tasks.size())
Awaitility.await("resume-job-test").atMost(60, SECONDS).until({
def afterResumeTasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """
println "resume tasks :" + afterResumeTasks
afterResumeTasks.size() > tasks.size()
})
// check resume job status
def afterResumeJobStatus = sql """
select status from jobs("type"="insert") where Name='${jobName}'
"""
assert afterResumeJobStatus.get(0).get(0) == "RUNNING"

// assert same job name
try {
Expand Down