Skip to content

Commit

Permalink
Merge e1a216f into 4651597
Browse files Browse the repository at this point in the history
  • Loading branch information
morazow committed Feb 10, 2021
2 parents 4651597 + e1a216f commit 447f004
Show file tree
Hide file tree
Showing 33 changed files with 470 additions and 272 deletions.
3 changes: 3 additions & 0 deletions doc/changes/changes_1.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* #11: Added support for importing Parquet complex (LIST, MAP) types (PR #111).
* #115: Added support for importing Orc complex (LIST, STRUCT) types (PR #116).
* #118: Added support for docker based Exasol installations (PR #119).
* #120: Added support for S3 path style access (PR #126).

## Bug Fixes

Expand All @@ -16,6 +17,8 @@
## Refactoring

* #117: Added Exasol docker containers for integration tests (PR #119).
* #114: Enabled linter to check the platform charsets or locales (PR #126).
* #123: Renamed the UDF script class names (PR #126).

## Documentation

Expand Down
32 changes: 28 additions & 4 deletions doc/user_guide/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,18 @@ Run the following SQL statements to create importer UDF scripts.
OPEN SCHEMA CLOUD_STORAGE_EXTENSION;

CREATE OR REPLACE JAVA SET SCRIPT IMPORT_PATH(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ImportPath;
%scriptclass com.exasol.cloudetl.scriptclasses.FilesImportQueryGenerator;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-<VERSION>.jar;
/

CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...)
EMITS (filename VARCHAR(2000), partition_index VARCHAR(100)) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ImportMetadata;
%scriptclass com.exasol.cloudetl.scriptclasses.FilesMetadataReader;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-<VERSION>.jar;
/

CREATE OR REPLACE JAVA SET SCRIPT IMPORT_FILES(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ImportFiles;
%scriptclass com.exasol.cloudetl.scriptclasses.FilesDataImporter;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-<VERSION>.jar;
/
```
Expand All @@ -163,7 +163,7 @@ different deployment.
OPEN SCHEMA CLOUD_STORAGE_EXTENSION;

CREATE OR REPLACE JAVA SET SCRIPT IMPORT_PATH(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.DockerFilesQueryGenerator;
%scriptclass com.exasol.cloudetl.scriptclasses.DockerFilesImportQueryGenerator;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-<VERSION>.jar;
/

Expand Down Expand Up @@ -590,6 +590,30 @@ INTO SCRIPT CLOUD_STORAGE_EXTENSION.EXPORT_PATH WITH
PARALLELISM = 'iproc(), floor(random()*<MULTIPLIER>)';
```

### S3 Path Style Access

Amazon S3 [deprecated the path style access][s3-path-style-deprecation] to the
buckets at the end of the 2020. This breaks the access to the bucket that
contain dot (`.`) in their names.

[s3-path-style-deprecation]: https://forums.aws.amazon.com/ann.jspa?annID=6776

To enable the path style access to the bucket, you can set the
`S3_PATH_STYLE_ACCESS` parameter to `true`.

For example:

```
IMPORT INTO <schema>.<table>
FROM SCRIPT CLOUD_STORAGE_EXTENSION.IMPORT_PATH WITH
BUCKET_PATH = 's3a://<S3_PATH>.data.domain/import/data/*.parquet'
DATA_FORMAT = 'PARQUET'
S3_PATH_STYLE_ACCESS = 'true'
S3_ENDPOINT = 's3.<REGION>.amazonaws.com'
CONNECTION_NAME = 'S3_CONNECTION'
PARALLELISM = 'nproc()';
```

## Google Cloud Storage

Similar to Amazon S3, you need to have security credentials to access the Google
Expand Down
2 changes: 0 additions & 2 deletions project/Compilation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ object Compilation {
Wart.Null,
Wart.MutableDataStructures,
Wart.Overloading,
Wart.PlatformDefault,
Wart.Throw,
Wart.Var,
Wart.While
Expand All @@ -126,7 +125,6 @@ object Compilation {
Wart.NonUnitStatements,
Wart.Null,
Wart.Overloading,
Wart.PlatformDefault,
Wart.Var
)

Expand Down
5 changes: 4 additions & 1 deletion src/it/scala/com/exasol/cloudetl/BaseIntegrationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {
.language(UdfScript.Language.JAVA)
.inputType(UdfScript.InputType.SET)
.emits()
.bucketFsContent("com.exasol.cloudetl.scriptclasses.DockerFilesQueryGenerator", jarPath)
.bucketFsContent(
"com.exasol.cloudetl.scriptclasses.DockerFilesImportQueryGenerator",
jarPath
)
.build()
schema
.createUdfBuilder("IMPORT_METADATA")
Expand Down
7 changes: 4 additions & 3 deletions src/it/scala/com/exasol/cloudetl/DataImporterIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.math._
import java.nio.file.Path
import java.nio.ByteOrder
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets.UTF_8
import java.sql.ResultSet
import java.sql.Timestamp
import java.time._
Expand Down Expand Up @@ -163,7 +164,7 @@ class DataImporterIT extends BaseIntegrationTest {

test("imports bytes") {
AvroChecker(getBasicSchema("\"bytes\""), "VARCHAR(20)")
.withInputValues(List(ByteBuffer.wrap("hello".getBytes("UTF-8"))))
.withInputValues(List(ByteBuffer.wrap("hello".getBytes(UTF_8))))
.assertResultSet(
table()
.row("hello")
Expand All @@ -189,7 +190,7 @@ class DataImporterIT extends BaseIntegrationTest {
val schema = getBasicSchema("""{"type":"fixed","name":"fixed", "size":5}""")
val fixedSchema = new Schema.Parser().parse(schema).getField("column").schema()
val fixedData = new GenericData.Fixed(fixedSchema)
fixedData.bytes("fixed".getBytes("UTF-8"))
fixedData.bytes("fixed".getBytes(UTF_8))
AvroChecker(schema, "VARCHAR(20)")
.withInputValues(List(fixedData))
.assertResultSet(
Expand Down Expand Up @@ -576,7 +577,7 @@ class DataImporterIT extends BaseIntegrationTest {
unionVector.noNulls = false
// Set string type for the first row
unionVector.tags(1) = 0
unionVector.fields(1).asInstanceOf[BytesColumnVector].setVal(0, "str".getBytes("UTF-8"))
unionVector.fields(1).asInstanceOf[BytesColumnVector].setVal(0, "str".getBytes(UTF_8))
// Set int type for the second row
unionVector.tags(0) = 1
unionVector.fields(0).asInstanceOf[LongColumnVector].vector(1) = 23
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ final case class S3Bucket(path: String, params: StorageProperties)
private[this] val S3_ACCESS_KEY: String = "S3_ACCESS_KEY"
private[this] val S3_SECRET_KEY: String = "S3_SECRET_KEY"
private[this] val S3_SESSION_TOKEN: String = "S3_SESSION_TOKEN"
private[this] val S3_PATH_STYLE_ACCESS: String = "S3_PATH_STYLE_ACCESS"
private[this] val S3_CHANGE_DETECTION_MODE: String = "S3_CHANGE_DETECTION_MODE"

/** @inheritdoc */
Expand Down Expand Up @@ -49,6 +50,9 @@ final case class S3Bucket(path: String, params: StorageProperties)
if (properties.containsKey(S3_CHANGE_DETECTION_MODE)) {
conf.set("fs.s3a.change.detection.mode", properties.getString(S3_CHANGE_DETECTION_MODE))
}
if (properties.containsKey(S3_PATH_STYLE_ACCESS)) {
conf.set("fs.s3a.path.style.access", properties.getString(S3_PATH_STYLE_ACCESS))
}

val mergedProperties = if (properties.hasNamedConnection()) {
properties.merge(S3_ACCESS_KEY)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.exasol.cloudetl.parquet

import java.util.Locale.ENGLISH

import com.exasol.cloudetl.storage.StorageProperties

import org.apache.parquet.hadoop.ParquetWriter
Expand All @@ -17,7 +19,7 @@ object ParquetWriteOptions {

def apply(params: StorageProperties): ParquetWriteOptions = {
val compressionCodec =
params.get("PARQUET_COMPRESSION_CODEC").getOrElse("").toUpperCase() match {
params.get("PARQUET_COMPRESSION_CODEC").getOrElse("").toUpperCase(ENGLISH) match {
case "SNAPPY" => CompressionCodecName.SNAPPY
case "GZIP" => CompressionCodecName.GZIP
case "LZO" => CompressionCodecName.LZO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.exasol.cloudetl.parquet

import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -135,7 +136,7 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] {
case PrimitiveTypeName.BINARY =>
(row: Row, index: Int) =>
recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getAs[String](index).getBytes)
Binary.fromReusedByteArray(row.getAs[String](index).getBytes(UTF_8))
)

case PrimitiveTypeName.INT96 =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.exasol.cloudetl.parquet.converter
import java.math.BigDecimal
import java.math.BigInteger
import java.nio.ByteOrder
import java.nio.charset.StandardCharsets.UTF_8

import com.exasol.cloudetl.util.DateTimeUtil

Expand Down Expand Up @@ -58,7 +59,8 @@ trait ParquetConverter extends Converter {
final case class ParquetPrimitiveConverter(index: Int, holder: ValueHolder)
extends PrimitiveConverter
with ParquetConverter {
override def addBinary(value: Binary): Unit = holder.put(index, new String(value.getBytes()))
override def addBinary(value: Binary): Unit =
holder.put(index, new String(value.getBytes(), UTF_8))
override def addBoolean(value: Boolean): Unit = holder.put(index, value)
override def addDouble(value: Double): Unit = holder.put(index, value)
override def addFloat(value: Float): Unit = holder.put(index, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ object DockerFilesDataImporter {
def run(metadata: ExaMetadata, iterator: ExaIterator): Unit = {
import org.apache.hadoop.security.UserGroupInformation
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("exauser"))
ImportFiles.run(metadata, iterator)
FilesDataImporter.run(metadata, iterator)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import com.exasol.ExaMetadata
/**
* A query generator class to run inside the Exasol docker container.
*/
object DockerFilesQueryGenerator {
object DockerFilesImportQueryGenerator {

def generateSqlForImportSpec(
metadata: ExaMetadata,
importSpecification: ExaImportSpecification
): String = {
import org.apache.hadoop.security.UserGroupInformation
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("exauser"))
ImportPath.generateSqlForImportSpec(metadata, importSpecification)
FilesImportQueryGenerator.generateSqlForImportSpec(metadata, importSpecification)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ object DockerFilesMetadataReader {
def run(metadata: ExaMetadata, iterator: ExaIterator): Unit = {
import org.apache.hadoop.security.UserGroupInformation
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("exauser"))
ImportMetadata.run(metadata, iterator)
FilesMetadataReader.run(metadata, iterator)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.exasol.cloudetl.scriptclasses

import scala.collection.mutable.ListBuffer

import com.exasol.ExaIterator
import com.exasol.ExaMetadata
import com.exasol.cloudetl.bucket.Bucket
import com.exasol.cloudetl.source._
import com.exasol.cloudetl.storage.StorageProperties
import com.exasol.common.data.Row

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.fs.Path

/**
* A importer class that reads and imports data into Exasol database.
*/
object FilesDataImporter extends LazyLogging {

/**
* Reads files and emits their data into Exasol iterator.
*
* @param metadata an Exasol metadata object
* @param iterator an Exasol iterator object
*/
def run(metadata: ExaMetadata, iterator: ExaIterator): Unit = {
val storageProperties = StorageProperties(iterator.getString(1), metadata)
val fileFormat = storageProperties.getFileFormat()
val bucket = Bucket(storageProperties)

val files = groupFiles(iterator, 2)
val nodeId = metadata.getNodeId
val vmId = metadata.getVmId
logger.info(s"The total number of files for node: $nodeId, vm: $vmId is '${files.size}'.")

files.foreach { file =>
logger.debug(s"Importing from file: '$file'")
val source =
Source(fileFormat, new Path(file), bucket.getConfiguration(), bucket.fileSystem)
readAndEmit(source.stream(), iterator)
source.close()
}
}

private[this] def groupFiles(
iterator: ExaIterator,
fileStartingIndex: Int
): Seq[String] = {
val files = ListBuffer[String]()
do {
files.append(iterator.getString(fileStartingIndex))
} while (iterator.next())
files.toSeq
}

private[this] def readAndEmit(rowIterator: Iterator[Row], ctx: ExaIterator): Unit =
rowIterator.foreach { row =>
val columns: Seq[Object] = row.getValues().map(_.asInstanceOf[AnyRef])
ctx.emit(columns: _*)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.exasol.cloudetl.scriptclasses

import com.exasol.ExaImportSpecification
import com.exasol.ExaMetadata
import com.exasol.cloudetl.bucket.Bucket
import com.exasol.cloudetl.storage.StorageProperties

/**
* A SQL query generator class that facilitates the metadata reading and
* file data importing.
*/
object FilesImportQueryGenerator {

private[this] val DEFAULT_PARALLELISM = "nproc()"

/**
* Generates an Exasol SQL for the data import query.
*
* @param metadata an Exasol metadata object
* @param importSpecification an Exasol import specification object
*/
def generateSqlForImportSpec(
metadata: ExaMetadata,
importSpecification: ExaImportSpecification
): String = {
val storageProperties = StorageProperties(importSpecification.getParameters())
val bucket = Bucket(storageProperties)
bucket.validate()

val scriptSchema = metadata.getScriptSchema()
val bucketPath = bucket.bucketPath
val parallelism = storageProperties.getParallelism(DEFAULT_PARALLELISM)
val storagePropertiesAsString = storageProperties.mkString()

s"""|SELECT
| $scriptSchema.IMPORT_FILES(
| '$bucketPath', '$storagePropertiesAsString', filename
|)
|FROM (
| SELECT $scriptSchema.IMPORT_METADATA(
| '$bucketPath', '$storagePropertiesAsString', $parallelism
| )
|)
|GROUP BY
| partition_index;
|""".stripMargin
}

}
Loading

0 comments on commit 447f004

Please sign in to comment.