Skip to content

Commit

Permalink
Acxiom#345 Added DataStream trait and reader/writer traits. Implement…
Browse files Browse the repository at this point in the history
…ed a CSVDataROwReader.
  • Loading branch information
dafreels committed Mar 22, 2023
1 parent 5705db3 commit ba66e5e
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 10 deletions.
21 changes: 20 additions & 1 deletion metalus-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@
<artifactId>json4s-ext_${scala.compat.version}</artifactId>
<version>${json4s.version}</version>
</dependency>
<!-- Compression Libraries -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.22</version>
</dependency>
<!-- CSV and Fixed Width File Parsing -->
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>2.9.1</version>
</dependency>
<!-- Test libraries -->
<dependency>
<groupId>log4j</groupId>
Expand Down Expand Up @@ -205,7 +217,6 @@
<exclude>junit:junit</exclude>
<exclude>jmock:*</exclude>
<exclude>*:xml-apis</exclude>
<!-- <exclude>com.thoughtworks.paranamer:*:jar:</exclude>-->
<exclude>com.fasterxml:*:jar:</exclude>
<exclude>org.apache.maven:lib:tests</exclude>
<exclude>org.apache.kafka:*:jar:</exclude>
Expand Down Expand Up @@ -241,6 +252,14 @@
<pattern>com.thoughtworks.paranamer</pattern>
<shadedPattern>metalus.com.thoughtworks.paranamer</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>metalus.org.apache.commons</shadedPattern>
</relocation>
<relocation>
<pattern>com.univocity</pattern>
<shadedPattern>metalus.com.univocity</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,28 @@ trait Connector {
def connectorType: String

/**
* Using the provided PipelineContext and the optional credentialName and credential, this function will
* attempt to provide a Credential for use by the connector.
*
* @param pipelineContext The current PipelineContext for this session.
* @return A credential or None.
*/
* Returns a DataRowReader or None. The reader can be used to window data from the connector.
*
* @param properties Optional properties required by the reader.
* @return Returns a DataRowReader or None.
*/
def getReader(properties: Option[DataStreamOptions]): Option[DataRowReader] = None

/**
* Returns a DataRowWriter or None. The writer can be used to window data to the connector.
*
* @param properties Optional properties required by the writer.
* @return Returns a DataRowWriter or None.
*/
def getWriter(properties: Option[DataStreamOptions]): Option[DataRowWriter] = None

/**
* Using the provided PipelineContext and the optional credentialName and credential, this function will
* attempt to provide a Credential for use by the connector.
*
* @param pipelineContext The current PipelineContext for this session.
* @return A credential or None.
*/
protected def getCredential(pipelineContext: PipelineContext): Option[Credential] = {
if (credentialName.isDefined) {
pipelineContext.credentialProvider.get.getNamedCredential(credentialName.get)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.acxiom.metalus.connectors

import com.acxiom.metalus.{Constants, PipelineException}
import com.acxiom.metalus.sql.{Row, Schema}

/**
* Represents a stream of data.
*/
trait DataRowStream {
/**
* Closes the stream.
*/
def close(): Unit

/**
* Opens the stream for processing.
*/
def open(): Unit
}

/**
* Provides the ability to read from a data stream.
*/
trait DataRowReader extends DataRowStream {
/**
* Fetches the next set of rows from the stream. An empty list indicates the stream is open but no data was available
* while None indicates the stream is closed and no more data is available,
*
* @return A list of rows or None if the end of the stream has been reached.
*/
def next(): Option[List[Row]]
}

/**
* Provides the ability to write data to a stream.
*/
trait DataRowWriter extends DataRowStream {
/**
* Prepares the provided row and pushes to the stream. The format of the data will be determined by the
* implementation.
*
* @param row A single row to push to the stream.
* @throws PipelineException - will be thrown if this call cannot be completed.
*/
@throws(classOf[PipelineException])
def process(row: Row): Unit = process(List(row))

/**
* Prepares the provided rows and pushes to the stream. The format of the data will be determined by the
* implementation.
*
* @param rows A list of Row objects.
* @throws PipelineException - will be thrown if this call cannot be completed.
*/
@throws(classOf[PipelineException])
def process(rows: List[Row]): Unit
}


case class DataStreamOptions(schema: Option[Schema],
options: Map[String, Any] = Map(),
rowBufferSize: Int = Constants.ONE_HUNDRED)
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
package com.acxiom.metalus.connectors

import com.acxiom.metalus.{Credential, PipelineContext}
import com.acxiom.metalus.fs.{FileManager, LocalFileManager}
import com.acxiom.metalus.sql.{Attribute, AttributeType, Row, Schema}
import com.acxiom.metalus.utils.DriverUtils
import com.acxiom.metalus.{Constants, Credential, PipelineContext}
import com.univocity.parsers.csv.{CsvParser, CsvParserSettings, UnescapedQuoteHandling}
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.compressors.z.ZCompressorInputStream

import java.io.{BufferedReader, InputStreamReader}

/**
* Provides access to the local file system. Credentials are not used by this connector.
Expand All @@ -20,4 +28,80 @@ case class LocalFileConnector(override val name: String,
* @return A FileManager for this specific connector type
*/
override def getFileManager(pipelineContext: PipelineContext): FileManager = new LocalFileManager()

override def getReader(properties: Option[DataStreamOptions]): Option[DataRowReader] = {
val options = properties.getOrElse(DataStreamOptions(None))
val filePath = options.options.getOrElse("filePath", "INVALID_FILE_PATH").toString
if (filePath.split('.').contains("csv")) {
Some(CSVFileDataRowReader(new LocalFileManager(), options))
} else {
None
}
}
}

case class CSVFileDataRowReader(fileManager: FileManager, properties: DataStreamOptions) extends DataRowReader {
private val settings = new CsvParserSettings()
private val format = settings.getFormat
format.setComment('\u0000')
format.setDelimiter(properties.options.getOrElse("fileDelimiter", ",").toString)
properties.options.get("fileQuote").asInstanceOf[Option[String]].foreach(q => format.setQuote(q.head))
properties.options.get("fileQuoteEscape").asInstanceOf[Option[String]].foreach(q => format.setQuoteEscape(q.head))
properties.options.get("fileRecordDelimiter").asInstanceOf[Option[String]].foreach(r => format.setLineSeparator(r))
settings.setEmptyValue("")
settings.setNullValue("")
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_CLOSING_QUOTE)
private val csvParser = new CsvParser(settings)

private val filePath = properties.options.getOrElse("filePath", "INVALID_FILE_PATH").toString
private val file = {
if (!fileManager.exists(filePath)) {
throw DriverUtils.buildPipelineException(Some("A valid file path is required to read data!"), None, None)
}
fileManager.getFileResource(properties.options("filePath").toString)
}
private val inputStreamReader = {
val inputStream = file.getInputStream()
new BufferedReader(new InputStreamReader(filePath.split('.').last.toLowerCase match {
case "gz" => new GzipCompressorInputStream(inputStream, true)
case "bz2" => new BZip2CompressorInputStream(inputStream)
case "z" => new ZCompressorInputStream(inputStream)
case _ => inputStream
}))
}
private val schema = {
if (properties.options.getOrElse("useHeader", false).toString.toBoolean) {
Some(Schema(csvParser.parseLine(inputStreamReader.readLine()).map { column =>
Attribute(column, AttributeType("string"), None, None)
}))
} else {
properties.schema
}
}

override def next(): Option[List[Row]] = {
val rows = Range(Constants.ZERO, properties.rowBufferSize).foldLeft(List[Row]()) { (list, index) =>
val line = Option(inputStreamReader.readLine())
if (line.isDefined) {
list :+ Row(csvParser.parseLine(line.get), schema, Some(line))
} else {
list
}
}
if (rows.isEmpty) {
None
} else if (rows.length < properties.rowBufferSize) {
if (rows.nonEmpty) {
Some(rows)
} else {
None
}
} else {
Some(rows)
}
}

override def close(): Unit = {}

override def open(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ package com.acxiom.metalus.sql
* @param schema An optional schema that provides metadata about each column
* @param nativeRow The original data representation.
*/
case class Row(columns: Array[Any], schema: Option[Schema], nativeRow: Option[Any]) {
case class Row(columns: Array[_], schema: Option[Schema], nativeRow: Option[Any]) {
def mkString(sep: String): String = columns.mkString(sep)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.acxiom.metalus.connectors

import com.acxiom.metalus.Constants
import org.scalatest.funspec.AnyFunSpec

import java.io.File
import java.nio.file.{Files, StandardCopyOption}

class LocalFileConnectorTests extends AnyFunSpec {
describe("LocalFileConnector - DataRowReader") {
it ("should read data from a file in chunks") {
val source = File.createTempFile("placeholder", ".txt")
source.deleteOnExit()
val dataFilePath = s"${source.getParentFile.getAbsolutePath}/MOCK_DATA.csv"
Files.copy(getClass.getResourceAsStream("/MOCK_DATA.csv"),
new File(dataFilePath).toPath,
StandardCopyOption.REPLACE_EXISTING)
val localFileConnector = LocalFileConnector("my-connector", None, None)
val options = DataStreamOptions(None,
Map("filePath" -> dataFilePath, "fileDelimiter" -> ",", "useHeader" -> true),
Constants.TWELVE)
val reader = localFileConnector.getReader(Some(options))
assert(reader.isDefined)
val firstRows = reader.get.next()
assert(firstRows.isDefined && firstRows.get.length == Constants.TWELVE)
val firstRow = firstRows.get.head
assert(firstRow.columns.length == Constants.SEVEN)
assert(firstRow.schema.isDefined)
assert(firstRow.schema.get.attributes.length == Constants.SEVEN)
val columnNames = List("id","first_name","last_name","email","gender","ein","postal_code")
firstRow.schema.get.attributes.foreach(a => assert(columnNames.contains(a.name)))
var count = firstRows.get.length
Iterator.continually(reader.get.next()).takeWhile(_.isDefined).foreach { rows =>
if (rows.isDefined) {
assert(rows.get.nonEmpty)
count = count + rows.get.length
}
}
assert(count == Constants.ONE_THOUSAND)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class FileManagerTests extends AnyFunSpec with Suite {
val data = "Some string that isn't very large"
val source = File.createTempFile("localFileManagerTest1", ".txt")
source.deleteOnExit()
val buffer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream((source))))
val buffer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(source)))
buffer.write(data)
buffer.flush()
buffer.close()
Expand Down

0 comments on commit ba66e5e

Please sign in to comment.