Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-245] Add Cassandra IO #592

Closed
wants to merge 11 commits into from

Conversation

jbonofre
Copy link
Member

@jbonofre jbonofre commented Jul 6, 2016

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

Initial version of CassandraIO.

TODO:

  • fix and enable the tests (Cassandra daemon related)
  • usage of entity should be optional and the source should be able to return a PCollection<Row>

@jbonofre
Copy link
Member Author

R: @jkff

@jbonofre
Copy link
Member Author

jbonofre commented Nov 9, 2016

Rebased, AutoValue use, etc. Not fully ready for review anyway.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High-level notes:

  • Audit the code for uses of raw types: I found several. There should be zero uses of raw types in the entire PR - if you feel that a particular use is necessary, let's discuss.
  • For initial splitting, please reuse ByteKeyRange rather than hand-implementing similar logic.
  • Add logging to places where something really important happens (e.g. especially various fallback cases when splitting)
  • Please use SourceTestUtils to test your Source.
  • I'm confused by the sink implementation: it seems to be similar to the Sink/WriteOperation API, but I don't see why this is necessary: can't it be done with a simple DoFn?

@jbonofre
Copy link
Member Author

@jkff thanks for the update. Regarding your comment:

  • yes, I will fix the raw types
  • ok, thanks for the tip
  • agree, I will
  • yes, as I did in ElasticsearchIO, generally speaking, the tests should be improved
  • agree, I will simplify with a "regular" DoFn

@jbonofre jbonofre force-pushed the BEAM-245-CASSANDRA branch 2 times, most recently from 3f51ada to 12184b8 Compare December 18, 2016 16:59
@jbonofre
Copy link
Member Author

jbonofre commented Dec 18, 2016

Rebased and implemented:

  • Reimplement sink using "regular" DoFn approach.
  • Fixed the getEstimatedSizeBytes() and splitIntoBundles() methods.
  • Fixed test using CassandraUnit.

TODO:

  • Add logging in key places.
  • Use of ByteKeyRange (I have to evaluate).
  • Use of SourceTestUtils in the test.
  • Maybe set Mapper optional (I have to think about that).

However, the PR is now in a better shape IMHO.

@asfbot
Copy link

asfbot commented Dec 18, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6062/
--none--

@asfbot
Copy link

asfbot commented Dec 18, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6063/
--none--

@jbonofre
Copy link
Member Author

Travis is failing because Cassandra 3.7 is Java8 only. Do we need Java7 compliant dependencies ? I guess so.

@jbonofre
Copy link
Member Author

I improved the test. However, as Cassandra only works with Java 8, I have to execute the tests only with Java 8 (not Java 7). I will do as we do in the java8 examples.

@asfbot
Copy link

asfbot commented Dec 19, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6071/
--none--

@asfbot
Copy link

asfbot commented Dec 20, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6101/
--none--

@asfbot
Copy link

asfbot commented Dec 20, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6102/
--none--

@echauchot
Copy link
Contributor

echauchot commented Dec 23, 2016

JB, for the splitIntoBundles you use token range from -2^63 to 2^63 but these values are for Cassandra Murmur3Partitioner partitioner. This is the default Cassandra v1.2+ but it can be changed by the user to RandomPartitioner, ... in that case the values for the ranges won't be the same. IMHO, it might be better rather than using absolute values, to read all the token ranges in system.size_estimates table in fields rangeStart and rangeEnd. That way ranges it will be dynamic. But that system table is only available in Cassandra v2.1.5+. Besides, this is what cassandra connector for spark does

@echauchot
Copy link
Contributor

Another thing, for the write: currently, it is not batched. It would be better I think to use batch API (BatchStatement) in place of asynchronously writing record by record.

@doanduyhai
Copy link

@echauchot

No, don't use BatchStatement. It'll kill your performance. Batch Statements are there for denormalization, when you need to update together some tables.

Async insert is the best solution for performance

@jbonofre
Copy link
Member Author

Good point for the partitioner. I implemented that way because it's the default in Cassandra. I will improve this.
For the usage of BatchStatement, I don't think it's a good idea, especially with the Mapper. I prefer a sync approach with optionally a batching on the IO (just storing number of entities or rows).

@asfbot
Copy link

asfbot commented Dec 24, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6245/
--none--

@asfbot
Copy link

asfbot commented Dec 24, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6246/
--none--

@asfbot
Copy link

asfbot commented Dec 24, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6247/
--none--

@echauchot
Copy link
Contributor

Did not know about BatchStatement, thanks @doanduyhai

@echauchot
Copy link
Contributor

echauchot commented Dec 27, 2016

I'll work on the partitionner part, and also on splitIntoBundles and getEstimatedSize because these parts are linked

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

<source>1.7</source>
<target>1.7</target>
<compilerArgs>
<arg>-Xlint:all</arg>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why all these disabled warnings? If you're adding a new module, might as well make it lint-clean.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just the same configuration as in the parent pom.

<description>IO to read and write with Apache Cassandra database</description>

<properties>
<compiler.error.flag></compiler.error.flag>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to disable -Werror which fails with Cassandra on Java7 (Cassandra requires Java8).

<arg>-Xlint:-varargs</arg>
</compilerArgs>
<showWarnings>true</showWarnings>
<showDeprecation>false</showDeprecation>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise. Don't think there's a strong reason to use deprecated APIs in this connector.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As previous comment, it's duplication of the parent pom configuration updated for Cassandra.


<profiles>
<profile>
<!-- Skip tests on Java7 as Cassandra requires Java8 -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the embedded test instance of Cassandra, or do you mean the Cassandra API? Please clarify in the comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the embedded Cassandra instance used in the test, which requires Java8. I will update the comment accordingly.

*
* <p>CassandraIO provides a source to read and returns a bounded collection of entities as {@code
* PCollection<Entity>}.
* An entity is built by Cassandra mapper based on a POJO containing annotations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to confirm whether this is how people usually interact with Cassandra, i.e. whether they'll find this familiar. E.g. is this similar to how the Spark cassandra connector works?

If not, you may want to adjust the API to let the user supply an arbitrary row mapper function, like in JdbcIO, and allow using entity mappers as just a special case of that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Spark cassandra connector also uses an Entity AFAIK.

So, my preferred option is to use an Entity. However, for convenience, if the user doesn't provide an Entity, then, he will get a PCollection of Row (implementing this part now).

@Before
public void startCassandra() throws Exception {
EmbeddedCassandraServerHelper.startEmbeddedCassandra("/cassandra.yaml",
"target/cassandra", 30000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use target/ - use a TempDirectory rule.

@Test
@Category(NeedsRunner.class)
public void testRead() throws Exception {
Pipeline pipeline = TestPipeline.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestPipeline is now a @Rule.

Copy link
Member Author

@jbonofre jbonofre Dec 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using @Rule for TestPipeline causes:

java.io.NotSerializableException: org.apache.beam.sdk.testing.TestPipeline

when I do MapElements.via(new SimpleFunction()).

It's because CassandraIOTest is Serializable (required for the functions) but not TestPipeline. I'm testing transient there.

@@ -0,0 +1,1073 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean up this file - remove cruft, unnecessary settings and unnecessary comments?

# Directory where Cassandra should store hints.
# If not set, the default directory is $CASSANDRA_HOME/data/hints.
# hints_directory: /var/lib/cassandra/hints
hints_directory: target/cassandra/hints
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also use the temp directory.

# data_file_directories:
# - /var/lib/cassandra/data
data_file_directories:
- target/cassandra/data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here and line 217. I hope there's a better way to tell Cassandra where to write. I also hope it has defaults for most of the options declared in this file...

@jbonofre
Copy link
Member Author

jbonofre commented Jun 2, 2017

retest this please

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 8581685 on jbonofre:BEAM-245-CASSANDRA into ** on apache:master**.

Copy link
Contributor

@ssisk ssisk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good for the IO ITs, thanks for making sure the tests can run repeatedly!

public class CassandraIOIT implements Serializable {

private static IOTestPipelineOptions options;
private static String writeTableName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used any more

@jbonofre
Copy link
Member Author

jbonofre commented Jun 6, 2017

Agree to merge ?

@jbonofre
Copy link
Member Author

jbonofre commented Jun 6, 2017

Rebased.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.007%) to 70.61% when pulling fc618c2 on jbonofre:BEAM-245-CASSANDRA into 6d64c6e on apache:master.

@jbonofre
Copy link
Member Author

jbonofre commented Jun 6, 2017

The build passed:

2017-06-06T07:19:20.608 [INFO] ------------------------------------------------------------------------
2017-06-06T07:19:20.608 [INFO] BUILD SUCCESS
2017-06-06T07:19:20.608 [INFO] ------------------------------------------------------------------------

but Jenkins complaints.

I'm launching a new build.

@jbonofre
Copy link
Member Author

jbonofre commented Jun 6, 2017

retest this please

@coveralls
Copy link

Coverage Status

Coverage remained the same at 70.617% when pulling fc618c2 on jbonofre:BEAM-245-CASSANDRA into 6d64c6e on apache:master.

@jbonofre
Copy link
Member Author

jbonofre commented Jun 6, 2017

@jkff @ssisk good for you guys ?

@jkff
Copy link
Contributor

jkff commented Jun 6, 2017

Was good for me already, and seems @ssisk is good too. Go ahead and merge, thanks!

@jkff
Copy link
Contributor

jkff commented Jun 6, 2017

Please squash the commits though.

@asfgit asfgit closed this in c189d5c Jun 7, 2017
@jbonofre jbonofre deleted the BEAM-245-CASSANDRA branch June 7, 2017 06:06
@nevillelyh
Copy link
Contributor

So if I understand the writer code correctly, createWriter is called per DoFn instance, e.g. creating one connection per CPU per worker, and then doing blocking mapper.save(entity) per element?

johnjcasey pushed a commit to johnjcasey/beam that referenced this pull request Feb 8, 2023
pl04351820 pushed a commit to pl04351820/beam that referenced this pull request Dec 20, 2023
* fix(deps): Require proto-plus >=1.20.5

In proto-plus 1.20.5, the protobuf dependency is pinned to <4.0.0dev

Fix apache#592
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet