Set of tools for CSV to Avro conversions written in Scala using Univocity CSV Parser. Available for scala 2.11
, 2.12
and 2.13
CsvIterator - wrapper around Univocity CSV Parser to adapt to Scala Collections Framework. Produces CsvRow
s.
CsvRow - simple map-like structure for rapid access to parsed CSV row.
CsvParser - parses CsvRow into avro's GenericData.Record
- Primitive Arrays are supported when a corresponding delimiter is provided to cut the String value
- Nested Records are supported when a corresponding delimiter is provided to construct complex field names
- Data not present in schema is ignored
- Extendable StringParsers. Some predefined
ones:
- numeric and boolean types
BYTES
andFIXED
as base64-stringsLogical Types
:date
time-millis
time-micros
timestamp-millis
timestamp-micros
- Extendable validations
- Comprehensive Exceptions: General/Missing Value/Wrong Type. All containing path and description
libraryDependencies += "io.github.agolovenko" %% "avro-tools-csv" % "0.8.0"
import com.univocity.parsers.csv.CsvParserSettings
import io.github.agolovenko.avro._
import io.github.agolovenko.avro.csv.{CsvIterator, CsvParser, CsvRow}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import java.io.ByteArrayInputStream
import java.nio.charset.StandardCharsets.UTF_8
val csv =
"""
|rstring,rint,rlong,rfloat,rdouble,rboolean,rdate
|qwerty,123,123456789012345667,123.45,12345.12345,true,2021-11-23
|""".stripMargin
val schema = new Schema.Parser().parse(
"""
|{
| "type": "record",
| "name": "r1",
| "fields": [
| {
| "name": "rstring",
| "type": "string"
| },
| {
| "name": "rint",
| "type": "int"
| },
| {
| "name": "rlong",
| "type": "long"
| },
| {
| "name": "rfloat",
| "type": "float"
| },
| {
| "name": "rdouble",
| "type": "double"
| },
| {
| "name": "rboolean",
| "type": "boolean"
| },
| {
| "name": "rdate",
| "type": {
| "type": "int",
| "logicalType": "date"
| }
| }
| ]
|}
|""".stripMargin)
val settings = new CsvParserSettings()
settings.setReadInputOnSeparateThread(false)
val csvRows: Iterator[CsvRow] = CsvIterator(settings, customHeaders = None)(csv)
val parser = new CsvParser(schema, arrayDelimiter = None, recordDelimiter = None, StringParsers.primitiveParsers)
val records: Iterator[GenericData.Record] = csvRows.map(parser.apply)
For more examples check out the tests!