Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merged with pr389 and pr382 #390

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ private boolean isValidCronExpression(String cronExpression) {
private boolean isValidBaseLine(List<JobDataSegment> segments) {
assert segments != null;
for (JobDataSegment jds : segments) {
if (jds.isBaseline()) {
if (jds.isAsTsBaseline()) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public JobHealth getHealthInfo() {

@RequestMapping(path = "/jobs/download", method = RequestMethod.GET)
public ResponseEntity<Resource> download(@RequestParam("jobName") String jobName, @RequestParam("ts") long timestamp) throws Exception {
String path = jobService.getJobHdfsPersistPath(jobName, timestamp);
String path = jobService.getJobHdfsSinksPath(jobName, timestamp);
InputStreamResource resource = new InputStreamResource(FSUtil.getMissSampleInputStream(path));
return ResponseEntity.ok().
header("content-disposition", "attachment; filename = sampleMissingData.json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void setJobStartTime(JobDetail jobDetail) throws SchedulerException {
private void setSourcesPartitionsAndPredicates(List<DataSource> sources) throws Exception {
boolean isFirstBaseline = true;
for (JobDataSegment jds : job.getSegments()) {
if (jds.isBaseline() && isFirstBaseline) {
if (jds.isAsTsBaseline() && isFirstBaseline) {
Long tsOffset = TimeUtil.str2Long(jds.getSegmentRange().getBegin());
measure.setTimestamp(jobStartTime + tsOffset);
isFirstBaseline = false;
Expand Down Expand Up @@ -347,7 +347,7 @@ private void setJobDataMap(JobDetail jobDetail, String pJobName) throws IOExcept

private void preProcessMeasure() throws IOException {
for (DataSource source : measure.getDataSources()) {
Map cacheMap = source.getCacheMap();
Map cacheMap = source.getCheckpointMap();
//to skip batch job
if (cacheMap == null) {
return;
Expand All @@ -357,7 +357,7 @@ private void preProcessMeasure() throws IOException {
cache = cache.replaceAll("\\$\\{SOURCE_NAME}", source.getName());
cache = cache.replaceAll("\\$\\{TARGET_NAME}", source.getName());
cacheMap = toEntity(cache, Map.class);
source.setCacheMap(cacheMap);
source.setCheckpointMap(cacheMap);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ public interface JobService {

JobHealth getHealthInfo();

String getJobHdfsPersistPath(String jobName, long timestamp);
String getJobHdfsSinksPath(String jobName, long timestamp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -545,24 +545,25 @@ public Boolean isJobHealthy(Long jobId) {
}

@Override
public String getJobHdfsPersistPath(String jobName, long timestamp) {
public String getJobHdfsSinksPath(String jobName, long timestamp) {
List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted(jobName, false);
if (jobList.size() == 0) {
return null;
}
if (jobList.get(0).getType().toLowerCase().equals("batch")) {
return getPersistPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + "";
return getSinksPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + "";
}

return getPersistPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + "";
return getSinksPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + "";
}

private String getPersistPath(String jsonString) {
private String getSinksPath(String jsonString) {
try {
JSONObject obj = new JSONObject(jsonString);
JSONArray persistArray = obj.getJSONArray("persist");
JSONArray persistArray = obj.getJSONArray("sinks");
for (int i = 0; i < persistArray.length(); i++) {
if (persistArray.getJSONObject(i).get("type").equals("hdfs")) {
Object type = persistArray.getJSONObject(i).get("type");
if (type instanceof String && "hdfs".equalsIgnoreCase(String.valueOf(type))) {
return persistArray.getJSONObject(i).getJSONObject("config").getString("path");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ public class JobDataSegment extends AbstractAuditableEntity {
@NotNull
private String dataConnectorName;

private boolean baseline = false;
private boolean asTsBaseline = false;

@OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
@JoinColumn(name = "segment_range_id")
private SegmentRange segmentRange = new SegmentRange();

@JsonProperty("as.baseline")
public boolean isBaseline() {
return baseline;
public boolean isAsTsBaseline() {
return asTsBaseline;
}

public void setBaseline(boolean baseline) {
this.baseline = baseline;
public void setAsTsBaseline(boolean asTsBaseline) {
this.asTsBaseline = asTsBaseline;
}

@JsonProperty("segment.range")
Expand Down Expand Up @@ -85,12 +85,12 @@ public JobDataSegment() {

public JobDataSegment(String dataConnectorName, boolean baseline) {
this.dataConnectorName = dataConnectorName;
this.baseline = baseline;
this.asTsBaseline = baseline;
}

public JobDataSegment(String dataConnectorName, boolean baseline, SegmentRange segmentRange) {
this.dataConnectorName = dataConnectorName;
this.baseline = baseline;
this.asTsBaseline = baseline;
this.segmentRange = segmentRange;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public enum DataType {

private String version;

@JsonInclude(JsonInclude.Include.NON_NULL)
private String dataFrameName;

@JsonInclude(JsonInclude.Include.NON_NULL)
private String dataUnit;

Expand Down Expand Up @@ -131,6 +134,15 @@ private String getConfig() {
return config;
}

@JsonProperty("dataframe.name")
public String getDataFrameName() {
return dataFrameName;
}

public void setDataFrameName(String dataFrameName) {
this.dataFrameName = dataFrameName;
}

@JsonProperty("data.unit")
public String getDataUnit() {
return dataUnit;
Expand Down Expand Up @@ -204,16 +216,19 @@ public void load() throws IOException {
public DataConnector() {
}

public DataConnector(String name, DataType type, String version, String config) throws IOException {
public DataConnector(String name, DataType type, String version,
String config, String dataFrameName) throws IOException {
this.name = name;
this.type = type;
this.version = version;
this.config = config;
this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, Object>>() {
});
this.dataFrameName = dataFrameName;
}

public DataConnector(String name, String dataUnit, Map configMap, List<SegmentPredicate> predicates) {
public DataConnector(String name, String dataUnit, Map configMap,
List<SegmentPredicate> predicates) {
this.name = name;
this.dataUnit = dataUnit;
this.configMap = configMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ public class DataSource extends AbstractAuditableEntity {
@JoinColumn(name = "data_source_id")
private List<DataConnector> connectors = new ArrayList<>();

private boolean baseline = false;

@JsonIgnore
@Column(length = 1024)
private String cache;
private String checkpoint;

@Transient
@JsonInclude(JsonInclude.Include.NON_NULL)
private Map<String, Object> cacheMap;
private Map<String, Object> checkpointMap;


public String getName() {
Expand All @@ -83,36 +85,44 @@ public void setConnectors(List<DataConnector> connectors) {
this.connectors = connectors;
}

private String getCache() {
return cache;
public boolean isBaseline() {
return baseline;
}

public void setBaseline(boolean baseline) {
this.baseline = baseline;
}

private String getCheckpoint() {
return checkpoint;
}

private void setCache(String cache) {
this.cache = cache;
private void setCheckpoint(String checkpoint) {
this.checkpoint = checkpoint;

}

@JsonProperty("cache")
public Map<String, Object> getCacheMap() {
return cacheMap;
@JsonProperty("checkpoint")
public Map<String, Object> getCheckpointMap() {
return checkpointMap;
}

public void setCacheMap(Map<String, Object> cacheMap) {
this.cacheMap = cacheMap;
public void setCheckpointMap(Map<String, Object> checkpointMap) {
this.checkpointMap = checkpointMap;
}

@PrePersist
@PreUpdate
public void save() throws JsonProcessingException {
if (cacheMap != null) {
this.cache = JsonUtil.toJson(cacheMap);
if (checkpointMap != null) {
this.checkpoint = JsonUtil.toJson(checkpointMap);
}
}

@PostLoad
public void load() throws IOException {
if (!StringUtils.isEmpty(cache)) {
this.cacheMap = JsonUtil.toEntity(cache, new TypeReference<Map<String, Object>>() {
if (!StringUtils.isEmpty(checkpoint)) {
this.checkpointMap = JsonUtil.toEntity(checkpoint, new TypeReference<Map<String, Object>>() {
});
}
}
Expand All @@ -124,4 +134,14 @@ public DataSource(String name, List<DataConnector> connectors) {
this.name = name;
this.connectors = connectors;
}

public DataSource(String name, boolean baseline,
Map<String, Object> checkpointMap,
List<DataConnector> connectors) {
this.name = name;
this.baseline = baseline;
this.checkpointMap = checkpointMap;
this.connectors = connectors;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,14 @@ public GriffinMeasure() {
super();
}

public GriffinMeasure(String name, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) {
public GriffinMeasure(String name, String owner, List<DataSource> dataSources,
EvaluateRule evaluateRule,
List<String> sinksList) {
this.name = name;
this.owner = owner;
this.dataSources = dataSources;
this.evaluateRule = evaluateRule;
setSinksList(sinksList);
}

public GriffinMeasure(Long measureId, String name, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ Licensed to the Apache Software Foundation (ASF) under one

package org.apache.griffin.core.measure.entity;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Inheritance;
import javax.persistence.InheritanceType;
import com.fasterxml.jackson.annotation.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang.StringUtils;
import org.apache.griffin.core.util.JsonUtil;

import javax.persistence.*;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

@Entity
@Inheritance(strategy = InheritanceType.JOINED)
Expand All @@ -53,6 +54,13 @@ public abstract class Measure extends AbstractAuditableEntity {
@JsonInclude(JsonInclude.Include.NON_NULL)
private String organization;

@Transient
@JsonInclude(JsonInclude.Include.NON_NULL)
private List<String> sinksList = Arrays.asList("ELASTICSEARCH", "HDFS");

@JsonIgnore
private String sinks;

private boolean deleted = false;

public String getName() {
Expand Down Expand Up @@ -96,6 +104,23 @@ public void setOwner(String owner) {
this.owner = owner;
}

@JsonProperty("sinks")
public List<String> getSinksList() {
return sinksList;
}

public void setSinksList(List<String> sinksList) {
this.sinksList = sinksList;
}

private String getSinks() {
return sinks;
}

private void setSinks(String sinks) {
this.sinks = sinks;
}

public boolean isDeleted() {
return deleted;
}
Expand All @@ -104,6 +129,26 @@ public void setDeleted(boolean deleted) {
this.deleted = deleted;
}

@PrePersist
@PreUpdate
public void save() throws JsonProcessingException {
if (sinksList != null) {
this.sinks = JsonUtil.toJson(sinksList);
} else {
this.sinks = null;
}
}

@PostLoad
public void load() throws IOException {
if (!StringUtils.isEmpty(sinks)) {
this.sinksList = JsonUtil.toEntity(sinks, new TypeReference<List<String>>() {
});
} else {
this.sinksList = null;
}
}

public Measure() {
}

Expand Down
Loading