Skip to content
This repository was archived by the owner on Oct 31, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
from typing import Dict, Sequence, Tuple, Type, Union
import pkg_resources

import google.api_core.client_options as ClientOptions # type: ignore
from google.api_core.client_options import ClientOptions # type: ignore
from google.api_core import exceptions as core_exceptions # type: ignore
from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore
from google.auth import credentials as ga_credentials # type: ignore
from google.oauth2 import service_account # type: ignore

OptionalRetry = Union[retries.Retry, object]

from google.cloud.dataflow_v1beta3.types import jobs
from google.cloud.dataflow_v1beta3.types import templates
from .transports.base import FlexTemplatesServiceTransport, DEFAULT_CLIENT_INFO
Expand Down Expand Up @@ -164,16 +166,16 @@ def __init__(

async def launch_flex_template(
self,
request: templates.LaunchFlexTemplateRequest = None,
request: Union[templates.LaunchFlexTemplateRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> templates.LaunchFlexTemplateResponse:
r"""Launch a job with a FlexTemplate.

Args:
request (:class:`google.cloud.dataflow_v1beta3.types.LaunchFlexTemplateRequest`):
request (Union[google.cloud.dataflow_v1beta3.types.LaunchFlexTemplateRequest, dict]):
The request object. A request to launch a Cloud Dataflow
job from a FlexTemplate.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from google.auth.exceptions import MutualTLSChannelError # type: ignore
from google.oauth2 import service_account # type: ignore

OptionalRetry = Union[retries.Retry, object]

from google.cloud.dataflow_v1beta3.types import jobs
from google.cloud.dataflow_v1beta3.types import templates
from .transports.base import FlexTemplatesServiceTransport, DEFAULT_CLIENT_INFO
Expand Down Expand Up @@ -339,7 +341,7 @@ def launch_flex_template(
self,
request: Union[templates.LaunchFlexTemplateRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> templates.LaunchFlexTemplateResponse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#
import abc
from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
import packaging.version
import pkg_resources

import google.auth # type: ignore
Expand All @@ -37,15 +36,6 @@
except pkg_resources.DistributionNotFound:
DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo()

try:
# google.auth.__version__ was added in 1.26.0
_GOOGLE_AUTH_VERSION = google.auth.__version__
except AttributeError:
try: # try pkg_resources if it is available
_GOOGLE_AUTH_VERSION = pkg_resources.get_distribution("google-auth").version
except pkg_resources.DistributionNotFound: # pragma: NO COVER
_GOOGLE_AUTH_VERSION = None


class FlexTemplatesServiceTransport(abc.ABC):
"""Abstract transport class for FlexTemplatesService."""
Expand Down Expand Up @@ -100,7 +90,7 @@ def __init__(
host += ":443"
self._host = host

scopes_kwargs = self._get_scopes_kwargs(self._host, scopes)
scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES}

# Save the scopes.
self._scopes = scopes
Expand Down Expand Up @@ -133,29 +123,6 @@ def __init__(
# Save the credentials.
self._credentials = credentials

# TODO(busunkim): This method is in the base transport
# to avoid duplicating code across the transport classes. These functions
# should be deleted once the minimum required versions of google-auth is increased.

# TODO: Remove this function once google-auth >= 1.25.0 is required
@classmethod
def _get_scopes_kwargs(
cls, host: str, scopes: Optional[Sequence[str]]
) -> Dict[str, Optional[Sequence[str]]]:
"""Returns scopes kwargs to pass to google-auth methods depending on the google-auth version"""

scopes_kwargs = {}

if _GOOGLE_AUTH_VERSION and (
packaging.version.parse(_GOOGLE_AUTH_VERSION)
>= packaging.version.parse("1.25.0")
):
scopes_kwargs = {"scopes": scopes, "default_scopes": cls.AUTH_SCOPES}
else:
scopes_kwargs = {"scopes": scopes or cls.AUTH_SCOPES}

return scopes_kwargs

def _prep_wrapped_messages(self, client_info):
# Precompute the wrapped methods.
self._wrapped_methods = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from google.api_core import grpc_helpers_async # type: ignore
from google.auth import credentials as ga_credentials # type: ignore
from google.auth.transport.grpc import SslCredentials # type: ignore
import packaging.version

import grpc # type: ignore
from grpc.experimental import aio # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
from typing import Dict, Sequence, Tuple, Type, Union
import pkg_resources

import google.api_core.client_options as ClientOptions # type: ignore
from google.api_core.client_options import ClientOptions # type: ignore
from google.api_core import exceptions as core_exceptions # type: ignore
from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore
from google.auth import credentials as ga_credentials # type: ignore
from google.oauth2 import service_account # type: ignore

OptionalRetry = Union[retries.Retry, object]

from google.cloud.dataflow_v1beta3.services.jobs_v1_beta3 import pagers
from google.cloud.dataflow_v1beta3.types import environment
from google.cloud.dataflow_v1beta3.types import jobs
Expand Down Expand Up @@ -164,9 +166,9 @@ def __init__(

async def create_job(
self,
request: jobs.CreateJobRequest = None,
request: Union[jobs.CreateJobRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> jobs.Job:
Expand All @@ -179,7 +181,7 @@ async def create_job(
will always start in ``us-central1``.

Args:
request (:class:`google.cloud.dataflow_v1beta3.types.CreateJobRequest`):
request (Union[google.cloud.dataflow_v1beta3.types.CreateJobRequest, dict]):
The request object. Request to create a Cloud Dataflow
job.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
Expand Down Expand Up @@ -213,9 +215,9 @@ async def create_job(

async def get_job(
self,
request: jobs.GetJobRequest = None,
request: Union[jobs.GetJobRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> jobs.Job:
Expand All @@ -228,7 +230,7 @@ async def get_job(
get the state of jobs that are running in ``us-central1``.

Args:
request (:class:`google.cloud.dataflow_v1beta3.types.GetJobRequest`):
request (Union[google.cloud.dataflow_v1beta3.types.GetJobRequest, dict]):
The request object. Request to get the state of a Cloud
Dataflow job.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
Expand Down Expand Up @@ -262,9 +264,9 @@ async def get_job(

async def update_job(
self,
request: jobs.UpdateJobRequest = None,
request: Union[jobs.UpdateJobRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> jobs.Job:
Expand All @@ -278,7 +280,7 @@ async def update_job(
``us-central1``.

Args:
request (:class:`google.cloud.dataflow_v1beta3.types.UpdateJobRequest`):
request (Union[google.cloud.dataflow_v1beta3.types.UpdateJobRequest, dict]):
The request object. Request to update a Cloud Dataflow
job.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
Expand Down Expand Up @@ -312,9 +314,9 @@ async def update_job(

async def list_jobs(
self,
request: jobs.ListJobsRequest = None,
request: Union[jobs.ListJobsRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.ListJobsAsyncPager:
Expand All @@ -329,7 +331,7 @@ async def list_jobs(
running in ``us-central1``.

Args:
request (:class:`google.cloud.dataflow_v1beta3.types.ListJobsRequest`):
request (Union[google.cloud.dataflow_v1beta3.types.ListJobsRequest, dict]):
The request object. Request to list Cloud Dataflow jobs.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
Expand Down Expand Up @@ -378,16 +380,16 @@ async def list_jobs(

async def aggregated_list_jobs(
self,
request: jobs.ListJobsRequest = None,
request: Union[jobs.ListJobsRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.AggregatedListJobsAsyncPager:
r"""List the jobs of a project across all regions.

Args:
request (:class:`google.cloud.dataflow_v1beta3.types.ListJobsRequest`):
request (Union[google.cloud.dataflow_v1beta3.types.ListJobsRequest, dict]):
The request object. Request to list Cloud Dataflow jobs.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
Expand Down Expand Up @@ -436,17 +438,17 @@ async def aggregated_list_jobs(

async def check_active_jobs(
self,
request: jobs.CheckActiveJobsRequest = None,
request: Union[jobs.CheckActiveJobsRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> jobs.CheckActiveJobsResponse:
r"""Check for existence of active jobs in the given
project across all regions.

Args:
request (:class:`google.cloud.dataflow_v1beta3.types.CheckActiveJobsRequest`):
request (Union[google.cloud.dataflow_v1beta3.types.CheckActiveJobsRequest, dict]):
The request object. Request to check is active jobs
exists for a project
retry (google.api_core.retry.Retry): Designation of what errors, if any,
Expand Down Expand Up @@ -478,16 +480,16 @@ async def check_active_jobs(

async def snapshot_job(
self,
request: jobs.SnapshotJobRequest = None,
request: Union[jobs.SnapshotJobRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> snapshots.Snapshot:
r"""Snapshot the state of a streaming job.

Args:
request (:class:`google.cloud.dataflow_v1beta3.types.SnapshotJobRequest`):
request (Union[google.cloud.dataflow_v1beta3.types.SnapshotJobRequest, dict]):
The request object. Request to create a snapshot of a
job.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
Expand Down
16 changes: 9 additions & 7 deletions google/cloud/dataflow_v1beta3/services/jobs_v1_beta3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from google.auth.exceptions import MutualTLSChannelError # type: ignore
from google.oauth2 import service_account # type: ignore

OptionalRetry = Union[retries.Retry, object]

from google.cloud.dataflow_v1beta3.services.jobs_v1_beta3 import pagers
from google.cloud.dataflow_v1beta3.types import environment
from google.cloud.dataflow_v1beta3.types import jobs
Expand Down Expand Up @@ -340,7 +342,7 @@ def create_job(
self,
request: Union[jobs.CreateJobRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> jobs.Job:
Expand Down Expand Up @@ -390,7 +392,7 @@ def get_job(
self,
request: Union[jobs.GetJobRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> jobs.Job:
Expand Down Expand Up @@ -440,7 +442,7 @@ def update_job(
self,
request: Union[jobs.UpdateJobRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> jobs.Job:
Expand Down Expand Up @@ -491,7 +493,7 @@ def list_jobs(
self,
request: Union[jobs.ListJobsRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.ListJobsPager:
Expand Down Expand Up @@ -558,7 +560,7 @@ def aggregated_list_jobs(
self,
request: Union[jobs.ListJobsRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.AggregatedListJobsPager:
Expand Down Expand Up @@ -617,7 +619,7 @@ def check_active_jobs(
self,
request: Union[jobs.CheckActiveJobsRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> jobs.CheckActiveJobsResponse:
Expand Down Expand Up @@ -660,7 +662,7 @@ def snapshot_job(
self,
request: Union[jobs.SnapshotJobRequest, dict] = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> snapshots.Snapshot:
Expand Down
Loading