Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sink and Source for Apache Parquet #1131

Merged
merged 23 commits into from
Aug 31, 2018
Merged

Sink and Source for Apache Parquet #1131

merged 23 commits into from
Aug 31, 2018

Conversation

dannylesnik
Copy link
Contributor

This commit contains Scala and Java DSL for Sink and Source. I don't think there is a need to create Flow stage as well since I can't find and use case for it.

It contains Unit tests for both DSL.

@juanjoDiaz
Copy link
Contributor

I haven't gone through this in depth but this PR seems very similar to #720 and I can see some of the same issues that I highlighted there:

I don't think that we should create connectors for specific file formats.
This should be a flow to format the content and then use the File sink to write to disk.

and on the same line, I don't think that we should have a dependency to hadoop at all. We should only have a dependency to Avro-Parquet

Adding missing configuration to build.sbt
@dannylesnik
Copy link
Contributor Author

@juanjoDiaz
Thank you for the feedback, Some points.

  • In my commit I provided also Sink and Source. So I believe I extended WIP - Parquet sink for Alpakka #720 with Source implementation as well.

  • You mention that there should be flow format and File as Sink. In my opinion this is good reason, but don't forget that ParquetReader and Writer can write and read to HDFS directly (we are using it this way on our production systems. In m my Sink and Source can come directly from files located in HDFS.
    This is how we are using it on production.
    1) Source is query to Elasticsearch with elastic4s. Sink is parquet file located directly on HDFS.
    2) Run Spark job on this data, which generates parquet output files.
    3) Use parquet output files from HDFS as the Source, and Elastic4s as sink to stream job results to HDFS.
    Using parquet Source and Sink, we can do this functionality by several line of codes.

  • and the last, I don't depend on hadoop. In my commit the scope for hadoop files is TEST. I use Avro-Parquet library only as the dependency.

Adding missing configuration to build.sbt
Adding missing configuration to build.sbt
Adding missing configuration to build.sbt
@ennru
Copy link
Member

ennru commented Aug 17, 2018

I see you have trouble with Paradox, we recently changed to absolute paths. You can remove the $alpakka$ prefix.

@ennru ennru added the p:new label Aug 17, 2018
@dannylesnik
Copy link
Contributor Author

@ennru Thank you for your help. This issue fixed. Now have some weird ftp unit test failure.

@ennru
Copy link
Member

ennru commented Aug 20, 2018

Looks very promising!

Without looking into the details, I'd like to ask you to change a few things in the structure

  • move internal classes into an impl package
  • move the example code to docs.scaladsl and docs.javadsl (to surface any visibility problems)
  • add the module to .travis.yml
  • akka-stream, junit and akka-stream-testkit dependencies come in from Common
  • please annotate other dependencies with their license

@dannylesnik
Copy link
Contributor Author

Hi @ennru

Just made the changes as you asked me.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good.
Would there be a way to create a writer which emits ByteString? Creating a Flow to emit data to be written or sent via other technologies would be useful as @juanjoDiaz pointed out.

Even if the Parquet API doesn't support that case, I'd like to see a Flow complementing the Sink, maybe just emitting Done when the record is written.


override def onUpstreamFailure(ex: Throwable): Unit = {
super.onUpstreamFailure(ex)
writer.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please free resources in postStop. See #277

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import org.apache.avro.generic.GenericRecord
import org.apache.parquet.hadoop.ParquetWriter

private[avroparquet] class AvroParquetFlow(writer: ParquetWriter[GenericRecord])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @InternalApi and a ScalaDoc comment stating INTERNAL API to all internal classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

writer.close()
}

@scala.throws[Exception](classOf[Exception])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the annotation add any value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


override def onDownstreamFinish(): Unit = {
super.onDownstreamFinish()
reader.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use postStop here, as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@dannylesnik
Copy link
Contributor Author

Hi @ennru.

Committed all the changes you requested to do after your code review.

Regarding what you suggest. I don't think that ByteString is the correct abstraction, since this is Avro Column record and I need schema to manipulate it.
Working heavily with Akka Streams and Hadoop Echo System. I believe that 99% percent of this code usage would be storing events (which obviously are case classes or raw Jsons, fetched from DB, received from HTTP Layer or even from Actor Publisher) on HDFS (or any other distributed storage), to run Map-Reduce operation, which stores result in Parquet and move output back to the system. I don't believe that we need any other format on the way from schema based parquet record to case class.

What I suggest might be useful is to make AvroParquetFlow public API and use it as .via(FlowStage[GenericRecord,GenericRecord]) in case where Store to Parquet is not a last stage of the stream.

@ennru
Copy link
Member

ennru commented Aug 27, 2018

Ok, makes sense. Having access to a flow is the most important bit.

My advice for the documentation snippets was not clear enough. Please move the snippets to /avroparquet/src/test/java/docs/javadsl and /avroparquet/src/test/java/docs/scaladsl.

You need to add Parquet in connectors.md to get it listed.

@dannylesnik
Copy link
Contributor Author

  • Created public Java and Scala DSL for API for AvroParquetFlow
  • Added Spec for AvroParquetFlow
  • Added documentation section for AvroParquetFlow in Paradox
  • Added Parquet for connectors.md
  • Examples.java moved to /avroparquet/src/test/java/docs/javadsl

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please configure logging to be sent to a file instead (see other modules).

Have you tried the generated documentation? To get the current Paradox plugin, you'd need to re-base your branch.

You may move all API tests to docs.javadoc and docs.scaladoc. This makes sure all API is accessible.

Please explain what the code snippets do in the docs.

Make sure the imports are part of the snippets (too many classes are called Path). You may use the same snippet tag multiple times in a source file to get everything into one snippet.


override def onPush(): Unit = {
val obtainedValue = grab(in)
writer.write(obtainedValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the write operation in most cases will be blocking, the Parquet stages should use Akka's IODispatcher.

Please add this to both flow and source stages:

  override protected def initialAttributes: Attributes =
    super.initialAttributes and ActorAttributes.IODispatcher

Copy link
Contributor

@danelkotev danelkotev Aug 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ennru Hi, can you give a brief explanation what IODispatcher is and why it's necessary here? I believe it provides separate dispatcher for I/O configured through the akka.stream.blocking-io-dispatcher. If this is the case, it should be added to both Sink & Source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the IODispatcher can be configured centrally in Akka. Its size may need to be adapted if there are many blocking things happening in your Actor system. And yes, it should be selected for all stages executing blocking IO.

# Please enter a commit message to explain why this merge is necessary,
# especially if it merges an updated upstream into a topic branch.
#
# Lines starting with '#' will be ignored, and an empty message aborts
# the commit.
@dannylesnik
Copy link
Contributor Author

  • Logging configured

  • All API unit tests moved to docs.javadoc and docs.scaladoc

  • Added more explanation and imports in documentation

  • Added IO Dispatcher to Flow and Sink.

  • generated avroparquet.html using code/paradox. Looks OK.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're closing in...

if (entries != null) {
for (String s : entries) {
File currentFile = new File(index.getPath(), s);
currentFile.delete();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider to use File.createTempFile and File.deleteOnExit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File.createTempFile can't work here, because I'm creating Hadoop Path file directly from path String, however File.deleteOnExit plays perfectly here. Thanks.

@@ -0,0 +1,13 @@
<configuration>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>target/files.log</file>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log should be called after the module, avroparquet.log.

@@ -8,6 +8,7 @@
* [Apache Geode](geode.md)
* [Apache Kafka](kafka.md)
* [Apache Kudu](kudu.md)
* [Apache Parquet](avroparquet.md)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was it intentional to use "Apache" instead of "Avro"? Both are Apache projects, but it would better to follow the module name.

@dannylesnik
Copy link
Contributor Author

Changes committed.

@dannylesnik
Copy link
Contributor Author

@ennru .
Changes committed.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@ennru ennru merged commit 84db4a5 into akka:master Aug 31, 2018
@ennru
Copy link
Member

ennru commented Aug 31, 2018

Thank you for your contribution! Keep them coming.

@ennru ennru added this to the 0.21 milestone Aug 31, 2018
@dannylesnik dannylesnik deleted the avroparquet branch August 31, 2018 14:30
sebastianharko pushed a commit to sebastianharko/alpakka that referenced this pull request Sep 5, 2018
dannylesnik added a commit to dannylesnik/alpakka that referenced this pull request Sep 8, 2018
@dannylesnik dannylesnik restored the avroparquet branch September 10, 2018 14:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants