Skip to content

Commit

Permalink
added/removed files around avro source tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Sep 18, 2022
1 parent 1ab1b1a commit 8b68d0a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 0 deletions.
Binary file removed src/test/resources/avro-test/brec01.avro
Binary file not shown.
Binary file removed src/test/resources/avro-test/brec02.avro
Binary file not shown.
@@ -0,0 +1,97 @@
package io.epiphanous.flinkrunner.model

import org.apache.flink.api.common.serialization.BulkWriter
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.local.LocalDataInputStream
import org.apache.flink.core.fs.{
FSDataOutputStream,
FileInputSplit,
FileSystem,
Path
}
import org.apache.flink.testutils.TestFileSystem

import java.io.File
import java.nio.file.{Files, Paths}
import java.nio.file.attribute.BasicFileAttributeView
import scala.collection.mutable.ArrayBuffer
import scala.util.Try

trait AvroFileTestUtils {

val overwrite = FileSystem.WriteMode.OVERWRITE

def getStream(path: String): FSDataOutputStream =
new TestFileSystem().create(new Path(path), overwrite)

def getWriter(path: String, isParquet: Boolean): BulkWriter[BWrapper] = {
new EmbeddedAvroWriterFactory[BWrapper, BRecord, MyAvroADT](
isParquet
).create(getStream(path))
}

def readFile(path: String, isParquet: Boolean): List[BWrapper] =
if (isParquet) readParquetFile(path) else readAvroFile(path)

def readAvroFile(file: String): List[BWrapper] = {
val fileSize = getFileInfoView(Paths.get(file)).readAttributes().size()
val path = new Path(file)
val inputFormat =
new EmbeddedAvroInputFormat[BWrapper, BRecord, MyAvroADT](path)
inputFormat.open(new FileInputSplit(1, path, 0L, fileSize, null))
val reuse: BWrapper = BWrapper(new BRecord())
val pop: ArrayBuffer[BWrapper] = ArrayBuffer.empty
var done = false
while (!done) {
val b = inputFormat.nextRecord(reuse)
done = b == null
if (!done) pop += b
}
inputFormat.close()
pop.toList
}

def readParquetFile(file: String): List[BWrapper] = {
val fileSize = getFileInfoView(Paths.get(file)).readAttributes().size()
val inputFormat =
new EmbeddedAvroParquetInputFormat[BWrapper, BRecord, MyAvroADT]()
val input = new LocalDataInputStream(new File(file))
val reader = inputFormat.createReader(
new Configuration(),
input,
fileSize,
fileSize
)
val pop: ArrayBuffer[BWrapper] = ArrayBuffer.empty
var done = false
while (!done) {
val b = reader.read()
done = b == null
if (!done) pop += b
}
reader.close()
pop.toList
}

def writeFile(
path: String,
isParquet: Boolean,
pop: List[BWrapper]): Unit = {
val writer = getWriter(path, isParquet)
pop.foreach(writer.addElement)
writer.finish()
}

def getFileInfoView(path: java.nio.file.Path): BasicFileAttributeView =
Files
.getFileAttributeView(
path,
classOf[BasicFileAttributeView]
)

def getTempFile(isParquet: Boolean): Try[java.nio.file.Path] = {
val pa = if (isParquet) "parquet" else "avro"
Try(Files.createTempFile(s"$pa-test-", s".$pa"))
}
}

0 comments on commit 8b68d0a

Please sign in to comment.