diff --git a/src/test/resources/avro-test/brec01.avro b/src/test/resources/avro-test/brec01.avro deleted file mode 100644 index f14a86b..0000000 Binary files a/src/test/resources/avro-test/brec01.avro and /dev/null differ diff --git a/src/test/resources/avro-test/brec02.avro b/src/test/resources/avro-test/brec02.avro deleted file mode 100644 index 0759617..0000000 Binary files a/src/test/resources/avro-test/brec02.avro and /dev/null differ diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/AvroFileTestUtils.scala b/src/test/scala/io/epiphanous/flinkrunner/model/AvroFileTestUtils.scala new file mode 100644 index 0000000..ca71e56 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/AvroFileTestUtils.scala @@ -0,0 +1,97 @@ +package io.epiphanous.flinkrunner.model + +import org.apache.flink.api.common.serialization.BulkWriter +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.core.fs.local.LocalDataInputStream +import org.apache.flink.core.fs.{ + FSDataOutputStream, + FileInputSplit, + FileSystem, + Path +} +import org.apache.flink.testutils.TestFileSystem + +import java.io.File +import java.nio.file.{Files, Paths} +import java.nio.file.attribute.BasicFileAttributeView +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +trait AvroFileTestUtils { + + val overwrite = FileSystem.WriteMode.OVERWRITE + + def getStream(path: String): FSDataOutputStream = + new TestFileSystem().create(new Path(path), overwrite) + + def getWriter(path: String, isParquet: Boolean): BulkWriter[BWrapper] = { + new EmbeddedAvroWriterFactory[BWrapper, BRecord, MyAvroADT]( + isParquet + ).create(getStream(path)) + } + + def readFile(path: String, isParquet: Boolean): List[BWrapper] = + if (isParquet) readParquetFile(path) else readAvroFile(path) + + def readAvroFile(file: String): List[BWrapper] = { + val fileSize = getFileInfoView(Paths.get(file)).readAttributes().size() + val path = new Path(file) + val inputFormat = + new EmbeddedAvroInputFormat[BWrapper, BRecord, MyAvroADT](path) + inputFormat.open(new FileInputSplit(1, path, 0L, fileSize, null)) + val reuse: BWrapper = BWrapper(new BRecord()) + val pop: ArrayBuffer[BWrapper] = ArrayBuffer.empty + var done = false + while (!done) { + val b = inputFormat.nextRecord(reuse) + done = b == null + if (!done) pop += b + } + inputFormat.close() + pop.toList + } + + def readParquetFile(file: String): List[BWrapper] = { + val fileSize = getFileInfoView(Paths.get(file)).readAttributes().size() + val inputFormat = + new EmbeddedAvroParquetInputFormat[BWrapper, BRecord, MyAvroADT]() + val input = new LocalDataInputStream(new File(file)) + val reader = inputFormat.createReader( + new Configuration(), + input, + fileSize, + fileSize + ) + val pop: ArrayBuffer[BWrapper] = ArrayBuffer.empty + var done = false + while (!done) { + val b = reader.read() + done = b == null + if (!done) pop += b + } + reader.close() + pop.toList + } + + def writeFile( + path: String, + isParquet: Boolean, + pop: List[BWrapper]): Unit = { + val writer = getWriter(path, isParquet) + pop.foreach(writer.addElement) + writer.finish() + } + + def getFileInfoView(path: java.nio.file.Path): BasicFileAttributeView = + Files + .getFileAttributeView( + path, + classOf[BasicFileAttributeView] + ) + + def getTempFile(isParquet: Boolean): Try[java.nio.file.Path] = { + val pa = if (isParquet) "parquet" else "avro" + Try(Files.createTempFile(s"$pa-test-", s".$pa")) + } +}