Skip to content

Commit

Permalink
Merge 12ef838 into f386473
Browse files Browse the repository at this point in the history
  • Loading branch information
morazow committed Mar 26, 2021
2 parents f386473 + 12ef838 commit d45608d
Show file tree
Hide file tree
Showing 22 changed files with 532 additions and 97 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ before_install:
- git fetch --tags
- docker pull "exasol/docker-db:$EXASOL_DOCKER_VERSION"
- docker pull localstack/localstack:0.12.5
- docker pull alluxio/alluxio:2.5.0

script:
- ./scripts/ci.sh
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ however, it should be safe to run it on the newer JVM versions. This is also
| [Hadoop Azure][hadoop-azr-link] | Access support for Azure Blob Storage | Apache License 2.0 |
| [Hadoop Azure Datalake][hadoop-azrlake-link]| Access support for Azure Data Lake Store | Apache License 2.0 |
| [Hadoop Client][hadoop-client-link] | Apache Hadoop common dependencies as configuration or filesystem| Apache License 2.0 |
| [Alluxio Client][alluxio-client-link] | Alluxio filesystem API dependency | Apache License 2.0 |
| [Google Cloud Storage][gcs-connectors-link] | Access support for Google Cloud Storage | Apache License 2.0 |
| [Delta Lake Core][delta-io] | Integration support for Delta Lake format | Apache License 2.0 |
| [Apache Spark SQL][spark-sql] | Access support for Delta Lake formatted files | Apache License 2.0 |
Expand Down Expand Up @@ -114,6 +115,7 @@ These plugins help with project development.
[hadoop-azr-link]: https://hadoop.apache.org/docs/current/hadoop-azure/index.html
[hadoop-azrlake-link]: https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html
[hadoop-client-link]: https://github.com/apache/hadoop/tree/trunk/hadoop-client-modules
[alluxio-client-link]: https://docs.alluxio.io/os/user/stable/en/api/FS-API.html
[gcs-connectors-link]: https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage
[jdk-compatibility]: https://docs.scala-lang.org/overviews/jdk-compatibility/overview.html#running-versus-compiling
[scalatest-link]: http://www.scalatest.org/
Expand Down
15 changes: 13 additions & 2 deletions doc/changes/changes_1.1.0.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
# Cloud Storage Extension 1.1.0, released 2021-MM-DD
# Cloud Storage Extension 1.1.0, released 2021-02-26

Code name:
Code Name: v1.1.0 - Added support for Alluxio and HDFS filesystems

## Summary

The release `1.1.0` offers support for Alluxio and Hadoop Distributed Filesystem
(HDFS) filesystems. We also fixed bug in Parquet dictionary encoded decimal
converter and improved the documentation.

## Features

* #134: Added support for Hadoop Distributed Filesystem (HDFS) (PR #136).
* #135: Added support for Alluxio filesystem (PR #139).

## Bug Fixes

Expand All @@ -18,8 +25,12 @@ Code name:

### Runtime Dependency Updates

* Updated `org.apache.parquet:parquet-hadoop:1.11.1` to `1.12.0`

### Test Dependency Updates

* Added `org.apache.hadoop:hadoop-hdfs:3.3.0`
* Added `org.alluxio:alluxio-core-client-hdfs:2.5.0`
* Updated `org.scalatest:scalatest:3.2.3` to `3.2.6`
* Updated `org.mockito:mockito-core:3.7.7` to `3.8.0`
* Updated `org.testcontainers:localstack:1.15.1` to `1.15.2`
Expand Down
56 changes: 56 additions & 0 deletions doc/user_guide/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1066,3 +1066,59 @@ changes.
[delta-io]: https://delta.io/
[delta-storage]: https://docs.delta.io/latest/delta-storage.html
[delta-history]: https://docs.delta.io/latest/delta-utility.html#history

## Hadoop Distributed Filesystem (HDFS)

The [Hadoop distributed file system (HDFS)][hdfs-link] is a distributed,
scalable, and portable file system written in Java for the Hadoop framework

[hdfs-link]: https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html

When the Hadoop datanodes and Exasol cluster are installed in the same (virtual)
network, you can access the HDFS using `cloud-storage-extension`.

For import:

```sql
IMPORT INTO <schema>.<table>
FROM SCRIPT CLOUD_STORAGE_EXTENSION.IMPORT_PATH WITH
BUCKET_PATH = 'hdfs://<HDFS_PATH>/import/orc/data/*.orc'
DATA_FORMAT = 'ORC'
PARALLELISM = 'nproc()*<MULTIPLIER>';
```

For export:

```sql
EXPORT <schema>.<table>
INTO SCRIPT CLOUD_STORAGE_EXTENSION.EXPORT_PATH WITH
BUCKET_PATH = 'hdfs://<HDFS_PATH>/export/parquet/data/'
DATA_FORMAT = 'PARQUET'
PARALLELISM = 'iproc(), floor(random()*<MULTIPLIER>)';
```

Because we assume that they are in the same private network, you do not have to
create a connection object.

At the moment, it is not possible to access HDFS using `cloud-storage-extension`
if the clusters are not located in the same private network.

## Alluxio Filesystem

[Alluxio](https://docs.alluxio.io/os/user/stable/en/Overview.html) is an open
source data orchestration technology for analytics and AI for the cloud. It
provides filesystem API similar to the HDFS.

You can import formatted data from Alluxio using the cloud-storage-extension.

```sql
IMPORT INTO <schema>.<table>
FROM SCRIPT CLOUD_STORAGE_EXTENSION.IMPORT_PATH WITH
BUCKET_PATH = 'alluxio://<ALLUXIO_PATH>/import/parquet/data/*'
DATA_FORMAT = 'PARQUET'
PARALLELISM = 'nproc()*<MULTIPLIER>';
```

For this to work, the Alluxio and Exasol clusters should be located in a same
(virtual) network. It is not possible to import if they are not within the same
private network.
6 changes: 5 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ object Dependencies {
private val HadoopVersion = "3.3.0"
private val DeltaVersion = "0.7.0"
private val OrcVersion = "1.6.7"
private val ParquetVersion = "1.11.1"
private val ParquetVersion = "1.12.0"
private val GoogleStorageVersion = "1.9.4-hadoop3"
private val SparkSQLVersion = "3.0.1"
private val AlluxioCoreHDFSVersion = "2.5.0"

// Test dependencies versions
private val ScalaTestVersion = "3.2.6"
Expand All @@ -24,6 +25,7 @@ object Dependencies {
private val ExasolTestDBBuilderVersion = "3.1.1"
private val ExasolTestContainersVersion = "3.5.1"
private val TestContainersLocalstackVersion = "1.15.2"
private val TestContainersScalaVersion = "0.39.3"

val Resolvers: Seq[Resolver] = Seq(
"Exasol Releases" at "https://maven.exasol.com/artifactory/exasol-releases"
Expand Down Expand Up @@ -58,6 +60,7 @@ object Dependencies {
ExclusionRule(organization = "org.apache.zookeeper")
),
"org.apache.hadoop" % "hadoop-hdfs" % HadoopVersion,
"org.alluxio" % "alluxio-core-client-hdfs" % AlluxioCoreHDFSVersion,
"com.google.cloud.bigdataoss" % "gcs-connector" % GoogleStorageVersion
exclude ("com.google.guava", "guava")
exclude ("org.apache.httpcomponents", "httpclient"),
Expand Down Expand Up @@ -91,6 +94,7 @@ object Dependencies {
"com.exasol" % "test-db-builder-java" % ExasolTestDBBuilderVersion,
"com.exasol" % "hamcrest-resultset-matcher" % ExasolHamcrestMatcherVersion,
"org.hamcrest" % "hamcrest" % HamcrestVersion,
"com.dimafeng" %% "testcontainers-scala-scalatest" % TestContainersScalaVersion,
"org.testcontainers" % "localstack" % TestContainersLocalstackVersion
).map(_ % Test)

Expand Down
6 changes: 4 additions & 2 deletions project/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ object Settings {
logLevel in assembly := Level.Info,
assemblyJarName in assembly := moduleName.value + "-" + version.value + ".jar",
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
case "META-INF/services/io.grpc.LoadBalancerProvider" => MergeStrategy.concat
case "META-INF/services/io.grpc.NameResolverProvider" => MergeStrategy.concat
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
},
assemblyExcludedJars in assembly := {
val cp = (fullClasspath in assembly).value
Expand Down
136 changes: 136 additions & 0 deletions src/it/scala/com/exasol/cloudetl/AlluxioExportImportIT.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package com.exasol.cloudetl

import java.sql.ResultSet

import com.exasol.dbbuilder.dialects.Table
import com.exasol.matcher.ResultSetStructureMatcher.table
import com.dimafeng.testcontainers.GenericContainer
import org.hamcrest.Matcher
import org.hamcrest.MatcherAssert.assertThat

class AlluxioExportImportIT extends BaseIntegrationTest {

val ALLUXIO_IMAGE = "alluxio/alluxio:2.5.0"
val SCHEMA_NAME = "ALLUXIO_SCHEMA"

val alluxioMainContainer =
GenericContainer(ALLUXIO_IMAGE, exposedPorts = Seq(19998), command = Seq("master"))
.configure { c =>
c.withNetwork(network)
c.withNetworkAliases("alluxio-main")
c.withEnv("ALLUXIO_JAVA_OPTS", "-Dalluxio.master.hostname=alluxio-main")
c.withReuse(true)
()
}
val alluxioWorkerContainer =
GenericContainer(ALLUXIO_IMAGE, exposedPorts = Seq(29999, 30000), command = Seq("worker"))
.configure { c =>
c.withNetwork(network)
c.withNetworkAliases("alluxio-worker")
c.withEnv(
"ALLUXIO_JAVA_OPTS",
"-Dalluxio.master.hostname=alluxio-main " +
"-Dalluxio.worker.container.hostname=alluxio-worker -Dalluxio.worker.ramdisk.size=64MB"
)
c.withSharedMemorySize(1024 * 1024 * 1024 * 1024)
c.dependsOn(alluxioMainContainer)
c.withReuse(true)
()
}

override final def beforeAll(): Unit = {
super.beforeAll()
alluxioMainContainer.start()
alluxioWorkerContainer.start()
prepareExasolDatabase(SCHEMA_NAME)
}

override final def afterAll(): Unit = {
alluxioMainContainer.stop()
alluxioWorkerContainer.stop()
super.afterAll()
}

test("alluxio filesystem export and import") {
val exportedTable = schema
.createTable("EXPORTED_ITEMS", "PRODUCT_ID", "DECIMAL(18,0)", "NAME", "VARCHAR(40)")
.insert("1", "Cat food")
.insert("2", "Toy mouse")
val importedTable = schema
.createTable("IMPORTED_ITEMS", "PRODUCT_ID", "DECIMAL(18,0)", "NAME", "VARCHAR(40)")
prepareContainers("data")
exportIntoAlluxio(exportedTable, "data")
importIntoExasol(importedTable, "data")
assertResultSet(
importedTable,
table()
.row(java.lang.Long.valueOf(1), "Cat food")
.row(java.lang.Long.valueOf(2), "Toy mouse")
.matches()
)
}

def assertResultSet(table: Table, matcher: Matcher[ResultSet]): Unit = {
val resultSet = executeQuery(
s"SELECT * FROM ${table.getFullyQualifiedName()} ORDER BY PRODUCT_ID ASC"
)
assertThat(resultSet, matcher)
resultSet.close()
}

def prepareContainers(bucket: String): Unit = {
val alluxioFsCmd = "/opt/alluxio/bin/alluxio fs"
var exitCode = alluxioMainContainer
.execInContainer("/bin/sh", "-c", s"$alluxioFsCmd mkdir /$bucket")
if (exitCode.getExitCode() != 0) {
throw new RuntimeException(s"Could not create '$bucket' folder in Alluxio container.")
}
exitCode = alluxioMainContainer
.execInContainer("/bin/sh", "-c", s"$alluxioFsCmd chmod 777 /$bucket/")
if (exitCode.getExitCode() != 0) {
throw new RuntimeException(
s"Could not change '$bucket' folder permissions in Alluxio container."
)
}
val workerIPv4Address = getContainerIPv4Address(alluxioWorkerContainer)
exitCode = exasolContainer.execInContainer(
"/bin/sh",
"-c",
s"echo '$workerIPv4Address alluxio-worker' >> /etc/hosts"
)
if (exitCode.getExitCode() != 0) {
throw new RuntimeException("Could not update `/etc/hosts` file in Exasol container.")
}
}

def exportIntoAlluxio(table: Table, bucket: String): Unit =
executeStmt(
s"""|EXPORT ${table.getFullyQualifiedName()}
|INTO SCRIPT $SCHEMA_NAME.EXPORT_PATH WITH
|BUCKET_PATH = 'alluxio://${getContainerIPv4Address(alluxioMainContainer)}:19998/$bucket/'
|DATA_FORMAT = 'PARQUET'
|PARQUET_BLOCK_SIZE = '67108864'
|PARALLELISM = 'iproc()';
""".stripMargin
)

def importIntoExasol(table: Table, bucket: String): Unit =
executeStmt(
s"""|IMPORT INTO ${table.getFullyQualifiedName()}
|FROM SCRIPT $SCHEMA_NAME.IMPORT_PATH WITH
|BUCKET_PATH = 'alluxio://${getContainerIPv4Address(alluxioMainContainer)}:19998/$bucket/'
|DATA_FORMAT = 'PARQUET'
|PARALLELISM = 'nproc()';
""".stripMargin
)

private[this] def getContainerIPv4Address(container: GenericContainer): String =
container.containerInfo
.getNetworkSettings()
.getNetworks()
.values()
.iterator()
.next()
.getIpAddress()

}
Loading

0 comments on commit d45608d

Please sign in to comment.