Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add parquet data writer #403

Merged
merged 33 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8353095
add simple parquet data writer with test
sh-rp Jun 13, 2023
7a5c27a
more work on parquet writer
sh-rp Jun 14, 2023
12431c7
enable parquet for big query
sh-rp Jun 14, 2023
e80cb4d
move tests and pyarrow imports
sh-rp Jun 14, 2023
654c977
start rotation tests
sh-rp Jun 14, 2023
e1bb088
test file rotation
sh-rp Jun 15, 2023
3d5073c
fix linting
sh-rp Jun 15, 2023
ce8ab9f
propagate file loader type to normalize methods
sh-rp Jun 15, 2023
c6dbb8f
bigquery parquet loading
sh-rp Jun 15, 2023
ff4a8ed
fix comments
sh-rp Jun 15, 2023
9a3e07f
move bigquery default back to jsonl
sh-rp Jun 15, 2023
c723469
fixes jsonl default in filesystem: normalize tests
rudolfix Jun 15, 2023
1cccb14
fixes the pipeline info cli command output
rudolfix Jun 15, 2023
aa61349
put sql back into bigquer caps
sh-rp Jun 16, 2023
351324b
add biquery parquet result testing
sh-rp Jun 16, 2023
9f03539
add config spec
sh-rp Jun 16, 2023
fcf2820
some pr fixes
sh-rp Jun 16, 2023
ad73d6b
make non compression support explicit in parquet writer
sh-rp Jun 19, 2023
3ed0e2a
further pr fixes
sh-rp Jun 19, 2023
8227652
fix tests
sh-rp Jun 19, 2023
e735971
temporarily remove extra flag from pyarrow in pyproject toml
sh-rp Jun 19, 2023
ac1338c
add bigquery extra
sh-rp Jun 19, 2023
5cd8f5f
some dependency fixes for tests
sh-rp Jun 19, 2023
ca8334f
create proper extra
sh-rp Jun 19, 2023
98cf9dd
move default destination caps for bigquery back to jsonl
sh-rp Jun 19, 2023
fc3ac52
fix source format detection in bigquery loader
sh-rp Jun 19, 2023
ed90517
pr fixes
sh-rp Jun 20, 2023
afe16e5
small pr fix
sh-rp Jun 20, 2023
c5692c2
Merge branch 'devel' into d#/389-add-parquet-data-writer
rudolfix Jun 20, 2023
0f62cb3
updates poetry.lock
rudolfix Jun 20, 2023
fb1b82d
makes random dataset name in compression test to run in parallel
rudolfix Jun 20, 2023
a84c7ad
resolves the potgres/redshift/snowflake credentials when parsing nati…
rudolfix Jun 20, 2023
2732ca3
adds proper handling of password vs private key fields in snowflake c…
rudolfix Jun 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_airflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-airflow-runner

- name: Install dependencies
run: poetry install --no-interaction --with airflow -E duckdb
run: poetry install --no-interaction --with airflow -E duckdb -E pyarrow

- run: |
poetry run pytest tests/helpers/airflow_tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E duckdb
run: poetry install --no-interaction -E duckdb -E pyarrow

# - name: Install self
# run: poetry install --no-interaction
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_bigquery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E bigquery --with providers
run: poetry install --no-interaction -E bigquery --with providers -E pyarrow

# - name: Install self
# run: poetry install --no-interaction
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E duckdb -E gs -E s3
run: poetry install --no-interaction -E redshift -E duckdb -E gs -E s3 -E pyarrow

# - name: Install self
# run: poetry install --no-interaction
Expand Down
2 changes: 1 addition & 1 deletion dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
if p.default_schema_name is None:
fmt.warning("This pipeline does not have a default schema")
else:
is_single_schema = len(p.schema_names)
is_single_schema = len(p.schema_names) == 1
for schema_name in p.schema_names:
fmt.echo("Resources in schema: %s" % fmt.bold(schema_name))
schema = p.schemas[schema_name]
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> Non
if self.file_max_bytes and self._file.tell() >= self.file_max_bytes:
self._rotate_file()
# rotate on max items
if self.file_max_items and self._writer.items_count >= self.file_max_items:
elif self.file_max_items and self._writer.items_count >= self.file_max_items:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah you've found the bug we could not pinpoint with @z3z1ma for quite some time 🚀 #370
obviously double rotation will not work

self._rotate_file()

def close(self) -> None:
Expand Down
72 changes: 70 additions & 2 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import abc

# import jsonlines
from dataclasses import dataclass
from typing import Any, Dict, Sequence, IO, Type
from typing import Any, Dict, Sequence, IO, Type, Optional, List, cast

from dlt.common import json
from dlt.common.typing import StrAny
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.destination import TLoaderFileFormat, DestinationCapabilitiesContext

from dlt.common.configuration import with_config, known_sections, configspec
from dlt.common.configuration.specs import BaseConfiguration

@dataclass
class TFileFormatSpec:
Expand Down Expand Up @@ -67,6 +69,8 @@ def class_factory(file_format: TLoaderFileFormat) -> Type["DataWriter"]:
return JsonlListPUAEncodeWriter
elif file_format == "insert_values":
return InsertValuesWriter
elif file_format == "parquet":
return ParquetDataWriter # type: ignore
else:
raise ValueError(file_format)

Expand Down Expand Up @@ -173,3 +177,67 @@ def data_format(cls) -> TFileFormatSpec:
supports_compression=True,
requires_destination_capabilities=True,
)
return TFileFormatSpec("insert_values", "insert_values", False, False, requires_destination_capabilities=True)


@configspec
class ParquetDataWriterConfiguration(BaseConfiguration):
flavor: str = "spark"
version: str = "2.4"
data_page_size: int = 1024 * 1024
Copy link
Collaborator

@rudolfix rudolfix Jun 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool!

let's add a section:
__section__ = known_sections.DATA_WRITER

then "data_writer" is always mandatory and it will blend in with generic settings in buffered


__section__: str = known_sections.DATA_WRITER

class ParquetDataWriter(DataWriter):

@with_config(spec=ParquetDataWriterConfiguration)
def __init__(self,
f: IO[Any],
caps: DestinationCapabilitiesContext = None,
*,
flavor: str = "spark",
version: str = "2.4",
data_page_size: int = 1024 * 1024
) -> None:
super().__init__(f, caps)
from dlt.common.libs.pyarrow import pyarrow

self.writer: Optional[pyarrow.parquet.ParquetWriter] = None
self.schema: Optional[pyarrow.Schema] = None
self.complex_indices: List[str] = None
self.parquet_flavor = flavor
self.parquet_version = version
self.parquet_data_page_size = data_page_size

def write_header(self, columns_schema: TTableSchemaColumns) -> None:
from dlt.common.libs.pyarrow import pyarrow, get_py_arrow_datatype

# build schema
self.schema = pyarrow.schema([pyarrow.field(name, get_py_arrow_datatype(schema_item["data_type"]), nullable=schema_item["nullable"]) for name, schema_item in columns_schema.items()])
# find row items that are of the complex type (could be abstracted out for use in other writers?)
self.complex_indices = [i for i, field in columns_schema.items() if field["data_type"] == "complex"]
self.writer = pyarrow.parquet.ParquetWriter(self._f, self.schema, flavor=self.parquet_flavor, version=self.parquet_version, data_page_size=self.parquet_data_page_size)


def write_data(self, rows: Sequence[Any]) -> None:
super().write_data(rows)
from dlt.common.libs.pyarrow import pyarrow

# replace complex types with json
for key in self.complex_indices:
for row in rows:
if key in row:
row[key] = json.dumps(row[key]) if row[key] else row[key]

table = pyarrow.Table.from_pylist(rows, schema=self.schema)
# Write
self.writer.write_table(table)

def write_footer(self) -> None:
self.writer.close()
self.writer = None


@classmethod
def data_format(cls) -> TFileFormatSpec:
return TFileFormatSpec("parquet", "parquet", True, False, requires_destination_capabilities=True, supports_compression=False)
4 changes: 2 additions & 2 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# puae-jsonl - internal extract -> normalize format bases on jsonl
# insert_values - insert SQL statements
# sql - any sql statement
TLoaderFileFormat = Literal["jsonl", "puae-jsonl", "insert_values", "sql"]
TLoaderFileFormat = Literal["jsonl", "puae-jsonl", "insert_values", "sql", "parquet"]


@configspec(init=True)
Expand All @@ -38,7 +38,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
def generic_capabilities(preferred_loader_file_format: TLoaderFileFormat = None) -> "DestinationCapabilitiesContext":
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = preferred_loader_file_format
caps.supported_loader_file_formats = ["jsonl", "insert_values"]
caps.supported_loader_file_formats = ["jsonl", "insert_values", "parquet"]
caps.escape_identifier = identity
caps.escape_literal = serialize_value
caps.max_identifier_length = 65536
Expand Down
Empty file added dlt/common/libs/__init__.py
Empty file.
33 changes: 33 additions & 0 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from dlt.common.exceptions import MissingDependencyException
from typing import Any

try:
import pyarrow
import pyarrow.parquet
except ImportError:
raise MissingDependencyException("DLT parquet Helpers", ["parquet"], "DLT Helpers for for parquet.")


def get_py_arrow_datatype(column_type: str) -> Any:
if column_type == "text":
return pyarrow.string()
elif column_type == "double":
return pyarrow.float64()
elif column_type == "bool":
return pyarrow.bool_()
elif column_type == "timestamp":
return pyarrow.timestamp('ms')
elif column_type == "bigint":
return pyarrow.int64()
elif column_type == "binary":
return pyarrow.binary()
elif column_type == "complex":
return pyarrow.string()
elif column_type == "decimal":
return pyarrow.decimal128(38, 18)
elif column_type == "wei":
return pyarrow.decimal128(38, 0)
elif column_type == "date":
return pyarrow.date32()
else:
raise ValueError(column_type)
6 changes: 4 additions & 2 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dlt.common.storages.load_storage import LoadPackageInfo
from dlt.common.typing import DictStrAny, REPattern
from dlt.common.jsonpath import delete_matches, TAnyJsonPath
from dlt.common.data_writers.writers import TLoaderFileFormat


class ExtractInfo(NamedTuple):
Expand Down Expand Up @@ -179,8 +180,9 @@ def run(
write_disposition: TWriteDisposition = None,
columns: Sequence[TColumnSchema] = None,
primary_key: TColumnKey = None,
schema: Schema = None
) -> LoadInfo:
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None
) -> LoadInfo:
...

def _set_context(self, is_active: bool) -> None:
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def _configure(config: BigQueryClientConfiguration = config.value) -> BigQueryCl
def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = "jsonl"
caps.supported_loader_file_formats = ["jsonl", "sql"]
caps.supported_loader_file_formats = ["jsonl", "sql", "parquet"]
caps.escape_identifier = escape_bigquery_identifier
caps.escape_literal = None
caps.max_identifier_length = 1024
Expand Down
8 changes: 7 additions & 1 deletion dlt/destinations/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,20 @@ def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]
def _create_load_job(self, table_name: str, write_disposition: TWriteDisposition, file_path: str) -> bigquery.LoadJob:
# append to table for merge loads (append to stage) and regular appends
bq_wd = bigquery.WriteDisposition.WRITE_TRUNCATE if write_disposition == "replace" else bigquery.WriteDisposition.WRITE_APPEND

# choose correct source format
source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
if file_path.endswith("parquet"):
source_format = bigquery.SourceFormat.PARQUET

# if merge then load to staging
with self.sql_client.with_staging_dataset(write_disposition == "merge"):
job_id = BigQueryLoadJob.get_job_id_from_file_path(file_path)
job_config = bigquery.LoadJobConfig(
autodetect=False,
write_disposition=bq_wd,
create_disposition=bigquery.CreateDisposition.CREATE_NEVER,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
source_format=source_format,
ignore_unknown_values=False,
max_bad_records=0)
with open(file_path, "rb") as f:
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/filesystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def _configure(config: FilesystemClientConfiguration = config.value) -> Filesyst


def capabilities() -> DestinationCapabilitiesContext:
return DestinationCapabilitiesContext.generic_capabilities("jsonl")
return DestinationCapabilitiesContext.generic_capabilities("parquet")


def client(schema: Schema, initial_config: DestinationClientDwhConfiguration = config.value) -> JobClientBase:
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/postgres/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class PostgresCredentials(ConnectionStringCredentials):
def parse_native_representation(self, native_value: Any) -> None:
super().parse_native_representation(native_value)
self.connect_timeout = int(self.query.get("connect_timeout", self.connect_timeout))
if not self.is_partial():
self.resolve()

def on_resolved(self) -> None:
self.database = self.database.lower()
Expand Down
15 changes: 11 additions & 4 deletions dlt/destinations/snowflake/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import TSecretStrValue
from dlt.common.configuration.specs import ConnectionStringCredentials
from dlt.common.configuration.exceptions import ConfigurationValueError
from dlt.common.configuration import configspec
from dlt.common.destination.reference import DestinationClientDwhConfiguration

Expand Down Expand Up @@ -46,10 +47,16 @@ class SnowflakeCredentials(ConnectionStringCredentials):

def parse_native_representation(self, native_value: Any) -> None:
super().parse_native_representation(native_value)
if 'warehouse' in self.query:
self.warehouse = self.query['warehouse']
if 'role' in self.query:
self.role = self.query['role']
self.warehouse = self.query.get('warehouse')
self.role = self.query.get('role')
self.private_key = self.query.get('private_key') # type: ignore
self.private_key_passphrase = self.query.get('private_key_passphrase') # type: ignore
if not self.is_partial() and (self.password or self.private_key):
self.resolve()

def on_resolved(self) -> None:
if not self.password and not self.private_key:
raise ConfigurationValueError("Please specify password or private_key. SnowflakeCredentials supports password and private key authentication and one of those must be specified.")

def to_url(self) -> URL:
query = dict(self.query or {})
Expand Down
13 changes: 0 additions & 13 deletions dlt/helpers/parquet.py

This file was deleted.

10 changes: 5 additions & 5 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ def __init__(self, collector: Collector = NULL_COLLECTOR, schema_storage: Schema
self.schema_storage: SchemaStorage = None

# setup storages
self.create_storages(config.destination_capabilities.preferred_loader_file_format)
self.create_storages()
# create schema storage with give type
self.schema_storage = schema_storage or SchemaStorage(self.config._schema_storage_config, makedirs=True)

def create_storages(self, loader_file_format: TLoaderFileFormat) -> None:
def create_storages(self) -> None:
# pass initial normalize storage config embedded in normalize config
self.normalize_storage = NormalizeStorage(True, config=self.config._normalize_storage_config)
# normalize saves in preferred format but can read all supported formats
self.load_storage = LoadStorage(True, loader_file_format, LoadStorage.ALL_SUPPORTED_FILE_FORMATS, config=self.config._load_storage_config)
self.load_storage = LoadStorage(True, self.config.destination_capabilities.preferred_loader_file_format, LoadStorage.ALL_SUPPORTED_FILE_FORMATS, config=self.config._load_storage_config)


@staticmethod
Expand All @@ -70,7 +70,7 @@ def w_normalize_files(
destination_caps: DestinationCapabilitiesContext,
stored_schema: TStoredSchema,
load_id: str,
extracted_items_files: Sequence[str]
extracted_items_files: Sequence[str],
) -> TWorkerRV:

schema_updates: List[TSchemaUpdate] = []
Expand Down Expand Up @@ -230,7 +230,7 @@ def map_single(self, schema: Schema, load_id: str, files: Sequence[str]) -> TMap
self.config.destination_capabilities,
schema.to_dict(),
load_id,
files
files,
)
self.update_schema(schema, result[0])
self.collector.update("Files", len(result[2]))
Expand Down
1 change: 1 addition & 0 deletions dlt/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dlt.common.configuration.inject import get_orig_args, last_config
from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg
from dlt.common.pipeline import LoadInfo, PipelineContext, get_dlt_pipelines_dir
from dlt.common.data_writers import TLoaderFileFormat
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need it here?


from dlt.pipeline.configuration import PipelineConfiguration, ensure_correct_pipeline_kwargs
from dlt.pipeline.pipeline import Pipeline
Expand Down
2 changes: 2 additions & 0 deletions dlt/pipeline/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dlt.common.configuration.specs import RunConfiguration, BaseConfiguration
from dlt.common.typing import AnyFun, TSecretValue
from dlt.common.utils import digest256
from dlt.common.data_writers import TLoaderFileFormat



Expand All @@ -12,6 +13,7 @@ class PipelineConfiguration(BaseConfiguration):
pipeline_name: Optional[str] = None
pipelines_dir: Optional[str] = None
destination_name: Optional[str] = None
loader_file_format: Optional[TLoaderFileFormat] = None
dataset_name: Optional[str] = None
pipeline_salt: Optional[TSecretValue] = None
restore_from_destination: bool = True
Expand Down
Loading