diff --git a/cuebot/src/main/resources/conf/ddl/postgres/migrations/V13__Add_history_control.sql b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V13__Add_history_control.sql new file mode 100644 index 000000000..16896d4c5 --- /dev/null +++ b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V13__Add_history_control.sql @@ -0,0 +1,397 @@ +-- Add history control + +CREATE OR REPLACE FUNCTION trigger__after_insert_job() +RETURNS TRIGGER AS $body$ +BEGIN + INSERT INTO job_stat (pk_job_stat,pk_job) VALUES(NEW.pk_job,NEW.pk_job); + INSERT INTO job_resource (pk_job_resource,pk_job) VALUES(NEW.pk_job,NEW.pk_job); + INSERT INTO job_usage (pk_job_usage,pk_job) VALUES(NEW.pk_job,NEW.pk_job); + INSERT INTO job_mem (pk_job_mem,pk_job) VALUES (NEW.pk_job,NEW.pk_job); + + IF NOT EXISTS (SELECT FROM config WHERE str_key='DISABLE_HISTORY') THEN + + INSERT INTO job_history + (pk_job, pk_show, pk_facility, pk_dept, str_name, str_shot, str_user, int_ts_started) + VALUES + (NEW.pk_job, NEW.pk_show, NEW.pk_facility, NEW.pk_dept, + NEW.str_name, NEW.str_shot, NEW.str_user, epoch(current_timestamp)); + + END IF; + + RETURN NULL; +END; +$body$ +LANGUAGE PLPGSQL; + + +CREATE OR REPLACE FUNCTION trigger__before_delete_job() +RETURNS TRIGGER AS $body$ +DECLARE + js JobStatType; +BEGIN + IF NOT EXISTS (SELECT FROM config WHERE str_key='DISABLE_HISTORY') THEN + + SELECT + job_usage.int_core_time_success, + job_usage.int_core_time_fail, + job_usage.int_gpu_time_success, + job_usage.int_gpu_time_fail, + job_stat.int_waiting_count, + job_stat.int_dead_count, + job_stat.int_depend_count, + job_stat.int_eaten_count, + job_stat.int_succeeded_count, + job_stat.int_running_count, + job_mem.int_max_rss, + job_mem.int_gpu_mem_max + INTO + js + FROM + job_mem, + job_usage, + job_stat + WHERE + job_usage.pk_job = job_mem.pk_job + AND + job_stat.pk_job = job_mem.pk_job + AND + job_mem.pk_job = OLD.pk_job; + + UPDATE + job_history + SET + pk_dept = OLD.pk_dept, + int_core_time_success = js.int_core_time_success, + int_core_time_fail = js.int_core_time_fail, + int_gpu_time_success = js.int_gpu_time_success, + int_gpu_time_fail = js.int_gpu_time_fail, + int_frame_count = OLD.int_frame_count, + int_layer_count = OLD.int_layer_count, + int_waiting_count = js.int_waiting_count, + int_dead_count = js.int_dead_count, + int_depend_count = js.int_depend_count, + int_eaten_count = js.int_eaten_count, + int_succeeded_count = js.int_succeeded_count, + int_running_count = js.int_running_count, + int_max_rss = js.int_max_rss, + int_gpu_mem_max = js.int_gpu_mem_max, + b_archived = true, + int_ts_stopped = COALESCE(epoch(OLD.ts_stopped), epoch(current_timestamp)) + WHERE + pk_job = OLD.pk_job; + + END IF; + + DELETE FROM depend WHERE pk_job_depend_on=OLD.pk_job OR pk_job_depend_er=OLD.pk_job; + DELETE FROM frame WHERE pk_job=OLD.pk_job; + DELETE FROM layer WHERE pk_job=OLD.pk_job; + DELETE FROM job_env WHERE pk_job=OLD.pk_job; + DELETE FROM job_stat WHERE pk_job=OLD.pk_job; + DELETE FROM job_resource WHERE pk_job=OLD.pk_job; + DELETE FROM job_usage WHERE pk_job=OLD.pk_job; + DELETE FROM job_mem WHERE pk_job=OLD.pk_job; + DELETE FROM comments WHERE pk_job=OLD.pk_job; + + RETURN OLD; +END +$body$ +LANGUAGE PLPGSQL; + + +CREATE OR REPLACE FUNCTION trigger__after_job_finished() +RETURNS TRIGGER AS $body$ +DECLARE + ts INT := cast(epoch(current_timestamp) as integer); + js JobStatType; + ls LayerStatType; + one_layer RECORD; +BEGIN + IF NOT EXISTS (SELECT FROM config WHERE str_key='DISABLE_HISTORY') THEN + + SELECT + job_usage.int_core_time_success, + job_usage.int_core_time_fail, + job_usage.int_gpu_time_success, + job_usage.int_gpu_time_fail, + job_stat.int_waiting_count, + job_stat.int_dead_count, + job_stat.int_depend_count, + job_stat.int_eaten_count, + job_stat.int_succeeded_count, + job_stat.int_running_count, + job_mem.int_max_rss, + job_mem.int_gpu_mem_max + INTO + js + FROM + job_mem, + job_usage, + job_stat + WHERE + job_usage.pk_job = job_mem.pk_job + AND + job_stat.pk_job = job_mem.pk_job + AND + job_mem.pk_job = NEW.pk_job; + + UPDATE + job_history + SET + pk_dept = NEW.pk_dept, + int_core_time_success = js.int_core_time_success, + int_core_time_fail = js.int_core_time_fail, + int_gpu_time_success = js.int_gpu_time_success, + int_gpu_time_fail = js.int_gpu_time_fail, + int_frame_count = NEW.int_frame_count, + int_layer_count = NEW.int_layer_count, + int_waiting_count = js.int_waiting_count, + int_dead_count = js.int_dead_count, + int_depend_count = js.int_depend_count, + int_eaten_count = js.int_eaten_count, + int_succeeded_count = js.int_succeeded_count, + int_running_count = js.int_running_count, + int_max_rss = js.int_max_rss, + int_gpu_mem_max = js.int_gpu_mem_max, + int_ts_stopped = ts + WHERE + pk_job = NEW.pk_job; + + FOR one_layer IN (SELECT pk_layer from layer where pk_job = NEW.pk_job) + LOOP + SELECT + layer_usage.int_core_time_success, + layer_usage.int_core_time_fail, + layer_usage.int_gpu_time_success, + layer_usage.int_gpu_time_fail, + layer_stat.int_total_count, + layer_stat.int_waiting_count, + layer_stat.int_dead_count, + layer_stat.int_depend_count, + layer_stat.int_eaten_count, + layer_stat.int_succeeded_count, + layer_stat.int_running_count, + layer_mem.int_max_rss, + layer_mem.int_gpu_mem_max + INTO + ls + FROM + layer_mem, + layer_usage, + layer_stat + WHERE + layer_usage.pk_layer = layer_mem.pk_layer + AND + layer_stat.pk_layer = layer_mem.pk_layer + AND + layer_mem.pk_layer = one_layer.pk_layer; + + UPDATE + layer_history + SET + int_core_time_success = ls.int_core_time_success, + int_core_time_fail = ls.int_core_time_fail, + int_gpu_time_success = ls.int_gpu_time_success, + int_gpu_time_fail = ls.int_gpu_time_fail, + int_frame_count = ls.int_total_count, + int_waiting_count = ls.int_waiting_count, + int_dead_count = ls.int_dead_count, + int_depend_count = ls.int_depend_count, + int_eaten_count = ls.int_eaten_count, + int_succeeded_count = ls.int_succeeded_count, + int_running_count = ls.int_running_count, + int_max_rss = ls.int_max_rss, + int_gpu_mem_max = ls.int_gpu_mem_max + WHERE + pk_layer = one_layer.pk_layer; + END LOOP; + + END IF; + + /** + * Delete any local core assignments from this job. + **/ + DELETE FROM job_local WHERE pk_job=NEW.pk_job; + + RETURN NEW; +END; +$body$ +LANGUAGE PLPGSQL; + + +CREATE OR REPLACE FUNCTION trigger__after_insert_layer() +RETURNS TRIGGER AS $body$ +BEGIN + INSERT INTO layer_stat (pk_layer_stat, pk_layer, pk_job) VALUES (NEW.pk_layer, NEW.pk_layer, NEW.pk_job); + INSERT INTO layer_resource (pk_layer_resource, pk_layer, pk_job) VALUES (NEW.pk_layer, NEW.pk_layer, NEW.pk_job); + INSERT INTO layer_usage (pk_layer_usage, pk_layer, pk_job) VALUES (NEW.pk_layer, NEW.pk_layer, NEW.pk_job); + INSERT INTO layer_mem (pk_layer_mem, pk_layer, pk_job) VALUES (NEW.pk_layer, NEW.pk_layer, NEW.pk_job); + + IF NOT EXISTS (SELECT FROM config WHERE str_key='DISABLE_HISTORY') THEN + + INSERT INTO layer_history + (pk_layer, pk_job, str_name, str_type, int_cores_min, int_mem_min, int_gpus_min, int_gpu_mem_min, b_archived,str_services) + VALUES + (NEW.pk_layer, NEW.pk_job, NEW.str_name, NEW.str_type, NEW.int_cores_min, NEW.int_mem_min, NEW.int_gpus_min, NEW.int_gpu_mem_min, false, NEW.str_services); + + END IF; + + RETURN NEW; +END; +$body$ +LANGUAGE PLPGSQL; + + +CREATE OR REPLACE FUNCTION trigger__before_delete_layer() +RETURNS TRIGGER AS $body$ +DECLARE + js LayerStatType; +BEGIN + IF NOT EXISTS (SELECT FROM config WHERE str_key='DISABLE_HISTORY') THEN + + SELECT + layer_usage.int_core_time_success, + layer_usage.int_core_time_fail, + layer_usage.int_gpu_time_success, + layer_usage.int_gpu_time_fail, + layer_stat.int_total_count, + layer_stat.int_waiting_count, + layer_stat.int_dead_count, + layer_stat.int_depend_count, + layer_stat.int_eaten_count, + layer_stat.int_succeeded_count, + layer_stat.int_running_count, + layer_mem.int_max_rss, + layer_mem.int_gpu_mem_max + INTO + js + FROM + layer_mem, + layer_usage, + layer_stat + WHERE + layer_usage.pk_layer = layer_mem.pk_layer + AND + layer_stat.pk_layer = layer_mem.pk_layer + AND + layer_mem.pk_layer = OLD.pk_layer; + + UPDATE + layer_history + SET + int_core_time_success = js.int_core_time_success, + int_core_time_fail = js.int_core_time_fail, + int_gpu_time_success = js.int_gpu_time_success, + int_gpu_time_fail = js.int_gpu_time_fail, + int_frame_count = js.int_total_count, + int_waiting_count = js.int_waiting_count, + int_dead_count = js.int_dead_count, + int_depend_count = js.int_depend_count, + int_eaten_count = js.int_eaten_count, + int_succeeded_count = js.int_succeeded_count, + int_running_count = js.int_running_count, + int_max_rss = js.int_max_rss, + int_gpu_mem_max = js.int_gpu_mem_max, + b_archived = true + WHERE + pk_layer = OLD.pk_layer; + + END IF; + + DELETE FROM layer_resource where pk_layer=OLD.pk_layer; + DELETE FROM layer_stat where pk_layer=OLD.pk_layer; + DELETE FROM layer_usage where pk_layer=OLD.pk_layer; + DELETE FROM layer_env where pk_layer=OLD.pk_layer; + DELETE FROM layer_mem where pk_layer=OLD.pk_layer; + DELETE FROM layer_output where pk_layer=OLD.pk_layer; + + RETURN OLD; +END; +$body$ +LANGUAGE PLPGSQL; + + +CREATE OR REPLACE FUNCTION trigger__frame_history_open() +RETURNS TRIGGER AS $body$ +DECLARE + str_pk_alloc VARCHAR(36) := null; + int_checkpoint INT := 0; +BEGIN + + IF NOT EXISTS (SELECT FROM config WHERE str_key='DISABLE_HISTORY') THEN + + IF OLD.str_state = 'RUNNING' THEN + + IF NEW.int_exit_status = 299 THEN + + EXECUTE 'DELETE FROM frame_history WHERE int_ts_stopped = 0 AND pk_frame=$1' USING + NEW.pk_frame; + + ELSE + If NEW.str_state = 'CHECKPOINT' THEN + int_checkpoint := 1; + END IF; + + EXECUTE + 'UPDATE + frame_history + SET + int_mem_max_used=$1, + int_gpu_mem_max_used=$2, + int_ts_stopped=$3, + int_exit_status=$4, + int_checkpoint_count=$5 + WHERE + int_ts_stopped = 0 AND pk_frame=$6' + USING + NEW.int_mem_max_used, + NEW.int_gpu_mem_max_used, + epoch(current_timestamp), + NEW.int_exit_status, + int_checkpoint, + NEW.pk_frame; + END IF; + END IF; + + IF NEW.str_state = 'RUNNING' THEN + + SELECT pk_alloc INTO str_pk_alloc FROM host WHERE str_name=NEW.str_host; + + EXECUTE + 'INSERT INTO + frame_history + ( + pk_frame, + pk_layer, + pk_job, + str_name, + str_state, + int_cores, + int_mem_reserved, + int_gpus, + int_gpu_mem_reserved, + str_host, + int_ts_started, + pk_alloc + ) + VALUES + ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)' + USING NEW.pk_frame, + NEW.pk_layer, + NEW.pk_job, + NEW.str_name, + 'RUNNING', + NEW.int_cores, + NEW.int_mem_reserved, + NEW.int_gpus, + NEW.int_gpu_mem_reserved, + NEW.str_host, + epoch(current_timestamp), + str_pk_alloc; + END IF; + + END IF; + RETURN NULL; + +END; +$body$ +LANGUAGE PLPGSQL; diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HistoryControlTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HistoryControlTests.java new file mode 100644 index 000000000..e8dc15517 --- /dev/null +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HistoryControlTests.java @@ -0,0 +1,198 @@ + +/* + * Copyright Contributors to the OpenCue Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package com.imageworks.spcue.test.dispatcher; + +import java.io.File; +import java.util.List; +import javax.annotation.Resource; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.test.annotation.Rollback; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.transaction.annotation.Transactional; + +import com.imageworks.spcue.DispatchHost; +import com.imageworks.spcue.FrameInterface; +import com.imageworks.spcue.JobDetail; +import com.imageworks.spcue.LayerDetail; +import com.imageworks.spcue.VirtualProc; +import com.imageworks.spcue.dao.LayerDao; +import com.imageworks.spcue.dispatcher.Dispatcher; +import com.imageworks.spcue.dispatcher.FrameCompleteHandler; +import com.imageworks.spcue.grpc.host.HardwareState; +import com.imageworks.spcue.grpc.report.FrameCompleteReport; +import com.imageworks.spcue.grpc.report.RenderHost; +import com.imageworks.spcue.grpc.report.RunningFrameInfo; +import com.imageworks.spcue.service.AdminManager; +import com.imageworks.spcue.service.HostManager; +import com.imageworks.spcue.service.JobLauncher; +import com.imageworks.spcue.service.JobManager; +import com.imageworks.spcue.test.TransactionalTest; +import com.imageworks.spcue.util.CueUtil; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@ContextConfiguration +public class HistoryControlTests extends TransactionalTest { + + @Resource + AdminManager adminManager; + + @Resource + FrameCompleteHandler frameCompleteHandler; + + @Resource + HostManager hostManager; + + @Resource + JobLauncher jobLauncher; + + @Resource + JobManager jobManager; + + @Resource + LayerDao layerDao; + + @Resource + Dispatcher dispatcher; + + private static final String HOSTNAME = "beta"; + private static final String DELETE_HISTORY = + "DELETE FROM frame_history; " + + "DELETE FROM job_history; "; + private static final String DISABLE_HISTORY = + "INSERT INTO " + + "config (pk_config,str_key) " + + "VALUES " + + "(uuid_generate_v1(),'DISABLE_HISTORY');"; + + @Before + public void setTestMode() { + dispatcher.setTestMode(true); + } + + public void launchJob() { + jobLauncher.testMode = true; + jobLauncher.launch( + new File("src/test/resources/conf/jobspec/jobspec_gpus_test.xml")); + } + + @Before + public void createHost() { + RenderHost host = RenderHost.newBuilder() + .setName(HOSTNAME) + .setBootTime(1192369572) + .setFreeMcp(76020) + .setFreeMem((int) CueUtil.GB8) + .setFreeSwap(20760) + .setLoad(0) + .setTotalMcp(195430) + .setTotalMem(CueUtil.GB8) + .setTotalSwap(CueUtil.GB2) + .setNimbyEnabled(false) + .setNumProcs(40) + .setCoresPerProc(100) + .setState(HardwareState.UP) + .setFacility("spi") + .putAttributes("SP_OS", "Linux") + .setNumGpus(8) + .setFreeGpuMem(CueUtil.GB16 * 8) + .setTotalGpuMem(CueUtil.GB16 * 8) + .build(); + + hostManager.createHost(host, + adminManager.findAllocationDetail("spi", "general")); + } + + public DispatchHost getHost() { + return hostManager.findDispatchHost(HOSTNAME); + } + + public void launchAndDeleteJob() { + launchJob(); + + JobDetail job = jobManager.findJobDetail("pipe-default-testuser_test0"); + LayerDetail layer = layerDao.findLayerDetail(job, "layer0"); + jobManager.setJobPaused(job, false); + + DispatchHost host = getHost(); + List procs = dispatcher.dispatchHost(host); + VirtualProc proc = procs.get(0); + + RunningFrameInfo info = RunningFrameInfo.newBuilder() + .setJobId(proc.getJobId()) + .setLayerId(proc.getLayerId()) + .setFrameId(proc.getFrameId()) + .setResourceId(proc.getProcId()) + .build(); + FrameCompleteReport report = FrameCompleteReport.newBuilder() + .setFrame(info) + .setExitStatus(0) + .build(); + frameCompleteHandler.handleFrameCompleteReport(report); + + assertTrue(jobManager.isLayerComplete(layer)); + assertTrue(jobManager.isJobComplete(job)); + + jdbcTemplate.update("DELETE FROM job WHERE pk_job=?", job.getId()); + } + + @Test + @Transactional + @Rollback(true) + public void testEnabled() { + jdbcTemplate.update(DELETE_HISTORY); + assertEquals(Integer.valueOf(0), jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM job_history", Integer.class)); + assertEquals(Integer.valueOf(0), jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM frame_history", Integer.class)); + + launchAndDeleteJob(); + + assertEquals(Integer.valueOf(3), jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM job_history", Integer.class)); + assertEquals(Integer.valueOf(1), jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM frame_history", Integer.class)); + } + + @Test + @Transactional + @Rollback(true) + public void testDisabled() { + jdbcTemplate.update(DELETE_HISTORY); + jdbcTemplate.update(DISABLE_HISTORY); + + assertEquals(Integer.valueOf(0), jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM job_history", Integer.class)); + assertEquals(Integer.valueOf(0), jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM frame_history", Integer.class)); + + launchAndDeleteJob(); + + assertEquals(Integer.valueOf(0), jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM job_history", Integer.class)); + assertEquals(Integer.valueOf(0), jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM frame_history", Integer.class)); + } +} +