Skip to content

Commit

Permalink
Acxiom#345 JDBC DataRow(Reader|Writer)s are now working.
Browse files Browse the repository at this point in the history
  • Loading branch information
dafreels committed Mar 27, 2023
1 parent de37e9b commit a0ea9da
Show file tree
Hide file tree
Showing 13 changed files with 436 additions and 67 deletions.
@@ -1,6 +1,8 @@
package com.acxiom.metalus.connectors

import com.acxiom.metalus.{Credential, PipelineContext}
import com.acxiom.metalus.sql.Row
import com.acxiom.metalus.utils.DriverUtils
import com.acxiom.metalus.{Constants, Credential, PipelineContext}

trait Connector {
def name: String
Expand Down
Expand Up @@ -2,6 +2,7 @@ package com.acxiom.metalus.connectors

import com.acxiom.metalus.{Constants, PipelineException}
import com.acxiom.metalus.sql.{Row, Schema}
import com.acxiom.metalus.utils.DriverUtils

/**
* Represents a stream of data.
Expand Down Expand Up @@ -29,6 +30,25 @@ trait DataRowReader extends DataRowStream {
* @return A list of rows or None if the end of the stream has been reached.
*/
def next(): Option[List[Row]]

protected def readDataWindow(properties: DataStreamOptions, rowFunc: (List[Row], Int) => List[Row]): Option[List[Row]] = {
try {
val rows = Range(Constants.ZERO, properties.rowBufferSize).foldLeft(List[Row]()) { (list, index) => rowFunc(list, index) }
if (rows.isEmpty) {
None
} else if (rows.length < properties.rowBufferSize) {
if (rows.nonEmpty) {
Some(rows)
} else {
None
}
} else {
Some(rows)
}
} catch {
case t: Throwable => throw DriverUtils.buildPipelineException(Some(s"Unable to read data: ${t.getMessage}"), Some(t), None)
}
}
}

/**
Expand Down
Expand Up @@ -101,31 +101,14 @@ case class CSVFileDataRowReader(fileManager: FileManager, properties: DataStream
}
}

override def next(): Option[List[Row]] = {
try {
val rows = Range(Constants.ZERO, properties.rowBufferSize).foldLeft(List[Row]()) { (list, index) =>
val line = Option(inputStreamReader.readLine())
if (line.isDefined) {
list :+ Row(csvParser.parseLine(line.get), schema, Some(line))
} else {
list
}
}
if (rows.isEmpty) {
None
} else if (rows.length < properties.rowBufferSize) {
if (rows.nonEmpty) {
Some(rows)
} else {
None
}
} else {
Some(rows)
}
} catch {
case t: Throwable => throw DriverUtils.buildPipelineException(Some(s"Unable to read data: ${t.getMessage}"), Some(t), None)
override def next(): Option[List[Row]] = readDataWindow(properties, (list, index) => {
val line = Option(inputStreamReader.readLine())
if (line.isDefined) {
list :+ Row(csvParser.parseLine(line.get), schema, Some(line))
} else {
list
}
}
})

override def close(): Unit = {}

Expand Down
@@ -1,9 +1,13 @@
package com.acxiom.metalus.connectors.jdbc

import com.acxiom.metalus.{Credential, PipelineContext, PipelineException, UserNameCredential}
import com.acxiom.metalus.connectors.DataConnector
import com.acxiom.metalus.sql.DataReferenceOrigin
import com.acxiom.metalus.sql.jdbc.{BasicJDBCDataReference, JDBCDataReference}
import com.acxiom.metalus.connectors.{DataConnector, DataRowReader, DataRowWriter, DataStreamOptions}
import com.acxiom.metalus.sql.jdbc.{BasicJDBCDataReference, JDBCDataReference, JDBCUtils}
import com.acxiom.metalus.sql.{Attribute, AttributeType, DataReferenceOrigin, Row, Schema}
import com.acxiom.metalus.utils.DriverUtils
import com.acxiom.metalus.{Constants, Credential, PipelineContext, PipelineException, UserNameCredential}

import java.sql.ResultSetMetaData
import java.util.Date

case class JDBCDataConnector(url: String,
name: String,
Expand All @@ -30,4 +34,169 @@ case class JDBCDataConnector(url: String,
}.getOrElse(Map())
BasicJDBCDataReference(() => dbtable, url, info, DataReferenceOrigin(this, Some(info)), pipelineContext)
}

/**
* Returns a DataRowReader or None. The reader can be used to window data from the connector.
*
* @param properties Optional properties required by the reader.
* @param pipelineContext The current PipelineContext
* @return Returns a DataRowReader or None.
*/
override def getReader(properties: Option[DataStreamOptions], pipelineContext: PipelineContext): Option[DataRowReader] = {
if (properties.isEmpty || (!properties.get.options.contains("dbtable") && !properties.get.options.contains("sql"))) {
throw DriverUtils.buildPipelineException(Some("Either the dbtable or sql property is required!"), None, Some(pipelineContext))
}
val dbTable = properties.get.options.get("dbtable")
val sql = properties.get.options.get("sql")
val finalSql = if (sql.isEmpty) {
dbTable.get
} else {
sql.get.toString
}
Some(JDBCDataRowReader(this, properties.get.copy(options = properties.get.options + ("dbtable" -> finalSql)), pipelineContext))
}

/**
* Returns a DataRowWriter or None. The writer can be used to window data to the connector.
*
* @param properties Optional properties required by the writer.
* @param pipelineContext The current PipelineContext
* @return Returns a DataRowWriter or None.
*/
override def getWriter(properties: Option[DataStreamOptions], pipelineContext: PipelineContext): Option[DataRowWriter] = {
if (properties.isEmpty || !properties.get.options.contains("dbtable")) {
throw DriverUtils.buildPipelineException(Some("The dbtable property is required!"), None, Some(pipelineContext))
}
Some(JDBCDataRowWriter(this, properties.get.options("dbtable").toString, properties.get, pipelineContext))
}
}

private case class JDBCDataRowReader(connector: JDBCDataConnector,
properties: DataStreamOptions,
pipelineContext: PipelineContext) extends DataRowReader {
private lazy val dataRef = connector.createDataReference(Some(properties.options), pipelineContext).asInstanceOf[BasicJDBCDataReference]
private lazy val result = dataRef.execute
private lazy val resultSet = result.resultSet.get
private lazy val schema = {
val metadata = resultSet.getMetaData
val count = metadata.getColumnCount
Schema(Range(Constants.ONE, count + Constants.ONE).map(index => {
Attribute(metadata.getColumnName(index), AttributeType(metadata.getColumnTypeName(index)),
Some(metadata.isNullable(index) match {
case ResultSetMetaData.columnNoNulls => false
case _ => true
}), None)
}))
}
/**
* Fetches the next set of rows from the stream. An empty list indicates the stream is open but no data was available
* while None indicates the stream is closed and no more data is available,
*
* @return A list of rows or None if the end of the stream has been reached.
*/
override def next(): Option[List[Row]] = readDataWindow(properties, (list, index) => {
if (resultSet.next()) {
val cols = schema.attributes.map(col => {
col.dataType.baseType.toLowerCase match{
case "int" | "integer" | "bigint" => resultSet.getInt(col.name)
case "long" => resultSet.getLong(col.name)
case "float" => resultSet.getFloat(col.name)
case "bigdecimal" => resultSet.getBigDecimal(col.name)
case "double" => resultSet.getDouble(col.name)
case "boolean" => resultSet.getBoolean(col.name)
case "byte" => resultSet.getByte(col.name)
case "date" => resultSet.getDate(col.name)
case "string" => resultSet.getString(col.name)
case _ => resultSet.getObject(col.name)
}
}).toArray
list :+ Row(cols, Some(schema), None)
} else {
list
}
})

/**
* Closes the stream.
*/
override def close(): Unit = {
resultSet.close()
result.connection.foreach(c => c.close())
}

/**
* Opens the stream for processing.
*/
override def open(): Unit = {}
}

private case class JDBCDataRowWriter(connector: JDBCDataConnector,
tableName: String,
properties: DataStreamOptions,
pipelineContext: PipelineContext) extends DataRowWriter {

private lazy val connection = JDBCUtils.createConnection(connector.url, properties.options.mapValues(_.toString).toMap[String, String])

/**
* Prepares the provided rows and pushes to the stream. The format of the data will be determined by the
* implementation.
*
* @param rows A list of Row objects.
* @throws PipelineException - will be thrown if this call cannot be completed.
*/
override def process(rows: List[Row]): Unit = {
if (rows.nonEmpty) {
val row = rows.head
val schema = getSchema(row)
val sqlValues = schema.attributes.foldLeft((List[String](), List[String]()))((sql, a) => {
(sql._1 :+ a.name, sql._2 :+ "?")
})
val finalSql = s"INSERT INTO $tableName (${sqlValues._1.mkString(",")}) VALUES (${sqlValues._2.mkString(",")})"
val statement = connection.prepareStatement(finalSql)
rows.foreach { row =>
row.columns.zipWithIndex.foreach { value =>
val index = value._2 + 1
value._1 match {
case i: Int => statement.setInt(index, i)
case i: Integer => statement.setInt(index, i)
case l: Long => statement.setLong(index, l)
case f: Float => statement.setFloat(index, f)
case d: BigDecimal => statement.setBigDecimal(index, d.bigDecimal)
case d: Double => statement.setDouble(index, d)
case b: Boolean => statement.setBoolean(index, b)
case b: Byte => statement.setByte(index, b)
case d: Date => statement.setDate(index, new java.sql.Date(d.getTime))
case s: String => statement.setString(index, s)
case _ => statement.setObject(index, value._1)
}
}
statement.executeUpdate()
}
statement.close()
}
}

private def getSchema(row: Row) = {
if (properties.schema.isDefined) {
properties.schema.get
} else if (properties.options.contains("schema")) {
properties.options("schema").asInstanceOf[Schema]
} else if (row.schema.isDefined) {
row.schema.get
} else {
throw DriverUtils.buildPipelineException(Some("A valid Schema is required!"), None, Some(pipelineContext))
}
}

/**
* Closes the stream.
*/
override def close(): Unit = {
connection.close()
}

/**
* Opens the stream for processing.
*/
override def open(): Unit = {}
}
Expand Up @@ -3,8 +3,7 @@ package com.acxiom.metalus.sql.jdbc
import com.acxiom.metalus.PipelineContext
import com.acxiom.metalus.sql._

import java.sql.{Connection, DriverManager, ResultSet}
import java.util.Properties
import java.sql.{Connection, ResultSet}
import scala.collection.immutable.Queue
import scala.util.{Failure, Success, Try}

Expand All @@ -16,25 +15,6 @@ trait JDBCDataReference[T] extends SqlBuildingDataReference[T] {
def uri: String

def properties: Map[String, String]

protected def createConnection(): Connection = {
val props = new Properties()
properties.foreach(entry => props.put(entry._1, entry._2))
DriverManager.getConnection(uri, props)
}

protected implicit class ResultSetImplicits(rs: ResultSet) {
def map[R](func: Int => R): Iterator[R] = Iterator.from(0).takeWhile(_ => rs.next()).map(func)

def toList: List[Map[String, Any]] = {
val columns = (1 to rs.getMetaData.getColumnCount).map(rs.getMetaData.getColumnName)
new Iterator[Map[String, Any]] {
def hasNext: Boolean = rs.next()

def next(): Map[String, Any] = columns.map(c => c -> rs.getObject(c)).toMap
}.toList
}
}
}

final case class BasicJDBCDataReference(baseExpression: () => String,
Expand All @@ -50,7 +30,7 @@ final case class BasicJDBCDataReference(baseExpression: () => String,
override def initialReference: String = baseExpression()

override def execute: JDBCResult = {
Try(createConnection()).flatMap { connection =>
Try(JDBCUtils.createConnection(uri, properties)).flatMap { connection =>
val stmt = connection.createStatement()
val sql = logicalPlan.lastOption match {
case Some(_: Select | _: CreateAs) => toSql
Expand All @@ -60,11 +40,13 @@ final case class BasicJDBCDataReference(baseExpression: () => String,
s"SELECT * FROM $ref"
}
val res = Try(stmt.execute(sql)).map {
case true => JDBCResult(Some(stmt.getResultSet.toList), None)
case false => JDBCResult(None, Some(stmt.getUpdateCount))
case true => JDBCResult(Some(stmt.getResultSet), None, Some(connection))
case false =>
val updateCount = stmt.getUpdateCount
stmt.close()
connection.close()
JDBCResult(None, Some(updateCount), None)
}
stmt.close()
connection.close()
res
} match {
case Success(rs) => rs
Expand All @@ -82,4 +64,4 @@ final case class BasicJDBCDataReference(baseExpression: () => String,
}
}

final case class JDBCResult(resultSet: Option[List[Map[String, Any]]], count: Option[Int])
final case class JDBCResult(resultSet: Option[ResultSet], count: Option[Int], connection: Option[Connection])
@@ -0,0 +1,12 @@
package com.acxiom.metalus.sql.jdbc

import java.sql.{Connection, DriverManager}
import java.util.Properties

object JDBCUtils {
def createConnection(uri: String, properties: Map[String, String]): Connection = {
val props = new Properties()
properties.foreach(entry => props.put(entry._1, entry._2))
DriverManager.getConnection(uri, props)
}
}
@@ -0,0 +1,18 @@
package com.acxiom.metalus.sql

import java.sql.ResultSet

package object jdbc {
implicit class ResultSetImplicits(rs: ResultSet) {
def map[R](func: Int => R): Iterator[R] = Iterator.from(0).takeWhile(_ => rs.next()).map(func)

def toList: List[Map[String, Any]] = {
val columns = (1 to rs.getMetaData.getColumnCount).map(rs.getMetaData.getColumnName)
new Iterator[Map[String, Any]] {
def hasNext: Boolean = rs.next()

def next(): Map[String, Any] = columns.map(c => c -> rs.getObject(c)).toMap
}.toList
}
}
}

0 comments on commit a0ea9da

Please sign in to comment.