Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix using .json template extension in GMP operators #9566

Merged
merged 5 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""
This module contains Google CampaignManager operators.
"""
import json
import tempfile
import uuid
from typing import Any, Dict, List, Optional
Expand Down Expand Up @@ -298,6 +299,12 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to

def prepare_template(self) -> None:
# If .json is passed then we have to read the file
if isinstance(self.report, str) and self.report.endswith('.json'):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would work for templates passed in with a *.json extension, but doesn't cover the case where report is passed as a JSON string template. For example:

create_report = GoogleCampaignManagerInsertReportOperator(
        task_id="create_report",
        gcp_conn_id=self.config.get("gcp_connection_id"),
        profile_id=self.config.get("cm_profile_id"),
        report="""
            {
               'name': '{{ params.title }}',
               'accountId': '{{ params.account }}',
               'fileName': '{{ params.file_name }}',
               'type': 'STANDARD',
               'criteria': {
               ...""",
        params=report_params,
        dag=self.dag)

Safest approach may be to parse the json string within the execute method.

Copy link
Member Author

@turbaszek turbaszek Jun 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to documentation user should provide a dictionary so I would say that passing str doesn't have to be supported.

Copy link
Member

@mik-laj mik-laj Jun 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can create a task as below to create a task based on a JSON file from another source e.g. variable.

create_report = GoogleCampaignManagerInsertReportOperator(
        task_id="create_report",
        gcp_conn_id=self.config.get("gcp_connection_id"),
        profile_id=self.config.get("cm_profile_id"),
        report=json.loads("""
            {
               'name': '{{ params.title }}',
               'accountId': '{{ params.account }}',
               'fileName': '{{ params.file_name }}',
               'type': 'STANDARD',
               'criteria': {
               ..."""),
        params=report_params,
        dag=self.dag)

Otherwise we have ambiguity. The str type means the path to the file in one case and the contents of the file in other case.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, although you can't take advantage of Airflow's built-in macros like task_instance. Also deviates from the behavior of other templated operators (e.g. BashOperator).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can because the templates are rendered recursively.

def render_template( # pylint: disable=too-many-return-statements
self, content: Any, context: Dict, jinja_env: Optional[jinja2.Environment] = None,
seen_oids: Optional[Set] = None
) -> Any:
"""
Render a templated string. The content can be a collection holding multiple templated strings and will
be templated recursively.
:param content: Content to template. Only strings can be templated (may be inside collection).
:type content: Any
:param context: Dict with values to apply on templated content
:type context: dict
:param jinja_env: Jinja environment. Can be provided to avoid re-creating Jinja environments during
recursion.
:type jinja_env: jinja2.Environment
:param seen_oids: template fields already rendered (to avoid RecursionError on circular dependencies)
:type seen_oids: set
:return: Templated content
"""
if not jinja_env:
jinja_env = self.get_template_env()
# Imported here to avoid ciruclar dependency
from airflow.models.xcom_arg import XComArg
if isinstance(content, str):
if any(content.endswith(ext) for ext in self.template_ext):
# Content contains a filepath
return jinja_env.get_template(content).render(**context)
else:
return jinja_env.from_string(content).render(**context)
elif isinstance(content, XComArg):
return content.resolve(context)
if isinstance(content, tuple):
if type(content) is not tuple: # pylint: disable=unidiomatic-typecheck
# Special case for named tuples
return content.__class__(
*(self.render_template(element, context, jinja_env) for element in content)
)
else:
return tuple(self.render_template(element, context, jinja_env) for element in content)
elif isinstance(content, list):
return [self.render_template(element, context, jinja_env) for element in content]
elif isinstance(content, dict):
return {key: self.render_template(value, context, jinja_env) for key, value in content.items()}
elif isinstance(content, set):
return {self.render_template(element, context, jinja_env) for element in content}
else:
if seen_oids is None:
seen_oids = set()
self._render_nested_template_fields(content, context, jinja_env, seen_oids)
return content

The problem only occurs when you use Jinja templates to generate new structures e.g. create new array elements.

with open(self.report, 'r') as file:
self.report = json.load(file)

def execute(self, context: Dict):
hook = GoogleCampaignManagerHook(
gcp_conn_id=self.gcp_conn_id,
Expand Down Expand Up @@ -349,7 +356,6 @@ class GoogleCampaignManagerRunReportOperator(BaseOperator):
"gcp_conn_id",
"delegate_to",
)
template_ext = (".json",)

@apply_defaults
def __init__(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
This module contains Google DisplayVideo operators.
"""
import csv
import json
import shutil
import tempfile
import urllib.request
Expand Down Expand Up @@ -75,6 +76,12 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to

def prepare_template(self) -> None:
turbaszek marked this conversation as resolved.
Show resolved Hide resolved
# If .json is passed then we have to read the file
if isinstance(self.body, str) and self.body.endswith('.json'):
with open(self.body, 'r') as file:
self.body = json.load(file)

def execute(self, context: Dict):
hook = GoogleDisplayVideo360Hook(
gcp_conn_id=self.gcp_conn_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""
This module contains Google Search Ads operators.
"""
import json
from tempfile import NamedTemporaryFile
from typing import Any, Dict, Optional

Expand Down Expand Up @@ -70,6 +71,12 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to

def prepare_template(self) -> None:
# If .json is passed then we have to read the file
if isinstance(self.report, str) and self.report.endswith('.json'):
with open(self.report, 'r') as file:
self.report = json.load(file)

def execute(self, context: Dict):
hook = GoogleSearchAdsHook(
gcp_conn_id=self.gcp_conn_id,
Expand Down
3 changes: 2 additions & 1 deletion docs/howto/operator/gcp/campaign_manager.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ Running this operator creates a new report.

You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerInsertReportOperator`
parameters which allows you to dynamically determine values.
parameters which allows you to dynamically determine values. You can provide report definition using
``.json`` file as this operator supports this template extension.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.

.. _howto/operator:GoogleCampaignManagerRunReportOperator:
Expand Down
3 changes: 2 additions & 1 deletion docs/howto/operator/gcp/display_video.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ To create Display&Video 360 report use

Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateReportOperator`
parameters which allow you to dynamically determine values.
parameters which allow you to dynamically determine values. You can provide body definition using ``
.json`` file as this operator supports this template extension.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to be used by other operators.

.. _howto/operator:GoogleDisplayVideo360DeleteReportOperator:
Expand Down
3 changes: 2 additions & 1 deletion docs/howto/operator/gcp/search_ads.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ To insert a Search Ads report use the

You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsInsertReportOperator`
parameters which allows you to dynamically determine values.
parameters which allows you to dynamically determine values. You can provide report definition using ``
.json`` file as this operator supports this template extension.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators:

.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_search_ads.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from tempfile import NamedTemporaryFile
from unittest import TestCase, mock

from airflow.providers.google.marketing_platform.operators.campaign_manager import (
Expand Down Expand Up @@ -183,6 +185,23 @@ def test_execute(self, xcom_mock, mock_base_op, hook_mock):
)
xcom_mock.assert_called_once_with(None, key="report_id", value=report_id)

def test_prepare_template(self):
profile_id = "PROFILE_ID"
report = {"key": "value"}
with NamedTemporaryFile("w+", suffix=".json") as f:
f.write(json.dumps(report))
f.flush()
op = GoogleCampaignManagerInsertReportOperator(
profile_id=profile_id,
report=f.name,
api_version=API_VERSION,
task_id="test_task",
)
op.prepare_template()

assert isinstance(op.report, dict)
assert op.report == report


class TestGoogleCampaignManagerRunReportOperator(TestCase):
@mock.patch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from tempfile import NamedTemporaryFile
from typing import Optional
from unittest import TestCase, mock

Expand Down Expand Up @@ -57,6 +59,19 @@ def test_execute(self, mock_base_op, hook_mock, xcom_mock):
hook_mock.return_value.create_query.assert_called_once_with(query=body)
xcom_mock.assert_called_once_with(None, key="report_id", value=query_id)

def test_prepare_template(self):
body = {"key": "value"}
with NamedTemporaryFile("w+", suffix=".json") as f:
f.write(json.dumps(body))
f.flush()
op = GoogleDisplayVideo360CreateReportOperator(
body=body, api_version=API_VERSION, task_id="test_task"
)
op.prepare_template()

assert isinstance(op.body, dict)
assert op.body == body


class TestGoogleDisplayVideo360DeleteReportOperator(TestCase):
@mock.patch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from tempfile import NamedTemporaryFile
from unittest import TestCase, mock

from airflow.providers.google.marketing_platform.operators.search_ads import (
Expand All @@ -25,7 +27,7 @@
GCP_CONN_ID = "google_cloud_default"


class TestSearchAdsGenerateReportOperator(TestCase):
class TestGoogleSearchAdsInsertReportOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform."
"operators.search_ads.GoogleSearchAdsHook"
Expand All @@ -52,8 +54,21 @@ def test_execute(self, xcom_mock, mock_base_op, hook_mock):
hook_mock.return_value.insert_report.assert_called_once_with(report=report)
xcom_mock.assert_called_once_with(None, key="report_id", value=report_id)

def test_prepare_template(self):
report = {"key": "value"}
with NamedTemporaryFile("w+", suffix=".json") as f:
f.write(json.dumps(report))
f.flush()
op = GoogleSearchAdsInsertReportOperator(
report=report, api_version=API_VERSION, task_id="test_task"
)
op.prepare_template()

class TestSearchAdsGetfileReportOperator(TestCase):
assert isinstance(op.report, dict)
assert op.report == report


class TestGoogleSearchAdsDownloadReportOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform."
"operators.search_ads.NamedTemporaryFile"
Expand Down