Skip to content
Embulk (https://github.com/embulk/embulk/) output plugin to dump records as Apache Parquet (https://parquet.apache.org/) files on S3.
Branch: develop
Clone or download
Latest commit c731102 Jul 17, 2019

README.md

S3 Parquet output plugin for Embulk

Embulk output plugin to dump records as Apache Parquet files on S3.

Overview

  • Plugin type: output
  • Load all or nothing: no
  • Resume supported: no
  • Cleanup supported: yes

Configuration

  • bucket: s3 bucket name (string, required)
  • path_prefix: prefix of target keys (string, optional)
  • sequence_format: format of the sequence number of the output files (string, default: "%03d.%02d.")
    • sequence_format formats task index and sequence number in a task.
  • file_ext: path suffix of the output files (string, default: "parquet")
  • compression_codec: compression codec for parquet file ("uncompressed","snappy","gzip","lzo","brotli","lz4" or "zstd", default: "uncompressed")
  • default_timestamp_format: default timestamp format (string, default: "%Y-%m-%d %H:%M:%S.%6N %z")
  • default_timezone: default timezone (string, default: "UTC")
  • column_options: a map whose keys are name of columns, and values are configuration with following parameters (optional)
    • timezone: timezone if type of this column is timestamp. If not set, default_timezone is used. (string, optional)
    • format: timestamp format if type of this column is timestamp. If not set, default_timestamp_format: is used. (string, optional)
  • canned_acl: grants one of canned ACLs for created objects (string, default: private)
  • block_size: The block size is the size of a row group being buffered in memory. This limits the memory usage when writing. Larger values will improve the I/O when reading but consume more memory when writing. (int, default: 134217728 (128MB))
  • page_size: The page size is for compression. When reading, each page can be decompressed independently. A block is composed of pages. The page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. (int, default: 1048576 (1MB))
  • max_padding_size: The max size (bytes) to write as padding and the min size of a row group (int, default: 8388608 (8MB))
  • enable_dictionary_encoding: The boolean value is to enable/disable dictionary encoding. (boolean, default: true)
  • auth_method: name of mechanism to authenticate requests ("basic", "env", "instance", "profile", "properties", "anonymous", or "session", default: "default")
    • "basic": uses access_key_id and secret_access_key to authenticate.
    • "env": uses AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY) environment variables.
    • "instance": uses EC2 instance profile or attached ECS task role.
    • "profile": uses credentials written in a file. Format of the file is as following, where [...] is a name of profile.
      [default]
      aws_access_key_id=YOUR_ACCESS_KEY_ID
      aws_secret_access_key=YOUR_SECRET_ACCESS_KEY
      
      [profile2]
      ...
      
    • "properties": uses aws.accessKeyId and aws.secretKey Java system properties.
    • "anonymous": uses anonymous access. This auth method can access only public files.
    • "session": uses temporary-generated access_key_id, secret_access_key and session_token.
    • "assume_role": uses temporary-generated credentials by assuming role_arn role.
    • "default": uses AWS SDK's default strategy to look up available credentials from runtime environment. This method behaves like the combination of the following methods.
      1. "env"
      2. "properties"
      3. "profile"
      4. "instance"
  • profile_file: path to a profiles file. this is optionally used when auth_method is "profile". (string, default: given by AWS_CREDENTIAL_PROFILES_FILE environment variable, or ~/.aws/credentials).
  • profile_name: name of a profile. this is optionally used when auth_method is "profile". (string, default: "default")
  • access_key_id: aws access key id. this is required when auth_method is "basic" or "session". (string, optional)
  • secret_access_key: aws secret access key. this is required when auth_method is "basic" or "session". (string, optional)
  • session_token: aws session token. this is required when auth_method is "session". (string, optional)
  • role_arn: arn of the role to assume. this is required for auth_method is "assume_role". (string, optional)
  • role_session_name: an identifier for the assumed role session. this is required when auth_method is "assume_role". (string, optional)
  • role_external_id: a unique identifier that is used by third parties when assuming roles in their customers' accounts. this is optionally used for auth_method: "assume_role". (string, optional)
  • role_session_duration_seconds: duration, in seconds, of the role session. this is optionally used for auth_method: "assume_role". (int, optional)
  • scope_down_policy: an iam policy in json format. this is optionally used for auth_method: "assume_role". (string, optional)
  • catalog: Register a table if this option is specified (optional)
    • catalog_id: glue data catalog id if you use a catalog different from account/region default catalog. (string, optional)
    • database: The name of the database (string, required)
    • table: The name of the table (string, required)
    • column_options: a key-value pairs where key is a column name and value is options for the column. (string to options map, default: {})
      • type: type of a column when this plugin creates new tables (e.g. STRING, BIGINT) (string, default: depends on input column type. BIGINT if input column type is long, BOOLEAN if boolean, DOUBLE if double, STRING if string, STRING if timestamp, STRING if json)
    • operation_if_exists: operation if the table already exist. Available operations are "delete" and "skip" (string, default: "delete")
  • endpoint: The AWS Service endpoint (string, optional)
  • region: The AWS region (string, optional)
  • http_proxy: Indicate whether using when accessing AWS via http proxy. (optional)
    • host proxy host (string, required)
    • port proxy port (int, optional)
    • protocol proxy protocol (string, default: "https")
    • user proxy user (string, optional)
    • password proxy password (string, optional)
  • buffer_dir: buffer directory for parquet files to be uploaded on S3 (string, default: Create a Temporary Directory)

Example

out:
  type: s3_parquet
  bucket: my-bucket
  path_prefix: path/to/my-obj.
  file_ext: snappy.parquet
  compression_codec: snappy
  default_timezone: Asia/Tokyo
  canned_acl: bucket-owner-full-control

Note

  • The current implementation does not support LogicalTypes. I will implement them later as column_options. So, currently timestamp type and json type are stored as UTF-8 String. Please be careful.

Development

Run example:

$ ./gradlew classpath
$ embulk run example/config.yml -Ilib

Run test:

$ ./gradlew test

Build

$ ./gradlew gem  # -t to watch change of files and rebuild continuously

Release gem:

Fix build.gradle, then

$ ./gradlew gemPush

ChangeLog

CHANGELOG.md

You can’t perform that action at this time.