New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Gobblin-17] Add Elasticsearch writer (rest + transport) #2419
Conversation
39adda3
to
2d2a3ce
Compare
@@ -47,7 +47,7 @@ ext.externalDependency = [ | |||
"datanucleusCore": "org.datanucleus:datanucleus-core:3.2.10", | |||
"datanucleusRdbms": "org.datanucleus:datanucleus-rdbms:3.2.9", | |||
"eventhub": "com.microsoft.azure:azure-eventhubs:0.9.0", | |||
"guava": "com.google.guava:guava:15.0", | |||
"guava": "com.google.guava:guava:18.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a good time to upgrade guava to version 18, as the Hadoop API depends on guava APIs that are removed since guava 16.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Elasticsearch client breaks without it :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zxcware do you know which Hadoop API breaks guava version upgrade?
@@ -47,7 +47,7 @@ ext.externalDependency = [ | |||
"datanucleusCore": "org.datanucleus:datanucleus-core:3.2.10", | |||
"datanucleusRdbms": "org.datanucleus:datanucleus-rdbms:3.2.9", | |||
"eventhub": "com.microsoft.azure:azure-eventhubs:0.9.0", | |||
"guava": "com.google.guava:guava:15.0", | |||
"guava": "com.google.guava:guava:18.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zxcware do you know which Hadoop API breaks guava version upgrade?
/** | ||
* An interface to log errors during responses | ||
*/ | ||
public interface ResponseErrorLogger { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename logError to logException? And rename class to ResponseExceptionLogger?
int port = testServer.getPort(); | ||
// int serverPort = couchbaseTestServer.getServerPort(); | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need to keep this block commented out?
|
||
private void fillServerPort() | ||
{ | ||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this block needs to be commented out?
throws InterruptedException, ExecutionException, TimeoutException { | ||
Pair<WriteResponse, Throwable> writeResponseThrowablePair = writeResponseQueue.poll(timeout, unit); | ||
if (writeResponseThrowablePair == null) { | ||
throw new TimeoutException("Timeout exceeded while waiting for future to be done"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this timeout case, do we need to invoke the callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a client specifying how long they want to wait on the future... they could decide to issue this call again... so IMO the callback should only be fired when the underlying call to the destination succeeds or fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor comments. Also, please include your shading work for guava.
@@ -62,7 +62,11 @@ void append (D record) { | |||
records.add(record); | |||
} | |||
|
|||
boolean hasRoom (D record) { | |||
boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy) { | |||
if (records.isEmpty() && largeMessagePolicy == LargeMessagePolicy.ATTEMPT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw exception on LargeMessagePolicy.FAIL per contract?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done now... but in a different place (tryAppend), not hasRoom.
compile project(":gobblin-utility") | ||
compile project(":gobblin-metrics-libs:gobblin-metrics") | ||
|
||
compile "org.elasticsearch.client:transport:5.6.8" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is it not part of externalDependency spec and referred here as: externalDependency.elasticSearchClient etc
# | ||
|
||
TARGET_DIR="test-elasticsearch" | ||
ES_VERSION=5.6.8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be passed along from gradle script as param, where gralde script 'knows' about it through project properties (as versions for deps are defined in gradle scripts)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it can be made that generic and configurable... the download locations for different versions of elasticsearch were different when I spot-checked a couple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not going to do this... too much work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nw, create a Jira :)
} | ||
catch (Exception e) { | ||
//TODO | ||
throw new NoSuchFieldException("Could not find field "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
propagate e with NoSuchFieldException?
import com.typesafe.config.Config; | ||
|
||
|
||
public interface JsonSerializer<T> extends Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this specific to ElasticSearch? looks like helpful common util?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think our ser-de + type-mapping approach needs a re-think. Didn't want to take on that effort while writing this integration. We can use this approach as one strawman when we design the framework wide support for multiple formats.
String keyStoreType = ConfigUtils | ||
.getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE, | ||
ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE_DEFAULT); | ||
String keyStoreFilePassword = ConfigUtils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use password manager? Its a one line change but makes it secure. Or create a ticket and assign to me, I will do it.
|
||
|
||
private Process elasticProcess; | ||
private int _port = 9200; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about using a free port available at the time, like: https://stackoverflow.com/questions/51099027/find-free-port-in-java
we had frequent transient test failures in service / cluster test cases when they were using fixed ports to setup pre-requisite infra to run tests
108c3dd
to
f25f95c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -240,12 +261,16 @@ public void close() { | |||
public void flush() { | |||
try { | |||
ArrayList<Batch> batches = this.incomplete.all(); | |||
LOG.info ("flush on {} batches", batches.size()); | |||
int numOutstandingRecords = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
long ?
|
||
task installTestDependencies(type:Exec) { | ||
workingDir "${project.rootDir}/gobblin-modules/gobblin-elasticsearch/" | ||
commandLine './scripts/install_test_deps.sh' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fail build for non-zero exit?
# | ||
|
||
TARGET_DIR="test-elasticsearch" | ||
ES_VERSION=5.6.8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nw, create a Jira :)
this.hostAddresses = new ArrayList<>(hosts.size()); | ||
for (String host : hosts) { | ||
|
||
List<String> hostSplit = hostSplitter.splitToList(host); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be some Apache util to do this :)
I don't see this writer mentioned in the official docs...is there a Jira issue already logged for it? I'm thinking it might be good to at least document this lightly so folks know it's available now in releases? |
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Adds two elasticsearch writer types (transport and rest)
Tests
ElasticsearchRestClientWriterTest
ElasticsearchTransportClientWriterTest
Commits