Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avroparquet: Lower bound higher kinded #2273

Merged
merged 19 commits into from Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,7 @@
# added generics type to the factory methods
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.alpakka.avroparquet.javadsl.AvroParquetFlow.create")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.alpakka.avroparquet.javadsl.AvroParquetSource.create")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.alpakka.avroparquet.javadsl.AvroParquetSink.create")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.alpakka.avroparquet.scaladsl.AvroParquetSink.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.alpakka.avroparquet.scaladsl.AvroParquetFlow.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.alpakka.avroparquet.scaladsl.AvroParquetSource.apply")
Expand Up @@ -13,12 +13,12 @@ import org.apache.parquet.hadoop.ParquetWriter
* Internal API
*/
@InternalApi
private[avroparquet] class AvroParquetFlow(writer: ParquetWriter[GenericRecord])
extends GraphStage[FlowShape[GenericRecord, GenericRecord]] {
private[avroparquet] class AvroParquetFlow[T <: GenericRecord](writer: ParquetWriter[T])
extends GraphStage[FlowShape[T, T]] {

val in: Inlet[GenericRecord] = Inlet("AvroParquetSink.in")
val out: Outlet[GenericRecord] = Outlet("AvroParquetSink.out")
override val shape: FlowShape[GenericRecord, GenericRecord] = FlowShape.of(in, out)
val in: Inlet[T] = Inlet("AvroParquetSink.in")
val out: Outlet[T] = Outlet("AvroParquetSink.out")
override val shape: FlowShape[T, T] = FlowShape.of(in, out)

override protected def initialAttributes: Attributes =
super.initialAttributes and ActorAttributes.IODispatcher
Expand Down
Expand Up @@ -13,10 +13,10 @@ import org.apache.parquet.hadoop.ParquetReader
* Internal API
*/
@InternalApi
private[avroparquet] class AvroParquetSource(reader: ParquetReader[GenericRecord])
extends GraphStage[SourceShape[GenericRecord]] {
private[avroparquet] class AvroParquetSource[T <: GenericRecord](reader: ParquetReader[T])
extends GraphStage[SourceShape[T]] {

val out: Outlet[GenericRecord] = Outlet("AvroParquetSource")
val out: Outlet[T] = Outlet("AvroParquetSource")

override protected def initialAttributes: Attributes =
super.initialAttributes and ActorAttributes.IODispatcher
Expand Down Expand Up @@ -44,5 +44,5 @@ private[avroparquet] class AvroParquetSource(reader: ParquetReader[GenericRecord
override def postStop(): Unit = reader.close()

}
override def shape: SourceShape[GenericRecord] = SourceShape.of(out)
override def shape: SourceShape[T] = SourceShape.of(out)
}
Expand Up @@ -10,6 +10,6 @@ import org.apache.parquet.hadoop.ParquetWriter

object AvroParquetFlow {

def create(writer: ParquetWriter[GenericRecord]): Flow[GenericRecord, GenericRecord, NotUsed] =
def create[T <: GenericRecord](writer: ParquetWriter[T]): Flow[T, T, NotUsed] =
Flow.fromGraph(new akka.stream.alpakka.avroparquet.impl.AvroParquetFlow(writer))
}
Expand Up @@ -12,9 +12,9 @@ import org.apache.parquet.hadoop.ParquetWriter

object AvroParquetSink {

def create(writer: ParquetWriter[GenericRecord]): Sink[GenericRecord, CompletionStage[Done]] =
def create[T <: GenericRecord](writer: ParquetWriter[T]): Sink[T, CompletionStage[Done]] =
Flow
.fromGraph(new akka.stream.alpakka.avroparquet.impl.AvroParquetFlow(writer: ParquetWriter[GenericRecord]))
.fromGraph(new akka.stream.alpakka.avroparquet.impl.AvroParquetFlow(writer: ParquetWriter[T]))
.toMat(Sink.ignore(), Keep.right[NotUsed, CompletionStage[Done]])

}
Expand Up @@ -11,6 +11,6 @@ import org.apache.parquet.hadoop.ParquetReader

object AvroParquetSource {

def create(reader: ParquetReader[GenericRecord]): Source[GenericRecord, NotUsed] =
def create[T <: GenericRecord](reader: ParquetReader[T]): Source[T, NotUsed] =
Source.fromGraph(new akka.stream.alpakka.avroparquet.impl.AvroParquetSource(reader))
}
Expand Up @@ -10,6 +10,6 @@ import org.apache.parquet.hadoop.ParquetWriter

object AvroParquetFlow {

def apply(writer: ParquetWriter[GenericRecord]): Flow[GenericRecord, GenericRecord, NotUsed] =
def apply[T <: GenericRecord](writer: ParquetWriter[T]): Flow[T, T, NotUsed] =
Flow.fromGraph(new akka.stream.alpakka.avroparquet.impl.AvroParquetFlow(writer))
}
Expand Up @@ -12,7 +12,7 @@ import scala.concurrent.Future

object AvroParquetSink {

def apply(writer: ParquetWriter[GenericRecord]): Sink[GenericRecord, Future[Done]] =
def apply[T <: GenericRecord](writer: ParquetWriter[T]): Sink[T, Future[Done]] =
Flow.fromGraph(new akka.stream.alpakka.avroparquet.impl.AvroParquetFlow(writer)).toMat(Sink.ignore)(Keep.right)

}
Expand Up @@ -10,7 +10,7 @@ import org.apache.parquet.hadoop.ParquetReader

object AvroParquetSource {

def apply(reader: ParquetReader[GenericRecord]): Source[GenericRecord, NotUsed] =
def apply[T <: GenericRecord](reader: ParquetReader[T]): Source[T, NotUsed] =
Source.fromGraph(new akka.stream.alpakka.avroparquet.impl.AvroParquetSource(reader))

}
Expand Up @@ -74,6 +74,7 @@ public void setup() {
public void createNewParquetFile()
throws InterruptedException, IOException, TimeoutException, ExecutionException {
// #init-writer

Configuration conf = new Configuration();
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
ParquetWriter<GenericRecord> writer =
Expand Down
1 change: 1 addition & 0 deletions avroparquet/src/test/java/docs/javadsl/Examples.java
Expand Up @@ -39,6 +39,7 @@ public class Examples {
ActorMaterializer materializer = ActorMaterializer.create(system);

// #init-reader

Configuration conf = new Configuration();

ParquetReader<GenericRecord> reader =
Expand Down
114 changes: 95 additions & 19 deletions avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala
Expand Up @@ -3,48 +3,124 @@
*/

package docs.scaladsl

import java.io.File

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.testkit.TestKit
import com.sksamuel.avro4s.RecordFormat
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.AvroReadSupport
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.{AvroParquetReader, AvroParquetWriter, AvroReadSupport}
import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter}
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.scalacheck.Gen
import org.scalatest.{BeforeAndAfterAll, Suite}

import scala.language.higherKinds
import scala.reflect.io.Directory
import scala.util.Random

trait AbstractAvroParquet {
trait AbstractAvroParquet extends BeforeAndAfterAll {
this: Suite with TestKit =>

case class Document(id: String, body: String)

//#init-system
implicit val system: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
//#init-system

val conf = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)

val schema: Schema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"Document\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"body\",\"type\":\"string\"}]}"
)

val genDocument: Gen[Document] =
Gen.oneOf(Seq(Document(id = Gen.alphaStr.sample.get, body = Gen.alphaLowerStr.sample.get)))
val genDocuments: Int => Gen[List[Document]] = n => Gen.listOfN(n, genDocument)

val format: RecordFormat[Document] = RecordFormat[Document]

val folder: String = "./" + Random.alphanumeric.take(8).mkString("")

def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
val genFinalFile: Gen[String] = for {
fileName <- Gen.alphaLowerStr
} yield { folder + "/" + fileName + ".parquet" }

import scala.reflect.io.Directory
val directory = new Directory(new File(folder))
directory.deleteRecursively()
}
val genFile: Gen[String] = Gen.oneOf(Seq(Gen.alphaLowerStr.sample.get + ".parquet"))

val conf: Configuration = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)

def docToRecord(document: Document): GenericRecord =
def parquetWriter[T <: GenericRecord](file: String, conf: Configuration, schema: Schema): ParquetWriter[T] =
AvroParquetWriter.builder[T](new Path(file)).withConf(conf).withSchema(schema).build()

def parquetReader[T <: GenericRecord](file: String, conf: Configuration): ParquetReader[T] =
AvroParquetReader.builder[T](HadoopInputFile.fromPath(new Path(file), conf)).withConf(conf).build()

def docToGenericRecord(document: Document): GenericRecord =
new GenericRecordBuilder(schema)
.set("id", document.id)
.set("body", document.body)
.build()

def fromParquet(file: String, configuration: Configuration): List[GenericRecord] = {
val reader = parquetReader[GenericRecord](file, conf)
var record: GenericRecord = reader.read()
var result: List[GenericRecord] = List.empty[GenericRecord]
while (record != null) {
result = result ::: record :: Nil
record = reader.read()
}
result
}

def sourceDocumentation(): Unit = {
// #prepare-source
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.AvroReadSupport

val conf: Configuration = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
// #prepare-source
}

def sinkDocumentation(): Unit = {
// #prepare-sink
import com.sksamuel.avro4s.Record
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroReadSupport

val file: String = "./sample/path/test.parquet"
val conf: Configuration = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
val writer: ParquetWriter[Record] =
AvroParquetWriter.builder[Record](new Path(file)).withConf(conf).withSchema(schema).build()
// #prepare-sink
if (writer != null) { // forces val usage
}
}

def initWriterDocumentation(): Unit = {
// #init-writer
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.hadoop.ParquetReader

val file: String = "./sample/path/test.parquet"
val writer: ParquetWriter[GenericRecord] =
AvroParquetWriter.builder[GenericRecord](new Path(file)).withConf(conf).withSchema(schema).build()
// #init-writer
// #init-reader
val reader: ParquetReader[GenericRecord] =
AvroParquetReader.builder[GenericRecord](HadoopInputFile.fromPath(new Path(file), conf)).withConf(conf).build()
// #init-reader
if (writer != null && reader != null) { // forces val usage
}
}

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
val directory = new Directory(new File(folder))
directory.deleteRecursively()
}
}
121 changes: 65 additions & 56 deletions avroparquet/src/test/scala/docs/scaladsl/AvroParquetFlowSpec.scala
Expand Up @@ -4,68 +4,77 @@

package docs.scaladsl

import java.util.concurrent.TimeUnit

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.avroparquet.scaladsl.AvroParquetFlow
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.{Done, NotUsed}
import akka.testkit.TestKit
import com.sksamuel.avro4s.Record
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.{AvroParquetReader, AvroParquetWriter, AvroReadSupport}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.specs2.mutable.Specification
import org.specs2.specification.AfterAll

import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration.Duration

class AvroParquetFlowSpec extends Specification with AbstractAvroParquet with AfterAll {

"AvroParquet" should {

"insert records in parquet as part of Flow stage" in assertAllStagesStopped {

val docs = List[Document](Document("id1", "sdaada"), Document("id1", "sdaada"), Document("id3", " fvrfecefedfww"))

val source = Source.fromIterator(() => docs.iterator)

val file = folder + "/test.parquet"

val conf = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)

val writer: ParquetWriter[GenericRecord] =
AvroParquetWriter.builder[GenericRecord](new Path(file)).withConf(conf).withSchema(schema).build()

//#init-flow
val flow: Flow[GenericRecord, GenericRecord, NotUsed] = AvroParquetFlow(writer)

val result = source
.map(f => docToRecord(f))
.via(flow)
.runWith(Sink.ignore)
//#init-flow

Await.result[Done](result, Duration(5, TimeUnit.SECONDS))
val dataFile = new org.apache.hadoop.fs.Path(file)

val reader =
AvroParquetReader.builder[GenericRecord](HadoopInputFile.fromPath(dataFile, conf)).withConf(conf).build()

var r: GenericRecord = reader.read()

val b: mutable.Builder[GenericRecord, Seq[GenericRecord]] = Seq.newBuilder[GenericRecord]

while (r != null) {
b += r
r = reader.read()
}
b.result().length shouldEqual 3
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures

class AvroParquetFlowSpec
extends TestKit(ActorSystem("FlowSpec"))
with AnyWordSpecLike
with Matchers
with AbstractAvroParquet
with ScalaFutures
with BeforeAndAfterAll {

"Parquet Flow" should {

"insert avro records in parquet from `GenericRecord`" in assertAllStagesStopped {
//given
val n: Int = 2
val file: String = genFinalFile.sample.get
// #init-flow
val records: List[GenericRecord]
// #init-flow
= genDocuments(n).sample.get.map(docToGenericRecord)
val writer: ParquetWriter[GenericRecord] = parquetWriter(file, conf, schema)

//when
// #init-flow
val source: Source[GenericRecord, NotUsed] = Source(records)
val avroParquet: Flow[GenericRecord, GenericRecord, NotUsed] = AvroParquetFlow(writer)
val result =
source
.via(avroParquet)
.runWith(Sink.seq)
// #init-flow

result.futureValue

//then
val parquetContent: List[GenericRecord] = fromParquet(file, conf)
parquetContent.length shouldEqual n
parquetContent should contain theSameElementsAs records
}

"insert avro records in parquet from a subtype of `GenericRecord`" in assertAllStagesStopped {
//given
val n: Int = 2
val file: String = genFinalFile.sample.get
val documents: List[Document] = genDocuments(n).sample.get
val avroDocuments: List[Record] = documents.map(format.to(_))
val writer: ParquetWriter[Record] = parquetWriter[Record](file, conf, schema)

//when
Source(avroDocuments)
.via(AvroParquetFlow[Record](writer))
.runWith(Sink.seq)
.futureValue

//then
val parquetContent: List[GenericRecord] = fromParquet(file, conf)
parquetContent.length shouldEqual n
parquetContent.map(format.from(_)) should contain theSameElementsAs documents
}
}

}