Skip to content
Closed
4 changes: 4 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,10 @@ Dataflow job labeling is now supported in Dataflow{Java,Python}Operator with a d
"airflow-version" label, please upgrade your google-cloud-dataflow or apache-beam version
to 2.2.0 or greater.

### Google Cloud Storage Hook

The `GoogleCloudStorageDownloadOperator` can either write to a supplied `filename` or return the content of a file via xcom through `store_to_xcom_key` - both options are mutually exclusive.

### BigQuery Hooks and Operator

The `bql` parameter passed to `BigQueryOperator` and `BigQueryBaseCursor.run_query` has been deprecated and renamed to `sql` for consistency purposes. Using `bql` will still work (and raise a `DeprecationWarning`), but is no longer
Expand Down
12 changes: 9 additions & 3 deletions airflow/contrib/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,12 @@ def rewrite(self, source_bucket, source_object, destination_bucket,

def download(self, bucket_name, object_name, filename=None):
"""
Get a file from Google Cloud Storage.
Downloads a file from Google Cloud Storage.

When no filename is supplied, the operator loads the file into memory and returns its
content. When a filename is supplied, it writes the file to the specified location and
returns the location. For file sizes that exceed the available memory it is recommended
to write to a file.

:param bucket_name: The bucket to fetch from.
:type bucket_name: str
Expand All @@ -176,8 +181,9 @@ def download(self, bucket_name, object_name, filename=None):
if filename:
blob.download_to_filename(filename)
self.log.info('File downloaded to %s', filename)

return blob.download_as_string()
return filename
else:
return blob.download_as_string()

def upload(self, bucket_name, object_name, filename,
mime_type='application/octet-stream', gzip=False):
Expand Down
17 changes: 14 additions & 3 deletions airflow/contrib/operators/gcs_download_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
"""
Downloads a file from Google Cloud Storage.

If a filename is supplied, it writes the file to the specified location, alternatively one can
set the ``store_to_xcom_key`` parameter to True push the file content into xcom. When the file size
exceeds the maximum size for xcom it is recommended to write to a file.

:param bucket: The Google cloud storage bucket where the object is. (templated)
:type bucket: str
:param object: The name of the object to download in the Google cloud
Expand Down Expand Up @@ -81,6 +85,9 @@ def __init__(self,
else:
TypeError("__init__() missing 1 required positional argument: 'object_name'")

if filename is not None and store_to_xcom_key is not None:
raise ValueError("Either filename or store_to_xcom_key can be set")

if google_cloud_storage_conn_id:
warnings.warn(
"The google_cloud_storage_conn_id parameter has been deprecated. You should pass "
Expand All @@ -102,13 +109,17 @@ def execute(self, context):
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to
)
file_bytes = hook.download(bucket_name=self.bucket,
object_name=self.object,
filename=self.filename)

if self.store_to_xcom_key:
file_bytes = hook.download(bucket_name=self.bucket,
object_name=self.object)
if sys.getsizeof(file_bytes) < MAX_XCOM_SIZE:
context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes)
else:
raise AirflowException(
'The size of the downloaded file is too large to push to XCom!'
)
else:
hook.download(bucket_name=self.bucket,
object_name=self.object,
filename=self.filename)
5 changes: 5 additions & 0 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class KubernetesPodOperator(BaseOperator):
:type pod_runtime_info_envs: list[airflow.kubernetes.models.pod_runtime_info_env.PodRuntimeInfoEnv]
:param dnspolicy: Specify a dnspolicy for the pod
:type dnspolicy: str
:param schedulername: Specify a schedulername for the pod
:type schedulername: str
:param full_pod_spec: The complete podSpec
:type full_pod_spec: kubernetes.client.models.V1Pod
"""
Expand Down Expand Up @@ -133,6 +135,7 @@ def execute(self, context):
configmaps=self.configmaps,
security_context=self.security_context,
dnspolicy=self.dnspolicy,
schedulername=self.schedulername,
resources=self.resources,
pod=self.full_pod_spec,
).gen_pod()
Expand Down Expand Up @@ -199,6 +202,7 @@ def __init__(self,
security_context=None,
pod_runtime_info_envs=None,
dnspolicy=None,
schedulername=None,
full_pod_spec=None,
*args,
**kwargs):
Expand Down Expand Up @@ -239,4 +243,5 @@ def __init__(self,
self.security_context = security_context or {}
self.pod_runtime_info_envs = pod_runtime_info_envs or []
self.dnspolicy = dnspolicy
self.schedulername = schedulername
self.full_pod_spec = full_pod_spec
2 changes: 1 addition & 1 deletion airflow/example_dags/example_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def push(**kwargs):
kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


def push_by_returning():
def push_by_returning(**kwargs):
"""Pushes an XCom without a specific target, just by returning it"""
return value_2

Expand Down
222 changes: 222 additions & 0 deletions airflow/gcp/example_dags/example_bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Example Airflow DAG for Google Big Query service
"""
import os
from urllib.parse import urlparse

import airflow
from airflow import models
from airflow.contrib.operators.bigquery_get_data import BigQueryGetDataOperator
from airflow.contrib.operators.bigquery_operator import (
BigQueryOperator,
BigQueryCreateEmptyTableOperator,
BigQueryCreateEmptyDatasetOperator,
BigQueryGetDatasetOperator,
BigQueryPatchDatasetOperator,
BigQueryUpdateDatasetOperator,
BigQueryDeleteDatasetOperator,
BigQueryCreateExternalTableOperator,
)
from airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator
from airflow.contrib.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.operators.bash_operator import BashOperator

# 0x06012c8cf97BEaD5deAe237070F9587f8E7A266d = CryptoKitties contract address
WALLET_ADDRESS = os.environ.get("GCP_ETH_WALLET_ADDRESS", "0x06012c8cf97BEaD5deAe237070F9587f8E7A266d")

default_args = {"start_date": airflow.utils.dates.days_ago(1)}

MOST_VALUABLE_INCOMING_TRANSACTIONS = """
SELECT
value, to_address
FROM
`bigquery-public-data.ethereum_blockchain.transactions`
WHERE
1 = 1
AND DATE(block_timestamp) = "{{ ds }}"
AND to_address = LOWER(@to_address)
ORDER BY value DESC
LIMIT 1000
"""

MOST_ACTIVE_PLAYERS = """
SELECT
COUNT(from_address)
, from_address
FROM
`bigquery-public-data.ethereum_blockchain.transactions`
WHERE
1 = 1
AND DATE(block_timestamp) = "{{ ds }}"
AND to_address = LOWER(@to_address)
GROUP BY from_address
ORDER BY COUNT(from_address) DESC
LIMIT 1000
"""

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")

DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset")

DATA_SAMPLE_GCS_URL = os.environ.get(
"GCP_BIGQUERY_DATA_GCS_URL", "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
)

DATA_SAMPLE_GCS_URL_PARTS = urlparse(DATA_SAMPLE_GCS_URL)
DATA_SAMPLE_GCS_BUCKET_NAME = DATA_SAMPLE_GCS_URL_PARTS.netloc
DATA_SAMPLE_GCS_OBJECT_NAME = DATA_SAMPLE_GCS_URL_PARTS.path[1:]

DATA_EXPORT_BUCKET_NAME = os.environ.get("GCP_BIGQUERY_EXPORT_BUCKET_NAME", "test-bigquery-sample-data")


with models.DAG(
"example_bigquery", default_args=default_args, schedule_interval=None # Override to match your needs
) as dag:

execute_query = BigQueryOperator(
task_id="execute-query",
sql=MOST_VALUABLE_INCOMING_TRANSACTIONS,
use_legacy_sql=False,
query_params=[
{
"name": "to_address",
"parameterType": {"type": "STRING"},
"parameterValue": {"value": WALLET_ADDRESS},
}
],
)

bigquery_execute_multi_query = BigQueryOperator(
task_id="execute-multi-query",
sql=[MOST_VALUABLE_INCOMING_TRANSACTIONS, MOST_ACTIVE_PLAYERS],
use_legacy_sql=False,
query_params=[
{
"name": "to_address",
"parameterType": {"type": "STRING"},
"parameterValue": {"value": WALLET_ADDRESS},
}
],
)

execute_query_save = BigQueryOperator(
task_id="execute-query-save",
sql=MOST_VALUABLE_INCOMING_TRANSACTIONS,
use_legacy_sql=False,
destination_dataset_table="{}.save_query_result".format(DATASET_NAME),
query_params=[
{
"name": "to_address",
"parameterType": {"type": "STRING"},
"parameterValue": {"value": WALLET_ADDRESS},
}
],
)

get_data = BigQueryGetDataOperator(
task_id="get-data",
dataset_id=DATASET_NAME,
table_id="save_query_result",
max_results="10",
selected_fields="value,to_address",
)

get_data_result = BashOperator(
task_id="get-data-result", bash_command="echo \"{{ task_instance.xcom_pull('get-data') }}\""
)

create_external_table = BigQueryCreateExternalTableOperator(
task_id="create-external-table",
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
destination_project_dataset_table="{}.external_table".format(DATASET_NAME),
skip_leading_rows=1,
schema_fields=[{"name": "name", "type": "STRING"}, {"name": "post_abbr", "type": "STRING"}],
)

execute_query_external_table = BigQueryOperator(
task_id="execute-query-external-table",
destination_dataset_table="{}.selected_data_from_external_table".format(DATASET_NAME),
sql='SELECT * FROM `{}.external_table` WHERE name LIKE "W%"'.format(DATASET_NAME),
use_legacy_sql=False,
)

copy_from_selected_data = BigQueryToBigQueryOperator(
task_id="copy-from-selected-data",
source_project_dataset_tables="{}.selected_data_from_external_table".format(DATASET_NAME),
destination_project_dataset_table="{}.copy_of_selected_data_from_external_table".format(DATASET_NAME),
)

bigquery_to_gcs = BigQueryToCloudStorageOperator(
task_id="bigquery-to-gcs",
source_project_dataset_table="{}.selected_data_from_external_table".format(DATASET_NAME),
destination_cloud_storage_uris=["gs://{}/export-bigquery.csv".format(DATA_EXPORT_BUCKET_NAME)],
)

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)

create_table = BigQueryCreateEmptyTableOperator(
task_id="create-table",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)

delete_table = BigQueryTableDeleteOperator(
task_id="delete-table", deletion_dataset_table="{}.test_table".format(DATASET_NAME)
)

get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)

get_dataset_result = BashOperator(
task_id="get-dataset-result",
bash_command="echo \"{{ task_instance.xcom_pull('get-dataset')['id'] }}\"",
)

patch_dataset = BigQueryPatchDatasetOperator(
task_id="patch-dataset",
dataset_id=DATASET_NAME,
dataset_resource={"friendlyName": "Patchet Dataset", "description": "Patched dataset"},
)

update_dataset = BigQueryUpdateDatasetOperator(
task_id="update-dataset", dataset_id=DATASET_NAME, dataset_resource={"description": "Updated dataset"}
)

delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete-dataset", dataset_id=DATASET_NAME, delete_contents=True
)

create_dataset >> execute_query_save >> delete_dataset
create_dataset >> create_table >> delete_dataset
create_dataset >> get_dataset >> delete_dataset
create_dataset >> patch_dataset >> update_dataset >> delete_dataset
execute_query_save >> get_data >> get_dataset_result
get_data >> delete_dataset
create_dataset >> create_external_table >> execute_query_external_table >> \
copy_from_selected_data >> delete_dataset
execute_query_external_table >> bigquery_to_gcs >> delete_dataset
create_table >> delete_table >> delete_dataset
Loading