Skip to content

Commit

Permalink
Remove replace BQ source with view (#139)
Browse files Browse the repository at this point in the history
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed May 4, 2022
1 parent 6f43304 commit d22817c
Showing 1 changed file with 1 addition and 84 deletions.
85 changes: 1 addition & 84 deletions python/feast_spark/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,10 @@ def start_historical_feature_retrieval_job(
) -> RetrievalJob:
launcher = resolve_launcher(client.config)
feature_sources = [
_source_to_argument(
replace_bq_table_with_joined_view(feature_table, entity_source),
client.config,
)
_source_to_argument(feature_table.batch_source, client.config,)
for feature_table in feature_tables
]

extra_packages = []
if output_format == "tfrecord":
extra_packages.append("com.linkedin.sparktfrecord:spark-tfrecord_2.12:0.3.0")

return launcher.historical_feature_retrieval(
RetrievalJobParameters(
project=project,
Expand All @@ -243,41 +236,11 @@ def start_historical_feature_retrieval_job(
for feature_table in feature_tables
],
destination={"format": output_format, "path": output_path},
extra_packages=extra_packages,
checkpoint_path=client.config.get(opt.CHECKPOINT_PATH),
)
)


def replace_bq_table_with_joined_view(
feature_table: FeatureTable, entity_source: Union[FileSource, BigQuerySource],
) -> Union[FileSource, BigQuerySource]:
"""
Applies optimization to historical retrieval. Instead of pulling all data from Batch Source,
with this optimization we join feature values & entities on Data Warehouse side (improving data locality).
Several conditions should be met to enable this optimization:
* entities are staged to BigQuery
* feature values are in in BigQuery
* Entity columns are not mapped (ToDo: fix this limitation)
:return: replacement for feature source
"""
if not isinstance(feature_table.batch_source, BigQuerySource):
return feature_table.batch_source

if not isinstance(entity_source, BigQuerySource):
return feature_table.batch_source

if any(
entity in feature_table.batch_source.field_mapping
for entity in feature_table.entities
):
return feature_table.batch_source

return create_bq_view_of_joined_features_and_entities(
feature_table.batch_source, entity_source, feature_table.entities,
)


def table_reference_from_string(table_ref: str):
"""
Parses reference string with format "{project}:{dataset}.{table}" into bigquery.TableReference
Expand All @@ -291,52 +254,6 @@ def table_reference_from_string(table_ref: str):
)


def create_bq_view_of_joined_features_and_entities(
source: BigQuerySource, entity_source: BigQuerySource, entity_names: List[str]
) -> BigQuerySource:
"""
Creates BQ view that joins tables from `source` and `entity_source` with join key derived from `entity_names`.
Returns BigQuerySource with reference to created view. The BQ view will be created in the same BQ dataset as `entity_source`.
"""
from google.cloud import bigquery

bq_client = bigquery.Client()

source_ref = table_reference_from_string(source.bigquery_options.table_ref)
entities_ref = table_reference_from_string(entity_source.bigquery_options.table_ref)

destination_ref = bigquery.TableReference(
bigquery.DatasetReference(entities_ref.project, entities_ref.dataset_id),
f"_view_{source_ref.table_id}_{datetime.now():%Y%m%d%H%M%s}",
)

view = bigquery.Table(destination_ref)

join_template = """
SELECT source.* FROM
`{entities.project}.{entities.dataset_id}.{entities.table_id}` entities
JOIN
`{source.project}.{source.dataset_id}.{source.table_id}` source
ON
({entity_key})"""

view.view_query = join_template.format(
entities=entities_ref,
source=source_ref,
entity_key=" AND ".join([f"source.{e} = entities.{e}" for e in entity_names]),
)
view.expires = datetime.now() + timedelta(days=1)
bq_client.create_table(view)

return BigQuerySource(
event_timestamp_column=source.event_timestamp_column,
created_timestamp_column=source.created_timestamp_column,
table_ref=f"{view.project}:{view.dataset_id}.{view.table_id}",
field_mapping=source.field_mapping,
date_partition_column=source.date_partition_column,
)


def start_offline_to_online_ingestion(
client: "Client",
project: str,
Expand Down

0 comments on commit d22817c

Please sign in to comment.