Skip to content
Permalink
Browse files
[GOBBLIN-1533] Add completeness watermark to iceberg tables (#3385)
* [GOBBLIN-1533] Add completeness watermark to iceberg tables

* updated hive metadata writer test

* Add apache header

* Added correct default partition type

* Fixed kafka audit url and logic to get topic name for iceberg table

* Changes based on review

* Make audit check granularity configurable

* Added additional optimization to check for current hour during completion watermark calculation

* optimization to skip audit check if its upto date by checking the seconds from epoch between current watermark and now

* fixed test case

* Replace hours from epoch with duration

* Moved logging

* Update partition spec with late field even when schema has been updated
  • Loading branch information
vikrambohra committed Sep 10, 2021
1 parent 15ded96 commit 47707df00a6884ada5974a5f5203408ce1efb890
Showing 23 changed files with 1,063 additions and 34 deletions.
@@ -22,7 +22,9 @@

/**
* A type of client used to query the audit counts from Pinot backend
*/
@Deprecated {@link org.apache.gobblin.completeness.audit.AuditCountClient}
*/
@Deprecated
public interface AuditCountClient {
Map<String, Long> fetch (String topic, long start, long end) throws IOException;
}
@@ -21,7 +21,9 @@

/**
* A factory class responsible for creating {@link AuditCountClient}
* @Deprecated {@link org.apache.gobblin.completeness.audit.AuditCountClientFactory}
*/
@Deprecated
public interface AuditCountClientFactory {
String AUDIT_COUNT_CLIENT_FACTORY = "audit.count.client.factory";
AuditCountClient createAuditCountClient (State state);
@@ -40,14 +40,17 @@
import javax.annotation.concurrent.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.completeness.audit.AuditCountHttpClient;
import org.apache.gobblin.configuration.State;

/**
* A {@link AuditCountClient} which uses {@link org.apache.http.client.HttpClient}
* to perform audit count query.
* @Deprecated {@link AuditCountHttpClient}
*/
@Slf4j
@ThreadSafe
@Deprecated
public class KafkaAuditCountHttpClient implements AuditCountClient {

// Keys
@@ -18,12 +18,15 @@
package org.apache.gobblin.compaction.audit;

import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.completeness.audit.AuditCountHttpClientFactory;
import org.apache.gobblin.configuration.State;

/**
* Factory to create an instance of type {@link KafkaAuditCountHttpClient}
* @Deprecated {@link AuditCountHttpClientFactory}
*/
@Alias("KafkaAuditCountHttpClientFactory")
@Deprecated
public class KafkaAuditCountHttpClientFactory implements AuditCountClientFactory {

public KafkaAuditCountHttpClient createAuditCountClient (State state) {
@@ -44,6 +44,7 @@
* Use {@link AuditCountClient} to retrieve all record count across different tiers
* Compare one specific tier (gobblin-tier) with all other refernce tiers and determine
* if verification should be passed based on a pre-defined threshold.
* @TODO: 8/31/21 "Use @{@link org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier}"
*/
@Slf4j
public class CompactionAuditCountVerifier implements CompactionVerifier<FileSystemDataset> {
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: 'java'
apply plugin: 'java-test-fixtures'

dependencies {
compile project(":gobblin-api")
compile externalDependency.httpclient

testCompile externalDependency.testng
testCompile externalDependency.mockito

}

configurations {
compile { transitive = true }
}

ext.classification="library"
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.completeness.audit;

import java.io.IOException;
import java.util.Map;


/**
* A type of client used to query audit counts
*/
public interface AuditCountClient {
/**
*
* @param datasetName query dataset
* @param start start timestamp in millis from epoch
* @param end end timestamp in millis from epoch
* @return a map of <tier, counts>
* @throws IOException
*/
Map<String, Long> fetch(String datasetName, long start, long end) throws IOException;
}
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.completeness.audit;

import org.apache.gobblin.configuration.State;


/**
* A factory class responsible for creating {@link AuditCountClient}
*/
public interface AuditCountClientFactory {
String AUDIT_COUNT_CLIENT_FACTORY = "audit.count.client.factory";
AuditCountClient createAuditCountClient(State state);
}
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.completeness.audit;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import javax.annotation.concurrent.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.State;


/**
* A {@link AuditCountClient} which uses {@link org.apache.http.client.HttpClient}
* to perform audit count query.
*/
@Slf4j
@ThreadSafe
public class AuditCountHttpClient implements AuditCountClient {

// Keys
public static final String AUDIT_HTTP_PREFIX = "audit.http";
public static final String CONNECTION_MAX_TOTAL = AUDIT_HTTP_PREFIX + "max.total";
public static final int DEFAULT_CONNECTION_MAX_TOTAL = 10;
public static final String MAX_PER_ROUTE = AUDIT_HTTP_PREFIX + "max.per.route";
public static final int DEFAULT_MAX_PER_ROUTE = 10;


public static final String AUDIT_REST_BASE_URL = "audit.rest.base.url";
public static final String AUDIT_REST_MAX_TRIES = "audit.rest.max.tries";
public static final String AUDIT_REST_START_QUERYSTRING_KEY = "audit.rest.querystring.start";
public static final String AUDIT_REST_END_QUERYSTRING_KEY = "audit.rest.querystring.end";
public static final String AUDIT_REST_START_QUERYSTRING_DEFAULT = "start";
public static final String AUDIT_REST_END_QUERYSTRING_DEFAULT = "end";


// Http Client
private PoolingHttpClientConnectionManager cm;
private CloseableHttpClient httpClient;
private static final JsonParser PARSER = new JsonParser();

private final String baseUrl;
private final String startQueryString;
private final String endQueryString;
private String topicQueryString = "topic";
private final int maxNumTries;
/**
* Constructor
*/
public AuditCountHttpClient(State state) {
int maxTotal = state.getPropAsInt(CONNECTION_MAX_TOTAL, DEFAULT_CONNECTION_MAX_TOTAL);
int maxPerRoute = state.getPropAsInt(MAX_PER_ROUTE, DEFAULT_MAX_PER_ROUTE);

cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(maxTotal);
cm.setDefaultMaxPerRoute(maxPerRoute);
httpClient = HttpClients.custom()
.setConnectionManager(cm)
.build();

this.baseUrl = state.getProp(AUDIT_REST_BASE_URL);
this.maxNumTries = state.getPropAsInt(AUDIT_REST_MAX_TRIES, 5);
this.startQueryString = state.getProp(AUDIT_REST_START_QUERYSTRING_KEY, AUDIT_REST_START_QUERYSTRING_DEFAULT);
this.endQueryString = state.getProp(AUDIT_REST_END_QUERYSTRING_KEY, AUDIT_REST_END_QUERYSTRING_DEFAULT);
}


public Map<String, Long> fetch (String topic, long start, long end) throws IOException {
String fullUrl = (this.baseUrl.endsWith("/") ? this.baseUrl.substring(0, this.baseUrl.length() - 1)
: this.baseUrl) + "?" + this.topicQueryString + "=" + topic
+ "&" + this.startQueryString + "=" + start + "&" + this.endQueryString + "=" + end;
log.info("Full URL is " + fullUrl);
String response = getHttpResponse(fullUrl);
return parseResponse (fullUrl, response, topic);
}



/**
* Expects <code>response</code> being parsed to be as below.
*
* <pre>
* {
* "result": {
* "tier1": 79341895,
* "tier2": 79341892,
* }
* }
* </pre>
*/
@VisibleForTesting
public static Map<String, Long> parseResponse(String fullUrl, String response, String topic) throws IOException {
Map<String, Long> result = Maps.newHashMap();
JsonObject countsPerTier = null;
try {
JsonObject jsonObj = PARSER.parse(response).getAsJsonObject();

countsPerTier = jsonObj.getAsJsonObject("totalsPerTier");
} catch (Exception e) {
throw new IOException(String.format("Unable to parse JSON response: %s for request url: %s ", response,
fullUrl), e);
}

for(Map.Entry<String, JsonElement> entry : countsPerTier.entrySet()) {
String tier = entry.getKey();
long count = Long.parseLong(entry.getValue().getAsString());
result.put(tier, count);
}

return result;
}

private String getHttpResponse(String fullUrl) throws IOException {
HttpUriRequest req = new HttpGet(fullUrl);

for (int numTries = 0;; numTries++) {
try (CloseableHttpResponse response = this.httpClient.execute(req)) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode < 200 || statusCode >= 300) {
throw new IOException(
String.format("status code: %d, reason: %s", statusCode, response.getStatusLine().getReasonPhrase()));
}
return EntityUtils.toString(response.getEntity());
} catch (IOException e) {
String errMsg = "Unable to get or parse HTTP response for " + fullUrl;
if (numTries >= this.maxNumTries) {
throw new IOException (errMsg, e);
}
long backOffSec = (numTries + 1) * 2;
log.error(errMsg + ", will retry in " + backOffSec + " sec", e);
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(backOffSec));
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
}
}
}
}
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.completeness.audit;

import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.configuration.State;


/**
* Factory to create an instance of type {@link AuditCountHttpClient}
*/
@Alias("AuditCountHttpClientFactory")
public class AuditCountHttpClientFactory implements AuditCountClientFactory {

public AuditCountHttpClient createAuditCountClient (State state) {
return new AuditCountHttpClient(state);
}
}

0 comments on commit 47707df

Please sign in to comment.