Skip to content

Commit

Permalink
HACKDAY: Icon CDN (#26158)
Browse files Browse the repository at this point in the history
* Move icons to connector folder

* Delete old icons

* Update upload logic

* Add icon url to definitions

* Update registry model

* Populate cdn url

* DNC butcher the pipeline

* Low hanging fruit fixes

* Fix bucket name

* Merge old and new approaches

* Fix metadata upload step

* Format

* Fix test
  • Loading branch information
bnchrch committed May 25, 2023
1 parent 5707e47 commit 248bbf9
Show file tree
Hide file tree
Showing 370 changed files with 255 additions and 88 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
METADATA_FILE_NAME = "metadata.yaml"
ICON_FILE_NAME = "icon.svg"
METADATA_FOLDER = "metadata"
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
from google.oauth2 import service_account

from metadata_service.models.generated.ConnectorMetadataDefinitionV0 import ConnectorMetadataDefinitionV0
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER, ICON_FILE_NAME
from metadata_service.validators.metadata_validator import validate_metadata_images_in_dockerhub


def get_metadata_file_path(dockerRepository: str, version: str) -> str:
def get_metadata_remote_file_path(dockerRepository: str, version: str) -> str:
"""Get the path to the metadata file for a specific version of a connector.
Args:
Expand All @@ -30,6 +30,18 @@ def get_metadata_file_path(dockerRepository: str, version: str) -> str:
return f"{METADATA_FOLDER}/{dockerRepository}/{version}/{METADATA_FILE_NAME}"


def get_icon_remote_file_path(dockerRepository: str, version: str) -> str:
"""Get the path to the icon file for a specific version of a connector.
Args:
dockerRepository (str): Name of the connector docker image.
version (str): Version of the connector.
Returns:
str: Path to the icon file.
"""
return f"{METADATA_FOLDER}/{dockerRepository}/{version}/{ICON_FILE_NAME}"


def compute_gcs_md5(file_name: str) -> str:
hash_md5 = hashlib.md5()
with open(file_name, "rb") as f:
Expand All @@ -39,20 +51,43 @@ def compute_gcs_md5(file_name: str) -> str:
return base64.b64encode(hash_md5.digest()).decode("utf8")


def _save_blob_to_gcs(blob_to_save: storage.blob.Blob, file_path: str) -> bool:
def _save_blob_to_gcs(blob_to_save: storage.blob.Blob, file_path: str, disable_cache: bool = False) -> bool:
"""Uploads a file to the bucket."""
print(f"Uploading {file_path} to {blob_to_save.name}...")

# Set Cache-Control header to no-cache to avoid caching issues
# This is IMPORTANT because if we don't set this header, the metadata file will be cached by GCS
# and the next time we try to download it, we will get the stale version
blob_to_save.cache_control = "no-cache"
if disable_cache:
blob_to_save.cache_control = "no-cache"

blob_to_save.upload_from_filename(file_path)

return True


def upload_file_if_changed(
local_file_path: Path, bucket: storage.bucket.Bucket, blob_path: str, disable_cache: bool = False
) -> Tuple[bool, str]:
local_file_md5_hash = compute_gcs_md5(local_file_path)
remote_blob = bucket.blob(blob_path)

# reload the blob to get the md5_hash
if remote_blob.exists():
remote_blob.reload()

remote_blob_md5_hash = remote_blob.md5_hash if remote_blob.exists() else None

print(f"Local {local_file_path} md5_hash: {local_file_md5_hash}")
print(f"Remote {blob_path} md5_hash: {remote_blob_md5_hash}")

if local_file_md5_hash != remote_blob_md5_hash:
uploaded = _save_blob_to_gcs(remote_blob, local_file_path, disable_cache=disable_cache)
return uploaded, remote_blob.id

return False, remote_blob.id


def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path) -> Tuple[bool, str]:
"""Upload a metadata file to a GCS bucket.
Expand All @@ -66,50 +101,33 @@ def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path) -> Tuple[
Returns:
Tuple[bool, str]: Whether the metadata file was uploaded and its blob id.
"""
uploaded = False

raw_metadata = yaml.safe_load(metadata_file_path.read_text())
metadata = ConnectorMetadataDefinitionV0.parse_obj(raw_metadata)

print("Validating that the images are on DockerHub...")
is_valid, error = validate_metadata_images_in_dockerhub(metadata)
if not is_valid:
raise ValueError(error)

service_account_info = json.loads(os.environ.get("GCS_CREDENTIALS"))
credentials = service_account.Credentials.from_service_account_info(service_account_info)
storage_client = storage.Client(credentials=credentials)
bucket = storage_client.bucket(bucket_name)

version_path = get_metadata_file_path(metadata.data.dockerRepository, metadata.data.dockerImageTag)
latest_path = get_metadata_file_path(metadata.data.dockerRepository, "latest")

version_blob = bucket.blob(version_path)
latest_blob = bucket.blob(latest_path)

# reload the blobs to get the md5_hash
if version_blob.exists():
version_blob.reload()
if latest_blob.exists():
latest_blob.reload()

metadata_file_md5_hash = compute_gcs_md5(metadata_file_path)
version_blob_md5_hash = version_blob.md5_hash if version_blob.exists() else None
latest_blob_md5_hash = latest_blob.md5_hash if latest_blob.exists() else None

print(f"Local Metadata md5_hash: {metadata_file_md5_hash}")
print(f"Current Version blob md5_hash: {version_blob_md5_hash}")
print(f"Latest blob md5_hash: {latest_blob_md5_hash}")

trigger_version_upload = metadata_file_md5_hash != version_blob_md5_hash
trigger_latest_upload = metadata_file_md5_hash != latest_blob_md5_hash

# Validate that the images are on DockerHub
if trigger_version_upload or trigger_latest_upload:
print("Validating that the images are on DockerHub...")
is_valid, error = validate_metadata_images_in_dockerhub(metadata)
if not is_valid:
raise ValueError(error)
version_path = get_metadata_remote_file_path(metadata.data.dockerRepository, metadata.data.dockerImageTag)
latest_path = get_metadata_remote_file_path(metadata.data.dockerRepository, "latest")
latest_icon_path = get_icon_remote_file_path(metadata.data.dockerRepository, "latest")

# upload if md5_hash is different
if trigger_version_upload:
uploaded = _save_blob_to_gcs(version_blob, str(metadata_file_path))
(
version_uploaded,
version_blob_id,
) = upload_file_if_changed(metadata_file_path, bucket, version_path)
latest_uploaded, _latest_blob_id = upload_file_if_changed(metadata_file_path, bucket, latest_path)

if trigger_latest_upload:
uploaded = _save_blob_to_gcs(latest_blob, str(metadata_file_path))
# Replace metadata file name with icon file name
local_icon_path = metadata_file_path.parent / ICON_FILE_NAME
if local_icon_path.exists():
upload_file_if_changed(local_icon_path, bucket, latest_icon_path)

return uploaded, version_blob.id
return version_uploaded or latest_uploaded, version_blob_id
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class Data(BaseModel):
githubIssueLabel: str
maxSecondsBetweenMessages: Optional[int] = Field(
None,
description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach",
description="Maximum delay between 2 airbyte protocol messages, in second. The source will timeout if this delay is reached",
)
releaseDate: Optional[date] = Field(
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class Config:
dockerImageTag: str
documentationUrl: str
icon: Optional[str] = None
iconUrl: Optional[str] = None
spec: Dict[str, Any]
tombstone: Optional[bool] = Field(
False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class Config:
dockerImageTag: str
documentationUrl: str
icon: Optional[str] = None
iconUrl: Optional[str] = None
sourceType: Optional[Literal["api", "file", "database", "custom"]] = None
spec: Dict[str, Any]
tombstone: Optional[bool] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class Config:
dockerImageTag: str
documentationUrl: str
icon: Optional[str] = None
iconUrl: Optional[str] = None
sourceType: Optional[Literal["api", "file", "database", "custom"]] = None
spec: Dict[str, Any]
tombstone: Optional[bool] = Field(
Expand Down Expand Up @@ -152,6 +153,7 @@ class Config:
dockerImageTag: str
documentationUrl: str
icon: Optional[str] = None
iconUrl: Optional[str] = None
spec: Dict[str, Any]
tombstone: Optional[bool] = Field(
False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ properties:
type: string
icon:
type: string
iconUrl:
type: string
spec:
type: object
tombstone:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ properties:
type: string
icon:
type: string
iconUrl:
type: string
sourceType:
type: string
enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,19 @@ def test_upload_metadata_to_gcs_valid_metadata(
assert blob_id == mocks["mock_version_blob"].id

if not version_blob_exists:
mocks["mock_version_blob"].upload_from_filename.assert_called_with(str(metadata_file_path))
mocks["mock_version_blob"].upload_from_filename.assert_called_with(metadata_file_path)
assert uploaded

if not latest_blob_exists:
mocks["mock_latest_blob"].upload_from_filename.assert_called_with(str(metadata_file_path))
mocks["mock_latest_blob"].upload_from_filename.assert_called_with(metadata_file_path)
assert uploaded

if version_blob_md5_hash != local_file_md5_hash:
mocks["mock_version_blob"].upload_from_filename.assert_called_with(str(metadata_file_path))
mocks["mock_version_blob"].upload_from_filename.assert_called_with(metadata_file_path)
assert uploaded

if latest_blob_md5_hash != local_file_md5_hash:
mocks["mock_latest_blob"].upload_from_filename.assert_called_with(str(metadata_file_path))
mocks["mock_latest_blob"].upload_from_filename.assert_called_with(metadata_file_path)
assert uploaded


Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import pandas as pd
import numpy as np
import os
from typing import List
from dagster import Output, asset, OpExecutionContext
import yaml

from metadata_service.models.generated.ConnectorMetadataDefinitionV0 import ConnectorMetadataDefinitionV0
from metadata_service.constants import METADATA_FILE_NAME, ICON_FILE_NAME

from orchestrator.utils.object_helpers import are_values_equal, merge_values
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
from orchestrator.models.metadata import PartialMetadataDefinition, MetadataDefinition
from orchestrator.models.metadata import PartialMetadataDefinition, MetadataDefinition, LatestMetadataEntry
from orchestrator.config import get_public_url_for_gcs_file

GROUP_NAME = "metadata"

Expand Down Expand Up @@ -174,14 +177,29 @@ def validate_metadata(metadata: PartialMetadataDefinition) -> tuple[bool, str]:


@asset(required_resource_keys={"latest_metadata_file_blobs"}, group_name=GROUP_NAME)
def metadata_definitions(context: OpExecutionContext) -> List[MetadataDefinition]:
def metadata_definitions(context: OpExecutionContext) -> List[LatestMetadataEntry]:
latest_metadata_file_blobs = context.resources.latest_metadata_file_blobs

metadata_definitions = []
metadata_entries = []
for blob in latest_metadata_file_blobs:
yaml_string = blob.download_as_string().decode("utf-8")
metadata_dict = yaml.safe_load(yaml_string)
metadata_def = MetadataDefinition.parse_obj(metadata_dict)
metadata_definitions.append(metadata_def)

return metadata_definitions
metadata_file_path = blob.name
icon_file_path = metadata_file_path.replace(METADATA_FILE_NAME, ICON_FILE_NAME)
icon_blob = blob.bucket.blob(icon_file_path)

icon_url = (
get_public_url_for_gcs_file(icon_blob.bucket.name, icon_blob.name, os.getenv("METADATA_CDN_BASE_URL"))
if icon_blob.exists()
else None
)

metadata_entry = LatestMetadataEntry(
metadata_definition=metadata_def,
icon_url=icon_url,
)
metadata_entries.append(metadata_entry)

return metadata_entries
Loading

0 comments on commit 248bbf9

Please sign in to comment.