Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package za.co.absa.hyperdrive.driver

import java.util.UUID

import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.SparkSession
import za.co.absa.hyperdrive.ingestor.api.decoder.StreamDecoder
Expand All @@ -27,6 +28,7 @@ import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformer
import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriter
import za.co.absa.hyperdrive.shared.exceptions.{IngestionException, IngestionStartException}

import scala.collection.AbstractIterator
import scala.util.control.NonFatal

/**
Expand Down Expand Up @@ -70,6 +72,7 @@ object SparkIngestor {

logger.info(s"STARTING ingestion from '${streamReader.getSourceName}' into '${streamWriter.getDestination}' (id = $ingestionId)")

val destinationEmptyBefore = isDestinationEmpty(spark, streamWriter.getDestination)
val ingestionQuery = try {
val inputStream = streamReader.read(spark) // gets the source stream
val configuredStreamReader = offsetManager.configureOffsets(inputStream, spark.sparkContext.hadoopConfiguration) // does offset management if any
Expand All @@ -87,6 +90,9 @@ object SparkIngestor {
ingestionFinalizer.finalize(ingestionQuery)
} catch {
case NonFatal(e) =>
if(destinationEmptyBefore) {
cleanupDestination(spark, streamWriter.getDestination)
}
throw new IngestionException(message = s"PROBABLY FAILED INGESTION $ingestionId. There was no error in the query plan, but something when wrong. " +
s"Pay attention to this exception since the query has been started, which might lead to duplicate data or similar issues. " +
s"The logs should have enough detail, but a possible course of action is to replay this ingestion and overwrite the destination.", e)
Expand Down Expand Up @@ -132,4 +138,26 @@ object SparkIngestor {
}

private def generateIngestionId: String = UUID.randomUUID().toString

private def isDestinationEmpty(spark: SparkSession, destinationDirectory: String): Boolean = {
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val destinationPath = new Path(destinationDirectory)
!fs.exists(destinationPath) || !fs.listFiles(destinationPath, true).hasNext
}

private def cleanupDestination(spark: SparkSession, destinationDirectory: String): Unit = {
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val destinationPath = new Path(destinationDirectory)

if(fs.exists(destinationPath)) {
val filesIterator = fs.listFiles(destinationPath, true)
val filesList = new AbstractIterator[LocatedFileStatus] {
override def hasNext: Boolean = filesIterator.hasNext
override def next: LocatedFileStatus = filesIterator.next
}.map(f => f.getPath).toList
logger.info(s"Deleting directory $destinationDirectory with ${filesList.size} files: ${filesList.mkString(", ")}")

fs.delete(destinationPath, true)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

package za.co.absa.hyperdrive.driver

import java.nio.file.{Files, Paths}
import java.util.UUID

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand Down Expand Up @@ -57,6 +60,9 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug
streamingQuery)
when(streamReader.getSourceName).thenReturn("test-source")
when(streamWriter.getDestination).thenReturn("test-destination")
val sparkContext = mock[SparkContext]
when(sparkContext.hadoopConfiguration).thenReturn(configuration)
when(sparkSession.sparkContext).thenReturn(sparkContext)
}

behavior of SparkIngestor.getClass.getName
Expand Down Expand Up @@ -156,6 +162,30 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug
assertThrows[IngestionStartException](SparkIngestor.ingest(sparkSession, streamReader, offsetManager, streamDecoder, streamTransformer, streamWriter, ingestionFinalizer))
}

it should "delete destination directory and throw IngestionException if ingestion fails" in {
val destination = Files.createTempDirectory("test")
when(streamDecoder.decode(nullMockedDataStream)).thenReturn(dataFrame)
when(streamTransformer.transform(dataFrame)).thenReturn(dataFrame)
when(streamWriter.write(dataFrame, offsetManager)).thenReturn(streamingQuery)
when(streamingQuery.processAllAvailable()).thenThrow(classOf[NullPointerException])
when(streamWriter.getDestination).thenReturn(destination.toUri.getPath)
assertThrows[IngestionException](SparkIngestor.ingest(sparkSession, streamReader, offsetManager, streamDecoder, streamTransformer, streamWriter, ingestionFinalizer))
assert(!Files.exists(destination))
}

it should "not delete destination directory if it has not been empty before if ingestion fails" in {
val destination = Files.createTempDirectory("test")
val filepath = destination.resolve("someFile.txt")
Files.createFile(filepath)
when(streamDecoder.decode(nullMockedDataStream)).thenReturn(dataFrame)
when(streamTransformer.transform(dataFrame)).thenReturn(dataFrame)
when(streamWriter.write(dataFrame, offsetManager)).thenReturn(streamingQuery)
when(streamingQuery.processAllAvailable()).thenThrow(classOf[NullPointerException])
when(streamWriter.getDestination).thenReturn(destination.toUri.getPath)
assertThrows[IngestionException](SparkIngestor.ingest(sparkSession, streamReader, offsetManager, streamDecoder, streamTransformer, streamWriter, ingestionFinalizer))
assert(Files.exists(filepath))
}

it should "throw IngestionException if ingestion fails during execution" in {
when(streamDecoder.decode(nullMockedDataStream)).thenReturn(dataFrame)
when(streamTransformer.transform(dataFrame)).thenReturn(dataFrame)
Expand Down Expand Up @@ -185,11 +215,6 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug
}

private def prepareMocks(): Unit = {
val sparkContext = mock[SparkContext]
when(sparkContext.hadoopConfiguration).thenReturn(configuration)

when(sparkSession.sparkContext).thenReturn(sparkContext)

when(streamReader.read(sparkSession)).thenReturn(nullMockedDataStream)
when(streamReader.getSourceName).thenReturn("mocked_topic")

Expand Down