From ee12183602588657348ed8a3bd46bd666f5b2599 Mon Sep 17 00:00:00 2001 From: Chen Date: Mon, 5 Jun 2017 17:28:31 +0800 Subject: [PATCH 01/12] del system.out.println --- .../java/org/apache/griffin/core/measure/DataConnector.java | 1 - .../org/apache/griffin/core/schedule/SchedulerController.java | 1 - .../java/org/apache/griffin/core/schedule/SparkSubmitJob.java | 4 ++-- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java index cd7e0a4ea..6ebee1821 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java +++ b/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java @@ -102,7 +102,6 @@ public Map getConfig() { public DataConnector() { - System.out.println(); } public DataConnector(ConnectorType type,String version, Map config){ diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java index beae5b78b..c42b54203 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java @@ -89,7 +89,6 @@ public List> jobs() throws SchedulerException, map.put("measure", (String) jd.getJobDataMap().get("measure")); map.put("sourcePat",jd.getJobDataMap().getString("sourcePat")); map.put("targetPat",jd.getJobDataMap().getString("targetPat")); - System.out.print("dataStartTimestamp:"+jd.getJobDataMap().getString("dataStartTimestamp")); if(jd.getJobDataMap().getString("dataStartTimestamp")!=null && !jd.getJobDataMap().getString("dataStartTimestamp").equals("")) map.put("dataStartTimestamp",jd.getJobDataMap().getString("dataStartTimestamp")); map.put("jobStartTime",jd.getJobDataMap().getString("jobStartTime")); diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java index e9ccb4aae..0e9379839 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java @@ -108,7 +108,7 @@ public void execute(JobExecutionContext context) { periodTime = jd.getJobDataMap().getString("periodTime"); //prepare current system timestamp long currentSystemTimestamp = System.currentTimeMillis(); - + logger.info("currentSystemTimestamp: "+currentSystemTimestamp); if (sourcePattern != null && !sourcePattern.equals("")) { sourcePatternItemSet = sourcePattern.split("-"); long currentTimstamp = setCurrentTimestamp(currentSystemTimestamp); @@ -254,7 +254,7 @@ public Properties getsparkJobProperties() throws IOException { throw new FileNotFoundException("property file '" + propFileName + "' not found in the classpath"); } } catch (Exception e) { - System.out.println("Exception: " + e); + logger.info("Exception: " + e); } finally { inputStream.close(); } From 692e1d418f1bfdcc01e71dc0487349484f1d14aa Mon Sep 17 00:00:00 2001 From: Chen Date: Tue, 6 Jun 2017 09:26:55 +0800 Subject: [PATCH 02/12] replacenisempty --- .../griffin/core/schedule/SparkSubmitJob.java | 9 ++++----- .../griffin/core/schedule/SparkSubmitJobTest.java | 15 +++++++++++---- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java index 0e9379839..afd1af156 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java @@ -109,13 +109,13 @@ public void execute(JobExecutionContext context) { //prepare current system timestamp long currentSystemTimestamp = System.currentTimeMillis(); logger.info("currentSystemTimestamp: "+currentSystemTimestamp); - if (sourcePattern != null && !sourcePattern.equals("")) { + if (sourcePattern != null && !sourcePattern.isEmpty()) { sourcePatternItemSet = sourcePattern.split("-"); long currentTimstamp = setCurrentTimestamp(currentSystemTimestamp); setDataConnectorPartitions(measure.getSource(), sourcePatternItemSet, partitionItemSet, currentTimstamp); jd.getJobDataMap().put("lastTime", currentTimstamp + ""); } - if (targetPattern != null && !targetPattern.equals("")) { + if (targetPattern != null && !targetPattern.isEmpty()) { targetPatternItemSet = targetPattern.split("-"); long currentTimstamp = setCurrentTimestamp(currentSystemTimestamp); setDataConnectorPartitions(measure.getTarget(), targetPatternItemSet, partitionItemSet, currentTimstamp); @@ -125,7 +125,6 @@ public void execute(JobExecutionContext context) { RestTemplate restTemplate = new RestTemplate(); setSparkJobDO(); // String result = restTemplate.postForObject(uri, sparkJobDO, String.class); - logger.info("measure: \n"+measure); String result = restTemplate.postForObject(uri, sparkJobDO, String.class); logger.info(result); ScheduleResult scheduleResult=new ScheduleResult(); @@ -179,14 +178,14 @@ public void setDataConnectorPartitions(DataConnector dc, String[] patternItemSet public long setCurrentTimestamp(long currentSystemTimestamp) { long currentTimstamp=0; - if (eachJoblastTimestamp != null && !eachJoblastTimestamp.equals("")) { + if (eachJoblastTimestamp != null && !eachJoblastTimestamp.isEmpty()) { try { currentTimstamp = Long.parseLong(eachJoblastTimestamp) + Integer.parseInt(periodTime) * 1000; }catch (Exception e){ logger.info("lasttime or periodTime format problem! "+e); } } else { - if (dataStartTimestamp != null && !dataStartTimestamp.equals("")) { + if (dataStartTimestamp != null && !dataStartTimestamp.isEmpty()) { try{ currentTimstamp = Long.parseLong(dataStartTimestamp); }catch (Exception e){ diff --git a/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java b/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java index 18283b499..1b6ecf762 100644 --- a/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java +++ b/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java @@ -23,10 +23,12 @@ import org.apache.griffin.core.measure.repo.MeasureRepo; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.client.RestTemplate; import java.io.IOException; import java.text.SimpleDateFormat; @@ -86,11 +88,15 @@ public void test_execute() throws JsonProcessingException { when(ssj.measureRepo.findByName("bevssoj")).thenReturn(measure); // ssj.execute(context); -// -// RestTemplate restTemplate =mock(RestTemplate.class); + RestTemplate restTemplate =spy(RestTemplate.class); + Mockito.when(new RestTemplate()).thenReturn(restTemplate); +// PowerMockito.whenNew(RestTemplate.class).withNoArguments().thenReturn(restTemplate); + +// RestTemplate restTemplate =spy(RestTemplate.class); // String uri="http://10.9.246.187:8998/batches"; -// SparkJobDO sparkJobDO=mock(SparkJobDO.class); -// when(restTemplate.postForObject(uri, sparkJobDO, String.class)).thenReturn(null); + String uri="http://localhost:8998/batches"; + SparkJobDO sparkJobDO=mock(SparkJobDO.class); + when(restTemplate.postForObject(uri, sparkJobDO, String.class)).thenReturn(null); // // // long currentSystemTimestamp=System.currentTimeMillis(); @@ -98,6 +104,7 @@ public void test_execute() throws JsonProcessingException { // // verify(ssj.measureRepo).findByName("bevssoj"); // verify(jdmap,atLeast(2)).put("lastTime",currentTimstamp+""); + } @Test From 647c739f5dc8254ccc642afd7a6837c87133835b Mon Sep 17 00:00:00 2001 From: Chen Date: Wed, 7 Jun 2017 13:18:47 +0800 Subject: [PATCH 03/12] SparkSubmit powermock test and part of other test --- service/pom.xml | 12 ++ .../core/measure/MeasureServiceImpl.java | 9 +- .../core/measure/repo/MeasureRepo.java | 9 ++ .../griffin/core/schedule/JobHealth.java | 19 ++- .../core/schedule/Repo/ScheduleStateRepo.java | 20 ++- .../griffin/core/schedule/SparkSubmitJob.java | 8 +- .../{repo => }/MeasureControllerTest.java | 88 +++++++++++- .../core/measure/MeasureServiceImplTest.java | 74 ++++++++++ .../core/measure/repo/MeasureRepoTest.java | 126 +++--------------- .../core/schedule/SparkSubmitJobTest.java | 59 ++++---- 10 files changed, 278 insertions(+), 146 deletions(-) rename service/src/test/java/org/apache/griffin/core/measure/{repo => }/MeasureControllerTest.java (54%) create mode 100644 service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java diff --git a/service/pom.xml b/service/pom.xml index 0bbf573a9..2096782fb 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -133,6 +133,18 @@ test + + org.powermock + powermock-api-mockito + ${powermock.version} + + + org.powermock + powermock-module-junit4 + ${powermock.version} + + + diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java index ff74aa71f..0b6b03a07 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java @@ -87,7 +87,14 @@ public List getAllMeasureNameByOwner(String owner){ } public GriffinOperationMessage updateMeasure(@RequestBody Measure measure) { - +// Long measureId=measure.getId(); +// if (measureRepo.findOne(measureId)==null){ +// return GriffinOperationMessage.RESOURCE_NOT_FOUND; +// }else{ +// measureRepo.updateMeasure(measureId,measure.getDescription(),measure.getOrganization(),measure.getSource(),measure.getTarget(),measure.getEvaluateRule()); +//// System.out.print(res); +// return GriffinOperationMessage.UPDATE_MEASURE_SUCCESS; +// } String name=measure.getName(); Measure temp_mesaure=measureRepo.findByName(name); if (temp_mesaure==null){ diff --git a/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java b/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java index 5bc84a46e..1ed2fb2d9 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java +++ b/service/src/main/java/org/apache/griffin/core/measure/repo/MeasureRepo.java @@ -16,7 +16,10 @@ package org.apache.griffin.core.measure.repo; +import org.apache.griffin.core.measure.DataConnector; +import org.apache.griffin.core.measure.EvaluateRule; import org.apache.griffin.core.measure.Measure; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; import org.springframework.stereotype.Repository; @@ -37,4 +40,10 @@ public interface MeasureRepo extends CrudRepository { @Query("select m.organization from Measure m "+ "where m.name= ?1") String findOrgByName(String measureName); + + @Modifying + @Query("update Measure m "+ + "set m.description= ?2,m.organization= ?3,m.source= ?4,m.target= ?5,m.evaluateRule= ?6 where m.id= ?1") + void updateMeasure(Long Id, String description, String organization, DataConnector source, DataConnector target, EvaluateRule evaluateRule); + } diff --git a/service/src/main/java/org/apache/griffin/core/schedule/JobHealth.java b/service/src/main/java/org/apache/griffin/core/schedule/JobHealth.java index 538a90914..0d648359e 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/JobHealth.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/JobHealth.java @@ -1,8 +1,21 @@ +/*- +* Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + package org.apache.griffin.core.schedule; -/** - * Created by xiangrchen on 6/1/17. - */ + public class JobHealth { private int health; private int invalid; diff --git a/service/src/main/java/org/apache/griffin/core/schedule/Repo/ScheduleStateRepo.java b/service/src/main/java/org/apache/griffin/core/schedule/Repo/ScheduleStateRepo.java index a04335330..9cafc7f44 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/Repo/ScheduleStateRepo.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/Repo/ScheduleStateRepo.java @@ -1,5 +1,21 @@ +/*- +* Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + package org.apache.griffin.core.schedule.Repo; + import org.apache.griffin.core.schedule.ScheduleState; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.Query; @@ -8,9 +24,7 @@ import java.util.List; -/** - * Created by xiangrchen on 5/31/17. - */ + @Repository public interface ScheduleStateRepo extends CrudRepository{ @Query("select s from ScheduleState s " + diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java index afd1af156..3f7c0be0c 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java @@ -124,9 +124,9 @@ public void execute(JobExecutionContext context) { //final String uri = "http://10.9.246.187:8998/batches"; RestTemplate restTemplate = new RestTemplate(); setSparkJobDO(); -// String result = restTemplate.postForObject(uri, sparkJobDO, String.class); String result = restTemplate.postForObject(uri, sparkJobDO, String.class); logger.info(result); + //save result info into DataBase ScheduleResult scheduleResult=new ScheduleResult(); Gson gson=new Gson(); try { @@ -134,8 +134,10 @@ public void execute(JobExecutionContext context) { }catch (Exception e){ logger.info("scheduleResult covert error!"+e); } - ScheduleState scheduleState=new ScheduleState(groupName,jobName,scheduleResult.getId(),scheduleResult.getState(),scheduleResult.getAppId(),System.currentTimeMillis()); - scheduleStateRepo.save(scheduleState); + if(scheduleResult!=null) { + ScheduleState scheduleState = new ScheduleState(groupName, jobName, scheduleResult.getId(), scheduleResult.getState(), scheduleResult.getAppId(), System.currentTimeMillis()); + scheduleStateRepo.save(scheduleState); + } } public Map genPartitions(String[] patternItemSet, String[] partitionItemSet, long timestamp) { diff --git a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureControllerTest.java b/service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java similarity index 54% rename from service/src/test/java/org/apache/griffin/core/measure/repo/MeasureControllerTest.java rename to service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java index b5ca9d01e..3da5d6f21 100644 --- a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureControllerTest.java +++ b/service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java @@ -13,13 +13,14 @@ */ -package org.apache.griffin.core.measure.repo; +package org.apache.griffin.core.measure; -import org.apache.griffin.core.measure.*; +import org.apache.griffin.core.util.GriffinOperationMessage; import org.codehaus.jackson.map.ObjectMapper; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; import org.springframework.boot.test.mock.mockito.MockBean; @@ -28,11 +29,12 @@ import org.springframework.test.web.servlet.MockMvc; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import static org.hamcrest.CoreMatchers.is; import static org.mockito.BDDMockito.given; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -49,11 +51,24 @@ public class MeasureControllerTest { public void setup(){ } - @Test public void testGetAllMeasures() throws IOException,Exception{ Measure measure = createATestMeasure("viewitem_hourly","bullseye"); + given(service.getAllMeasures()).willReturn(Arrays.asList(measure)); + + mvc.perform(get("/measures").contentType(MediaType.APPLICATION_JSON)) +// .andDo(print()) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.[0].name",is("viewitem_hourly"))) + ; + } + + + @Test + public void testGetMeasuresById() throws IOException,Exception{ + Measure measure = createATestMeasure("viewitem_hourly","bullseye"); + given(service.getMeasuresById(1L)).willReturn(measure); mvc.perform(get("/measures/1").contentType(MediaType.APPLICATION_JSON)) @@ -65,17 +80,76 @@ public void testGetAllMeasures() throws IOException,Exception{ @Test public void testGetMeasureByName() throws IOException,Exception{ - Measure measure = createATestMeasure("viewitem_hourly","bullseye"); given(service.getMeasuresByName("viewitem_hourly")).willReturn(measure); mvc.perform(get("/measures/findByName/viewitem_hourly").contentType(MediaType.APPLICATION_JSON)) -// .andDo(print()) .andExpect(status().isOk()) .andExpect(jsonPath("$.name",is("viewitem_hourly"))) ; } + @Test + public void testDeleteMeasuresById() throws Exception{ + Mockito.doNothing().when(service).deleteMeasuresById(1L); + + mvc.perform(delete("/measures/deleteById/1").contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + ; + } + + @Test + public void testDeleteMeasuresByName() throws Exception{ + String measureName="viewitem_hourly"; + given(service.deleteMeasuresByName(measureName)).willReturn(GriffinOperationMessage.DELETE_MEASURE_BY_NAME_SUCCESS); + + mvc.perform(delete("/measures/deleteByName/"+measureName).contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(jsonPath("$",is("DELETE_MEASURE_BY_NAME_SUCCESS"))) + ; + } + + @Test + public void testUpdateMeasure() throws Exception{ + String measureName="viewitem_hourly"; + String org="bullseye"; + Measure measure=createATestMeasure(measureName,org); + ObjectMapper mapper=new ObjectMapper(); + String measureJson=mapper.writeValueAsString(measure); + given(service.updateMeasure(measure)).willReturn(GriffinOperationMessage.UPDATE_MEASURE_SUCCESS); + + mvc.perform(post("/measures/update").contentType(MediaType.APPLICATION_JSON).content(measureJson)) + .andExpect(status().isOk()) + .andExpect(jsonPath("$",is("UPDATE_MEASURE_SUCCESS"))) + ; + } + + @Test + public void testGetAllMeasureNameOfOwner() throws Exception{ + String Owner="test1"; + String measureName="viewitem_hourly"; + given(service.getAllMeasureNameByOwner(Owner)).willReturn(Arrays.asList(measureName)); + + mvc.perform(get("/measures/owner/"+Owner).contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.[0]",is("viewitem_hourly"))) + ; + } + + @Test + public void testCreateNewMeasure() throws Exception{ + String measureName="viewitem_hourly"; + String org="bullseye"; + Measure measure=createATestMeasure(measureName,org); + ObjectMapper mapper=new ObjectMapper(); + String measureJson=mapper.writeValueAsString(measure); + given(service.createNewMeasure(measure)).willReturn(GriffinOperationMessage.CREATE_MEASURE_SUCCESS); + + mvc.perform(post("/measures/add").contentType(MediaType.APPLICATION_JSON).content(measureJson)) + .andExpect(status().isOk()) + .andExpect(jsonPath("$",is("CREATE_MEASURE_SUCCESS"))) + ; + } private Measure createATestMeasure(String name,String org)throws IOException,Exception{ HashMap configMap1=new HashMap<>(); @@ -98,4 +172,6 @@ private Measure createATestMeasure(String name,String org)throws IOException,Exc return measure; } + + } diff --git a/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java new file mode 100644 index 000000000..e103c0cd6 --- /dev/null +++ b/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java @@ -0,0 +1,74 @@ +/*- +* Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +package org.apache.griffin.core.measure; + + +import org.apache.griffin.core.measure.repo.MeasureRepo; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.fail; +import static org.junit.Assert.assertTrue; + + +@RunWith(SpringRunner.class) +public class MeasureServiceImplTest { + + @TestConfiguration + public static class HiveMetastoreServiceConfiguration{ + @Bean + public MeasureServiceImpl service(){ + return new MeasureServiceImpl(); + } + } + @MockBean + private MeasureRepo measureRepo; + + @Autowired + private MeasureServiceImpl service; + + @Before + public void setup(){ + + } + + @Test + public void testGetAllMeasures(){ + try { + Iterable tmp = service.getAllMeasures(); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot get all Measure from dbs"); + } + } + + @Test + public void testGetMeasuresById(){ + try { + Measure tmp = service.getMeasuresById(1); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot get all tables in db default"); + } + } + +} diff --git a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java index ce5ab729f..0f6355574 100644 --- a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java +++ b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java @@ -15,8 +15,6 @@ package org.apache.griffin.core.measure.repo; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.griffin.core.measure.DataConnector; import org.apache.griffin.core.measure.EvaluateRule; import org.apache.griffin.core.measure.Measure; @@ -44,97 +42,21 @@ public class MeasureRepoTest { private MeasureRepo measureRepo; @Before - public void setup(){ + public void setup() throws Exception { entityManager.clear(); entityManager.flush(); + setEntityManager(); } @Test - public void testFindAllOrganizations(){ - HashMap configMap1=new HashMap<>(); - configMap1.put("database","default"); - configMap1.put("table.name","test_data_src"); - HashMap configMap2=new HashMap<>(); - configMap2.put("database","default"); - configMap2.put("table.name","test_data_tgt"); - String configJson1 = null; - String configJson2 = null; - try { - configJson1 = new ObjectMapper().writeValueAsString(configMap1); - configJson2 = new ObjectMapper().writeValueAsString(configMap2); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - - DataConnector source = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson1); - DataConnector target = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson2); - - String rules = "$source.uage > 100 AND $source.uid = $target.uid AND $source.uage + 12 = $target.uage + 10 + 2 AND $source.udes + 11 = $target.udes + 1 + 1"; - - EvaluateRule eRule = new EvaluateRule(1,rules); - - Measure measure = new Measure("m1","bevssoj description", Measure.MearuseType.accuracy, "bullseye", source, target, eRule,"owner1"); - entityManager.persistAndFlush(measure); - - DataConnector source2 = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson1); - DataConnector target2 = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson2); - EvaluateRule eRule2 = new EvaluateRule(1,rules); - Measure measure2 = new Measure("m2","test description", Measure.MearuseType.accuracy, "org1", source2, target2, eRule2,"owner1"); - entityManager.persistAndFlush(measure2); - - DataConnector source3 = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson1); - DataConnector target3 = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson2); - EvaluateRule eRule3 = new EvaluateRule(1,rules); - Measure measure3 = new Measure("m3","test_just_inthere description", Measure.MearuseType.accuracy, "org2", source3, target3, eRule3,"owner1"); - entityManager.persistAndFlush(measure3); - - + public void testFindAllOrganizations() throws Exception { List orgs = measureRepo.findOrganizations(); assertThat(orgs.size()).isEqualTo(3); - } @Test - public void testFindNameByOrganization(){ - HashMap configMap1=new HashMap<>(); - configMap1.put("database","default"); - configMap1.put("table.name","test_data_src"); - HashMap configMap2=new HashMap<>(); - configMap2.put("database","default"); - configMap2.put("table.name","test_data_tgt"); - String configJson1 = null; - String configJson2 = null; - try { - configJson1 = new ObjectMapper().writeValueAsString(configMap1); - configJson2 = new ObjectMapper().writeValueAsString(configMap2); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - - DataConnector source = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson1); - DataConnector target = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson2); - - String rules = "$source.uage > 100 AND $source.uid = $target.uid AND $source.uage + 12 = $target.uage + 10 + 2 AND $source.udes + 11 = $target.udes + 1 + 1"; - - EvaluateRule eRule = new EvaluateRule(1,rules); - - Measure measure = new Measure("m1","bevssoj description", Measure.MearuseType.accuracy, "bullseye", source, target, eRule,"owner1"); - entityManager.persistAndFlush(measure); - - DataConnector source2 = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson1); - DataConnector target2 = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson2); - EvaluateRule eRule2 = new EvaluateRule(1,rules); - Measure measure2 = new Measure("m2","test description", Measure.MearuseType.accuracy, "org1", source2, target2, eRule2,"owner1"); - entityManager.persistAndFlush(measure2); - - DataConnector source3 = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson1); - DataConnector target3 = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson2); - EvaluateRule eRule3 = new EvaluateRule(1,rules); - Measure measure3 = new Measure("m3","test_just_inthere description", Measure.MearuseType.accuracy, "org2", source3, target3, eRule3,"owner1"); - entityManager.persistAndFlush(measure3); - - + public void testFindNameByOrganization() throws Exception { List orgs = measureRepo.findNameByOrganization("org1"); assertThat(orgs.size()).isEqualTo(1); assertThat(orgs.get(0)).isEqualToIgnoringCase("m2"); @@ -142,21 +64,20 @@ public void testFindNameByOrganization(){ } @Test - public void testFindOrgByName(){ + public void testFindOrgByName() throws Exception { + String org = measureRepo.findOrgByName("m3"); + assertThat(org).isEqualTo("org2"); + } + + private Measure createATestMeasure(String name,String org)throws Exception{ HashMap configMap1=new HashMap<>(); configMap1.put("database","default"); configMap1.put("table.name","test_data_src"); HashMap configMap2=new HashMap<>(); configMap2.put("database","default"); configMap2.put("table.name","test_data_tgt"); - String configJson1 = null; - String configJson2 = null; - try { - configJson1 = new ObjectMapper().writeValueAsString(configMap1); - configJson2 = new ObjectMapper().writeValueAsString(configMap2); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } + String configJson1 = new org.codehaus.jackson.map.ObjectMapper().writeValueAsString(configMap1); + String configJson2 = new org.codehaus.jackson.map.ObjectMapper().writeValueAsString(configMap2); DataConnector source = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson1); DataConnector target = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson2); @@ -165,24 +86,19 @@ public void testFindOrgByName(){ EvaluateRule eRule = new EvaluateRule(1,rules); - Measure measure = new Measure("m1","bevssoj description", Measure.MearuseType.accuracy, "bullseye", source, target, eRule,"owner1"); + Measure measure = new Measure(name,"bevssoj description", Measure.MearuseType.accuracy, org, source, target, eRule,"test1"); + + return measure; + } + + public void setEntityManager() throws Exception { + Measure measure=createATestMeasure("m1","bullseye"); entityManager.persistAndFlush(measure); - DataConnector source2 = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson1); - DataConnector target2 = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson2); - EvaluateRule eRule2 = new EvaluateRule(1,rules); - Measure measure2 = new Measure("m2","test description", Measure.MearuseType.accuracy, "org1", source2, target2, eRule2,"owner1"); + Measure measure2=createATestMeasure("m2","org1"); entityManager.persistAndFlush(measure2); - DataConnector source3 = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson1); - DataConnector target3 = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson2); - EvaluateRule eRule3 = new EvaluateRule(1,rules); - Measure measure3 = new Measure("m3","test_just_inthere description", Measure.MearuseType.accuracy, "org2", source3, target3, eRule3,"owner1"); + Measure measure3=createATestMeasure("m3","org2"); entityManager.persistAndFlush(measure3); - - - String org = measureRepo.findOrgByName("m3"); - assertThat(org).isEqualTo("org2"); - } } diff --git a/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java b/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java index 1b6ecf762..97789af25 100644 --- a/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java +++ b/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java @@ -16,14 +16,17 @@ package org.apache.griffin.core.schedule; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.griffin.core.measure.DataConnector; import org.apache.griffin.core.measure.EvaluateRule; import org.apache.griffin.core.measure.Measure; import org.apache.griffin.core.measure.repo.MeasureRepo; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; @@ -42,8 +45,10 @@ /** * Created by xiangrchen on 5/8/17. */ -//@RunWith(SpringJUnit4ClassRunner.class) -public class SparkSubmitJobTest { +//@RunWith(MockitoJUnitRunner.class) +@RunWith(PowerMockRunner.class) +@PrepareForTest(SparkSubmitJob.class) +public class SparkSubmitJobTest{ private SparkSubmitJob ssj; @@ -57,7 +62,7 @@ public void setUp() throws IOException { } @Test - public void test_execute() throws JsonProcessingException { + public void test_execute() throws Exception { JobExecutionContext context=mock(JobExecutionContext.class); JobDetail jd = mock(JobDetail.class); when(context.getJobDetail()).thenReturn(jd); @@ -71,40 +76,44 @@ public void test_execute() throws JsonProcessingException { when(jdmap.getString("dataStartTimestamp")).thenReturn("1460174400000"); when(jdmap.getString("lastTime")).thenReturn(""); when(jdmap.getString("periodTime")).thenReturn("10"); + Measure measure = createATestMeasure("viewitem_hourly","bullseye"); + when(ssj.measureRepo.findByName("bevssoj")).thenReturn(measure); + + RestTemplate restTemplate =Mockito.mock(RestTemplate.class); + PowerMockito.whenNew(RestTemplate.class).withAnyArguments().thenReturn(restTemplate); + String uri=""; + SparkJobDO sparkJobDO= Mockito.mock(SparkJobDO.class); + PowerMockito.when(restTemplate.postForObject(uri, sparkJobDO, String.class)).thenReturn(null); + ssj.execute(context); + + long currentSystemTimestamp=System.currentTimeMillis(); + long currentTimstamp = ssj.setCurrentTimestamp(currentSystemTimestamp); +// + verify(ssj.measureRepo).findByName("bevssoj"); + verify(jdmap,atLeast(2)).put("lastTime",currentTimstamp+""); + } + + private Measure createATestMeasure(String name,String org)throws IOException,Exception{ HashMap configMap1=new HashMap<>(); configMap1.put("database","default"); configMap1.put("table.name","test_data_src"); HashMap configMap2=new HashMap<>(); configMap2.put("database","default"); configMap2.put("table.name","test_data_tgt"); - String configJson1 = new ObjectMapper().writeValueAsString(configMap1); - String configJson2 = new ObjectMapper().writeValueAsString(configMap2); + String configJson1 = new org.codehaus.jackson.map.ObjectMapper().writeValueAsString(configMap1); + String configJson2 = new org.codehaus.jackson.map.ObjectMapper().writeValueAsString(configMap2); + DataConnector source = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson1); DataConnector target = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson2); + String rules = "$source.uage > 100 AND $source.uid = $target.uid AND $source.uage + 12 = $target.uage + 10 + 2 AND $source.udes + 11 = $target.udes + 1 + 1"; + EvaluateRule eRule = new EvaluateRule(1,rules); - Measure measure = new Measure("viewitem_hourly","bevssoj description", Measure.MearuseType.accuracy, "bullyeye", source, target, eRule,"test1"); - when(ssj.measureRepo.findByName("bevssoj")).thenReturn(measure); -// ssj.execute(context); - RestTemplate restTemplate =spy(RestTemplate.class); - Mockito.when(new RestTemplate()).thenReturn(restTemplate); -// PowerMockito.whenNew(RestTemplate.class).withNoArguments().thenReturn(restTemplate); - -// RestTemplate restTemplate =spy(RestTemplate.class); -// String uri="http://10.9.246.187:8998/batches"; - String uri="http://localhost:8998/batches"; - SparkJobDO sparkJobDO=mock(SparkJobDO.class); - when(restTemplate.postForObject(uri, sparkJobDO, String.class)).thenReturn(null); -// -// -// long currentSystemTimestamp=System.currentTimeMillis(); -// long currentTimstamp = ssj.setCurrentTimestamp(currentSystemTimestamp); -// -// verify(ssj.measureRepo).findByName("bevssoj"); -// verify(jdmap,atLeast(2)).put("lastTime",currentTimstamp+""); + Measure measure = new Measure(name,"bevssoj description", Measure.MearuseType.accuracy, org, source, target, eRule,"test1"); + return measure; } @Test From 4c006d875b59685cf689722aa061c7464fa6ec42 Mon Sep 17 00:00:00 2001 From: Chen Date: Fri, 9 Jun 2017 15:49:57 +0800 Subject: [PATCH 04/12] ut --- service/pom.xml | 2 + .../metastore/HiveMetastoreController.java | 2 +- .../core/metastore/HiveMetastoreProxy.java | 24 +- .../core/metastore/HiveMetastoreService.java | 95 +------- .../metastore/HiveMetastoreServiceImpl.java | 124 +++++++++++ .../core/metastore/KafkaSchemaController.java | 2 +- .../core/metastore/KafkaSchemaService.java | 112 +--------- .../metastore/KafkaSchemaServiceImpl.java | 139 ++++++++++++ .../core/schedule/SchedulerController.java | 189 ++-------------- .../core/schedule/SchedulerRequestBody.java | 35 +++ .../core/schedule/SchedulerService.java | 24 ++ .../core/schedule/SchedulerServiceImpl.java | 210 ++++++++++++++++++ .../griffin/core/schedule/SparkSubmitJob.java | 11 +- .../core/measure/MeasureServiceImplTest.java | 130 ++++++++++- .../HiveMetastoreControllerTest.java | 2 +- ...java => HiveMetastoreServiceImplTest.java} | 8 +- .../metastore/KafkaSchemaControllerTest.java | 2 +- .../metastore/KafkaSchemaServiceImplTest.java | 61 +++++ .../core/metric/MetricServiceImplTest.java | 49 ++++ .../schedule/SchedulerControllerTest.java | 104 +++++++++ .../schedule/SchedulerServiceImplTest.java | 172 ++++++++++++++ .../core/schedule/SparkSubmitJobTest.java | 4 - .../resources/application-test.properties | 58 +++++ service/src/test/resources/quartz.properties | 11 + 24 files changed, 1168 insertions(+), 402 deletions(-) create mode 100644 service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImpl.java create mode 100644 service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImpl.java create mode 100644 service/src/main/java/org/apache/griffin/core/schedule/SchedulerService.java create mode 100644 service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java rename service/src/test/java/org/apache/griffin/core/metastore/{HiveMetastoreServiceTests.java => HiveMetastoreServiceImplTest.java} (94%) create mode 100644 service/src/test/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImplTest.java create mode 100644 service/src/test/java/org/apache/griffin/core/metric/MetricServiceImplTest.java create mode 100644 service/src/test/java/org/apache/griffin/core/schedule/SchedulerControllerTest.java create mode 100644 service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java create mode 100644 service/src/test/resources/application-test.properties create mode 100644 service/src/test/resources/quartz.properties diff --git a/service/pom.xml b/service/pom.xml index 4ae013d07..3ab5a3658 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -156,11 +156,13 @@ org.powermock powermock-api-mockito ${powermock.version} + test org.powermock powermock-module-junit4 ${powermock.version} + test diff --git a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreController.java b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreController.java index 85a2d36ed..4fd7dbe85 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreController.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreController.java @@ -30,7 +30,7 @@ public class HiveMetastoreController { @Autowired - HiveMetastoreService hiveMetastoreService; + HiveMetastoreServiceImpl hiveMetastoreService; @RequestMapping("/db") public Iterable getAllDatabases() { diff --git a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreProxy.java b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreProxy.java index 185bc2bc4..6330cd157 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreProxy.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreProxy.java @@ -37,20 +37,20 @@ public class HiveMetastoreProxy @Bean public HiveMetaStoreClient initHiveMetastoreClient(){ - HiveConf hiveConf = new HiveConf(); - hiveConf.set("hive.metastore.local", "false"); - hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); - hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, uris); - try { - client= new HiveMetaStoreClient(hiveConf); - } catch (MetaException e) { - log.error("Failed to connect hive metastore",e.getMessage()); - client = null; + HiveConf hiveConf = new HiveConf(); + hiveConf.set("hive.metastore.local", "false"); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, uris); + try { + client= new HiveMetaStoreClient(hiveConf); + } catch (MetaException e) { + log.error("Failed to connect hive metastore",e.getMessage()); + client = null; + } + + return client; } - return client; -} - public void destroy() throws Exception { if(null!=client) client.close(); } diff --git a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreService.java b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreService.java index c907b45ff..ba4f4bfc5 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreService.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreService.java @@ -15,104 +15,21 @@ package org.apache.griffin.core.metastore; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; -import org.springframework.util.StringUtils; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; +public interface HiveMetastoreService { -@Service -public class HiveMetastoreService { + public Iterable getAllDatabases(); - private static final Logger log = LoggerFactory.getLogger(HiveMetastoreService.class); + public Iterable getAllTableNames(String dbName); - @Autowired - HiveMetaStoreClient client; + public List getAllTable(String db); - @Value("${hive.metastore.dbname}") - private String defaultDbName; - - private String getUseDbName(String dbName) { - if (!StringUtils.hasText(dbName)) return defaultDbName; - else return dbName; - } - - public Iterable getAllDatabases() { - Iterable results = null; - try { - results = client.getAllDatabases(); - } catch (MetaException e) { - log.error("Can not get databases : ",e.getMessage()); - } - return results; - } - - public Iterable getAllTableNames(String dbName) { - Iterable results = null; - String useDbName = getUseDbName(dbName); - try { - results = client.getAllTables(useDbName); - } catch (Exception e) { - log.warn("Exception fetching tables info" + e.getMessage()); - } - return results; - } - - public List
getAllTable(String db){ - List
results = new ArrayList
(); - String useDbName = getUseDbName(db); - try { - Iterable tables = client.getAllTables(useDbName); - for (String table: tables) { - Table tmp = client.getTable(db,table); - results.add(tmp); - } - } catch (Exception e) { - log.warn("Exception fetching tables info" + e.getMessage()); - } - return results; - } - - public Map> getAllTable(){ - Map> results = new HashMap>(); - Iterable dbs = getAllDatabases(); - for(String db: dbs){ - List
alltables = new ArrayList
(); - String useDbName = getUseDbName(db); - try { - Iterable tables = client.getAllTables(useDbName); - for (String table: tables) { - Table tmp = client.getTable(db,table); - alltables.add(tmp); - } - } catch (Exception e) { - log.warn("Exception fetching tables info" + e.getMessage()); - } - results.put(db,alltables); - } - return results; - } - - public Table getTable(String dbName, String tableName) { - Table result = null; - String useDbName = getUseDbName(dbName); - try { - result = client.getTable(useDbName, tableName); - } catch (Exception e) { - log.warn("Exception fetching table info : " +tableName + " : " + e.getMessage()); - } - return result; - } + public Map> getAllTable(); + public Table getTable(String dbName, String tableName); } diff --git a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImpl.java new file mode 100644 index 000000000..1b48cf4fb --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImpl.java @@ -0,0 +1,124 @@ +/*- + * Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + */ + +package org.apache.griffin.core.metastore; + +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +@Service +public class HiveMetastoreServiceImpl implements HiveMetastoreService{ + + private static final Logger log = LoggerFactory.getLogger(HiveMetastoreServiceImpl.class); + + @Autowired + HiveMetaStoreClient client; + + @Value("${hive.metastore.dbname}") + private String defaultDbName; + + private String getUseDbName(String dbName) { + if (!StringUtils.hasText(dbName)) return defaultDbName; + else return dbName; + } + + @Override + public Iterable getAllDatabases() { + Iterable results = null; + try { + results = client.getAllDatabases(); + } catch (MetaException e) { + log.error("Can not get databases : ",e.getMessage()); + } + return results; + } + + @Override + public Iterable getAllTableNames(String dbName) { + Iterable results = null; + String useDbName = getUseDbName(dbName); + try { + results = client.getAllTables(useDbName); + client.reconnect(); + } catch (Exception e) { + log.warn("Exception fetching tables info" + e.getMessage()); + } + return results; + } + + @Override + public List
getAllTable(String db){ + List
results = new ArrayList
(); + String useDbName = getUseDbName(db); + try { + Iterable tables = client.getAllTables(useDbName); + for (String table: tables) { + Table tmp = client.getTable(db,table); + results.add(tmp); + } + } catch (Exception e) { + log.warn("Exception fetching tables info" + e.getMessage()); + } + return results; + } + + @Override + public Map> getAllTable(){ + Map> results = new HashMap>(); + Iterable dbs = getAllDatabases(); + for(String db: dbs){ + List
alltables = new ArrayList
(); + String useDbName = getUseDbName(db); + try { + Iterable tables = client.getAllTables(useDbName); + for (String table: tables) { + Table tmp = client.getTable(db,table); + alltables.add(tmp); + } + } catch (Exception e) { + log.warn("Exception fetching tables info" + e.getMessage()); + } + results.put(db,alltables); + } + return results; + } + + @Override + public Table getTable(String dbName, String tableName) { + Table result = null; + String useDbName = getUseDbName(dbName); + try { + result = client.getTable(useDbName, tableName); + } catch (Exception e) { + log.warn("Exception fetching table info : " +tableName + " : " + e.getMessage()); + } + return result; + } + + +} diff --git a/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaController.java b/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaController.java index f4ae18de0..11149a0b1 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaController.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaController.java @@ -28,7 +28,7 @@ public class KafkaSchemaController { @Autowired - KafkaSchemaService kafkaSchemaService; + KafkaSchemaServiceImpl kafkaSchemaService; @RequestMapping("/schema/{id}") public SchemaString getSchemaString(@PathVariable("id") Integer id) { diff --git a/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaService.java b/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaService.java index fc1e69308..bccfc5022 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaService.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaService.java @@ -18,116 +18,18 @@ import io.confluent.kafka.schemaregistry.client.rest.entities.Config; import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Service; -import org.springframework.util.StringUtils; -import org.springframework.web.client.RestTemplate; -import java.util.Arrays; +public interface KafkaSchemaService { + public SchemaString getSchemaString(Integer id); -@Service -public class KafkaSchemaService { + public Iterable getSubjects(); - private static final Logger log = LoggerFactory.getLogger(KafkaSchemaService.class); + public Iterable getSubjectVersions(String subject); - @Value("${kafka.schema.registry.url}") - private String url; + public Schema getSubjectSchema(String subject, String version); + public Config getTopLevelConfig(); - private String registryUrl(final String path) { - if (StringUtils.hasText(path)) { - String usePath = path; - if (!path.startsWith("/")) usePath = "/" + path; - return this.url + usePath; - } - return ""; - } + public Config getSubjectLevelConfig(String subject); - public SchemaString getSchemaString(Integer id) { - String path = "/schemas/ids/" + id; - String regUrl = registryUrl(path); - SchemaString result = null; - try { - RestTemplate restTemplate = new RestTemplate(); - ResponseEntity res = restTemplate.getForEntity(regUrl, SchemaString.class); - result = res.getBody(); - } catch (Exception e) { - log.error("Exception getting schema of id " + id + " : ", e.getMessage()); - } - return result; - } - - public Iterable getSubjects() { - String path = "/subjects"; - String regUrl = registryUrl(path); - Iterable result = null; - try { - RestTemplate restTemplate = new RestTemplate(); - ResponseEntity res = restTemplate.getForEntity(regUrl, String[].class); - result = Arrays.asList(res.getBody()); - } catch (Exception e) { - log.error("Exception getting subjects : ", e.getMessage()); - } - return result; - } - - public Iterable getSubjectVersions(String subject) { - String path = "/subjects/" + subject + "/versions"; - String regUrl = registryUrl(path); - Iterable result = null; - try { - RestTemplate restTemplate = new RestTemplate(); - ResponseEntity res = restTemplate.getForEntity(regUrl, Integer[].class); - result = Arrays.asList(res.getBody()); - } catch (Exception e) { - log.error("Exception getting subject " + subject + " versions : ", e.getMessage()); - } - return result; - } - - public Schema getSubjectSchema(String subject, String version) { - String path = "/subjects/" + subject + "/versions/" + version; - String regUrl = registryUrl(path); - Schema result = null; - try { - RestTemplate restTemplate = new RestTemplate(); - ResponseEntity res = restTemplate.getForEntity(regUrl, Schema.class); - result = res.getBody(); - } catch (Exception e) { - log.error("Exception getting subject " + subject + " with version " + version + " : ", e.getMessage()); - } - return result; - } - - public Config getTopLevelConfig() { - String path = "/config"; - String regUrl = registryUrl(path); - Config result = null; - try { - RestTemplate restTemplate = new RestTemplate(); - ResponseEntity res = restTemplate.getForEntity(regUrl, Config.class); - result = res.getBody(); - } catch (Exception e) { - log.error("Exception getting top level config : ", e.getMessage()); - } - return result; - } - - public Config getSubjectLevelConfig(String subject) { - String path = "/config/" + subject; - String regUrl = registryUrl(path); - Config result = null; - try { - RestTemplate restTemplate = new RestTemplate(); - ResponseEntity res = restTemplate.getForEntity(regUrl, Config.class); - result = res.getBody(); - } catch (Exception e) { - log.error("Exception getting subject " + subject + " level config : ", e.getMessage()); - } - return result; - } } diff --git a/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImpl.java new file mode 100644 index 000000000..74b60e9c3 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImpl.java @@ -0,0 +1,139 @@ +/*- + * Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + */ + +package org.apache.griffin.core.metastore; + +import io.confluent.kafka.schemaregistry.client.rest.entities.Config; +import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; +import org.springframework.web.client.RestTemplate; + +import java.util.Arrays; + +@Service +public class KafkaSchemaServiceImpl implements KafkaSchemaService{ + + private static final Logger log = LoggerFactory.getLogger(KafkaSchemaServiceImpl.class); + + @Value("${kafka.schema.registry.url}") + private String url; + + + private String registryUrl(final String path) { + if (StringUtils.hasText(path)) { + String usePath = path; + if (!path.startsWith("/")) usePath = "/" + path; + return this.url + usePath; + } + return ""; + } + + @Override + public SchemaString getSchemaString(Integer id) { + String path = "/schemas/ids/" + id; + String regUrl = registryUrl(path); + SchemaString result = null; + try { + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity res = restTemplate.getForEntity(regUrl, SchemaString.class); + result = res.getBody(); + } catch (Exception e) { + log.error("Exception getting schema of id " + id + " : ", e.getMessage()); + } + return result; + } + + @Override + public Iterable getSubjects() { + String path = "/subjects"; + String regUrl = registryUrl(path); + Iterable result = null; + try { + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity res = restTemplate.getForEntity(regUrl, String[].class); + result = Arrays.asList(res.getBody()); + } catch (Exception e) { + log.error("Exception getting subjects : ", e.getMessage()); + } + return result; + } + + @Override + public Iterable getSubjectVersions(String subject) { + String path = "/subjects/" + subject + "/versions"; + String regUrl = registryUrl(path); + Iterable result = null; + try { + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity res = restTemplate.getForEntity(regUrl, Integer[].class); + result = Arrays.asList(res.getBody()); + } catch (Exception e) { + log.error("Exception getting subject " + subject + " versions : ", e.getMessage()); + } + return result; + } + + @Override + public Schema getSubjectSchema(String subject, String version) { + String path = "/subjects/" + subject + "/versions/" + version; + String regUrl = registryUrl(path); + Schema result = null; + try { + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity res = restTemplate.getForEntity(regUrl, Schema.class); + result = res.getBody(); + } catch (Exception e) { + log.error("Exception getting subject " + subject + " with version " + version + " : ", e.getMessage()); + } + return result; + } + + @Override + public Config getTopLevelConfig() { + String path = "/config"; + String regUrl = registryUrl(path); + Config result = null; + try { + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity res = restTemplate.getForEntity(regUrl, Config.class); + + result = res.getBody(); + } catch (Exception e) { + log.error("Exception getting top level config : ", e.getMessage()); + } + return result; + } + + @Override + public Config getSubjectLevelConfig(String subject) { + String path = "/config/" + subject; + String regUrl = registryUrl(path); + Config result = null; + try { + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity res = restTemplate.getForEntity(regUrl, Config.class); + result = res.getBody(); + } catch (Exception e) { + log.error("Exception getting subject " + subject + " level config : ", e.getMessage()); + } + return result; + } +} diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java index c42b54203..6447fa959 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java @@ -15,29 +15,18 @@ package org.apache.griffin.core.schedule; -import org.apache.griffin.core.schedule.Repo.ScheduleStateRepo; -import org.quartz.*; -import org.quartz.impl.matchers.GroupMatcher; +import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.domain.PageRequest; -import org.springframework.data.domain.Pageable; -import org.springframework.data.domain.Sort; -import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.web.bind.annotation.*; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import java.io.Serializable; import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; - -import static org.quartz.JobBuilder.newJob; -import static org.quartz.JobKey.jobKey; -import static org.quartz.TriggerBuilder.newTrigger; -import static org.quartz.TriggerKey.triggerKey; +import java.util.List; +import java.util.Map; @RestController @RequestMapping("/jobs") @@ -45,186 +34,38 @@ public class SchedulerController { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerController.class); @Autowired - private SchedulerFactoryBean factory; - @Autowired - private ScheduleStateRepo scheduleStateRepo; - - public static final String ACCURACY_BATCH_GROUP = "BA"; + SchedulerService schedulerService; @RequestMapping("/") - public List> jobs() throws SchedulerException, + public List> getJobs() throws SchedulerException, ParseException { - Scheduler scheduler = factory.getObject(); - - List> list = new ArrayList<>(); - for (String groupName : scheduler.getJobGroupNames()) { - for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher - .jobGroupEquals(groupName))) { - String jobName = jobKey.getName(); - String jobGroup = jobKey.getGroup(); - - JobDetail jd = scheduler.getJobDetail(jobKey); - - List triggers = (List) scheduler - .getTriggersOfJob(jobKey); - Map map = new HashMap<>(); - - if (triggers.size() > 0) { - //always get next run - Date nextFireTime = triggers.get(0).getNextFireTime(); - Date previousFireTime=triggers.get(0).getPreviousFireTime(); -// triggers.get(0).getScheduleBuilder() - - Trigger.TriggerState triggerState=scheduler.getTriggerState(triggers.get(0).getKey()); - - map.put("jobName", jobName); - map.put("groupName", jobGroup); - map.put("nextFireTime", nextFireTime.getTime()); - if (previousFireTime!=null) - map.put("previousFireTime",previousFireTime.getTime()); - else - map.put("previousFireTime",-1); - map.put("triggerState",triggerState); - - map.put("measure", (String) jd.getJobDataMap().get("measure")); - map.put("sourcePat",jd.getJobDataMap().getString("sourcePat")); - map.put("targetPat",jd.getJobDataMap().getString("targetPat")); - if(jd.getJobDataMap().getString("dataStartTimestamp")!=null && !jd.getJobDataMap().getString("dataStartTimestamp").equals("")) - map.put("dataStartTimestamp",jd.getJobDataMap().getString("dataStartTimestamp")); - map.put("jobStartTime",jd.getJobDataMap().getString("jobStartTime")); - map.put("periodTime",jd.getJobDataMap().getString("periodTime")); - list.add(map); - } - - } - } - return list; - } - - @RequestMapping(value = "/{groupName}/{jobName}", method = RequestMethod.DELETE) - public String removeTask(@PathVariable String groupName, @PathVariable String jobName) - throws SchedulerException { - Scheduler scheduler = factory.getObject(); - JobKey jobKey = new JobKey(jobName, groupName); - scheduler.deleteJob(jobKey); - return "task has been removed"; + return schedulerService.getJobs(); } - //@RequestMapping(value = "/add/{groupName}/{jobName}/{measureName}/{sourcePat}/{targetPat}") @RequestMapping(value = "/add/{groupName}/{jobName}/{measureName}", method = RequestMethod.POST) @ResponseBody @Produces(MediaType.APPLICATION_JSON) - public Boolean addTask(@PathVariable String groupName, + public boolean addJob(@PathVariable String groupName, @PathVariable String jobName, @PathVariable String measureName, @RequestBody SchedulerRequestBody schedulerRequestBody) { - //@RequestParam(value = "sourcePat", required = false) String sourcePat, - //@RequestParam(value = "targetPat", required = false) String targetPat -// int jobStartTime = 0; - Date jobStartTime=null; - int periodTime = 0; - SimpleDateFormat format=new SimpleDateFormat("YYYYMMdd HH:mm:ss"); - try{ - periodTime = Integer.parseInt(schedulerRequestBody.getPeriodTime()); - jobStartTime=format.parse(schedulerRequestBody.getJobStartTime()); - }catch (Exception e){ - LOGGER.info("jobStartTime or periodTime format error! "+e); - return false; - } - try { - Scheduler scheduler = factory.getObject(); - TriggerKey triggerKey = triggerKey(jobName, groupName); - - if (scheduler.checkExists(triggerKey)) { - scheduler.unscheduleJob(triggerKey); - } - JobKey jobKey = jobKey(jobName, groupName); - JobDetail jobDetail; - if (scheduler.checkExists(jobKey)) { - jobDetail = scheduler.getJobDetail(jobKey); - setJobDetail(jobDetail,schedulerRequestBody,measureName,groupName,jobName); - scheduler.addJob(jobDetail, true); - } else { - jobDetail = newJob(SparkSubmitJob.class) - .storeDurably() - .withIdentity(jobKey) - .build(); - - setJobDetail(jobDetail,schedulerRequestBody,measureName,groupName,jobName); - scheduler.addJob(jobDetail, false); - } - - Trigger trigger = newTrigger() - .withIdentity(triggerKey) - .forJob(jobDetail) - //hardcode to -// .withSchedule(CronScheduleBuilder.cronSchedule("0 0/1 0 * * ?")) - .withSchedule(SimpleScheduleBuilder.simpleSchedule() - .withIntervalInSeconds(periodTime) - .repeatForever()) - .startAt(jobStartTime) - .build(); - scheduler.scheduleJob(trigger); - - return true; - - } catch (SchedulerException e) { - LOGGER.error("", e); - return false; - } - } - - private void setJobDetail(JobDetail jobDetail,SchedulerRequestBody schedulerRequestBody,String measureName,String groupName,String jobName){ - jobDetail.getJobDataMap().put("measure", measureName); - jobDetail.getJobDataMap().put("sourcePat", schedulerRequestBody.getSourcePat()); - jobDetail.getJobDataMap().put("targetPat", schedulerRequestBody.getTargetPat()); - jobDetail.getJobDataMap().put("dataStartTimestamp", schedulerRequestBody.getDataStartTimestamp()); - jobDetail.getJobDataMap().put("jobStartTime", schedulerRequestBody.getJobStartTime()); - jobDetail.getJobDataMap().put("periodTime", schedulerRequestBody.getPeriodTime()); - jobDetail.getJobDataMap().put("lastTime", ""); - jobDetail.getJobDataMap().put("groupName",groupName); - jobDetail.getJobDataMap().put("jobName",jobName); + return schedulerService.addJob(groupName,jobName,measureName,schedulerRequestBody); } - @RequestMapping(value = "/groups/{group}/jobs/{name}", method = RequestMethod.DELETE) + @RequestMapping(value = "/del/{group}/{name}", method = RequestMethod.DELETE) public boolean deleteJob(@PathVariable String group, @PathVariable String name) { - try { - Scheduler scheduler = factory.getObject(); - scheduler.deleteJob(new JobKey(name, group)); - return true; - } catch (SchedulerException e) { - LOGGER.error(e.getMessage()); - return false; - } + return schedulerService.deleteJob(group,name); } - @RequestMapping("/instances/groups/{group}/jobs/{name}/{page}/{size}") - public List findInstancesOfJob(@PathVariable String group,@PathVariable String name,@PathVariable int page,@PathVariable int size){ - Pageable pageRequest=new PageRequest(page,size, Sort.Direction.DESC,"timestamp"); - return scheduleStateRepo.findByGroupNameAndJobName(group,name,pageRequest); + @RequestMapping("/instances/{group}/{jobName}/{page}/{size}") + public List findInstancesOfJob(@PathVariable String group,@PathVariable String jobName, + @PathVariable int page,@PathVariable int size){ + return schedulerService.findInstancesOfJob(group,jobName,page,size); } @RequestMapping("/statics") public JobHealth getHealthInfo() throws SchedulerException { - Scheduler scheduler=factory.getObject(); - int jobCount=scheduler.getJobGroupNames().size(); - int health=0; - int invalid=0; - for (String groupName : scheduler.getJobGroupNames()){ - for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))){ - String jobName=jobKey.getName(); - String jobGroup=jobKey.getGroup(); - Pageable pageRequest=new PageRequest(0,1, Sort.Direction.DESC,"timestamp"); - ScheduleState scheduleState=scheduleStateRepo.findByGroupNameAndJobName(jobGroup,jobName,pageRequest).get(0); - if(scheduleState.getState().equals("starting")){ - health++; - }else{ - invalid++; - } - } - } - JobHealth jobHealth=new JobHealth(health,invalid,jobCount); - return jobHealth; + return schedulerService.getHealthInfo(); } } diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerRequestBody.java b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerRequestBody.java index d33472f15..a5671b498 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerRequestBody.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerRequestBody.java @@ -65,4 +65,39 @@ public void setPeriodTime(String periodTime) { this.periodTime = periodTime; } + public SchedulerRequestBody(String sourcePat, String targetPat, String dataStartTimestamp, String jobStartTime, String periodTime) { + this.sourcePat = sourcePat; + this.targetPat = targetPat; + this.dataStartTimestamp = dataStartTimestamp; + this.jobStartTime = jobStartTime; + this.periodTime = periodTime; + } + + public SchedulerRequestBody() { + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SchedulerRequestBody that = (SchedulerRequestBody) o; + + if (sourcePat != null ? !sourcePat.equals(that.sourcePat) : that.sourcePat != null) return false; + if (targetPat != null ? !targetPat.equals(that.targetPat) : that.targetPat != null) return false; + if (dataStartTimestamp != null ? !dataStartTimestamp.equals(that.dataStartTimestamp) : that.dataStartTimestamp != null) + return false; + if (jobStartTime != null ? !jobStartTime.equals(that.jobStartTime) : that.jobStartTime != null) return false; + return periodTime != null ? periodTime.equals(that.periodTime) : that.periodTime == null; + } + + @Override + public int hashCode() { + int result = sourcePat != null ? sourcePat.hashCode() : 0; + result = 31 * result + (targetPat != null ? targetPat.hashCode() : 0); + result = 31 * result + (dataStartTimestamp != null ? dataStartTimestamp.hashCode() : 0); + result = 31 * result + (jobStartTime != null ? jobStartTime.hashCode() : 0); + result = 31 * result + (periodTime != null ? periodTime.hashCode() : 0); + return result; + } } diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerService.java b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerService.java new file mode 100644 index 000000000..5d57ba19f --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerService.java @@ -0,0 +1,24 @@ +package org.apache.griffin.core.schedule; + +import org.quartz.SchedulerException; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * Created by xiangrchen on 6/7/17. + */ +public interface SchedulerService { + public List> getJobs() throws SchedulerException; + + public Boolean addJob(String groupName,String jobName,String measureName,SchedulerRequestBody schedulerRequestBody); + + public Boolean deleteJob(String groupName,String jobName); + + public List findInstancesOfJob(String group,String name,int page,int size); + + public JobHealth getHealthInfo() throws SchedulerException; + + +} diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java new file mode 100644 index 000000000..c3b37eb3e --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java @@ -0,0 +1,210 @@ +/*- + * Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + */ + +package org.apache.griffin.core.schedule; + +import org.apache.commons.lang.StringUtils; +import org.apache.griffin.core.schedule.Repo.ScheduleStateRepo; +import org.quartz.*; +import org.quartz.impl.matchers.GroupMatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Sort; +import org.springframework.scheduling.quartz.SchedulerFactoryBean; +import org.springframework.stereotype.Service; + +import java.io.Serializable; +import java.text.SimpleDateFormat; +import java.util.*; + +import static org.quartz.JobBuilder.newJob; +import static org.quartz.JobKey.jobKey; +import static org.quartz.TriggerBuilder.newTrigger; +import static org.quartz.TriggerKey.triggerKey; + +@Service +public class SchedulerServiceImpl implements SchedulerService { + + private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerServiceImpl.class); + + @Autowired + private SchedulerFactoryBean factory; + @Autowired + private ScheduleStateRepo scheduleStateRepo; + + public static final String ACCURACY_BATCH_GROUP = "BA"; + + + @Override + public List> getJobs() throws SchedulerException { + Scheduler scheduler = factory.getObject(); + List> list = new ArrayList<>(); + for (String groupName : scheduler.getJobGroupNames()) { + for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher + .jobGroupEquals(groupName))) { + setJobsByKey(list,scheduler, jobKey); + } + } + return list; + } + + public void setJobsByKey(List> list,Scheduler scheduler, JobKey jobKey) throws SchedulerException { + List triggers = (List) scheduler.getTriggersOfJob(jobKey); + if (triggers.size() > 0) { + //always get next run + Map map = new HashMap<>(); + setMap(map,scheduler,jobKey); + list.add(map); + } + } + + + public void setMap(Map map,Scheduler scheduler,JobKey jobKey) throws SchedulerException { + List triggers = (List) scheduler.getTriggersOfJob(jobKey); + JobDetail jd = scheduler.getJobDetail(jobKey); + Date nextFireTime = triggers.get(0).getNextFireTime(); + Date previousFireTime=triggers.get(0).getPreviousFireTime(); + Trigger.TriggerState triggerState=scheduler.getTriggerState(triggers.get(0).getKey()); + + map.put("jobName", jobKey.getName()); + map.put("groupName", jobKey.getGroup()); + if (nextFireTime!=null){ + map.put("nextFireTime", nextFireTime.getTime()); + } + else { + map.put("nextFireTime", -1); + } + if (previousFireTime!=null) { + map.put("previousFireTime", previousFireTime.getTime()); + } + else { + map.put("previousFireTime", -1); + } + map.put("triggerState",triggerState); + map.put("measure", jd.getJobDataMap().getString("measure")); + map.put("sourcePat",jd.getJobDataMap().getString("sourcePat")); + map.put("targetPat",jd.getJobDataMap().getString("targetPat")); + if(StringUtils.isNotEmpty(jd.getJobDataMap().getString("dataStartTimestamp"))) { + map.put("dataStartTimestamp", jd.getJobDataMap().getString("dataStartTimestamp")); + } + map.put("jobStartTime",jd.getJobDataMap().getString("jobStartTime")); + map.put("periodTime",jd.getJobDataMap().getString("periodTime")); + } + + @Override + public Boolean addJob(String groupName, String jobName, String measureName, SchedulerRequestBody schedulerRequestBody) { + Date jobStartTime=null; + int periodTime = 0; + SimpleDateFormat format=new SimpleDateFormat("YYYYMMdd HH:mm:ss"); + try{ + periodTime = Integer.parseInt(schedulerRequestBody.getPeriodTime()); + jobStartTime=format.parse(schedulerRequestBody.getJobStartTime()); + }catch (Exception e){ + LOGGER.info("jobStartTime or periodTime format error! "+e); + return false; + } + try { + Scheduler scheduler = factory.getObject(); + TriggerKey triggerKey = triggerKey(jobName, groupName); + if (scheduler.checkExists(triggerKey)) { + scheduler.unscheduleJob(triggerKey); + } + JobKey jobKey = jobKey(jobName, groupName); + JobDetail jobDetail; + if (scheduler.checkExists(jobKey)) { + jobDetail = scheduler.getJobDetail(jobKey); + setJobDetail(jobDetail,schedulerRequestBody,measureName,groupName,jobName); + scheduler.addJob(jobDetail, true); + } else { + jobDetail = newJob(SparkSubmitJob.class) + .storeDurably() + .withIdentity(jobKey) + .build(); + setJobDetail(jobDetail,schedulerRequestBody,measureName,groupName,jobName); + scheduler.addJob(jobDetail, false); + } + Trigger trigger = newTrigger() + .withIdentity(triggerKey) + .forJob(jobDetail) +// .withSchedule(CronScheduleBuilder.cronSchedule("0 0/1 0 * * ?")) + .withSchedule(SimpleScheduleBuilder.simpleSchedule() + .withIntervalInSeconds(periodTime) + .repeatForever()) + .startAt(jobStartTime) + .build(); + scheduler.scheduleJob(trigger); + return true; + } catch (SchedulerException e) { + LOGGER.error("", e); + return false; + } + } + + public void setJobDetail(JobDetail jobDetail,SchedulerRequestBody schedulerRequestBody,String measureName,String groupName,String jobName){ + jobDetail.getJobDataMap().put("measure", measureName); + jobDetail.getJobDataMap().put("sourcePat", schedulerRequestBody.getSourcePat()); + jobDetail.getJobDataMap().put("targetPat", schedulerRequestBody.getTargetPat()); + jobDetail.getJobDataMap().put("dataStartTimestamp", schedulerRequestBody.getDataStartTimestamp()); + jobDetail.getJobDataMap().put("jobStartTime", schedulerRequestBody.getJobStartTime()); + jobDetail.getJobDataMap().put("periodTime", schedulerRequestBody.getPeriodTime()); + jobDetail.getJobDataMap().put("lastTime", ""); + jobDetail.getJobDataMap().put("groupName",groupName); + jobDetail.getJobDataMap().put("jobName",jobName); + } + + @Override + public Boolean deleteJob(String group, String name) { + try { + Scheduler scheduler = factory.getObject(); + scheduler.deleteJob(new JobKey(name, group)); + return true; + } catch (SchedulerException e) { + LOGGER.error(e.getMessage()); + return false; + } + } + + @Override + public List findInstancesOfJob(String group, String name, int page, int size) { + Pageable pageRequest=new PageRequest(page,size, Sort.Direction.DESC,"timestamp"); + return scheduleStateRepo.findByGroupNameAndJobName(group,name,pageRequest); + } + + @Override + public JobHealth getHealthInfo() throws SchedulerException { + Scheduler scheduler=factory.getObject(); + int jobCount=scheduler.getJobGroupNames().size(); + int health=0; + int invalid=0; + for (String groupName : scheduler.getJobGroupNames()){ + for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))){ + String jobName=jobKey.getName(); + String jobGroup=jobKey.getGroup(); + Pageable pageRequest=new PageRequest(0,1, Sort.Direction.DESC,"timestamp"); + ScheduleState scheduleState=scheduleStateRepo.findByGroupNameAndJobName(jobGroup,jobName,pageRequest).get(0); + if(scheduleState.getState().equals("starting")){ + health++; + }else{ + invalid++; + } + } + } + JobHealth jobHealth=new JobHealth(health,invalid,jobCount); + return jobHealth; + } +} diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java index 3f7c0be0c..0c5af115e 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.gson.Gson; +import org.apache.commons.lang.StringUtils; import org.apache.griffin.core.measure.DataConnector; import org.apache.griffin.core.measure.Measure; import org.apache.griffin.core.measure.repo.MeasureRepo; @@ -109,13 +110,13 @@ public void execute(JobExecutionContext context) { //prepare current system timestamp long currentSystemTimestamp = System.currentTimeMillis(); logger.info("currentSystemTimestamp: "+currentSystemTimestamp); - if (sourcePattern != null && !sourcePattern.isEmpty()) { + if (StringUtils.isNotEmpty(sourcePattern)) { sourcePatternItemSet = sourcePattern.split("-"); long currentTimstamp = setCurrentTimestamp(currentSystemTimestamp); setDataConnectorPartitions(measure.getSource(), sourcePatternItemSet, partitionItemSet, currentTimstamp); jd.getJobDataMap().put("lastTime", currentTimstamp + ""); } - if (targetPattern != null && !targetPattern.isEmpty()) { + if (StringUtils.isNotEmpty(targetPattern)) { targetPatternItemSet = targetPattern.split("-"); long currentTimstamp = setCurrentTimestamp(currentSystemTimestamp); setDataConnectorPartitions(measure.getTarget(), targetPatternItemSet, partitionItemSet, currentTimstamp); @@ -180,14 +181,14 @@ public void setDataConnectorPartitions(DataConnector dc, String[] patternItemSet public long setCurrentTimestamp(long currentSystemTimestamp) { long currentTimstamp=0; - if (eachJoblastTimestamp != null && !eachJoblastTimestamp.isEmpty()) { + if (StringUtils.isNotEmpty(eachJoblastTimestamp)) { try { currentTimstamp = Long.parseLong(eachJoblastTimestamp) + Integer.parseInt(periodTime) * 1000; }catch (Exception e){ logger.info("lasttime or periodTime format problem! "+e); } } else { - if (dataStartTimestamp != null && !dataStartTimestamp.isEmpty()) { + if (StringUtils.isNotEmpty(dataStartTimestamp)) { try{ currentTimstamp = Long.parseLong(dataStartTimestamp); }catch (Exception e){ @@ -235,8 +236,6 @@ public void setSparkJobDO() { sparkJobDO.setJars(jars); List files = new ArrayList<>(); -// files.add(props.getProperty("sparkJob.files_1")); - sparkJobDO.setFiles(files); } diff --git a/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java index e103c0cd6..3ceb6bba2 100644 --- a/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java +++ b/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java @@ -16,7 +16,9 @@ package org.apache.griffin.core.measure; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.griffin.core.measure.repo.MeasureRepo; +import org.apache.griffin.core.util.GriffinOperationMessage; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -26,15 +28,21 @@ import org.springframework.context.annotation.Bean; import org.springframework.test.context.junit4.SpringRunner; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + import static org.assertj.core.api.Assertions.fail; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - +import static org.mockito.BDDMockito.given; @RunWith(SpringRunner.class) public class MeasureServiceImplTest { @TestConfiguration - public static class HiveMetastoreServiceConfiguration{ + public static class MeasureServiceImplConfiguration{ @Bean public MeasureServiceImpl service(){ return new MeasureServiceImpl(); @@ -48,7 +56,6 @@ public MeasureServiceImpl service(){ @Before public void setup(){ - } @Test @@ -67,8 +74,123 @@ public void testGetMeasuresById(){ Measure tmp = service.getMeasuresById(1); assertTrue(true); }catch (Throwable t){ - fail("Cannot get all tables in db default"); + fail("Cannot get Measure in db By Id: 1"); + } + } + + @Test + public void testGetMeasuresByName(){ + try { + Measure tmp = service.getMeasuresByName("viewitem_hourly"); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot get Measure in db By name: viewitem_hourly"); + } + } + + @Test + public void testDeleteMeasuresById(){ + try { + service.deleteMeasuresById(1L); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot delete Measure in db By Id: 1"); + } + } + + @Test + public void testDeleteMeasuresByName(){ + try { + String measureName="viewitem_hourly"; + given(measureRepo.findByName(measureName)).willReturn(null); + GriffinOperationMessage message=service.deleteMeasuresByName("viewitem_hourly"); + assertEquals(message,GriffinOperationMessage.RESOURCE_NOT_FOUND); + assertTrue(true); + + String org="bullseye"; + Measure measure=createATestMeasure(measureName,org); + given(measureRepo.findByName(measureName)).willReturn(measure); + GriffinOperationMessage message1=service.deleteMeasuresByName("viewitem_hourly"); + assertEquals(message1,GriffinOperationMessage.DELETE_MEASURE_BY_NAME_SUCCESS); + }catch (Throwable t){ + fail("Cannot delete Measure in db By name: viewitem_hourly"); } } + @Test + public void testCreateNewMeasure(){ + try { + String measureName="viewitem_hourly"; + String org="bullseye"; + Measure measure=createATestMeasure(measureName,org); + given(measureRepo.findByName(measureName)).willReturn(null); + GriffinOperationMessage message=service.createNewMeasure(measure); + assertEquals(message,GriffinOperationMessage.CREATE_MEASURE_FAIL); + assertTrue(true); + + Measure measure1=createATestMeasure(measureName,"bullseye1"); + given(measureRepo.findByName(measureName)).willReturn(measure1); + GriffinOperationMessage message1=service.createNewMeasure(measure); + assertEquals(message1,GriffinOperationMessage.CREATE_MEASURE_FAIL_DUPLICATE); + + given(measureRepo.findByName(measureName)).willReturn(null); + given(measureRepo.save(measure)).willReturn(measure); + GriffinOperationMessage message2=service.createNewMeasure(measure); + assertEquals(message2,GriffinOperationMessage.CREATE_MEASURE_SUCCESS); + }catch (Throwable t){ + fail("Cannot create new measure viewitem_hourly"); + } + } + + @Test + public void testGetAllMeasureNameByOwner(){ + try { + String measureName="viewitem_hourly"; + String org="bullseye"; + Measure measure=createATestMeasure(measureName,org); + String owner="test1"; + given(measureRepo.findAll()).willReturn(Arrays.asList(measure)); + List namelist=service.getAllMeasureNameByOwner(owner); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot get all measure name by owner test1"); + } + } + + @Test + public void testUpdateMeasure(){ + try { + String measureName="viewitem_hourly"; + String org="bullseye"; + Measure measure=createATestMeasure(measureName,org); + GriffinOperationMessage message=service.updateMeasure(measure); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot create new measure viewitem_hourly"); + } + } + + private Measure createATestMeasure(String name,String org)throws IOException,Exception{ + HashMap configMap1; + configMap1 = new HashMap<>(); + configMap1.put("database","default"); + configMap1.put("table.name","test_data_src"); + HashMap configMap2=new HashMap<>(); + configMap2.put("database","default"); + configMap2.put("table.name","test_data_tgt"); + String configJson1 = new ObjectMapper().writeValueAsString(configMap1); + String configJson2 = new ObjectMapper().writeValueAsString(configMap2); + + DataConnector source = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson1); + DataConnector target = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson2); + + String rules = "$source.uage > 100 AND $source.uid = $target.uid AND $source.uage + 12 = $target.uage + 10 + 2 AND $source.udes + 11 = $target.udes + 1 + 1"; + + EvaluateRule eRule = new EvaluateRule(1,rules); + + Measure measure = new Measure(name,"bevssoj description", Measure.MearuseType.accuracy, org, source, target, eRule,"test1"); + + return measure; + } + } diff --git a/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreControllerTest.java b/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreControllerTest.java index 2f8801d13..a8482333e 100644 --- a/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreControllerTest.java +++ b/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreControllerTest.java @@ -35,7 +35,7 @@ public class HiveMetastoreControllerTest { private MockMvc mockMvc; @Mock - HiveMetastoreService hiveMetastoreService; + HiveMetastoreServiceImpl hiveMetastoreService; @InjectMocks private HiveMetastoreController hiveMetastoreController; diff --git a/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceTests.java b/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImplTest.java similarity index 94% rename from service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceTests.java rename to service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImplTest.java index 7444f098c..c7ffc6332 100644 --- a/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceTests.java +++ b/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImplTest.java @@ -38,13 +38,13 @@ @RunWith(SpringRunner.class) @TestPropertySource(properties = {"hive.metastore.uris=thrift://10.9.246.187:9083"}) -public class HiveMetastoreServiceTests { +public class HiveMetastoreServiceImplTest { @TestConfiguration public static class HiveMetastoreServiceConfiguration{ @Bean - public HiveMetastoreService service(){ - return new HiveMetastoreService(); + public HiveMetastoreServiceImpl service(){ + return new HiveMetastoreServiceImpl(); } @Value("${hive.metastore.uris}") @@ -67,7 +67,7 @@ public HiveMetaStoreClient client(){ } } - @Autowired private HiveMetastoreService service; + @Autowired private HiveMetastoreServiceImpl service; @Before diff --git a/service/src/test/java/org/apache/griffin/core/metastore/KafkaSchemaControllerTest.java b/service/src/test/java/org/apache/griffin/core/metastore/KafkaSchemaControllerTest.java index d50438bfd..5ef258047 100644 --- a/service/src/test/java/org/apache/griffin/core/metastore/KafkaSchemaControllerTest.java +++ b/service/src/test/java/org/apache/griffin/core/metastore/KafkaSchemaControllerTest.java @@ -37,7 +37,7 @@ public class KafkaSchemaControllerTest { private MockMvc mockMvc; @Mock - KafkaSchemaService kafkaSchemaService; + KafkaSchemaServiceImpl kafkaSchemaService; @InjectMocks private KafkaSchemaController kafkaSchemaController; diff --git a/service/src/test/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImplTest.java new file mode 100644 index 000000000..258725a7a --- /dev/null +++ b/service/src/test/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImplTest.java @@ -0,0 +1,61 @@ +/*- + * Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + */ + +package org.apache.griffin.core.metastore; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@TestPropertySource(properties = {"kafka.schema.registry.url = http://10.65.159.119:8081"}) +public class KafkaSchemaServiceImplTest { + @TestConfiguration + public static class KafkaSchemaServiceConfiguration { + @Bean + public KafkaSchemaServiceImpl service() { + return new KafkaSchemaServiceImpl(); + } + + @Value("${kafka.schema.registry.url}") + String urls; + } + + @Autowired + private KafkaSchemaServiceImpl service; + + @Before + public void setup(){ + + } + + @Test + public void testGetSchemaString(){ +// try { +// SchemaString tmp = service.getSchemaString(1); +// assertTrue(true); +// }catch (Throwable t){ +// fail("Cannot get all tables from all dbs"); +// } + } + + +} diff --git a/service/src/test/java/org/apache/griffin/core/metric/MetricServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/metric/MetricServiceImplTest.java new file mode 100644 index 000000000..671808afd --- /dev/null +++ b/service/src/test/java/org/apache/griffin/core/metric/MetricServiceImplTest.java @@ -0,0 +1,49 @@ +package org.apache.griffin.core.metric; + +import org.apache.griffin.core.measure.repo.MeasureRepo; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.fail; +import static org.junit.Assert.assertTrue; + +/** + * Created by xiangrchen on 6/7/17. + */ +@RunWith(SpringRunner.class) +public class MetricServiceImplTest { + @TestConfiguration + static class MetricServiceConfiguration{ + @Bean + public MetricServiceImpl service(){ + return new MetricServiceImpl(); + } + } + + @MockBean + private MeasureRepo measureRepo; + + @Autowired + private MetricServiceImpl service; + + @Before + public void setup(){ + } + + @Test + public void testGetOrgByMeasureName(){ + try { + String measureName="viewitem_hourly"; + String tmp = service.getOrgByMeasureName(measureName); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot get org by measure name viewitem_hourly"); + } + } +} diff --git a/service/src/test/java/org/apache/griffin/core/schedule/SchedulerControllerTest.java b/service/src/test/java/org/apache/griffin/core/schedule/SchedulerControllerTest.java new file mode 100644 index 000000000..c4eb3eccb --- /dev/null +++ b/service/src/test/java/org/apache/griffin/core/schedule/SchedulerControllerTest.java @@ -0,0 +1,104 @@ +package org.apache.griffin.core.schedule; + +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.http.MediaType; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.servlet.MockMvc; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.mockito.BDDMockito.given; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +/** + * Created by xiangrchen on 6/7/17. + */ +@RunWith(SpringRunner.class) +@WebMvcTest(value = SchedulerController.class,secure = false) +public class SchedulerControllerTest { + @Autowired + private MockMvc mvc; + + @MockBean + private SchedulerService service; + + @Before + public void setup(){ + } + + @Test + public void testGetJobs() throws Exception { + Map map=new HashMap(); + map.put("jobName", "job1"); + map.put("groupName", "BA"); + given(service.getJobs()).willReturn(Arrays.asList(map)); + + mvc.perform(get("/jobs/").contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.[0].jobName",is("job1"))) + ; + } + + @Test + public void testAddJob() throws Exception { + String groupName="BA"; + String jobName="job1"; + String measureName="viewitem_hourly"; + SchedulerRequestBody schedulerRequestBody=new SchedulerRequestBody("YYYYMMdd-HH","YYYYMMdd-HH","111","20170607","100"); + ObjectMapper mapper=new ObjectMapper(); + String schedulerRequestBodyJson=mapper.writeValueAsString(schedulerRequestBody); + given(service.addJob(groupName,jobName,measureName,schedulerRequestBody)).willReturn(true); + + mvc.perform(post("/jobs/add/BA/job1/viewitem_hourly").contentType(MediaType.APPLICATION_JSON).content(schedulerRequestBodyJson)) + .andExpect(status().isOk()) + .andExpect(jsonPath("$",is(true))) + ; + } + + @Test + public void testDeleteJob() throws Exception { + String groupName="BA"; + String jobName="job1"; + given(service.deleteJob(groupName,jobName)).willReturn(true); + mvc.perform(delete("/jobs/del/BA/job1").contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(jsonPath("$",is(true))) + ; + } + + @Test + public void testFindInstancesOfJob() throws Exception { + String group="BA"; + String job="job1"; + int page=0; + int size=2; + ScheduleState scheduleState=new ScheduleState(group, job, 1, "NORMAL", "", System.currentTimeMillis()); + given(service.findInstancesOfJob(group,job,page,size)).willReturn(Arrays.asList(scheduleState)); + mvc.perform(get("/jobs/instances/BA/job1/0/2").contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.[0].groupName",is("BA"))) + ; + } + + @Test + public void testGetHealthInfo() throws Exception { + JobHealth jobHealth=new JobHealth(1,2,3); + given(service.getHealthInfo()).willReturn(jobHealth); + mvc.perform(get("/jobs/statics").contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.health",is(1))) + ; + } +} diff --git a/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java new file mode 100644 index 000000000..20b781c85 --- /dev/null +++ b/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java @@ -0,0 +1,172 @@ +/*- + * Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + */ + +package org.apache.griffin.core.schedule; + +import org.apache.griffin.core.schedule.Repo.ScheduleStateRepo; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.quartz.*; +import org.quartz.impl.triggers.CronTriggerImpl; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.quartz.SchedulerFactoryBean; +import org.springframework.test.context.junit4.SpringRunner; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.BDDMockito.given; + +@RunWith(SpringRunner.class) +public class SchedulerServiceImplTest { + + @TestConfiguration +// @ComponentScan("org.apache.griffin.core.schedule") + public static class SchedulerServiceConfiguration{ + @Bean + public SchedulerServiceImpl service(){ + return new SchedulerServiceImpl(); + } + @Bean + public SchedulerFactoryBean factoryBean(){ + return new SchedulerFactoryBean(); + } +// @Bean +// public SchedulerConfig schedulerConfig(){ +// return new SchedulerConfig(); +// } + } + + @MockBean + private ScheduleStateRepo scheduleStateRepo; + + + @MockBean + private SchedulerFactoryBean factory; + + @Autowired + private SchedulerServiceImpl service; + + @Before + public void setup(){ + } + + @Test + public void testGetJobs(){ + try { + Scheduler scheduler=Mockito.mock(Scheduler.class); + given(factory.getObject()).willReturn(scheduler); + List> tmp = service.getJobs(); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot get all jobs info from dbs"); + } + } + + @Test + public void testSetJobsByKey() throws SchedulerException { + List> list=new ArrayList>(); + Scheduler scheduler=Mockito.mock(Scheduler.class); + JobKey jobKey= new JobKey("TEST"); + List triggers=new ArrayList(); + Trigger trigger=new CronTriggerImpl(); + triggers.add(trigger); + given((List) scheduler.getTriggersOfJob(jobKey)).willReturn(triggers); + + JobDetail jd=Mockito.mock(JobDetail.class); + given(scheduler.getJobDetail(jobKey)).willReturn(jd); + + JobDataMap jobDataMap=Mockito.mock(JobDataMap.class); + given(jd.getJobDataMap()).willReturn(jobDataMap); + + service.setJobsByKey(list,scheduler,jobKey); + + } + + @Test + public void testAddJob(){ + try { + String groupName="BA"; + String jobName="job1"; + String measureName="m1"; + SchedulerRequestBody schedulerRequestBody=new SchedulerRequestBody(); + Scheduler scheduler=Mockito.mock(Scheduler.class); + given(factory.getObject()).willReturn(scheduler); + Boolean tmp = service.addJob(groupName,jobName,measureName,schedulerRequestBody); + assertEquals(tmp,false); + assertTrue(true); + + SchedulerRequestBody schedulerRequestBody1=new SchedulerRequestBody("YYYYMMdd-HH","YYYYMMdd-HH", + System.currentTimeMillis()+"","20170605 15:29:30","1000"); + Scheduler scheduler1=Mockito.mock(Scheduler.class); + given(factory.getObject()).willReturn(scheduler1); + Boolean tmp1 = service.addJob(groupName,jobName,measureName,schedulerRequestBody1); + assertEquals(tmp1,true); + }catch (Throwable t){ + fail("Cannot add job "); + } + } + + @Test + public void testDeleteJob(){ + try { + String groupName="BA"; + String jobName="job1"; + Scheduler scheduler=Mockito.mock(Scheduler.class); + given(factory.getObject()).willReturn(scheduler); + Boolean tmp = service.deleteJob(groupName,jobName); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot delete job"); + } + } + + @Test + public void testFindInstancesOfJob(){ + try { + String groupName="BA"; + String jobName="job1"; + int page=0; + int size=2; + List tmp = service.findInstancesOfJob(groupName,jobName,page,size); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot find instances of Job"); + } + } + + @Test + public void testGetHealthInfo(){ + try { + Scheduler scheduler=Mockito.mock(Scheduler.class); + given(factory.getObject()).willReturn(scheduler); + JobHealth tmp = service.getHealthInfo(); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot find instances of Job"); + } + } + +} diff --git a/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java b/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java index 97789af25..ce59858c5 100644 --- a/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java +++ b/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java @@ -42,10 +42,6 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; -/** - * Created by xiangrchen on 5/8/17. - */ -//@RunWith(MockitoJUnitRunner.class) @RunWith(PowerMockRunner.class) @PrepareForTest(SparkSubmitJob.class) public class SparkSubmitJobTest{ diff --git a/service/src/test/resources/application-test.properties b/service/src/test/resources/application-test.properties new file mode 100644 index 000000000..1af0ac97e --- /dev/null +++ b/service/src/test/resources/application-test.properties @@ -0,0 +1,58 @@ +#spring.datasource.url= jdbc:mysql://localhost:3306/griffintest?autoReconnect=true&useSSL=false +#spring.datasource.username =root +#spring.datasource.password =cxr123 +# +#spring.datasource.driver-class-name=com.mysql.jdbc.Driver +# +### Hibernate ddl auto (validate,create, create-drop, update) +# +#spring.jpa.hibernate.ddl-auto = update +#spring.jpa.show-sql=true +#spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect +# +##spring.jpa.hibernate.ddl-auto = create-drop +##spring.jpa.database = HSQL +##spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.HSQLDialect +##spring.datasource.driverClassName = org.hsqldb.jdbcDriver +##spring.datasource.url: jdbc:hsqldb:mem:scratchdb +##spring.datasource.username = sa +##spring.datasource.password = + +#spring.jpa.database: HSQL +# +## +## +### Naming strategy +#spring.jpa.hibernate.naming-strategy = org.hibernate.cfg.ImprovedNamingStrategy +# +# hive metastore +#hive.metastore.uris = thrift://10.9.246.187:9083 +#hive.metastore.dbname = default +# +## kafka schema registry +#kafka.schema.registry.url = http://10.65.159.119:8081 + +key=test + +spring.datasource.url= jdbc:mysql://localhost:3306/quartz?autoReconnect=true&useSSL=false +spring.datasource.username =griffin +spring.datasource.password =123456 + +spring.datasource.driver-class-name=com.mysql.jdbc.Driver + +## Hibernate ddl auto (validate,create, create-drop, update) + +spring.jpa.hibernate.ddl-auto = update +spring.jpa.show-sql=true +spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect +# +# +## Naming strategy +spring.jpa.hibernate.naming-strategy = org.hibernate.cfg.ImprovedNamingStrategy + +# hive metastore +hive.metastore.uris = thrift://10.9.246.187:9083 +hive.metastore.dbname = default + +# kafka schema registry +kafka.schema.registry.url = http://10.65.159.119:8081 \ No newline at end of file diff --git a/service/src/test/resources/quartz.properties b/service/src/test/resources/quartz.properties new file mode 100644 index 000000000..a7c07ab98 --- /dev/null +++ b/service/src/test/resources/quartz.properties @@ -0,0 +1,11 @@ +org.quartz.scheduler.instanceName=spring-boot-quartz +org.quartz.scheduler.instanceId=AUTO +org.quartz.threadPool.threadCount=5 +org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX +org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate +org.quartz.jobStore.useProperties=true +org.quartz.jobStore.misfireThreshold=60000 +org.quartz.jobStore.tablePrefix=QRTZ_ + +org.quartz.jobStore.isClustered=true +org.quartz.jobStore.clusterCheckinInterval=20000 \ No newline at end of file From b1310765ecd77df8dab48c4f4bfde97568d6603a Mon Sep 17 00:00:00 2001 From: Chen Date: Fri, 9 Jun 2017 17:58:36 +0800 Subject: [PATCH 05/12] version 1 --- .../org/apache/griffin/core/metastore/HiveMetastoreProxy.java | 3 +++ service/src/main/resources/application-dev.properties | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreProxy.java b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreProxy.java index 6330cd157..c40c20e50 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreProxy.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreProxy.java @@ -24,6 +24,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; + @Component public class HiveMetastoreProxy { @@ -51,6 +53,7 @@ public HiveMetaStoreClient initHiveMetastoreClient(){ return client; } + @PreDestroy public void destroy() throws Exception { if(null!=client) client.close(); } diff --git a/service/src/main/resources/application-dev.properties b/service/src/main/resources/application-dev.properties index 12f8865ce..9574b387b 100644 --- a/service/src/main/resources/application-dev.properties +++ b/service/src/main/resources/application-dev.properties @@ -6,7 +6,7 @@ spring.datasource.driver-class-name=com.mysql.jdbc.Driver ## Hibernate ddl auto (validate,create, create-drop, update) -spring.jpa.hibernate.ddl-auto = create-drop +spring.jpa.hibernate.ddl-auto = update spring.jpa.show-sql=true spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect # From 4d9277f0e3336ea5b2313fe643da6f442a70594b Mon Sep 17 00:00:00 2001 From: Chen Date: Mon, 12 Jun 2017 16:30:58 +0800 Subject: [PATCH 06/12] test --- service/pom.xml | 6 ++ .../metastore/HiveMetastoreController.java | 18 ++--- .../core/metastore/HiveMetastoreService.java | 12 +-- .../metastore/HiveMetastoreServiceImpl.java | 43 +++++++++-- .../metastore/KafkaSchemaServiceImpl.java | 13 ++-- .../core/schedule/SchedulerServiceImpl.java | 2 +- .../src/main/resources/sparkJob.properties | 4 +- .../core/measure/MeasureServiceImplTest.java | 4 - .../HiveMetastoreServiceImplTest.java | 36 ++------- .../metastore/KafkaSchemaServiceImplTest.java | 76 +++++++++++++++---- .../schedule/SchedulerServiceImplTest.java | 19 ++++- 11 files changed, 154 insertions(+), 79 deletions(-) diff --git a/service/pom.xml b/service/pom.xml index efbd91a68..c9847ccf2 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -73,6 +73,12 @@ spring-boot-starter-data-jpa + + org.springframework.retry + spring-retry + 1.1.5.RELEASE + + diff --git a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreController.java b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreController.java index 4fd7dbe85..911ccb851 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreController.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreController.java @@ -16,7 +16,9 @@ package org.apache.griffin.core.metastore; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; @@ -33,39 +35,37 @@ public class HiveMetastoreController { HiveMetastoreServiceImpl hiveMetastoreService; @RequestMapping("/db") - public Iterable getAllDatabases() { + public Iterable getAllDatabases() throws MetaException{ return hiveMetastoreService.getAllDatabases(); } @RequestMapping("/table") - public Iterable getDefAllTables() { + public Iterable getDefAllTables() throws MetaException{ return hiveMetastoreService.getAllTableNames(""); } @RequestMapping("/{db}/table") - public Iterable getAllTableNamess(@PathVariable("db") String dbName) { + public Iterable getAllTableNamess(@PathVariable("db") String dbName) throws MetaException{ return hiveMetastoreService.getAllTableNames(dbName); } @RequestMapping("/{db}/alltables") - public List
getAllTables(@PathVariable("db") String dbName) { + public List
getAllTables(@PathVariable("db") String dbName) throws TException { return hiveMetastoreService.getAllTable(dbName); } @RequestMapping("/alltables") - public Map> getAllTables() { + public Map> getAllTables() throws TException{ return hiveMetastoreService.getAllTable(); } @RequestMapping("/table/{table}") - public Table getDefTable(@PathVariable("table") String tableName) { + public Table getDefTable(@PathVariable("table") String tableName) throws TException{ return hiveMetastoreService.getTable("", tableName); } @RequestMapping("/{db}/table/{table}") - public Table getTable(@PathVariable("db") String dbName, @PathVariable("table") String tableName) { + public Table getTable(@PathVariable("db") String dbName, @PathVariable("table") String tableName) throws TException{ return hiveMetastoreService.getTable(dbName, tableName); } - - } diff --git a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreService.java b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreService.java index ba4f4bfc5..40d6610d0 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreService.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreService.java @@ -15,21 +15,23 @@ package org.apache.griffin.core.metastore; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; import java.util.List; import java.util.Map; public interface HiveMetastoreService { - public Iterable getAllDatabases(); + public Iterable getAllDatabases() throws MetaException; - public Iterable getAllTableNames(String dbName); + public Iterable getAllTableNames(String dbName) throws MetaException; - public List
getAllTable(String db); + public List
getAllTable(String db) throws TException; - public Map> getAllTable(); + public Map> getAllTable() throws TException; - public Table getTable(String dbName, String tableName); + public Table getTable(String dbName, String tableName) throws TException; } diff --git a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImpl.java index 1b48cf4fb..ef8db9376 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImpl.java @@ -18,10 +18,13 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; @@ -47,32 +50,44 @@ private String getUseDbName(String dbName) { else return dbName; } + @Retryable(value = { MetaException.class }, + maxAttempts = 2, + backoff = @Backoff(delay = 5000)) @Override - public Iterable getAllDatabases() { + public Iterable getAllDatabases() throws MetaException{ Iterable results = null; try { results = client.getAllDatabases(); } catch (MetaException e) { + reconnect(); log.error("Can not get databases : ",e.getMessage()); + throw e; } return results; } + @Retryable(value = { MetaException.class }, + maxAttempts = 2, + backoff = @Backoff(delay = 5000)) @Override - public Iterable getAllTableNames(String dbName) { + public Iterable getAllTableNames(String dbName) throws MetaException{ Iterable results = null; String useDbName = getUseDbName(dbName); try { results = client.getAllTables(useDbName); - client.reconnect(); } catch (Exception e) { + reconnect(); log.warn("Exception fetching tables info" + e.getMessage()); + throw e; } return results; } + @Retryable(value = { TException.class }, + maxAttempts = 2, + backoff = @Backoff(delay = 5000)) @Override - public List
getAllTable(String db){ + public List
getAllTable(String db) throws TException{ List
results = new ArrayList
(); String useDbName = getUseDbName(db); try { @@ -82,13 +97,18 @@ public List
getAllTable(String db){ results.add(tmp); } } catch (Exception e) { + reconnect(); log.warn("Exception fetching tables info" + e.getMessage()); + throw e; } return results; } + @Retryable(value = { TException.class }, + maxAttempts = 2, + backoff = @Backoff(delay = 5000)) @Override - public Map> getAllTable(){ + public Map> getAllTable() throws TException{ Map> results = new HashMap>(); Iterable dbs = getAllDatabases(); for(String db: dbs){ @@ -101,24 +121,33 @@ public Map> getAllTable(){ alltables.add(tmp); } } catch (Exception e) { + reconnect(); log.warn("Exception fetching tables info" + e.getMessage()); + throw e; } results.put(db,alltables); } return results; } + @Retryable(value = { TException.class }, + maxAttempts = 2, + backoff = @Backoff(delay = 5000)) @Override - public Table getTable(String dbName, String tableName) { + public Table getTable(String dbName, String tableName) throws TException{ Table result = null; String useDbName = getUseDbName(dbName); try { result = client.getTable(useDbName, tableName); } catch (Exception e) { + reconnect(); log.warn("Exception fetching table info : " +tableName + " : " + e.getMessage()); + throw e; } return result; } - + private void reconnect() throws MetaException{ + client.reconnect(); + } } diff --git a/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImpl.java index 74b60e9c3..87209c2a4 100644 --- a/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImpl.java @@ -36,6 +36,7 @@ public class KafkaSchemaServiceImpl implements KafkaSchemaService{ @Value("${kafka.schema.registry.url}") private String url; + RestTemplate restTemplate = new RestTemplate(); private String registryUrl(final String path) { if (StringUtils.hasText(path)) { @@ -52,7 +53,7 @@ public SchemaString getSchemaString(Integer id) { String regUrl = registryUrl(path); SchemaString result = null; try { - RestTemplate restTemplate = new RestTemplate(); +// RestTemplate restTemplate = new RestTemplate(); ResponseEntity res = restTemplate.getForEntity(regUrl, SchemaString.class); result = res.getBody(); } catch (Exception e) { @@ -67,7 +68,7 @@ public Iterable getSubjects() { String regUrl = registryUrl(path); Iterable result = null; try { - RestTemplate restTemplate = new RestTemplate(); +// RestTemplate restTemplate = new RestTemplate(); ResponseEntity res = restTemplate.getForEntity(regUrl, String[].class); result = Arrays.asList(res.getBody()); } catch (Exception e) { @@ -82,7 +83,7 @@ public Iterable getSubjectVersions(String subject) { String regUrl = registryUrl(path); Iterable result = null; try { - RestTemplate restTemplate = new RestTemplate(); +// RestTemplate restTemplate = new RestTemplate(); ResponseEntity res = restTemplate.getForEntity(regUrl, Integer[].class); result = Arrays.asList(res.getBody()); } catch (Exception e) { @@ -97,7 +98,7 @@ public Schema getSubjectSchema(String subject, String version) { String regUrl = registryUrl(path); Schema result = null; try { - RestTemplate restTemplate = new RestTemplate(); +// RestTemplate restTemplate = new RestTemplate(); ResponseEntity res = restTemplate.getForEntity(regUrl, Schema.class); result = res.getBody(); } catch (Exception e) { @@ -112,7 +113,7 @@ public Config getTopLevelConfig() { String regUrl = registryUrl(path); Config result = null; try { - RestTemplate restTemplate = new RestTemplate(); +// RestTemplate restTemplate = new RestTemplate(); ResponseEntity res = restTemplate.getForEntity(regUrl, Config.class); result = res.getBody(); @@ -128,7 +129,7 @@ public Config getSubjectLevelConfig(String subject) { String regUrl = registryUrl(path); Config result = null; try { - RestTemplate restTemplate = new RestTemplate(); +// RestTemplate restTemplate = new RestTemplate(); ResponseEntity res = restTemplate.getForEntity(regUrl, Config.class); result = res.getBody(); } catch (Exception e) { diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java index c3b37eb3e..b15395a04 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java @@ -38,7 +38,7 @@ import static org.quartz.TriggerKey.triggerKey; @Service -public class SchedulerServiceImpl implements SchedulerService { +public class SchedulerServiceImpl implements SchedulerService{ private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerServiceImpl.class); diff --git a/service/src/main/resources/sparkJob.properties b/service/src/main/resources/sparkJob.properties index 0d2cb27e7..518f2e4c2 100644 --- a/service/src/main/resources/sparkJob.properties +++ b/service/src/main/resources/sparkJob.properties @@ -13,5 +13,5 @@ sparkJob.jars_1=hdfs:///livy/datanucleus-api-jdo-3.2.6.jar sparkJob.jars_2=hdfs:///livy/datanucleus-core-3.2.10.jar sparkJob.jars_3=hdfs:///livy/datanucleus-rdbms-3.2.9.jar sparkJob.dateAndHour=dt,hour -sparkJob.uri=http://localhost:8998/batches -#sparkJob.uri=http://10.9.246.187:8998/batches \ No newline at end of file +#sparkJob.uri=http://localhost:8998/batches +sparkJob.uri=http://10.9.246.187:8998/batches \ No newline at end of file diff --git a/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java index 3ceb6bba2..66ff5fe03 100644 --- a/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java +++ b/service/src/test/java/org/apache/griffin/core/measure/MeasureServiceImplTest.java @@ -180,16 +180,12 @@ private Measure createATestMeasure(String name,String org)throws IOException,Exc configMap2.put("table.name","test_data_tgt"); String configJson1 = new ObjectMapper().writeValueAsString(configMap1); String configJson2 = new ObjectMapper().writeValueAsString(configMap2); - DataConnector source = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson1); DataConnector target = new DataConnector(DataConnector.ConnectorType.HIVE, "1.2", configJson2); String rules = "$source.uage > 100 AND $source.uid = $target.uid AND $source.uage + 12 = $target.uage + 10 + 2 AND $source.udes + 11 = $target.udes + 1 + 1"; - EvaluateRule eRule = new EvaluateRule(1,rules); - Measure measure = new Measure(name,"bevssoj description", Measure.MearuseType.accuracy, org, source, target, eRule,"test1"); - return measure; } diff --git a/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImplTest.java index c7ffc6332..ae1757ed0 100644 --- a/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImplTest.java +++ b/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImplTest.java @@ -15,18 +15,15 @@ package org.apache.griffin.core.metastore; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.annotation.Bean; -import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit4.SpringRunner; import java.util.List; @@ -37,7 +34,7 @@ @RunWith(SpringRunner.class) -@TestPropertySource(properties = {"hive.metastore.uris=thrift://10.9.246.187:9083"}) +//@TestPropertySource(properties = {"hive.metastore.uris=thrift://10.9.246.187:9083"}) public class HiveMetastoreServiceImplTest { @TestConfiguration @@ -46,29 +43,13 @@ public static class HiveMetastoreServiceConfiguration{ public HiveMetastoreServiceImpl service(){ return new HiveMetastoreServiceImpl(); } - - @Value("${hive.metastore.uris}") - String urls; - @Bean - public HiveMetaStoreClient client(){ - - HiveMetaStoreClient client = null; - HiveConf hiveConf = new HiveConf(); - hiveConf.set("hive.metastore.local", "false"); - hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); - hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, urls); - try { - client= new HiveMetaStoreClient(hiveConf); - } catch (MetaException e) { - client = null; - } - - return client; - - } } - @Autowired private HiveMetastoreServiceImpl service; + @MockBean + private HiveMetaStoreClient client; + + @Autowired + private HiveMetastoreServiceImpl service; @Before public void setup(){ @@ -76,8 +57,7 @@ public void setup(){ } @Test - public void testGetAllTables(){ - + public void testGetAllDatabases(){ try { Iterable tmp = service.getAllDatabases(); assertTrue(true); diff --git a/service/src/test/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImplTest.java index 258725a7a..56a0a8cea 100644 --- a/service/src/test/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImplTest.java +++ b/service/src/test/java/org/apache/griffin/core/metastore/KafkaSchemaServiceImplTest.java @@ -15,18 +15,23 @@ package org.apache.griffin.core.metastore; +import io.confluent.kafka.schemaregistry.client.rest.entities.Config; +import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.web.client.RestTemplate; + +import static org.assertj.core.api.Assertions.fail; +import static org.junit.Assert.assertTrue; @RunWith(SpringRunner.class) -@TestPropertySource(properties = {"kafka.schema.registry.url = http://10.65.159.119:8081"}) public class KafkaSchemaServiceImplTest { @TestConfiguration public static class KafkaSchemaServiceConfiguration { @@ -34,9 +39,6 @@ public static class KafkaSchemaServiceConfiguration { public KafkaSchemaServiceImpl service() { return new KafkaSchemaServiceImpl(); } - - @Value("${kafka.schema.registry.url}") - String urls; } @Autowired @@ -44,18 +46,66 @@ public KafkaSchemaServiceImpl service() { @Before public void setup(){ - + service.restTemplate= Mockito.mock(RestTemplate.class); } @Test public void testGetSchemaString(){ -// try { -// SchemaString tmp = service.getSchemaString(1); -// assertTrue(true); -// }catch (Throwable t){ -// fail("Cannot get all tables from all dbs"); -// } + try { + SchemaString tmp = service.getSchemaString(1); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot get all tables from all dbs"); + } + } + + @Test + public void testGetSubjects(){ + try { + Iterable tmp = service.getSubjects(); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot get all tables from all dbs"); + } + } + + @Test + public void testGetSubjectVersions(){ + try { + Iterable tmp = service.getSubjectVersions(""); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot get all tables from all dbs"); + } } + @Test + public void testGetSubjectSchema(){ + try { + Schema tmp = service.getSubjectSchema("",""); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot get all tables from all dbs"); + } + } + @Test + public void testGetTopLevelConfig(){ + try { + Config tmp = service.getTopLevelConfig(); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot get all tables from all dbs"); + } + } + + @Test + public void testGetSubjectLevelConfig(){ + try { + Config tmp = service.getSubjectLevelConfig(""); + assertTrue(true); + }catch (Throwable t){ + fail("Cannot get all tables from all dbs"); + } + } } diff --git a/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java index 20b781c85..10ff6da9d 100644 --- a/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java +++ b/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java @@ -21,18 +21,20 @@ import org.junit.runner.RunWith; import org.mockito.Mockito; import org.quartz.*; +import org.quartz.impl.matchers.GroupMatcher; import org.quartz.impl.triggers.CronTriggerImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.annotation.Bean; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Sort; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.test.context.junit4.SpringRunner; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; import static org.assertj.core.api.Assertions.fail; import static org.junit.Assert.assertEquals; @@ -162,10 +164,19 @@ public void testGetHealthInfo(){ try { Scheduler scheduler=Mockito.mock(Scheduler.class); given(factory.getObject()).willReturn(scheduler); + given(scheduler.getJobGroupNames()).willReturn(Arrays.asList("BA")); + JobKey jobKey= new JobKey("TEST"); + Set jobKeySet=new HashSet(); + jobKeySet.add(jobKey); + given(scheduler.getJobKeys(GroupMatcher.jobGroupEquals("BA"))).willReturn((jobKeySet)); + + Pageable pageRequest=new PageRequest(0,1, Sort.Direction.DESC,"timestamp"); + List scheduleStateList=new ArrayList(); + given(scheduleStateRepo.findByGroupNameAndJobName(jobKey.getGroup(),jobKey.getName(),pageRequest)).willReturn(scheduleStateList); JobHealth tmp = service.getHealthInfo(); assertTrue(true); }catch (Throwable t){ - fail("Cannot find instances of Job"); + fail("Cannot get Health info "+t); } } From 4f39a1878c6f6d7f11bd405771fb123ba5fea035 Mon Sep 17 00:00:00 2001 From: Chen Date: Mon, 12 Jun 2017 18:25:55 +0800 Subject: [PATCH 07/12] schedulerServiceImple UT --- .../core/schedule/SchedulerServiceImpl.java | 14 +++++++++----- .../metastore/HiveMetastoreServiceImplTest.java | 11 ++++++++++- .../core/schedule/SchedulerServiceImplTest.java | 5 ----- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java index b15395a04..7c70623d5 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java @@ -196,11 +196,15 @@ public JobHealth getHealthInfo() throws SchedulerException { String jobName=jobKey.getName(); String jobGroup=jobKey.getGroup(); Pageable pageRequest=new PageRequest(0,1, Sort.Direction.DESC,"timestamp"); - ScheduleState scheduleState=scheduleStateRepo.findByGroupNameAndJobName(jobGroup,jobName,pageRequest).get(0); - if(scheduleState.getState().equals("starting")){ - health++; - }else{ - invalid++; + ScheduleState scheduleState=new ScheduleState(); + if (scheduleStateRepo.findByGroupNameAndJobName(jobGroup,jobName,pageRequest)!=null + &&scheduleStateRepo.findByGroupNameAndJobName(jobGroup,jobName,pageRequest).size()>0){ + scheduleState=scheduleStateRepo.findByGroupNameAndJobName(jobGroup,jobName,pageRequest).get(0); + if(scheduleState.getState().equals("starting")){ + health++; + }else{ + invalid++; + } } } } diff --git a/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImplTest.java index ae1757ed0..9934e5de6 100644 --- a/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImplTest.java +++ b/service/src/test/java/org/apache/griffin/core/metastore/HiveMetastoreServiceImplTest.java @@ -26,11 +26,14 @@ import org.springframework.context.annotation.Bean; import org.springframework.test.context.junit4.SpringRunner; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import static org.assertj.core.api.Assertions.fail; import static org.junit.Assert.assertTrue; +import static org.mockito.BDDMockito.given; @RunWith(SpringRunner.class) @@ -80,7 +83,9 @@ public void testGetAllTableNames(){ @Test public void testGetAllTableByDBName(){ try { - List
tmp = service.getAllTable("default"); + String useDbName="default"; + given(client.getAllTables(useDbName)).willReturn(Arrays.asList("cout","cout1")); + List
tmp = service.getAllTable(useDbName); assertTrue(true); }catch (Throwable t){ fail("Cannot get all tables in default db"); @@ -90,6 +95,10 @@ public void testGetAllTableByDBName(){ @Test public void testGetAllTable(){ try { + Iterable dbs=new ArrayList<>(); + given(service.getAllDatabases()).willReturn(Arrays.asList("default","griffin")); + String useDbName="default"; + given(client.getAllTables(useDbName)).willReturn(Arrays.asList("cout","cout1")); Map> tmp = service.getAllTable(); assertTrue(true); }catch (Throwable t){ diff --git a/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java index 10ff6da9d..066777e24 100644 --- a/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java +++ b/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java @@ -45,7 +45,6 @@ public class SchedulerServiceImplTest { @TestConfiguration -// @ComponentScan("org.apache.griffin.core.schedule") public static class SchedulerServiceConfiguration{ @Bean public SchedulerServiceImpl service(){ @@ -55,10 +54,6 @@ public SchedulerServiceImpl service(){ public SchedulerFactoryBean factoryBean(){ return new SchedulerFactoryBean(); } -// @Bean -// public SchedulerConfig schedulerConfig(){ -// return new SchedulerConfig(); -// } } @MockBean From e599a9c42d19cc325b7bfc9d4ac7960da580d7f6 Mon Sep 17 00:00:00 2001 From: Chen Date: Tue, 13 Jun 2017 10:13:56 +0800 Subject: [PATCH 08/12] config --- service/src/main/resources/sparkJob.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/src/main/resources/sparkJob.properties b/service/src/main/resources/sparkJob.properties index 518f2e4c2..0d2cb27e7 100644 --- a/service/src/main/resources/sparkJob.properties +++ b/service/src/main/resources/sparkJob.properties @@ -13,5 +13,5 @@ sparkJob.jars_1=hdfs:///livy/datanucleus-api-jdo-3.2.6.jar sparkJob.jars_2=hdfs:///livy/datanucleus-core-3.2.10.jar sparkJob.jars_3=hdfs:///livy/datanucleus-rdbms-3.2.9.jar sparkJob.dateAndHour=dt,hour -#sparkJob.uri=http://localhost:8998/batches -sparkJob.uri=http://10.9.246.187:8998/batches \ No newline at end of file +sparkJob.uri=http://localhost:8998/batches +#sparkJob.uri=http://10.9.246.187:8998/batches \ No newline at end of file From df73de496432bd8a6306c35ef3203cb70663cf47 Mon Sep 17 00:00:00 2001 From: Chen Date: Tue, 13 Jun 2017 10:20:00 +0800 Subject: [PATCH 09/12] README modify --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 0eda770fd..274d5fb82 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,7 @@ Release: ``` Create a directory in Hdfs, and put our measure package into it. ``` + cp /measure/target/measure-0.1.3-incubating-SNAPSHOT.jar /measure/target/griffin-measure.jar hdfs dfs -put /measure/target/griffin-measure.jar / ``` After all our environment services startup, we can start our server. From 8b7c102d2704f51c0c3228daaac92f7df2e2ca60 Mon Sep 17 00:00:00 2001 From: Chen Date: Tue, 13 Jun 2017 15:11:45 +0800 Subject: [PATCH 10/12] scheduler next firetime modify --- .../core/schedule/SchedulerServiceImpl.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java index 7c70623d5..52fbead79 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java @@ -108,12 +108,13 @@ public void setMap(Map map,Scheduler scheduler,JobKey jobK @Override public Boolean addJob(String groupName, String jobName, String measureName, SchedulerRequestBody schedulerRequestBody) { - Date jobStartTime=null; int periodTime = 0; - SimpleDateFormat format=new SimpleDateFormat("YYYYMMdd HH:mm:ss"); + Date jobStartTime=null; + SimpleDateFormat format=new SimpleDateFormat("yyyyMMdd HH:mm:ss"); try{ periodTime = Integer.parseInt(schedulerRequestBody.getPeriodTime()); jobStartTime=format.parse(schedulerRequestBody.getJobStartTime()); + setJobStartTime(jobStartTime,periodTime); }catch (Exception e){ LOGGER.info("jobStartTime or periodTime format error! "+e); return false; @@ -135,6 +136,7 @@ public Boolean addJob(String groupName, String jobName, String measureName, Sche .storeDurably() .withIdentity(jobKey) .build(); + //set JobDetail setJobDetail(jobDetail,schedulerRequestBody,measureName,groupName,jobName); scheduler.addJob(jobDetail, false); } @@ -155,6 +157,17 @@ public Boolean addJob(String groupName, String jobName, String measureName, Sche } } + public void setJobStartTime(Date jobStartTime,int periodTime){ + long currentTimestamp=System.currentTimeMillis(); + long jobstartTimestamp=jobStartTime.getTime(); + //if jobStartTime is before currentTimestamp, set it as the latest trigger time in the future + if(jobStartTime.before(new Date(currentTimestamp))){ + long n=(currentTimestamp-jobstartTimestamp)/(long)(periodTime*1000); + jobstartTimestamp=jobstartTimestamp+(n+1)*(long)(periodTime*1000); + jobStartTime.setTime(jobstartTimestamp); + } + } + public void setJobDetail(JobDetail jobDetail,SchedulerRequestBody schedulerRequestBody,String measureName,String groupName,String jobName){ jobDetail.getJobDataMap().put("measure", measureName); jobDetail.getJobDataMap().put("sourcePat", schedulerRequestBody.getSourcePat()); From ee367ba6bb4e0ee887ac557323d1f213c4593875 Mon Sep 17 00:00:00 2001 From: Chen Date: Wed, 14 Jun 2017 14:43:32 +0800 Subject: [PATCH 11/12] jobstartTime adjust --- .../apache/griffin/core/schedule/SchedulerServiceImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java index 52fbead79..152fdf8e7 100644 --- a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerServiceImpl.java @@ -29,7 +29,6 @@ import org.springframework.stereotype.Service; import java.io.Serializable; -import java.text.SimpleDateFormat; import java.util.*; import static org.quartz.JobBuilder.newJob; @@ -110,10 +109,11 @@ public void setMap(Map map,Scheduler scheduler,JobKey jobK public Boolean addJob(String groupName, String jobName, String measureName, SchedulerRequestBody schedulerRequestBody) { int periodTime = 0; Date jobStartTime=null; - SimpleDateFormat format=new SimpleDateFormat("yyyyMMdd HH:mm:ss"); +// SimpleDateFormat format=new SimpleDateFormat("yyyyMMdd HH:mm:ss"); try{ periodTime = Integer.parseInt(schedulerRequestBody.getPeriodTime()); - jobStartTime=format.parse(schedulerRequestBody.getJobStartTime()); +// jobStartTime=format.parse(schedulerRequestBody.getJobStartTime()); + jobStartTime=new Date(Long.parseLong(schedulerRequestBody.getJobStartTime())); setJobStartTime(jobStartTime,periodTime); }catch (Exception e){ LOGGER.info("jobStartTime or periodTime format error! "+e); From 1324e6a2cbe96620b23199d4b5b736fe266feab7 Mon Sep 17 00:00:00 2001 From: Chen Date: Wed, 14 Jun 2017 15:36:21 +0800 Subject: [PATCH 12/12] schedulerServiceImplTest --- .../apache/griffin/core/schedule/SchedulerServiceImplTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java index 066777e24..372db9ed7 100644 --- a/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java +++ b/service/src/test/java/org/apache/griffin/core/schedule/SchedulerServiceImplTest.java @@ -116,7 +116,7 @@ public void testAddJob(){ assertTrue(true); SchedulerRequestBody schedulerRequestBody1=new SchedulerRequestBody("YYYYMMdd-HH","YYYYMMdd-HH", - System.currentTimeMillis()+"","20170605 15:29:30","1000"); + System.currentTimeMillis()+"",System.currentTimeMillis()+"","1000"); Scheduler scheduler1=Mockito.mock(Scheduler.class); given(factory.getObject()).willReturn(scheduler1); Boolean tmp1 = service.addJob(groupName,jobName,measureName,schedulerRequestBody1);