Skip to content

Commit

Permalink
Update unsupported data types
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Feb 2, 2024
1 parent 2d50cc7 commit dc984df
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 6 deletions.
24 changes: 24 additions & 0 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dlt.common.storages.file_storage import FileStorage
from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchema, TColumnType, TSchemaTables, TTableFormat
from dlt.common.schema.utils import table_schema_has_type


from dlt.destinations.insert_job_client import InsertValuesJobClient
Expand Down Expand Up @@ -104,6 +105,7 @@ def from_db_type(
class DatabricksLoadJob(LoadJob, FollowupJob):
def __init__(
self,
table: TTableSchema,
file_path: str,
table_name: str,
load_id: str,
Expand Down Expand Up @@ -181,6 +183,27 @@ def __init__(
file_path,
"Databricks loader does not support gzip compressed JSON files. Please disable compression in the data writer configuration: https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression",
)
if table_schema_has_type(table, "decimal"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load DECIMAL type columns from json files. Switch to parquet format to load decimals.",
)
if table_schema_has_type(table, "binary"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load BINARY type columns from json files. Switch to parquet format to load byte values.",
)
if table_schema_has_type(table, "complex"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load complex columns (lists and dicts) from json files. Switch to parquet format to load complex types.",
)
if table_schema_has_type(table, "date"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load DATE type columns from json files. Switch to parquet format to load dates.",
)

source_format = "JSON"
format_options_clause = "FORMAT_OPTIONS('inferTimestamp'='true')"
# Databricks fails when trying to load empty json files, so we have to check the file size
Expand Down Expand Up @@ -236,6 +259,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->

if not job:
job = DatabricksLoadJob(
table,
file_path,
table["name"],
load_id,
Expand Down
19 changes: 13 additions & 6 deletions docs/website/docs/dlt-ecosystem/destinations/databricks.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,25 @@ For more information on staging, see the [staging support](#staging-support) sec

## Supported file formats
* [insert-values](../file-formats/insert-format.md) is used by default
* [jsonl](../file-formats/jsonl.md) supported when staging is enabled. **Note**: Currently loading compressed jsonl files is not supported. `data_writer.disable_compression` should be set to `true` in dlt config
* [jsonl](../file-formats/jsonl.md) supported when staging is enabled (see limitations below)
* [parquet](../file-formats/parquet.md) supported when staging is enabled

The `jsonl` format has some limitations when used with Databricks:

1. Compression must be disabled to load jsonl files in databricks. Set `data_writer.disable_compression` to `true` in dlt config when using this format.
2. The following data types are not supported when using `jsonl` format with `databricks`: `decimal`, `complex`, `date`, `binary`. Use `parquet` if your data contains these types.
3. `bigint` data type with precision is not supported with `jsonl` format


## Staging support

Databricks supports both Amazon S3 and Azure Blob Storage as staging locations. `dlt` will upload files in `parquet` format to the staging location and will instruct Databricks to load data from there.

### Databricks and Amazon S3

Please refer to the [S3 documentation](./filesystem.md#aws-s3) to learn how to set up your bucket with the bucket_url and credentials. For s3, the dlt Databricks loader will use the AWS credentials provided for s3 to access the s3 bucket if not specified otherwise (see config options below). You can specify your s3 bucket directly in your d
Please refer to the [S3 documentation](./filesystem.md#aws-s3) for details on connecting your s3 bucket with the bucket_url and credentials.

lt configuration:

To set up Databricks with s3 as a staging destination:
Example to set up Databricks with s3 as a staging destination:

```python
import dlt
Expand All @@ -83,7 +88,9 @@ pipeline = dlt.pipeline(

### Databricks and Azure Blob Storage

Refer to the [Azure Blob Storage filesystem documentation](./filesystem.md#azure-blob-storage) for setting up your container with the bucket_url and credentials. For Azure Blob Storage, Databricks can directly load data from the storage container specified in the configuration:
Refer to the [Azure Blob Storage filesystem documentation](./filesystem.md#azure-blob-storage) for details on connecting your Azure Blob Storage container with the bucket_url and credentials.

Example to set up Databricks with Azure as a staging destination:

```python
# Create a dlt pipeline that will load
Expand Down
3 changes: 3 additions & 0 deletions tests/load/pipeline/test_stage_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ def test_all_data_types(destination_config: DestinationTestConfiguration) -> Non
):
# Redshift can't load fixed width binary columns from parquet
exclude_columns.append("col7_precision")
if destination_config.destination == "databricks" and destination_config.file_format == "jsonl":
exclude_types.extend(["decimal", "binary", "wei", "complex", "date"])
exclude_columns.append("col1_precision")

column_schemas, data_types = table_update_and_row(
exclude_types=exclude_types, exclude_columns=exclude_columns
Expand Down

0 comments on commit dc984df

Please sign in to comment.