Skip to content

Commit

Permalink
#134: Added support for HDFS (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
morazow committed Mar 17, 2021
1 parent ece3518 commit 433ce2b
Show file tree
Hide file tree
Showing 18 changed files with 181 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ scala:

env:
- EXASOL_DOCKER_VERSION="6.2.12-d1"
- EXASOL_DOCKER_VERSION="7.0.6"
- EXASOL_DOCKER_VERSION="7.0.8"

before_install:
- git fetch --tags
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ lazy val root =
project
.in(file("."))
.settings(moduleName := "cloud-storage-extension")
.settings(version := "1.0.0")
.settings(version := "1.1.0")
.settings(orgSettings)
.settings(buildSettings)
.settings(Settings.projectSettings(scalaVersion))
Expand Down
1 change: 1 addition & 0 deletions doc/changes/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Releases

* [1.1.0](changes_1.1.0.md)
* [1.0.0](changes_1.0.0.md)
* [0.9.0](changes_0.9.0.md)
* [0.8.0](changes_0.8.0.md)
Expand Down
25 changes: 25 additions & 0 deletions doc/changes/changes_1.1.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Cloud Storage Extension 1.1.0, released 2021-MM-DD

## Features

* #134: Added support for Hadoop Distributed Filesystem (HDFS) (PR #136)

## Documentation

* #131: Added access privilege to connection object (PR #132).

## Dependency Updates

### Runtime Dependency Updates

### Test Dependency Updates

* Updated `org.scalatest:scalatest:3.2.3` to `3.2.6`
* Updated `org.mockito:mockito-core:3.7.7` to `3.8.0`
* Updated `org.testcontainers:localstack:1.15.1` to `1.15.2`
* Updated `com.exasol:test-db-builder-java:3.0.0` to `3.1.1`
* Updated `com.exasol:exasol-testcontainers:3.5.0` to `3.5.1`

### Plugin Updates

* Updated `com.timushev.sbt:sbt-updates:0.5.1` to `0.5.2`
26 changes: 24 additions & 2 deletions doc/user_guide/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,12 @@ Run these statements to create export UDF scripts:
OPEN SCHEMA CLOUD_STORAGE_EXTENSION;

CREATE OR REPLACE JAVA SET SCRIPT EXPORT_PATH(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ExportPath;
%scriptclass com.exasol.cloudetl.scriptclasses.TableExportQueryGenerator;

This comment has been minimized.

Copy link
@allipatev

allipatev Mar 19, 2021

Contributor

@morazow : this new class name does not work with cloud-storage-extension 1.0.0 and raises
F-UDF-CL-LIB-1126: F-UDF-CL-SL-JAVA-1006: F-UDF-CL-SL-JAVA-1026: com.exasol.ExaCompilationException: F-UDF-CL-SL-JAVA-1066: The main script class defined via %scriptclass cannot be found: com.exasol.cloudetl.scriptclasses.TableExportQueryGenerator
Is it expected?

This comment has been minimized.

Copy link
@morazow

morazow Mar 19, 2021

Author Contributor

Hello @allipatev,

Thanks for the info. But it is not released yet. Or did you build from the master?

Once it is released it should work.

This comment has been minimized.

Copy link
@morazow

morazow Mar 19, 2021

Author Contributor

To build the jar, you can clone it and run:

./sbtx assembly

The assembled jar can be found in target/scala-2.12/stripped/ folder.

This comment has been minimized.

Copy link
@allipatev

allipatev Mar 19, 2021

Contributor

Hello @morazow
Ah, so I managed to fall between doc update and publishing release?
So the changes in class names apply only to 1.1.0, right?

This comment has been minimized.

Copy link
@morazow

morazow Mar 19, 2021

Author Contributor

Yes, that is correct.

%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-<VERSION>.jar;
/

CREATE OR REPLACE JAVA SET SCRIPT EXPORT_TABLE(...) EMITS (ROWS_AFFECTED INT) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ExportTable;
%scriptclass com.exasol.cloudetl.scriptclasses.TableDataExporter;

This comment has been minimized.

Copy link
@allipatev

allipatev Mar 19, 2021

Contributor

@morazow : this new class name does not work with cloud-storage-extension 1.0.0 and raises
F-UDF-CL-LIB-1127: F-UDF-CL-SL-JAVA-1002: F-UDF-CL-SL-JAVA-1013: com.exasol.ExaCompilationException: F-UDF.CL.SL.JAVA-1165: F-UDF.CL.SL.JAVA-1072: The main script class defined via %scriptclass cannot be found: com.exasol.cloudetl.scriptclasses.TableDataExporter
Is it expected?

%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-<VERSION>.jar;
/
```
Expand All @@ -291,6 +291,28 @@ UDF and it will call the `EXPORT_TABLE` script internally.
Make sure you change the `<BUCKET>` name and jar version `<VERSION>`
accordingly.

#### Setup Export UDF Scripts in Docker

Similar to import, the UDF scripts require slightly different deployment for
Exasol Docker installations.

```sql
OPEN SCHEMA CLOUD_STORAGE_EXTENSION;

CREATE OR REPLACE JAVA SET SCRIPT EXPORT_PATH(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.DockerTableExportQueryGenerator;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-<VERSION>.jar;
/

CREATE OR REPLACE JAVA SET SCRIPT EXPORT_TABLE(...) EMITS (ROWS_AFFECTED INT) AS
%scriptclass com.exasol.cloudetl.scriptclasses.DockerTableDataExporter;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-<VERSION>.jar;
/
```

Please notice that we use different class names for the `%scriptclasses`
parameter.

## Prepare an Exasol Table for Import

To store the imported data, you need to create a table inside the Exasol
Expand Down
11 changes: 6 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ object Dependencies {
private val SparkSQLVersion = "3.0.1"

// Test dependencies versions
private val ScalaTestVersion = "3.2.3"
private val ScalaTestVersion = "3.2.6"
private val ScalaTestPlusVersion = "1.0.0-M2"
private val MockitoCoreVersion = "3.7.7"
private val MockitoCoreVersion = "3.8.0"
private val HamcrestVersion = "2.2"
private val ExasolHamcrestMatcherVersion = "1.4.0"
private val ExasolTestDBBuilderVersion = "3.0.0"
private val ExasolTestContainersVersion = "3.5.0"
private val TestContainersLocalstackVersion = "1.15.1"
private val ExasolTestDBBuilderVersion = "3.1.1"
private val ExasolTestContainersVersion = "3.5.1"
private val TestContainersLocalstackVersion = "1.15.2"

val Resolvers: Seq[Resolver] = Seq(
"Exasol Releases" at "https://maven.exasol.com/artifactory/exasol-releases"
Expand Down Expand Up @@ -57,6 +57,7 @@ object Dependencies {
ExclusionRule(organization = "org.apache.curator"),
ExclusionRule(organization = "org.apache.zookeeper")
),
"org.apache.hadoop" % "hadoop-hdfs" % HadoopVersion,
"com.google.cloud.bigdataoss" % "gcs-connector" % GoogleStorageVersion
exclude ("com.google.guava", "guava")
exclude ("org.apache.httpcomponents", "httpclient"),
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.7")
// Adds a `dependencyUpdates` task to check Maven repositories for
// dependency updates
// http://github.com/rtimush/sbt-updates
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.1")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.2")

// Adds a `scalafmt` task for automatic source code formatting
// https://github.com/lucidsoftware/neo-sbt-scalafmt
Expand Down
2 changes: 1 addition & 1 deletion project/project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ addSbtPlugin("com.lucidchart" % "sbt-scalafmt-coursier" % "1.16")

// Used to get updates for plugins
// see https://github.com/rtimush/sbt-updates/issues/10
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.0")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.2")
2 changes: 1 addition & 1 deletion src/it/scala/com/exasol/cloudetl/BaseIntegrationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {
private[this] val JAR_DIRECTORY_PATTERN = "scala-"
private[this] val JAR_NAME_PATTERN = "cloud-storage-extension-"

private[this] val DEFAULT_EXASOL_DOCKER_IMAGE = "7.0.6"
private[this] val DEFAULT_EXASOL_DOCKER_IMAGE = "7.0.8"
private[this] val DEFAULT_LOCALSTACK_DOCKER_IMAGE =
DockerImageName.parse("localstack/localstack:0.12.5")

Expand Down
1 change: 1 addition & 0 deletions src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ object Bucket extends LazyLogging {
case "abfs" | "abfss" => AzureAbfsBucket(path, storageProperties)
case "adl" => AzureAdlsBucket(path, storageProperties)
case "wasb" | "wasbs" => AzureBlobBucket(path, storageProperties)
case "hdfs" => HDFSBucket(path, storageProperties)
case "file" => LocalBucket(path, storageProperties)
case _ =>
throw new IllegalArgumentException(s"Unsupported path scheme $scheme!")
Expand Down
31 changes: 31 additions & 0 deletions src/main/scala/com/exasol/cloudetl/bucket/HDFSBucket.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.exasol.cloudetl.bucket

import com.exasol.cloudetl.storage.StorageProperties

import org.apache.hadoop.conf.Configuration

/** A [[Bucket]] implementation for the HDFS filesystem */
final case class HDFSBucket(path: String, params: StorageProperties) extends Bucket {

/** @inheritdoc */
override val bucketPath: String = path

/** @inheritdoc */
override val properties: StorageProperties = params

/** Returns the list of required property keys for HDFS filesystem. */
override def getRequiredProperties(): Seq[String] = Seq.empty[String]

/** @inheritdoc */
override def validate(): Unit =
validateRequiredProperties()

/** @inheritdoc */
override def getConfiguration(): Configuration = {
validate()
val conf = new Configuration()
conf.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName())
conf
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.exasol.cloudetl.scriptclasses

import com.exasol.ExaIterator
import com.exasol.ExaMetadata

/**
* A table data exporter class to run inside the Exasol docker container.
*/
object DockerTableDataExporter {

def run(metadata: ExaMetadata, iterator: ExaIterator): Unit = {
import org.apache.hadoop.security.UserGroupInformation
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("exauser"))
TableDataExporter.run(metadata, iterator)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.exasol.cloudetl.scriptclasses

import com.exasol.ExaExportSpecification
import com.exasol.ExaMetadata

/**
* An export query generator class to run inside the Exasol docker container.
*/
object DockerTableExportQueryGenerator {

def generateSqlForExportSpec(
metadata: ExaMetadata,
exportSpecification: ExaExportSpecification
): String = {
import org.apache.hadoop.security.UserGroupInformation
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("exauser"))
TableExportQueryGenerator.generateSqlForExportSpec(metadata, exportSpecification)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,17 @@ import com.exasol.common.data.Row

import com.typesafe.scalalogging.LazyLogging

object ExportTable extends LazyLogging {
/**
* A data exporter class that exports table data into a filesystem.
*/
object TableDataExporter extends LazyLogging {

/**
* Reads the table data and saves them to external filesystem.
*
* @param metadata an Exasol metadata object
* @param iterator an Exasol iterator object
*/
def run(metadata: ExaMetadata, iterator: ExaIterator): Unit = {
val storageProperties = StorageProperties(iterator.getString(1), metadata)
val bucket = Bucket(storageProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,17 @@ import com.exasol.cloudetl.storage.StorageProperties

import org.apache.hadoop.fs.Path

object ExportPath {
/**
* A SQL query generator that call the exporter classes.
*/
object TableExportQueryGenerator {

/**
* Generates an Exasol SQL for the data export query.
*
* @param metadata an Exasol metadata object
* @param exportSpecification an Exasol export specification object
*/
def generateSqlForExportSpec(
metadata: ExaMetadata,
exportSpec: ExaExportSpecification
Expand All @@ -24,7 +33,6 @@ object ExportPath {
val parallelism = storageProperties.getParallelism("iproc()")
val storagePropertiesStr = storageProperties.mkString()
val scriptSchema = metadata.getScriptSchema

val srcColumns = getSourceColumns(exportSpec)
val srcColumnsStr = srcColumns.mkString(".")

Expand Down
17 changes: 17 additions & 0 deletions src/test/scala/com/exasol/cloudetl/bucket/HDFSBucketTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.exasol.cloudetl.bucket

class HDFSBucketTest extends AbstractBucketTest {

test("apply sets correct configuration") {
val properties = Map(
PATH -> "hdfs://dir/path",
FORMAT -> "orc"
)
val exaMetadata = mockConnectionInfo("", "")
val bucket = getBucket(properties, exaMetadata)
val expectedFileSystemName = classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName()
assert(bucket.isInstanceOf[HDFSBucket])
assert(bucket.getConfiguration().get("fs.hdfs.impl") === expectedFileSystemName)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.mockito.ExtraMockito
import org.mockito.Mockito._
import org.scalatest.BeforeAndAfterEach

class ExportTableTest
class TableDataExporterTest
extends StorageTest
with BeforeAndAfterEach
with DataRecords
Expand Down Expand Up @@ -98,7 +98,7 @@ class ExportTableTest
}

test("run exports table rows") {
ExportTable.run(metadata, iterator)
TableDataExporter.run(metadata, iterator)

verify(metadata, times(1)).getInputColumnCount
for { idx <- 3 to 10 } {
Expand All @@ -119,7 +119,7 @@ class ExportTableTest
}

test("imports exported rows from a path") {
ExportTable.run(metadata, iterator)
TableDataExporter.run(metadata, iterator)

val properties = Map("BUCKET_PATH" -> testResourceDir, "DATA_FORMAT" -> "PARQUET")
val importIter = mockExasolIterator(properties)
Expand All @@ -133,7 +133,7 @@ class ExportTableTest
}

test("export creates file without compression extension if compression codec is not set") {
ExportTable.run(metadata, iterator)
TableDataExporter.run(metadata, iterator)
assert(Files.exists(outputPath) === true)
assert(Files.list(outputPath).count() === 2)
checkExportFileExtensions(outputPath, "")
Expand All @@ -142,7 +142,7 @@ class ExportTableTest
test("export creates file with compression extension if compression codec is set") {
val properties = defaultProperties ++ Map("PARQUET_COMPRESSION_CODEC" -> "SNAPPY")
iterator = createMockedIterator(outputPath.toUri.toString, properties)
ExportTable.run(metadata, iterator)
TableDataExporter.run(metadata, iterator)
assert(Files.exists(outputPath) === true)
assert(Files.list(outputPath).count() === 2)
checkExportFileExtensions(outputPath, ".snappy")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.exasol.cloudetl.storage.StorageProperties

import org.mockito.Mockito._

class ExportPathTest extends PathTest {
class TableExportQueryGeneratorTest extends PathTest {

test("generateSqlForExportSpec returns SQL statement") {
when(metadata.getScriptSchema()).thenReturn(schema)
Expand All @@ -31,7 +31,10 @@ class ExportPathTest extends PathTest {
| iproc();
|""".stripMargin

assert(ExportPath.generateSqlForExportSpec(metadata, exportSpec) === expectedSQLStatement)
assert(
TableExportQueryGenerator
.generateSqlForExportSpec(metadata, exportSpec) === expectedSQLStatement
)
verify(metadata, atLeastOnce).getScriptSchema
verify(exportSpec, times(1)).getParameters
verify(exportSpec, times(1)).getSourceColumnNames
Expand All @@ -43,7 +46,7 @@ class ExportPathTest extends PathTest {
when(exportSpec.getParameters()).thenReturn(newProperties.asJava)

val thrown = intercept[IllegalArgumentException] {
ExportPath.generateSqlForExportSpec(metadata, exportSpec)
TableExportQueryGenerator.generateSqlForExportSpec(metadata, exportSpec)
}
assert(thrown.getMessage === "Please provide a value for the S3_ENDPOINT property!")
verify(exportSpec, times(1)).getParameters
Expand All @@ -57,7 +60,7 @@ class ExportPathTest extends PathTest {
when(exportSpec.getSourceColumnNames).thenReturn(srcCols.asJava)

val thrown = intercept[RuntimeException] {
ExportPath.generateSqlForExportSpec(metadata, exportSpec)
TableExportQueryGenerator.generateSqlForExportSpec(metadata, exportSpec)
}
assert(thrown.getMessage === "Could not parse the column name from 'tbl.c_int.integer'!")
verify(metadata, atLeastOnce).getScriptSchema
Expand All @@ -79,7 +82,7 @@ class ExportPathTest extends PathTest {
)
when(metadata.getScriptSchema()).thenReturn(schema)
when(exportSpec.getParameters()).thenReturn(newProperties.asJava)
ExportPath.generateSqlForExportSpec(metadata, exportSpec)
TableExportQueryGenerator.generateSqlForExportSpec(metadata, exportSpec)
assert(Files.exists(bucketPath) === true)
assert(Files.list(bucketPath).findAny().isPresent() === true)
assert(Files.list(bucketPath).count() === 3)
Expand All @@ -97,7 +100,7 @@ class ExportPathTest extends PathTest {
)
when(metadata.getScriptSchema()).thenReturn(schema)
when(exportSpec.getParameters()).thenReturn(newProperties.asJava)
ExportPath.generateSqlForExportSpec(metadata, exportSpec)
TableExportQueryGenerator.generateSqlForExportSpec(metadata, exportSpec)
assert(Files.exists(bucketPath) === false)
}

Expand Down

0 comments on commit 433ce2b

Please sign in to comment.