Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source azure blob storage: certification #37504

Merged
merged 20 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c400532
Source Azure Blob Storage: add unit tests
artem1205 Apr 22, 2024
42af8a4
Source Azure Blob Storage: ref
artem1205 Apr 22, 2024
842abcb
Source Azure Blob Storage: add test for stream reader
artem1205 Apr 22, 2024
bdcfeb5
Source Azure Blob Storage: ref
artem1205 Apr 22, 2024
3e0fe50
Source Azure Blob Storage: add freezegun && update deps
artem1205 Apr 22, 2024
20e2eb9
Source Azure Blob Storage: add tests
artem1205 Apr 22, 2024
5b966ff
Source Azure Blob Storage: ref legacy config migration
artem1205 Apr 22, 2024
956a4d6
Source Azure Blob Storage: ref legacy config migration
artem1205 Apr 22, 2024
58a78fb
Source Azure Blob Storage: format
artem1205 Apr 22, 2024
04df48b
Source Azure Blob Storage: bump version
artem1205 Apr 22, 2024
c355125
Source Azure Blob Storage: fix test
artem1205 Apr 22, 2024
7f45551
Source Azure Blob Storage: reorder spec
artem1205 Apr 23, 2024
3515425
Source Azure Blob Storage: reorder spec
artem1205 Apr 23, 2024
6894bc6
Source Azure Blob Storage: Add Prerequisites
artem1205 Apr 23, 2024
4a69e4e
Source Azure Blob Storage: update metadata
artem1205 Apr 23, 2024
5f859e0
Source Azure Blob Storage: update docs
artem1205 Apr 23, 2024
c0789cf
Source Azure Blob Storage: add error handler
artem1205 Apr 23, 2024
24ac101
Merge remote-tracking branch 'refs/remotes/origin/master' into artem1…
artem1205 Apr 28, 2024
643d774
Source Azure Blob Storage: add unit test
artem1205 Apr 28, 2024
5a929a9
Source Azure Blob Storage: set maxSecondsBetweenMessages
artem1205 Apr 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,11 @@
"required": ["name", "format"]
}
},
"azure_blob_storage_account_name": {
"title": "Azure Blob Storage account name",
"description": "The account's name of the Azure Blob Storage.",
"examples": ["airbyte5storage"],
"order": 2,
"type": "string"
},
"credentials": {
"title": "Authentication",
"description": "Credentials for connecting to the Azure Blob Storage",
"type": "object",
"order": 3,
"order": 2,
"oneOf": [
{
"title": "Authenticate via Oauth2",
Expand Down Expand Up @@ -434,6 +427,13 @@
}
]
},
"azure_blob_storage_account_name": {
"title": "Azure Blob Storage account name",
"description": "The account's name of the Azure Blob Storage.",
"examples": ["airbyte5storage"],
"order": 3,
"type": "string"
},
"azure_blob_storage_container_name": {
"title": "Azure blob storage container (Bucket) Name",
"description": "The name of the Azure blob storage container.",
Expand All @@ -451,8 +451,8 @@
},
"required": [
"streams",
"azure_blob_storage_account_name",
"credentials",
"azure_blob_storage_account_name",
"azure_blob_storage_container_name"
]
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
data:
ab_internal:
ql: 100
sl: 100
ql: 400
sl: 200
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.2.0@sha256:c22a9d97464b69d6ef01898edf3f8612dc11614f05a84984451dde195f337db9
connectorSubtype: file
connectorType: source
definitionId: fdaaba68-4875-4ed9-8fcd-4ae1e0a25093
dockerImageTag: 0.4.1
dockerImageTag: 0.4.2
dockerRepository: airbyte/source-azure-blob-storage
documentationUrl: https://docs.airbyte.com/integrations/sources/azure-blob-storage
githubIssueLabel: source-azure-blob-storage
icon: azureblobstorage.svg
license: MIT
maxSecondsBetweenMessages: 1
name: Azure Blob Storage
remoteRegistries:
pypi:
Expand All @@ -23,8 +24,8 @@ data:
enabled: true
oss:
enabled: true
releaseStage: alpha
supportLevel: community
releaseStage: generally_available
supportLevel: certified
tags:
- language:python
- cdk:python-file-based
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.4.1"
version = "0.4.2"
name = "source-azure-blob-storage"
description = "Source implementation for Azure Blob Storage."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping
from typing import Any

from airbyte_cdk.config_observation import emit_configuration_as_airbyte_control_message
from airbyte_cdk.sources.declarative.models import OAuthConfigSpecification
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
from airbyte_protocol.models import AdvancedAuth, ConnectorSpecification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,17 @@ class SourceAzureBlobStorageSpec(AbstractFileBasedSpec):
def documentation_url(cls) -> AnyUrl:
return AnyUrl("https://docs.airbyte.com/integrations/sources/azure-blob-storage", scheme="https")

azure_blob_storage_account_name: str = Field(
title="Azure Blob Storage account name",
description="The account's name of the Azure Blob Storage.",
examples=["airbyte5storage"],
order=2,
)
credentials: Union[Oauth2, StorageAccountKey] = Field(
title="Authentication",
description="Credentials for connecting to the Azure Blob Storage",
discriminator="auth_type",
type="object",
order=2,
)
azure_blob_storage_account_name: str = Field(
title="Azure Blob Storage account name",
description="The account's name of the Azure Blob Storage.",
examples=["airbyte5storage"],
order=3,
)
azure_blob_storage_container_name: str = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_protocol.models import FailureType
from azure.core.credentials import AccessToken
from azure.core.exceptions import ResourceNotFoundError
from azure.storage.blob import BlobServiceClient, ContainerClient
from smart_open import open

Expand Down Expand Up @@ -80,10 +83,13 @@ def get_matching_files(
) -> Iterable[RemoteFile]:
prefixes = [prefix] if prefix else self.get_prefixes_from_globs(globs)
prefixes = prefixes or [None]
for prefix in prefixes:
for blob in self.azure_container_client.list_blobs(name_starts_with=prefix):
remote_file = RemoteFile(uri=blob.name, last_modified=blob.last_modified.astimezone(pytz.utc).replace(tzinfo=None))
yield from self.filter_files_by_globs_and_start_date([remote_file], globs)
try:
for prefix in prefixes:
for blob in self.azure_container_client.list_blobs(name_starts_with=prefix):
remote_file = RemoteFile(uri=blob.name, last_modified=blob.last_modified.astimezone(pytz.utc).replace(tzinfo=None))
yield from self.filter_files_by_globs_and_start_date([remote_file], globs)
except ResourceNotFoundError as e:
raise AirbyteTracedException(failure_type=FailureType.config_error, internal_message=e.message, message=e.reason or e.message)

def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> IOBase:
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.


import dpath.util
from source_azure_blob_storage import SourceAzureBlobStorageSpec


def test_spec():
config = SourceAzureBlobStorageSpec(
azure_blob_storage_endpoint="https://teststorage.blob.core.windows.net",
azure_blob_storage_account_name="account1",
azure_blob_storage_container_name="airbyte-source-azure-blob-storage-test",
credentials={"auth_type": "storage_account_key", "azure_blob_storage_account_key": "key1"},
streams=[],
start_date="2024-01-01T00:00:00.000000Z",
)

assert config.documentation_url() == "https://docs.airbyte.com/integrations/sources/azure-blob-storage"
assert len(dpath.util.get(config.schema(), "properties/streams/items/properties/format/oneOf/4/properties/processing/oneOf")) == 1
58 changes: 45 additions & 13 deletions docs/integrations/sources/azure-blob-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,33 @@ This page contains the setup guide and reference information for the Azure Blob
Cloud storage may incur egress costs. Egress refers to data that is transferred out of the cloud storage system, such as when you download files or access them from a different location. For more information, see the [Azure Blob Storage pricing guide](https://azure.microsoft.com/en-us/pricing/details/storage/blobs/).
:::

## Prerequisites

- Tenant ID of the Microsoft Azure Application user
- Azure Blob Storage account name
- Azure blob storage container (Bucket) Name

<details>
<summary>
Minimum permissions (role [Storage Blob Data Reader](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles/storage#storage-blob-data-reader) ):
</summary>
```json
[
{
"actions": [
"Microsoft.Storage/storageAccounts/blobServices/containers/read",
"Microsoft.Storage/storageAccounts/blobServices/generateUserDelegationKey/action"
],
"notActions": [],
"dataActions": [
"Microsoft.Storage/storageAccounts/blobServices/containers/blobs/read"
],
"notDataActions": []
}
]
```
</details>

## Setup guide

### Step 1: Set up Azure Blob Storage
Expand All @@ -20,7 +47,7 @@ to use role [Storage Blob Data Reader](https://learn.microsoft.com/en-gb/azure/s

<details>
<summary>
Follow this steps to setup IAM role:
Follow these steps to set up an IAM role:
</summary>

1. Go to Azure portal, select the Storage (or Container) you'd like to sync from and get to Access Control(IAM) -> Role Assignment ![Access Control (IAM)](../../.gitbook/assets/source/azure-blob-storage/access_control_iam.png)
Expand All @@ -38,19 +65,19 @@ Follow this steps to setup IAM role:
2. In the left navigation bar, click **Sources**. In the top-right corner, click **+ New source**.
3. Find and select **Azure Blob Storage** from the list of available sources.
4. Enter the name of your Azure **Account**.
5. Click **Authenticate your Azure Blob Storage account**.
5. Enter your Tenant ID and Click **Authenticate your Azure Blob Storage account**.
6. Log in and authorize the Azure Blob Storage account.
7. Enter the name of the **Container** containing your files to replicate.
8. Add a stream
1. Write the **File Type**
2. In the **Format** box, use the dropdown menu to select the format of the files you'd like to replicate. The supported formats are **CSV**, **Parquet**, **Avro** and **JSONL**. Toggling the **Optional fields** button within the **Format** box will allow you to enter additional configurations based on the selected format. For a detailed breakdown of these settings, refer to the [File Format section](#file-format-settings) below.
3. Give a **Name** to the stream
4. (Optional) - If you want to enforce a specific schema, you can enter a **Input schema**. By default, this value is set to `{}` and will automatically infer the schema from the file\(s\) you are replicating. For details on providing a custom schema, refer to the [User Schema section](#user-schema).
4. (Optional)β€”If you want to enforce a specific schema, you can enter a **Input schema**. By default, this value is set to `{}` and will automatically infer the schema from the file\(s\) you are replicating. For details on providing a custom schema, refer to the [User Schema section](#user-schema).
5. Optionally, enter the **Globs** which dictates which files to be synced. This is a regular expression that allows Airbyte to pattern match the specific files to replicate. If you are replicating all the files within your bucket, use `**` as the pattern. For more precise pattern matching options, refer to the [Path Patterns section](#path-patterns) below.
9. (Optional) Enter the endpoint to use for the data replication.
10. (Optional) Enter the desired start date from which to begin replicating data.

## Supported sync modes
## Supported Streams

The Azure Blob Storage source connector supports the following [sync modes](https://docs.airbyte.com/cloud/core-concepts#connection-sync-modes):

Expand All @@ -63,7 +90,7 @@ The Azure Blob Storage source connector supports the following [sync modes](http
| Replicate Multiple Streams \(distinct tables\) | Yes |
| Namespaces | No |

## File Compressions
### File Compressions

| Compression | Supported? |
|:------------|:-----------|
Expand All @@ -76,7 +103,7 @@ The Azure Blob Storage source connector supports the following [sync modes](http

Please let us know any specific compressions you'd like to see support for next!

## Path Patterns
### Path Patterns

\(tl;dr -&gt; path pattern syntax using [wcmatch.glob](https://facelessuser.github.io/wcmatch/glob/). GLOBSTAR and SPLIT flags are enabled.\)

Expand Down Expand Up @@ -126,7 +153,7 @@ We want to pick up part1.csv, part2.csv and part3.csv \(excluding another_part1.

As you can probably tell, there are many ways to achieve the same goal with path patterns. We recommend using a pattern that ensures clarity and is robust against future additions to the directory structure.

## User Schema
### User Schema

Providing a schema allows for more control over the output of this stream. Without a provided schema, columns and datatypes will be inferred from the first created file in the bucket matching your path pattern and suffix. This will probably be fine in most cases but there may be situations you want to enforce a schema instead, e.g.:

Expand All @@ -150,9 +177,9 @@ For example:
- `{"id": "integer", "location": "string", "longitude": "number", "latitude": "number"}`
- `{"username": "string", "friends": "array", "information": "object"}`

## File Format Settings
### File Format Settings

### CSV
#### CSV

Since CSV files are effectively plain text, providing specific reader options is often required for correct parsing of the files. These settings are applied when a CSV is created or exported so please ensure that this process happens consistently over time.

Expand Down Expand Up @@ -180,24 +207,24 @@ Leaving this field blank (default option) will disallow escaping.
- **True Values**: A set of case-sensitive strings that should be interpreted as true values.


### Parquet
#### Parquet

Apache Parquet is a column-oriented data storage format of the Apache Hadoop ecosystem. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk. At the moment, partitioned parquet datasets are unsupported. The following settings are available:

- **Convert Decimal Fields to Floats**: Whether to convert decimal fields to floats. There is a loss of precision when converting decimals to floats, so this is not recommended.

### Avro
#### Avro

The Avro parser uses the [Fastavro library](https://fastavro.readthedocs.io/en/latest/). The following settings are available:
- **Convert Double Fields to Strings**: Whether to convert double fields to strings. This is recommended if you have decimal numbers with a high degree of precision because there can be a loss precision when handling floating point numbers.

### JSONL
#### JSONL

There are currently no options for JSONL parsing.

<FieldAnchor field="streams.0.format[unstructured],streams.1.format[unstructured],streams.2.format[unstructured]">

### Document File Type Format (Experimental)
#### Document File Type Format (Experimental)

:::warning
The Document File Type Format is currently an experimental feature and not subject to SLAs. Use at your own risk.
Expand All @@ -213,10 +240,15 @@ This connector utilizes the open source [Unstructured](https://unstructured-io.g

</FieldAnchor>

## Performance considerations

The Azure Blob Storage connector should not encounter any [Microsoft API limitations](https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets#scale-targets-for-blob-storage) under normal usage.

## Changelog

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------|
| 0.4.2 | 2024-04-23 | [37504](https://github.com/airbytehq/airbyte/pull/37504) | Update specification |
| 0.4.1 | 2024-04-22 | [37467](https://github.com/airbytehq/airbyte/pull/37467) | Fix start date filter |
| 0.4.0 | 2024-04-05 | [36825](https://github.com/airbytehq/airbyte/pull/36825) | Add oauth 2.0 support |
| 0.3.6 | 2024-04-03 | [36542](https://github.com/airbytehq/airbyte/pull/36542) | Use Latest CDK; add integration tests |
Expand Down
Loading