Skip to content

Commit

Permalink
Add LocalToAzureDataLakeStorageOperator (#10814)
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed Oct 5, 2020
1 parent 93475e9 commit c51016b
Show file tree
Hide file tree
Showing 11 changed files with 397 additions and 10 deletions.
@@ -0,0 +1,39 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os
from airflow import models
from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalToAzureDataLakeStorageOperator
from airflow.utils.dates import days_ago

LOCAL_FILE_PATH = os.environ.get("LOCAL_FILE_PATH", 'localfile.txt')
REMOTE_FILE_PATH = os.environ.get("REMOTE_LOCAL_PATH", 'remote')


with models.DAG(
"example_local_to_adls",
start_date=days_ago(1),
schedule_interval=None,
tags=['example'],
) as dag:
# [START howto_operator_local_to_adls]
upload_file = LocalToAzureDataLakeStorageOperator(
task_id='upload_task',
local_path=LOCAL_FILE_PATH,
remote_path=REMOTE_FILE_PATH,
)
# [END howto_operator_local_to_adls]
38 changes: 28 additions & 10 deletions airflow/providers/microsoft/azure/hooks/azure_data_lake.py
Expand Up @@ -24,6 +24,8 @@
login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name (Account Name)
(see connection `azure_data_lake_default` for an example).
"""
from typing import Any, Optional

from azure.datalake.store import core, lib, multithread

from airflow.hooks.base_hook import BaseHook
Expand All @@ -41,13 +43,13 @@ class AzureDataLakeHook(BaseHook):
:type azure_data_lake_conn_id: str
"""

def __init__(self, azure_data_lake_conn_id='azure_data_lake_default'):
def __init__(self, azure_data_lake_conn_id: str = 'azure_data_lake_default'):
super().__init__()
self.conn_id = azure_data_lake_conn_id
self._conn = None
self.account_name = None
self._conn: Optional[core.AzureDLFileSystem] = None
self.account_name: Optional[str] = None

def get_conn(self):
def get_conn(self) -> core.AzureDLFileSystem:
"""Return a AzureDLFileSystem object."""
if not self._conn:
conn = self.get_connection(self.conn_id)
Expand All @@ -61,7 +63,7 @@ def get_conn(self):
self._conn.connect()
return self._conn

def check_for_file(self, file_path):
def check_for_file(self, file_path: str) -> bool:
"""
Check if a file exists on Azure Data Lake.
Expand All @@ -77,8 +79,15 @@ def check_for_file(self, file_path):
return False

def upload_file(
self, local_path, remote_path, nthreads=64, overwrite=True, buffersize=4194304, blocksize=4194304
):
self,
local_path: str,
remote_path: str,
nthreads: int = 64,
overwrite: bool = True,
buffersize: int = 4194304,
blocksize: int = 4194304,
**kwargs,
) -> Any:
"""
Upload a file to Azure Data Lake.
Expand Down Expand Up @@ -113,11 +122,19 @@ def upload_file(
overwrite=overwrite,
buffersize=buffersize,
blocksize=blocksize,
**kwargs,
)

def download_file(
self, local_path, remote_path, nthreads=64, overwrite=True, buffersize=4194304, blocksize=4194304
):
self,
local_path: str,
remote_path: str,
nthreads: int = 64,
overwrite: bool = True,
buffersize: int = 4194304,
blocksize: int = 4194304,
**kwargs,
) -> Any:
"""
Download a file from Azure Blob Storage.
Expand Down Expand Up @@ -153,9 +170,10 @@ def download_file(
overwrite=overwrite,
buffersize=buffersize,
blocksize=blocksize,
**kwargs,
)

def list(self, path):
def list(self, path: str):
"""
List files in Azure Data Lake Storage
Expand Down
103 changes: 103 additions & 0 deletions airflow/providers/microsoft/azure/transfers/local_to_adls.py
@@ -0,0 +1,103 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Dict, Any, Optional
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.azure_data_lake import AzureDataLakeHook
from airflow.utils.decorators import apply_defaults


class LocalToAzureDataLakeStorageOperator(BaseOperator):
"""
Upload file(s) to Azure Data Lake
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:LocalToAzureDataLakeStorageOperator`
:param local_path: local path. Can be single file, directory (in which case,
upload recursively) or glob pattern. Recursive glob patterns using `**`
are not supported
:type local_path: str
:param remote_path: Remote path to upload to; if multiple files, this is the
directory root to write within
:type remote_path: str
:param nthreads: Number of threads to use. If None, uses the number of cores.
:type nthreads: int
:param overwrite: Whether to forcibly overwrite existing files/directories.
If False and remote path is a directory, will quit regardless if any files
would be overwritten or not. If True, only matching filenames are actually
overwritten
:type overwrite: bool
:param buffersize: int [2**22]
Number of bytes for internal buffer. This block cannot be bigger than
a chunk and cannot be smaller than a block
:type buffersize: int
:param blocksize: int [2**22]
Number of bytes for a block. Within each chunk, we write a smaller
block for each API call. This block cannot be bigger than a chunk
:type blocksize: int
:param extra_upload_options: Extra upload options to add to the hook upload method
:type extra_upload_options: dict
:param azure_data_lake_conn_id: Reference to the Azure Data Lake connection
:type azure_data_lake_conn_id: str
"""

template_fields = ("local_path", "remote_path")
ui_color = '#e4f0e8'

@apply_defaults
def __init__(
self,
*,
local_path: str,
remote_path: str,
overwrite: bool = True,
nthreads: int = 64,
buffersize: int = 4194304,
blocksize: int = 4194304,
extra_upload_options: Optional[Dict[str, Any]] = None,
azure_data_lake_conn_id: str = 'azure_data_lake_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.local_path = local_path
self.remote_path = remote_path
self.overwrite = overwrite
self.nthreads = nthreads
self.buffersize = buffersize
self.blocksize = blocksize
self.extra_upload_options = extra_upload_options
self.azure_data_lake_conn_id = azure_data_lake_conn_id

def execute(self, context: Dict[Any, Any]) -> None:
if '**' in self.local_path:
raise AirflowException("Recursive glob patterns using `**` are not supported")
if not self.extra_upload_options:
self.extra_upload_options = {}
hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
self.log.info('Uploading %s to %s', self.local_path, self.remote_path)
return hook.upload_file(
local_path=self.local_path,
remote_path=self.remote_path,
nthreads=self.nthreads,
overwrite=self.overwrite,
buffersize=self.buffersize,
blocksize=self.blocksize,
**self.extra_upload_options,
)
1 change: 1 addition & 0 deletions docs/conf.py
Expand Up @@ -210,6 +210,7 @@
# Templates or partials
'autoapi_templates',
'howto/operator/google/_partials',
'howto/operator/microsoft/_partials'
]

ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
Expand Down
1 change: 1 addition & 0 deletions docs/howto/operator/index.rst
Expand Up @@ -38,6 +38,7 @@ determine what actually executes when your DAG runs.
google/index
http
kubernetes
microsoft/index
papermill
python
external_task_sensor
Expand Down
34 changes: 34 additions & 0 deletions docs/howto/operator/microsoft/_partials/prerequisite_tasks.rst
@@ -0,0 +1,34 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
To use these operators, you must do a few things:

* Create necessary resources using `AZURE PORTAL`_ or `AZURE CLI`_.
* Install API libraries via **pip**.

.. code-block:: bash
pip install 'apache-airflow[azure]'
Detailed information is available :doc:`/installation`

* :doc:`Setup Connection </howto/connection/azure>`.

.. _AZURE PORTAL: https://portal.azure.com
.. _AZURE CLI: https://docs.microsoft.com/en-us/cli/azure/
26 changes: 26 additions & 0 deletions docs/howto/operator/microsoft/index.rst
@@ -0,0 +1,26 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Microsoft Operators
===================

.. toctree::
:maxdepth: 1

transfer/index
28 changes: 28 additions & 0 deletions docs/howto/operator/microsoft/transfer/index.rst
@@ -0,0 +1,28 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Microsoft Transfer Operators
===================================


.. toctree::
:maxdepth: 1
:glob:

*
58 changes: 58 additions & 0 deletions docs/howto/operator/microsoft/transfer/local_to_adls.rst
@@ -0,0 +1,58 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Upload data from Local Filesystem to Azure Data Lake
====================================================
The `Azure Data Lake <https://azure.microsoft.com/en-us/solutions/data-lake/>`__ (ADL) make it easy to store data of
any size, shape, and speed.
This page shows how to upload data from local filesystem to ADL.

.. contents::
:depth: 1
:local:


Prerequisite Tasks
^^^^^^^^^^^^^^^^^^

.. include::/howto/operator/microsoft/_partials/prerequisite_tasks.rst
.. _howto/operator:LocalToAzureDataLakeStorageOperator:

LocalToAzureDataLakeStorageOperator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:class:`~airflow.providers.microsoft.azure.transfers.local_to_adls.LocalToAzureDataLakeStorageOperator` allows you to
upload data from local filesystem to ADL.


Below is an example of using this operator to upload a file to ADL.

.. exampleinclude:: /../airflow/providers/microsoft/azure/example_dags/example_local_to_adls.py
:language: python
:dedent: 0
:start-after: [START howto_operator_local_to_adls]
:end-before: [END howto_operator_local_to_adls]


Reference
---------

For further information, look at:

* `Azure Data lake Storage Documentation <https://docs.microsoft.com/en-us/azure/data-lake-store/>`__

0 comments on commit c51016b

Please sign in to comment.