Skip to content

Commit

Permalink
Ignore hidden or metadata files.
Browse files Browse the repository at this point in the history
Fixes #35.
  • Loading branch information
morazow committed Feb 8, 2021
1 parent 012321b commit b0962a5
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 61 deletions.
52 changes: 50 additions & 2 deletions src/it/scala/com/exasol/cloudetl/BaseIntegrationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ import java.nio.file.Paths

import com.exasol.containers.ExasolContainer
import com.exasol.dbbuilder.dialects.Column
import com.exasol.dbbuilder.dialects.Table
import com.exasol.dbbuilder.dialects.exasol.ExasolObjectFactory
import com.exasol.dbbuilder.dialects.exasol.ExasolSchema
import com.exasol.dbbuilder.dialects.exasol.udf.UdfScript

import org.apache.hadoop.fs.{Path => HPath}
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import org.testcontainers.utility.DockerImageName
import org.testcontainers.containers.localstack.LocalStackContainer
import org.testcontainers.containers.localstack.LocalStackContainer.Service.S3
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

Expand All @@ -24,11 +30,12 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {

val exasolContainer = new ExasolContainer(getExasolDockerImageVersion())
val s3Container = new LocalStackContainer(DEFAULT_LOCALSTACK_DOCKER_IMAGE)
.withServices(LocalStackContainer.Service.S3)
.withServices(S3)
.withReuse(true)
val assembledJarName = getAssembledJarName()

var schema: ExasolSchema = _
var s3: AmazonS3 = _

def startContainers(): Unit = {
exasolContainer.start()
Expand All @@ -44,14 +51,55 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {
uploadJarToBucket()
}

def prepareS3Client(): Unit =
s3 = AmazonS3ClientBuilder
.standard()
.withPathStyleAccessEnabled(true)
.withEndpointConfiguration(s3Container.getEndpointConfiguration(S3))
.withCredentials(s3Container.getDefaultCredentialsProvider())
.disableChunkedEncoding()
.build()

def executeStmt(sql: String): Unit = {
getConnection().createStatement().execute(sql)
()
}

def query(sql: String): java.sql.ResultSet =
def executeQuery(sql: String): java.sql.ResultSet =
getConnection().createStatement().executeQuery(sql)

def importIntoExasol(
schemaName: String,
table: Table,
bucket: String,
file: String,
dataFormat: String
): Unit = {
val s3Endpoint = s3Container
.getEndpointConfiguration(S3)
.getServiceEndpoint()
.replaceAll("127.0.0.1", "172.17.0.1")
executeStmt(
s"""|IMPORT INTO ${table.getFullyQualifiedName()}
|FROM SCRIPT $schemaName.IMPORT_PATH WITH
|BUCKET_PATH = 's3a://$bucket/$file'
|DATA_FORMAT = '$dataFormat'
|S3_ENDPOINT = '$s3Endpoint'
|S3_CHANGE_DETECTION_MODE = 'none'
|CONNECTION_NAME = 'S3_CONNECTION'
|PARALLELISM = 'nproc()';
""".stripMargin
)
}

def uploadFileToS3(bucket: String, file: HPath): Unit = {
s3.createBucket(new CreateBucketRequest(bucket))
val request = new PutObjectRequest(bucket, file.getName(), new File(file.toUri()))
s3.putObject(request)
Thread.sleep(3 * 1000)
()
}

private[this] def getAssembledJarName(): String = {
val jarDir = findFileOrDirectory("target", JAR_DIRECTORY_PATTERN)
findFileOrDirectory("target/" + jarDir, JAR_NAME_PATTERN)
Expand Down
50 changes: 8 additions & 42 deletions src/it/scala/com/exasol/cloudetl/DataImporterIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ import com.exasol.matcher.CellMatcherFactory
import com.exasol.matcher.ResultSetStructureMatcher.table
import com.exasol.matcher.TypeMatchMode._

import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import org.apache.avro.file.DataFileWriter
import org.apache.avro._
import org.apache.avro.generic._
Expand All @@ -35,7 +32,6 @@ import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema._
import org.hamcrest.Matcher
import org.hamcrest.MatcherAssert.assertThat
import org.testcontainers.containers.localstack.LocalStackContainer.Service.S3
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite

Expand All @@ -48,18 +44,11 @@ class DataImporterIT extends BaseIntegrationTest {
val schemaName = "DATA_SCHEMA"
val bucketName: String = "databucket"
var conf: Configuration = _
var s3: AmazonS3 = _

override final def beforeAll(): Unit = {
startContainers()
prepareExasolDatabase(schemaName)
s3 = AmazonS3ClientBuilder
.standard()
.withPathStyleAccessEnabled(true)
.withEndpointConfiguration(s3Container.getEndpointConfiguration(S3))
.withCredentials(s3Container.getDefaultCredentialsProvider())
.disableChunkedEncoding()
.build()
prepareS3Client()
conf = new Configuration
}

Expand Down Expand Up @@ -1009,9 +998,13 @@ class DataImporterIT extends BaseIntegrationTest {
abstract class AbstractChecker(exaColumnType: String) {

def withResultSet(block: ResultSet => Unit): this.type = {
uploadParquetFile()
importIntoExasol()
val rs = query(s"SELECT * FROM ${getTableName()}")
uploadFileToS3(bucketName, path)
val table = schema
.createTableBuilder(tableName)
.column("COLUMN", exaColumnType)
.build()
importIntoExasol(schemaName, table, bucketName, path.getName(), dataFormat)
val rs = executeQuery(s"SELECT * FROM ${getTableName()}")
block(rs)
rs.close()
this
Expand All @@ -1021,33 +1014,6 @@ class DataImporterIT extends BaseIntegrationTest {
withResultSet(assertThat(_, matcher))
()
}

private[this] def uploadParquetFile(): Unit = {
s3.createBucket(new CreateBucketRequest(bucketName))
val request = new PutObjectRequest(bucketName, path.getName(), new File(path.toUri()))
s3.putObject(request)
Thread.sleep(1 * 1000)
()
}

private[this] def importIntoExasol(): Unit = {
val table = schema.createTableBuilder(tableName).column("COLUMN", exaColumnType).build()
val s3Endpoint = s3Container
.getEndpointConfiguration(S3)
.getServiceEndpoint()
.replaceAll("127.0.0.1", "172.17.0.1")
executeStmt(
s"""|IMPORT INTO ${table.getFullyQualifiedName()}
|FROM SCRIPT $schemaName.IMPORT_PATH WITH
|BUCKET_PATH = 's3a://$bucketName/${path.getName()}'
|DATA_FORMAT = '$dataFormat'
|S3_ENDPOINT = '$s3Endpoint'
|S3_CHANGE_DETECTION_MODE = 'none'
|CONNECTION_NAME = 'S3_CONNECTION'
|PARALLELISM = 'nproc()';
""".stripMargin
)
}
}

case class OrcChecker(orcColumn: String, exaColumn: String)
Expand Down
56 changes: 56 additions & 0 deletions src/it/scala/com/exasol/cloudetl/FilesMetadataReaderIT.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.exasol.cloudetl

import java.nio.file.Files
import java.nio.file.Path

import com.exasol.matcher.ResultSetStructureMatcher.table

import org.apache.hadoop.fs.{Path => HPath}
import org.hamcrest.MatcherAssert.assertThat
import org.scalatest.BeforeAndAfterEach

class FilesMetadataReaderIT
extends BaseIntegrationTest
with BeforeAndAfterEach
with TestFileManager {

val schemaName = "DATA_SCHEMA"
val tableName = "DATA_TABLE"
val bucketName: String = "databucket"
var outputDirectory: Path = _

override final def beforeAll(): Unit = {
startContainers()
prepareExasolDatabase(schemaName)
prepareS3Client()
}

override final def beforeEach(): Unit =
outputDirectory = createTemporaryFolder(s"data-files-")

override final def afterEach(): Unit =
deletePathFiles(outputDirectory)

def getTableName(): String = s""""$schemaName"."$tableName""""

test("filters hidden and metadata files") {
val hidden = Seq("_SUCCESS", ".METADATA").map(outputDirectory.resolve(_))
hidden.foreach { case f => Files.createFile(f) }

val file1 = new HPath(outputDirectory.toUri.toString, "_SUCCESS")
uploadFileToS3(bucketName, file1)
val file2 = new HPath(outputDirectory.toUri.toString, ".METADATA")
uploadFileToS3(bucketName, file2)

val exasolTable = schema
.createTableBuilder(tableName)
.column("COLUMN", "VARCHAR(5)")
.build()
importIntoExasol(schemaName, exasolTable, bucketName, "*", "parquet")

val resultSet = executeQuery(s"SELECT * FROM ${getTableName()}")
assertThat(resultSet, table("VARCHAR").matches())
resultSet.close()
}

}
4 changes: 2 additions & 2 deletions src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package com.exasol.cloudetl.bucket

import scala.collection.JavaConverters._

import com.exasol.cloudetl.filesystem.FileSystemManager
import com.exasol.cloudetl.storage.FileFormat
import com.exasol.cloudetl.storage.StorageProperties
import com.exasol.cloudetl.util.FileSystemUtil

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -72,7 +72,7 @@ abstract class Bucket extends LazyLogging {
*/
final def getPaths(): Seq[Path] = properties.getFileFormat() match {
case FileFormat.DELTA => getPathsFromDeltaLog()
case _ => FileSystemUtil.globWithPattern(bucketPath, fileSystem)
case _ => FileSystemManager.globWithPattern(bucketPath, fileSystem)
}

private[this] def getPathsFromDeltaLog(): Seq[Path] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.exasol.cloudetl.util
package com.exasol.cloudetl.filesystem

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object FileSystemUtil {
object FileSystemManager {

def globWithLocal(path: java.nio.file.Path, fs: FileSystem): Seq[Path] =
globWithPattern(path.toAbsolutePath.toUri.getRawPath, fs)
Expand All @@ -12,7 +12,7 @@ object FileSystemUtil {
glob(new Path(pattern), fs)

def glob(path: Path, fs: FileSystem): Seq[Path] = {
val opt = Option(fs.globStatus(path))
val opt = Option(fs.globStatus(path, HiddenFilesFilter))
opt.fold(Seq.empty[Path])(_.toSeq.map(_.getPath))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.exasol.cloudetl.filesystem

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.PathFilter

object HiddenFilesFilter extends PathFilter {

private[this] val PREFIX_DOT = "."
private[this] val PREFIX_UNDERSCORE = "_"

override def accept(p: Path): Boolean = {
val name = p.getName()
!name.startsWith(PREFIX_DOT) && !name.startsWith(PREFIX_UNDERSCORE)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object ImportMetadata extends LazyLogging {

val storageProperties = StorageProperties(iterator.getString(1), metadata)
val bucket = Bucket(storageProperties)
val paths = bucket.getPaths().filter(p => !p.getName().startsWith("_"))
val paths = bucket.getPaths()
logger.info(s"Total number of files: ${paths.size} in bucket path: $bucketPath")

paths.zipWithIndex.foreach {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.exasol.cloudetl.util
package com.exasol.cloudetl.filesystem

import java.nio.file.{Path => FPath}
import java.nio.file.Files
Expand All @@ -9,20 +9,23 @@ import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite

class FileSystemUtilTest extends AnyFunSuite with BeforeAndAfterEach {
class FileSystemManagerTest extends AnyFunSuite with BeforeAndAfterEach {

private[this] var temporaryDirectory: FPath = _
private[this] var files: Seq[FPath] = _
private[this] var hidden: Seq[FPath] = _

override final def beforeEach(): Unit = {
temporaryDirectory = Files.createTempDirectory("tempdir")
files = Seq("a", "b", "c", "a.parquet", "b.parquet").map(temporaryDirectory.resolve(_))
files.foreach { case f => Files.createFile(f) }
hidden = Seq("_SUCCESS", ".hidden").map(temporaryDirectory.resolve(_))
(files ++ hidden).foreach { case f => Files.createFile(f) }
()
}

override final def afterEach(): Unit = {
files.foreach(Files.deleteIfExists)
files.foreach(Files.deleteIfExists(_))
hidden.foreach(Files.deleteIfExists(_))
Files.deleteIfExists(temporaryDirectory)
()
}
Expand All @@ -31,19 +34,19 @@ class FileSystemUtilTest extends AnyFunSuite with BeforeAndAfterEach {
val fs = FileSystem.get(new Configuration())
val expectedPaths = files.map(f => new Path(s"file:${f.toUri.getRawPath}"))
val pathPattern = s"${temporaryDirectory.toUri.getRawPath}/*"
assert(FileSystemUtil.globWithPattern(pathPattern, fs).toSet === expectedPaths.toSet)
assert(FileSystemManager.globWithPattern(pathPattern, fs).toSet === expectedPaths.toSet)
}

test("globWithPattern returns paths from a pattern with file extensions") {
val fs = FileSystem.get(new Configuration())
val pathPattern = s"${temporaryDirectory.toUri.getRawPath}/*"
val pathsWithExtensions = FileSystemUtil.globWithPattern(s"$pathPattern.parquet", fs)
val pathsWithExtensions = FileSystemManager.globWithPattern(s"$pathPattern.parquet", fs)
assert(pathsWithExtensions.map(_.toUri.getRawPath).forall(_.contains("parquet")))
}

test("glob returns empty sequence if no path exists") {
val fileSystem = FileSystem.get(new Configuration())
assert(FileSystemUtil.glob(new Path("emptyPath"), fileSystem) === Seq.empty[Path])
assert(FileSystemManager.glob(new Path("emptyPath"), fileSystem) === Seq.empty[Path])
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package com.exasol.cloudetl.source
import java.nio.file.Path
import java.nio.file.Paths

import com.exasol.cloudetl.filesystem.FileSystemManager
import com.exasol.cloudetl.storage.FileFormat
import com.exasol.cloudetl.util.FileSystemUtil

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
Expand Down Expand Up @@ -37,7 +37,7 @@ class AbstractSourceTest extends AnyFunSuite with BeforeAndAfterEach {
Source(FileFormat(fileFormat), filePath, conf, fileSystem)

final def getRecordsCount(filePath: Path): Int = {
val globbedFilePath = FileSystemUtil.globWithLocal(filePath, fileSystem)
val globbedFilePath = FileSystemManager.globWithLocal(filePath, fileSystem)
globbedFilePath.map { file =>
val src = getSource(file)
val cnt = src.stream().size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.exasol.cloudetl.source

import java.nio.file.Paths

import com.exasol.cloudetl.util.FileSystemUtil
import com.exasol.cloudetl.filesystem.FileSystemManager

import org.apache.parquet.schema.MessageTypeParser

Expand All @@ -24,7 +24,7 @@ class ParquetSourceTest extends AbstractSourceTest {
""".stripMargin)

val filePattern = Paths.get(s"$resourceDir/sales_pos*.parquet")
val globbedFilePath = FileSystemUtil.globWithLocal(filePattern, getFileSystem())
val globbedFilePath = FileSystemManager.globWithLocal(filePattern, getFileSystem())
globbedFilePath.foreach { file =>
val schema = ParquetSource(file, getConf(), getFileSystem()).getSchema()
assert(schema.isDefined)
Expand Down

0 comments on commit b0962a5

Please sign in to comment.