Skip to content

Commit

Permalink
build(ingest): support flake8 6.0.0 (datahub-project#6540)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and cccs-Dustin committed Feb 1, 2023
1 parent 115d71d commit 3addeb0
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 6 deletions.
Expand Up @@ -631,7 +631,6 @@ def get_upstream_lineage_info(
return None

def test_capability(self, project_id: str) -> None:
lineage_metadata: Dict[str, Set[str]]
if self.config.use_exported_bigquery_audit_metadata:
bigquery_client: BigQueryClient = BigQueryClient(project=project_id)
entries = self._get_exported_bigquery_audit_metadata(
Expand Down
Expand Up @@ -4,7 +4,7 @@
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Iterable, List, MutableMapping, Optional, Set, Union, cast
from typing import Any, Dict, Iterable, List, MutableMapping, Optional, Union, cast

import cachetools
from google.cloud.bigquery import Client as BigQueryClient
Expand Down Expand Up @@ -443,7 +443,6 @@ def _extract_operational_meta(
) -> Optional[OperationalDataMeta]:
# If we don't have Query object that means this is a queryless read operation or a read operation which was not executed as JOB
# https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.TableDataRead.Reason/
operation_meta: OperationalDataMeta
if not event.query_event and event.read_event:
return OperationalDataMeta(
statement_type=OperationTypeClass.CUSTOM,
Expand Down Expand Up @@ -839,7 +838,6 @@ def _get_parsed_bigquery_log_events(
)

def test_capability(self, project_id: str) -> None:
lineage_metadata: Dict[str, Set[str]]
for entry in self._get_parsed_bigquery_log_events(project_id, limit=1):
logger.debug(f"Connection test got one {entry}")
return
Expand Up @@ -141,7 +141,8 @@ def get_schemas_from_confluent_ref_protobuf(
if schema_seen is None:
schema_seen = set()

for schema_ref in schema.references: # type: SchemaReference
schema_ref: SchemaReference
for schema_ref in schema.references:
ref_subject: str = schema_ref["subject"]
if ref_subject in schema_seen:
continue
Expand Down
Expand Up @@ -83,7 +83,6 @@ def execute(self, context: Any) -> bool:
else:
raise Exception(f"urn parameter has invalid type {type(self.urn)}")

partition: Optional[str]
for urn in urns:
self.log.info(f"Checking if dataset {self.urn} is ready to be consumed")
ret = self.circuit_breaker.is_circuit_breaker_active(
Expand Down

0 comments on commit 3addeb0

Please sign in to comment.