Skip to content

Commit

Permalink
GEOMESA-3074 Pass through original file in ConvertToGeoFile (#110)
Browse files Browse the repository at this point in the history
* GEOMESA-3074 Pass through original file in ConvertToGeoFile

* Send files that don't have any success to failure
* Fix ingest counts across files
* Make append writer lazy on modifying writes
  • Loading branch information
elahrvivaz committed Jul 19, 2021
1 parent 568b1be commit 138417a
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 121 deletions.
Expand Up @@ -704,4 +704,45 @@ class PutGeoMesaAccumuloTest extends LazyLogging {
runner.shutdown()
}
}

@Test
def testIngestCounts(): Unit = {
val catalog = s"${root}Counts"
val runner = TestRunners.newTestRunner(new PutGeoMesaAccumulo())
try {
dsParams.foreach { case (k, v) => runner.setProperty(k, v) }
runner.setProperty(AccumuloDataStoreParams.CatalogParam.key, catalog)
runner.setProperty(FeatureTypeProcessor.Properties.SftNameKey, "example")
runner.setProperty(ConvertInputProcessor.Properties.ConverterNameKey, "example-csv")

var i = 0
while (i < 3) {
runner.enqueue(getClass.getClassLoader.getResourceAsStream("example.csv"))
i += 1
}
runner.run()
runner.assertTransferCount(Relationships.SuccessRelationship, i)
runner.assertTransferCount(Relationships.FailureRelationship, 0)
while (i > 0) {
i -= 1
val output = runner.getFlowFilesForRelationship(Relationships.SuccessRelationship).get(i)
output.assertAttributeEquals(org.geomesa.nifi.datastore.processor.Attributes.IngestSuccessCount, "3")
output.assertAttributeEquals(org.geomesa.nifi.datastore.processor.Attributes.IngestFailureCount, "0")
}
} finally {
runner.shutdown()
}

val ds = DataStoreFinder.getDataStore((dsParams + (AccumuloDataStoreParams.CatalogParam.key -> catalog)).asJava)
Assert.assertNotNull(ds)
try {
val sft = ds.getSchema("example")
Assert.assertNotNull(sft)
val features = SelfClosingIterator(ds.getFeatureSource("example").getFeatures.features()).toList
logger.debug(features.mkString(";"))
Assert.assertEquals(3, features.length)
} finally {
ds.dispose()
}
}
}
Expand Up @@ -49,11 +49,14 @@ class ConvertToGeoFile extends ConvertInputProcessor {
import ConvertToGeoFile.FlowFileExportStream
import ConvertToGeoFile.Properties.{GzipLevel, IncludeHeaders, OutputFormat}

override protected def getShips: Seq[Relationship] =
super.getShips ++ Seq(Relationships.OriginalRelationship)

override protected def getTertiaryProperties: Seq[PropertyDescriptor] =
super.getTertiaryProperties ++ Seq(OutputFormat, GzipLevel, IncludeHeaders)

override def onTrigger(context: ProcessContext, session: ProcessSession): Unit = {
val input = session.get()
var input = session.get()
if (input == null) {
return
}
Expand Down Expand Up @@ -128,17 +131,25 @@ class ConvertToGeoFile extends ConvertInputProcessor {
}
}

if (result != null) {
output = session.putAttribute(output, "geomesa.convert.successes", result.success.toString)
output = session.putAttribute(output, "geomesa.convert.failures", result.failure.toString)
}

val basename =
Option(input.getAttribute("filename")).map(FilenameUtils.getBaseName).getOrElse(UUID.randomUUID().toString)
output = session.putAttribute(output, "filename", s"$basename.${format.extensions.head}")
output = session.removeAttribute(output, "mime.type")

val attributes = new java.util.HashMap[String, String](2)
attributes.put("geomesa.convert.successes", result.success.toString)
attributes.put("geomesa.convert.failures", result.failure.toString)

session.transfer(output, Relationships.SuccessRelationship)
session.remove(input)
output = session.putAllAttributes(output, attributes)
input = session.putAllAttributes(input, attributes)

if (result.success > 0L) {
session.transfer(output, Relationships.SuccessRelationship)
session.transfer(input, Relationships.OriginalRelationship)
} else {
session.remove(output)
session.transfer(input, Relationships.FailureRelationship)
}
} catch {
case NonFatal(e) =>
logger.error(s"Error converting file: ${e.getMessage}", e)
Expand Down
Expand Up @@ -15,25 +15,24 @@ import org.apache.nifi.processor._
import org.geomesa.nifi.datastore.processor.CompatibilityMode.CompatibilityMode
import org.geomesa.nifi.datastore.processor.mixins.ConvertInputProcessor.ConverterCallback
import org.geomesa.nifi.datastore.processor.mixins.DataStoreIngestProcessor.FeatureWriters
import org.geomesa.nifi.datastore.processor.mixins.DataStoreIngestProcessor.FeatureWriters.SimpleWriter
import org.geomesa.nifi.datastore.processor.mixins.{ConvertInputProcessor, FeatureTypeIngestProcessor}
import org.geomesa.nifi.datastore.processor.mixins.{ConvertInputProcessor, DataStoreIngestProcessor}
import org.geotools.data._
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.opengis.feature.simple.SimpleFeature

import scala.util.control.NonFatal

/**
* Converter ingest processor for geotools data stores
*/
@CapabilityDescription("Convert and ingest data files into GeoMesa")
trait ConverterIngestProcessor extends FeatureTypeIngestProcessor with ConvertInputProcessor {
trait ConverterIngestProcessor extends DataStoreIngestProcessor with ConvertInputProcessor {

override protected def createIngest(
context: ProcessContext,
dataStore: DataStore,
writers: FeatureWriters,
mode: CompatibilityMode): IngestProcessor = {
new ConverterIngest(context, dataStore, writers, mode)
new ConverterIngest(dataStore, writers, mode)
}

/**
Expand All @@ -43,27 +42,33 @@ trait ConverterIngestProcessor extends FeatureTypeIngestProcessor with ConvertIn
* @param writers feature writers
* @param mode schema compatibility mode
*/
class ConverterIngest(context: ProcessContext, store: DataStore, writers: FeatureWriters, mode: CompatibilityMode)
extends IngestProcessorWithSchema(store, writers, mode) {
class ConverterIngest(store: DataStore, writers: FeatureWriters, mode: CompatibilityMode)
extends IngestProcessor(store, writers, mode) {

override protected def ingest(
override def ingest(
context: ProcessContext,
session: ProcessSession,
file: FlowFile,
name: String,
sft: SimpleFeatureType,
writer: SimpleWriter): IngestResult = {
val callback = new ConverterCallback() {
override def apply(features: Iterator[SimpleFeature]): Long = {
var failed = 0L
features.foreach { feature =>
try { writer.apply(feature) } catch {
case NonFatal(e) => logError(feature, e); failed += 1
flowFileName: String): IngestResult = {
val sft = loadFeatureType(context, file)
checkSchema(sft)
val writer = writers.borrowWriter(sft.getTypeName, file)
try {
val callback = new ConverterCallback() {
override def apply(features: Iterator[SimpleFeature]): Long = {
var failed = 0L
features.foreach { feature =>
try { writer.apply(feature) } catch {
case NonFatal(e) => logError(feature, e); failed += 1
}
}
failed
}
failed
}
convert(context, session, file, sft, callback)
} finally {
writers.returnWriter(writer)
}
convert(context, session, file, sft, callback)
}
}
}
Expand Up @@ -39,16 +39,14 @@ abstract class BaseProcessor extends AbstractProcessor {

override protected def init(context: ProcessorInitializationContext): Unit = {
relationships.clear()
relationships.add(Relationships.SuccessRelationship)
relationships.add(Relationships.FailureRelationship)
getShips.foreach(relationships.add)

reloadDescriptors()

logger.info(s"Props are ${descriptors.asScala.mkString(", ")}")
logger.info(s"Relationships are ${relationships.asScala.mkString(", ")}")
}


@OnAdded // reload on add to pick up any sft/converter classpath changes
def reloadDescriptors(): Unit = {
descriptors.clear()
Expand All @@ -59,6 +57,10 @@ abstract class BaseProcessor extends AbstractProcessor {
getServiceProperties.foreach(descriptors.add)
}

protected def getShips: Seq[Relationship] =
Seq(Relationships.SuccessRelationship, Relationships.FailureRelationship)


/**
* Get the main processor params - these will come first in the UI
*
Expand Down
Expand Up @@ -13,6 +13,7 @@ package mixins
import java.io.InputStream
import java.util.Collections

import com.codahale.metrics.Counter
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool, GenericObjectPoolConfig}
Expand Down Expand Up @@ -120,7 +121,8 @@ trait ConvertInputProcessor extends FeatureTypeProcessor {
inputFile
}
}
val ec = converter.createEvaluationContext(globalParams)
// create new counters so we don't use any shared state
val ec = converter.createEvaluationContext(globalParams, new Counter(), new Counter())
session.read(file, new InputStreamCallback {
override def process(in: InputStream): Unit = {
WithClose(converter.process(in, ec)) { iter =>
Expand Down
Expand Up @@ -121,7 +121,11 @@ trait DataStoreIngestProcessor extends DataStoreProcessor {
output = session.putAttribute(output, Attributes.IngestFailureCount, result.failure.toString)
logger.debug(
s"Ingested file ${fullName(output)} with ${result.success} successes and ${result.failure} failures")
session.transfer(output, Relationships.SuccessRelationship)
if (result.success > 0L) {
session.transfer(output, Relationships.SuccessRelationship)
} else {
session.transfer(output, Relationships.FailureRelationship)
}
} catch {
case NonFatal(e) =>
logger.error(s"Error processing file ${fullName(file)}:", e)
Expand Down Expand Up @@ -174,11 +178,12 @@ trait DataStoreIngestProcessor extends DataStoreProcessor {
abstract class IngestProcessor(store: DataStore, writers: FeatureWriters, mode: CompatibilityMode)
extends Closeable {

private val schemaCheckCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).build(
new CacheLoader[SimpleFeatureType, Try[Unit]]() {
override def load(sft: SimpleFeatureType): Try[Unit] = Try(doCheckSchema(sft))
}
)
private val schemaCheckCache =
Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).build[SimpleFeatureType, Try[Unit]](
new CacheLoader[SimpleFeatureType, Try[Unit]]() {
override def load(sft: SimpleFeatureType): Try[Unit] = Try(doCheckSchema(sft))
}
)

/**
* Ingest a flow file
Expand Down Expand Up @@ -336,8 +341,13 @@ object DataStoreIngestProcessor {
override def close(): Unit = CloseWithLogging(writer)
}

case class ModifyWriter(ds: DataStore, typeName: String, attribute: Option[String], append: SimpleWriter)
class ModifyWriter(val typeName: String, ds: DataStore, attribute: Option[String], appender: => SimpleWriter)
extends SimpleWriter with LazyLogging {

private var append: SimpleWriter = _

def appendWriter: Option[SimpleWriter] = Option(append)

override def apply(f: SimpleFeature): Unit = {
val filter = attribute match {
case None => FilterHelper.ff.id(FilterHelper.ff.featureId(f.getID))
Expand All @@ -354,11 +364,15 @@ object DataStoreIngestProcessor {
logger.warn(s"Filter '${ECQL.toCQL(filter)}' matched multiple records, only updating the first one")
}
} else {
if (append == null) {
append = appender
}
append.apply(f)
}
}
}
override def close(): Unit = append.close()

override def close(): Unit = {}
}
}

Expand All @@ -370,7 +384,7 @@ object DataStoreIngestProcessor {
*/
class PooledWriters(ds: DataStore, timeout: Long) extends FeatureWriters {

private val cache = Caffeine.newBuilder().build(
private val cache = Caffeine.newBuilder().build[String, ObjectPool[SimpleWriter]](
new CacheLoader[String, ObjectPool[SimpleWriter]] {
override def load(key: String): ObjectPool[SimpleWriter] = {
val factory = new BasePooledObjectFactory[SimpleWriter] {
Expand Down Expand Up @@ -428,12 +442,9 @@ object DataStoreIngestProcessor {
*/
class ModifyWriters(ds: DataStore, attribute: Option[String], appender: FeatureWriters) extends FeatureWriters {
override def borrowWriter(typeName: String, file: FlowFile): SimpleWriter =
ModifyWriter(ds, typeName, attribute, appender.borrowWriter(typeName, file))
override def returnWriter(writer: SimpleWriter): Unit = {
val ModifyWriter(_, _, _, append) = writer
appender.returnWriter(append)
CloseWithLogging(writer)
}
new ModifyWriter(typeName, ds, attribute, appender.borrowWriter(typeName, file))
override def returnWriter(writer: SimpleWriter): Unit =
writer.asInstanceOf[ModifyWriter].appendWriter.foreach(appender.returnWriter)
override def invalidate(typeName: String): Unit = appender.invalidate(typeName)
override def close(): Unit = CloseWithLogging(appender) // note: also disposes of the datastore
}
Expand All @@ -451,7 +462,7 @@ object DataStoreIngestProcessor {
override def borrowWriter(typeName: String, file: FlowFile): SimpleWriter = {
lazy val append = appender.borrowWriter(typeName, file)
lazy val modify =
ModifyWriter(ds, typeName, Option(attribute.evaluateAttributeExpressions(file).getValue), append)
new ModifyWriter(typeName, ds, Option(attribute.evaluateAttributeExpressions(file).getValue), append)
mode.evaluateAttributeExpressions(file).getValue match {
case null | "" => append
case m if m.equalsIgnoreCase(FeatureWriters.Append) => append
Expand All @@ -461,11 +472,8 @@ object DataStoreIngestProcessor {
}
override def returnWriter(writer: SimpleWriter): Unit = {
writer match {
case m: ModifyWriter =>
appender.returnWriter(m.append)
CloseWithLogging(m)
case _ =>
appender.returnWriter(writer)
case m: ModifyWriter => m.appendWriter.foreach(appender.returnWriter)
case _ => appender.returnWriter(writer)
}
}
override def invalidate(typeName: String): Unit = appender.invalidate(typeName)
Expand Down

This file was deleted.

0 comments on commit 138417a

Please sign in to comment.