[BEAM-2657] Create Solr IO#3618
[BEAM-2657] Create Solr IO#3618CaoManhDat wants to merge 14 commits intoapache:masterfrom CaoManhDat:master
Conversation
|
@jkff I will give a first round and ping you back so you can give further comments once we have cleaned so you can take a break and do a shorter review. |
|
Hi @iemejia, It seems that the last failure of jenkins ( https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/13334/console ) does not relate to this PR |
|
@iemejia How can we re-run jenkins job? |
|
You have to write 'Retest this please' and it will do it, however after the latest commit the tests are breaking for me locally too. |
iemejia
left a comment
There was a problem hiding this comment.
Excellent your implementation is really good, I have some minor comments, but it looks really close to be mergeable.
| <dependency> | ||
| <groupId>org.apache.solr</groupId> | ||
| <artifactId>solr-solrj</artifactId> | ||
| <version>6.6.0</version> |
There was a problem hiding this comment.
I would really prefer that we use the LTS version here. Is there any particular reason that impeaches of using version 5.5.x ? considering that version 5 is the current LTS we should probably support that one and with this continue supporting users on Java 7 and also users in the big data distros who still are on version pre java 8 versions.
There was a problem hiding this comment.
Can you please move this version into a maven property please.
| <module>kinesis</module> | ||
| <module>mongodb</module> | ||
| <module>mqtt</module> | ||
| <module>solr</module> |
There was a problem hiding this comment.
Please add the module too in the parent pom (for reference for the javadoc) and in the sdks/java/javadoc/pom.xml so the doc would be also part of the release.
|
|
||
| <dependency> | ||
| <groupId>org.slf4j</groupId> | ||
| <artifactId>slf4j-api</artifactId> |
There was a problem hiding this comment.
Make scope test since the logging is only used in the tests.
| </dependency> | ||
|
|
||
| <!-- test --> | ||
|
|
| <dependency> | ||
| <groupId>org.slf4j</groupId> | ||
| <artifactId>slf4j-log4j12</artifactId> | ||
| <version>1.7.10</version> |
There was a problem hiding this comment.
use the same version property that in the parent pom (so they don't get disaligned). <version>${slf4j.version}</version>
|
|
||
| @AutoValue.Builder abstract static class Builder { | ||
|
|
||
| abstract Builder setZkHost(String zkHost); |
There was a problem hiding this comment.
is there any chance we can support 'easily' single host solr (I know this is less common but still a valid use case for devs starting to play with this. (take this more like a question than something to fix.
There was a problem hiding this comment.
We're doing the same thing in master (7.0) branch. It can lead to some problem, so should we postpone this feature for another issue?
There was a problem hiding this comment.
Yes, feel free to create some JIRA issues for this and other future work. I didn't know that 7.0 fixes this, in any case it is more important to cover the parallel case so better to have the CloudSolr version first.
| + "called with null collectioin"); | ||
| ConnectionConfiguration connectionConfiguration = new AutoValue_SolrIO_ConnectionConfiguration | ||
| .Builder().setZkHost(zkHost).setCollection(collection).build(); | ||
| return connectionConfiguration; |
| @AutoValue public abstract static class Write | ||
| extends PTransform<PCollection<SolrInputDocument>, PDone> { | ||
|
|
||
| @Nullable abstract ConnectionConfiguration getConnectionConfiguration(); |
| extends PTransform<PCollection<SolrInputDocument>, PDone> { | ||
|
|
||
| @Nullable abstract ConnectionConfiguration getConnectionConfiguration(); | ||
|
|
There was a problem hiding this comment.
same as above for extra newlines.
| assertThat("consumed bytes equal to encoded bytes", cis.getCount(), | ||
| equalTo((long) bytes.length)); | ||
| assertThat(decoded.entrySet(), equalTo(value.entrySet())); | ||
| assertThat(decoded.getChildDocuments(), equalTo(value.getChildDocuments())); |
There was a problem hiding this comment.
Maybe take a look at the CoderProperties class for some helper validation of the Coder.
|
@iemejia I think this PR is ready for review. |
jkff
left a comment
There was a problem hiding this comment.
Thanks, this is one of the highest-quality IO PRs I've seen! I have a bunch of comments but most of them are pretty minor.
Also, how long does SolrIOTest run? If it's less than ~20 seconds then it's fine to keep as-is; otherwise we should consider using a mock/fake/in-memory implementation per https://beam.apache.org/documentation/io/testing/.
| * @param <T> type of SolrClient | ||
| */ | ||
| class AuthorizedSolrClient<T extends SolrClient> implements Closeable { | ||
| protected T solrClient; |
There was a problem hiding this comment.
Please make these private final, and when necessary, introduce getters
| /** | ||
| * Client for interact with SolrCloud. | ||
| */ | ||
| class AuthorizedCloudSolrClient extends AuthorizedSolrClient<CloudSolrClient> { |
There was a problem hiding this comment.
Seems like inheritance here is unnecessary, you could instead have a private variable of type AuthorizedSolrClient and delegate to it. Composition is generally easier to understand/debug than inheritance.
| } | ||
|
|
||
| DocCollection getDocCollection(String collection){ | ||
| solrClient.connect(); |
There was a problem hiding this comment.
Why does this method need to connect, but others don't?
There was a problem hiding this comment.
others call will implicitly call connect(), this one ( solrClient.getZkStateReader() ) does not implicitly call connect().
| throw new CoderException("Invalid encoded SolrDocument length: " + len); | ||
| } | ||
| byte[] bytes = new byte[len]; | ||
| in.readFully(bytes); |
There was a problem hiding this comment.
I wonder if you can avoid a copy of these bytes here, e.g. by using a BoundedInputStream - codec.unmarshal(new BoundedInputStream(is, len)
| * @param zkHost host of zookeeper | ||
| * @param collection the collection toward which the requests will be issued | ||
| * @return the connection configuration object | ||
| * @throws IOException when it fails to connect to Solr |
There was a problem hiding this comment.
This method can't throw an IOException, it only builds a simple Java object but doesn't actually connect
| + "'authentication':{" | ||
| + " 'blockUnknown': true," | ||
| + " 'class':'solr.BasicAuthPlugin'," | ||
| + " 'credentials':{'solr':'orwp2Ghgj39lmnrZOTm7Qtre1VqHFDfwAEzr0ApbN3Y=" |
There was a problem hiding this comment.
Explain where the magic value comes from?
| ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); | ||
| zkStateReader.getZkClient() | ||
| .setData("/security.json", securityJson.getBytes(Charset.defaultCharset()), true); | ||
| Thread.sleep(1000); |
There was a problem hiding this comment.
Why this sleep? Don't we already wait for up to 60s below?
|
|
||
| @Before | ||
| public void before() throws Exception { | ||
| SolrIOTestUtils.clearCollection(SOLR_COLLECTION, solrClient); |
There was a problem hiding this comment.
Hm weird: we create the collection and insert documents in BeforeClass, but clear the collection before every test? What's the point in inserting the documents then?
| SolrIOTestUtils.clearCollection(SOLR_COLLECTION, solrClient); | ||
| } | ||
|
|
||
| @Test(expected = SolrException.class) |
There was a problem hiding this comment.
Consider using @Rule ExpectedException
| */ | ||
| static List<SolrInputDocument> createDocuments(long numDocs) { | ||
| String[] scientists = { | ||
| "Einstein", |
There was a problem hiding this comment.
Consider using the more diverse set of scientists from CassandraIOTest :)
String[] scientists = {
"Lovelace",
"Franklin",
"Meitner",
"Hopper",
"Curie",
"Faraday",
"Newton",
"Bohr",
"Galilei",
"Maxwell"
};
|
@jkff Just as a comment, the tests are below 20s and I think this 20s for tests should be relaxed for embedded implementations of some of the Apache data stores who depend of multiple infrastructure mechanisms, e.g. this one requires of starting a zookeeper and the hadoop ones are even worse because they require not only a zookeepr but a mini dfs cluster so these things take time, it is normally not an issue of the IO. I think it is way better to have a test with an embedded implementation than a fake even at the cost of some extra seconds. |
|
Yeah, I mean more like "If it's above 20s let's see if we can make it faster" rather than a blanket ban on tests longer than that :) |
|
@jkff the test runs around 10 seconds, maybe because you see the TimeOut above is 60 seconds. I think 10 seconds can be enough. |
|
@jkff I think this PR is ready for another round of review. |
jkff
left a comment
There was a problem hiding this comment.
Thanks! This is almost done I think. My comments on the main code are very minor; so I dug deeper into the tests.
| private final String password; | ||
|
|
||
| AuthorizedSolrClient(T solrClient, ConnectionConfiguration configuration) { | ||
| checkArgument( |
There was a problem hiding this comment.
Seems like the consensus on the beam-dev@ thread was that we're going to condense the style of these messages to something like checkArgument(solrClient != null, "solrClient can not be null"). But I can fix up myself when merging.
| return process(query); | ||
| } | ||
|
|
||
| <T2 extends SolrResponse> T2 process(SolrRequest<T2> request) |
There was a problem hiding this comment.
Could call the type parameters respectively ClientT and ResponseT
|
|
||
| byte[] bytes = baos.toByteArray(); | ||
| VarInt.encode(bytes.length, outStream); | ||
| outStream.write(bytes); |
There was a problem hiding this comment.
How about instead: codec.marshal(value, outStream) to avoid the copying of bytes, similarly to how you do it in decode (except this time you don't even need a wrapper)
There was a problem hiding this comment.
Hmm, how can I know ahead the length of bytes?
| * | ||
| * }</pre> | ||
| * | ||
| * <p>Optionally, you can provide {@code withBatchSize()} |
There was a problem hiding this comment.
Are there cases when the user can provide a substantially better value than the default? I think in all other IOs that use batching, we eventually decided against making it configurable by the user (except in tests, via @VisibleForTesting).
There was a problem hiding this comment.
The value is mentioned in comment of ElasticSearchIO ( which is the same with SolrIO ).
Default is 100. Maximum is 10 000. If documents are small, increasing batch size might improve read performance. If documents are big, you might need to decrease batchSize
How about 1000 as default value?
There was a problem hiding this comment.
Hmm, I don't remember why we decided to keep this in ESIO. I'm not a Solr expert; I'm just providing the general guidance: keep the parameter if there are cases where it's necessary to make the pipeline work at all, otherwise omit it (litmus test: if you omit the parameter, would anybody file a bug about it?). We discourage parameters whose role is fine-tuning of performance, but parameters necessary to make a pipeline work - well, there's no way around having them. I don't know which is this parameter.
There was a problem hiding this comment.
I commented these lines in my local
// 1000 for batch size is good enough in many cases,
// ex: if document size is large, around 10KB, the response's size will be around 10MB
// if document seize is small, around 1KB, the response's size will be around 1MB
But if the document is about 100bytes, the response's size will be around 100KB ( which is too small ). On the other hand, if the document is too large, around 100KB, the response's size will be around 1MB. Therefore, 1000 is good enough for many cases, not perfect.
But I'm thinking about a solution like this. In the first request, we will start with 10, if the response time is acceptable ( < 10 seconds ). Double the batchSize. Therefore It is safe to mark withBatchSize(int batchSize) as @VisibleForTesting, and remove that method in future.
| public class SolrIO { | ||
|
|
||
| public static Read read() { | ||
| return new AutoValue_SolrIO_Read.Builder().setBatchSize(100L).setQuery("*:*").build(); |
There was a problem hiding this comment.
Perhaps add some comments explaining respectively why 100 and 1000 are good values? I suppose in both cases we should use the biggest value possible, constrained only by request/response size limits?
|
|
||
| @BeforeClass | ||
| public static void beforeClass() throws Exception { | ||
| //setup credential for solr user |
There was a problem hiding this comment.
Hm, this needs more explanation. Seems like solr expects a security.json file in a certain well-defined format and you're artificially constructing a file that matches the password you'll use for the IO transforms below, right? It would be useful to add a comment with a link to official documentation of the format of security.json.
| //setup credential for solr user | ||
| String password = "SolrRocks"; | ||
| byte[] salt = new byte[32]; | ||
| String base64Salt = BaseEncoding.base64().encode(salt); |
There was a problem hiding this comment.
Odd that the password is being salted with 32 zero bytes - that's not much of a salt. However, more odd is that I don't see the salt being used anywhere else, except for computing the hash. Does Solr expect that all passwords will be salted with 32 zero bytes? Otherwise, I'm confused about how it will salt the provided password in credentials before comparing it to the salted hash in security.json.
There was a problem hiding this comment.
The salt can be arbitrary array and arbitrary content ( so an empty array is also valid, I will create a random salt if you want ).
As you can see, the salt being used in security.json
// this credential correspond to an user, in the test that is "solr" user
credential = sha56(password, base64Salt) + " " + base64Salt
The above formula I grab from Solr source. Internally when userA, passA are sent to Solr. Solr will do something like this
credential = getCredentialOfUser(userA).split(" ");
if ( sha56(passA, credential[1]) != credential[0] ) return 401
I'm not an expert in security, so I'm not sure this setup is good enough or not.
There was a problem hiding this comment.
Ohh, sorry, I misread the code and didn't notice that salt was part of the credential. Yeah, I'd suggest to just create a random salt.
| assertThat( | ||
| "Wrong estimated size beyond maximum", | ||
| estimatedSize, | ||
| greaterThan(SolrIOTestUtils.MAX_DOC_SIZE * NUM_DOCS)); |
There was a problem hiding this comment.
Also should be lessThan, and I'm confused about why the test is passing.
There was a problem hiding this comment.
This is a mistake. The upper bounds is not correct too.
| } | ||
|
|
||
| @Test | ||
| public void testSizes() throws Exception { |
There was a problem hiding this comment.
Generally this test file seems to share almost all of the code with the IT. Is it possible to deduplicate them, or have just one test class, run either with an embedded instance for unit tests, or with a provided instance for IT?
Or rather should we follow the example of JdbcIO tests, and have the unit tests be quite extensive, and integration tests verify just one scenario? Compare: https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java vs. https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
There was a problem hiding this comment.
Hmm, I think SolrIOIT is good enough for only test testSplitsVolume() and testWriteVolume(). What do you think?
There was a problem hiding this comment.
Looking at https://beam.apache.org/documentation/io/testing/, I think SolrIOIT should be quite different. I didn't realize how much guidance we had on writing integration tests! Sorry for not consulting that document earlier. Please take a look at that document, section "I/O Transform Integration Tests" and in particular "Implementing Integration Tests"? I think in practice it means, convert SolrIOIT to be a single "write then read" test, using TestRow, using unique collection names etc. I think also it may be a good idea to set that aside for a separate PR, because the rest of the current PR is looking good and I'm feeling fairly safe about shipping SolrIO with just the unit tests, and adding integration test separately, because the unit tests use a real in-memory instance and give pretty good coverage.
There was a problem hiding this comment.
Yeah, I think remove SolrIOIT is a good idea.
| .withMaxBatchSize(BATCH_SIZE); | ||
| // write bundles size is the runner decision, we cannot force a bundle size, | ||
| // so we test the Writer as a DoFn outside of a runner. | ||
| DoFnTester<SolrInputDocument, Void> fnTester = DoFnTester.of(new SolrIO.Write.WriteFn(write)); |
There was a problem hiding this comment.
I think DoFnTester can be used with a try-with-resources statement so you don't have to .close() it explicitly at the end.
|
Note to self: submit apache/beam-site#278 after this one. |
iemejia
left a comment
There was a problem hiding this comment.
@jkff I think this is in pretty good shape now. I think it can be merged soon. And if there are still minor issues we can fix them on merge time.
Once merged we will create some individual JIRAs for the ITs and the pending stuff, like support for other versions of Solr + the other ideas that @CaoManhDat mentioned in the JIRA issue.
|
Oups I forgot to say LGTM in the previous comment, I don't really have major things to say after the last round, as I said any missing detail can be addressed in future issues. |
jkff
left a comment
There was a problem hiding this comment.
Thanks! Merging after doing a couple of finishing touches myself.
|
Thanks a lot for your contribution @CaoManhDat and thanks for keeping the great work during the review process. Don't hesitate now to create new JIRAs for any pending thing like the ITs, and of course for any other issue or idea you have. |
This PR introduce SolrIO ( this components borrow som design's idea from ElasticsearchIO ) providing both bounded source and sink.