Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add TableSerializable and mysql_serializer #459

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion databuilder/models/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@

from typing import Iterator, Union

from amundsen_rds.models import RDSModel
from amundsen_rds.models.application import Application as RDSApplication, ApplicationTable as RDSApplicationTable

from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.table_metadata import TableMetadata
from databuilder.models.table_serializable import TableSerializable


class Application(GraphSerializable):
class Application(GraphSerializable, TableSerializable):
"""
Application-table matching model (Airflow task and table)
"""
Expand Down Expand Up @@ -46,6 +50,7 @@ def __init__(self,

self._node_iter = self._create_node_iterator()
self._relation_iter = self._create_relation_iterator()
self._record_iter = self._create_record_iterator()

def create_next_node(self) -> Union[GraphNode, None]:
# creates new node
Expand All @@ -60,6 +65,12 @@ def create_next_relation(self) -> Union[GraphRelationship, None]:
except StopIteration:
return None

def create_next_record(self) -> Union[RDSModel, None]:
try:
return next(self._record_iter)
except StopIteration:
return None

def get_table_model_key(self) -> str:
# returns formatted string for table name
return TableMetadata.TABLE_KEY_FORMAT.format(db=self.database,
Expand Down Expand Up @@ -113,3 +124,27 @@ def _create_relation_iterator(self) -> Iterator[GraphRelationship]:
attributes={}
)
yield graph_relationship

def _create_record_iterator(self) -> Iterator[RDSModel]:
application_description = '{app_type} with id {id}'.format(
app_type=Application.APPLICATION_TYPE,
id=Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag, task_id=self.task)
)
application_id = Application.APPLICATION_ID_FORMAT.format(
dag_id=self.dag,
task_id=self.task
)
application_record = RDSApplication(
rk=self.get_application_model_key(),
application_url=self.application_url,
name=Application.APPLICATION_TYPE,
id=application_id,
description=application_description
)
yield application_record

application_table_record = RDSApplicationTable(
rk=self.get_table_model_key(),
application_rk=self.get_application_model_key(),
)
yield application_table_record
32 changes: 30 additions & 2 deletions databuilder/models/badge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@

import re
from typing import (
Iterator, List, Optional,
Iterator, List, Optional, Union,
)

from amundsen_rds.models import RDSModel
from amundsen_rds.models.badge import Badge as RDSBadge

from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.table_serializable import TableSerializable


class Badge:
Expand All @@ -26,7 +30,7 @@ def __eq__(self, other: object) -> bool:
self.category == other.category


class BadgeMetadata(GraphSerializable):
class BadgeMetadata(GraphSerializable, TableSerializable):
"""
Badge model.
"""
Expand Down Expand Up @@ -64,6 +68,7 @@ def __init__(self,

self._node_iter = self._create_node_iterator()
self._relation_iter = self._create_relation_iterator()
self._record_iter = self._create_record_iterator()

def __repr__(self) -> str:
return f'BadgeMetadata({self.start_label!r}, {self.start_key!r})'
Expand All @@ -81,6 +86,12 @@ def create_next_relation(self) -> Optional[GraphRelationship]:
except StopIteration:
return None

def create_next_record(self) -> Union[RDSModel, None]:
try:
return next(self._record_iter)
except StopIteration:
return None

@staticmethod
def get_badge_key(name: str) -> str:
if not name:
Expand Down Expand Up @@ -119,6 +130,18 @@ def get_badge_relations(self) -> List[GraphRelationship]:
relations.append(relation)
return relations

def get_badge_records(self) -> List[RDSModel]:
records = []
for badge in self.badges:
if badge:
record = RDSBadge(
rk=self.get_badge_key(badge.name),
category=badge.category
)
records.append(record)

return records

def _create_node_iterator(self) -> Iterator[GraphNode]:
"""
Create badge nodes
Expand All @@ -132,3 +155,8 @@ def _create_relation_iterator(self) -> Iterator[GraphRelationship]:
relations = self.get_badge_relations()
for relation in relations:
yield relation

def _create_record_iterator(self) -> Iterator[RDSModel]:
records = self.get_badge_records()
for record in records:
yield record
24 changes: 23 additions & 1 deletion databuilder/models/column_usage_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@

from typing import Iterator, Union

from amundsen_rds.models import RDSModel
from amundsen_rds.models.table import TableUsage as RDSTableUsage

from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.table_metadata import TableMetadata
from databuilder.models.table_serializable import TableSerializable
from databuilder.models.usage.usage_constants import (
READ_RELATION_COUNT_PROPERTY, READ_RELATION_TYPE, READ_REVERSE_RELATION_TYPE,
)
from databuilder.models.user import User


class ColumnUsageModel(GraphSerializable):
class ColumnUsageModel(GraphSerializable, TableSerializable):
"""
A model represents user <--> column graph model
Currently it only support to serialize to table level
Expand Down Expand Up @@ -46,6 +50,7 @@ def __init__(self,

self._node_iter = self._create_node_iterator()
self._relation_iter = self._create_relation_iterator()
self._record_iter = self._create_record_iterator()

def create_next_node(self) -> Union[GraphNode, None]:

Expand Down Expand Up @@ -82,6 +87,23 @@ def _create_relation_iterator(self) -> Iterator[GraphRelationship]:
)
yield relationship

def create_next_record(self) -> Union[RDSModel, None]:
try:
return next(self._record_iter)
except StopIteration:
return None

def _create_record_iterator(self) -> Iterator[RDSModel]:
user_record = User(email=self.user_email).get_user_record()
yield user_record

table_usage_record = RDSTableUsage(
user_rk=self._get_user_key(self.user_email),
table_rk=self._get_table_key(),
read_count=self.read_count
)
yield table_usage_record

def _get_table_key(self) -> str:
return TableMetadata.TABLE_KEY_FORMAT.format(db=self.database,
cluster=self.cluster,
Expand Down
34 changes: 33 additions & 1 deletion databuilder/models/dashboard/dashboard_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
Any, Iterator, Optional, Union,
)

from amundsen_rds.models import RDSModel
from amundsen_rds.models.dashboard import DashboardChart as RDSDashboardChart

from databuilder.models.dashboard.dashboard_query import DashboardQuery
from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.table_serializable import TableSerializable

LOGGER = logging.getLogger(__name__)


class DashboardChart(GraphSerializable):
class DashboardChart(GraphSerializable, TableSerializable):
"""
A model that encapsulate Dashboard's charts
"""
Expand Down Expand Up @@ -47,6 +51,7 @@ def __init__(self,
self._cluster = cluster
self._node_iterator = self._create_node_iterator()
self._relation_iterator = self._create_relation_iterator()
self._record_iterator = self._create_record_iterator()

def create_next_node(self) -> Union[GraphNode, None]:
try:
Expand Down Expand Up @@ -109,6 +114,33 @@ def _get_chart_node_key(self) -> str:
chart_id=self._chart_id
)

def create_next_record(self) -> Union[RDSModel, None]:
try:
return next(self._record_iterator)
except StopIteration:
return None

def _create_record_iterator(self) -> Iterator[RDSModel]:
record = RDSDashboardChart(
rk=self._get_chart_node_key(),
id=self._chart_id,
query_rk=DashboardQuery.DASHBOARD_QUERY_KEY_FORMAT.format(
product=self._product,
cluster=self._cluster,
dashboard_group_id=self._dashboard_group_id,
dashboard_id=self._dashboard_id,
query_id=self._query_id
)
)
if self._chart_name:
record.name = self._chart_name
if self._chart_type:
record.type = self._chart_type
if self._chart_url:
record.url = self._chart_url

yield record

def __repr__(self) -> str:
return f'DashboardChart({self._dashboard_group_id!r}, {self._dashboard_id!r}, ' \
f'{self._query_id!r}, {self._chart_id!r}, {self._chart_name!r}, {self._chart_type!r}, ' \
Expand Down
26 changes: 25 additions & 1 deletion databuilder/models/dashboard/dashboard_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
Any, Iterator, Optional, Union,
)

from amundsen_rds.models import RDSModel
from amundsen_rds.models.dashboard import DashboardExecution as RDSDashboardExecution

from databuilder.models.dashboard.dashboard_metadata import DashboardMetadata
from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.table_serializable import TableSerializable

LOGGER = logging.getLogger(__name__)


class DashboardExecution(GraphSerializable):
class DashboardExecution(GraphSerializable, TableSerializable):
"""
A model that encapsulate Dashboard's execution timestamp in epoch and execution state
"""
Expand Down Expand Up @@ -46,6 +50,7 @@ def __init__(self,
self._cluster = cluster
self._node_iterator = self._create_node_iterator()
self._relation_iterator = self._create_relation_iterator()
self._record_iterator = self._create_record_iterator()

def create_next_node(self) -> Union[GraphNode, None]:
try:
Expand Down Expand Up @@ -87,6 +92,25 @@ def _create_relation_iterator(self) -> Iterator[GraphRelationship]:
)
yield relationship

def create_next_record(self) -> Union[RDSModel, None]:
try:
return next(self._record_iterator)
except StopIteration:
return None

def _create_record_iterator(self) -> Iterator[RDSModel]:
yield RDSDashboardExecution(
rk=self._get_last_execution_node_key(),
timestamp=self._execution_timestamp,
state=self._execution_state,
dashboard_rk=DashboardMetadata.DASHBOARD_KEY_FORMAT.format(
product=self._product,
cluster=self._cluster,
dashboard_group=self._dashboard_group_id,
dashboard_name=self._dashboard_id
)
)

def _get_last_execution_node_key(self) -> str:
return DashboardExecution.DASHBOARD_EXECUTION_KEY_FORMAT.format(
product=self._product,
Expand Down
26 changes: 25 additions & 1 deletion databuilder/models/dashboard/dashboard_last_modified.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@
Any, Iterator, Optional, Union,
)

from amundsen_rds.models import RDSModel
from amundsen_rds.models.dashboard import DashboardTimestamp as RDSDashboardTimestamp

from databuilder.models.dashboard.dashboard_metadata import DashboardMetadata
from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.table_serializable import TableSerializable
from databuilder.models.timestamp import timestamp_constants

LOGGER = logging.getLogger(__name__)


class DashboardLastModifiedTimestamp(GraphSerializable):
class DashboardLastModifiedTimestamp(GraphSerializable, TableSerializable):
"""
A model that encapsulate Dashboard's last modified timestamp in epoch
"""
Expand All @@ -38,6 +42,7 @@ def __init__(self,
self._cluster = cluster
self._node_iterator = self._create_node_iterator()
self._relation_iterator = self._create_relation_iterator()
self._record_iterator = self._create_record_iterator()

def create_next_node(self) -> Union[GraphNode, None]:
try:
Expand Down Expand Up @@ -80,6 +85,25 @@ def _create_relation_iterator(self) -> Iterator[GraphRelationship]:
)
yield relationship

def create_next_record(self) -> Union[RDSModel, None]:
try:
return next(self._record_iterator)
except StopIteration:
return None

def _create_record_iterator(self) -> Iterator[RDSModel]:
yield RDSDashboardTimestamp(
rk=self._get_last_modified_node_key(),
timestamp=self._last_modified_timestamp,
name=timestamp_constants.TimestampName.last_updated_timestamp.name,
dashboard_rk=DashboardMetadata.DASHBOARD_KEY_FORMAT.format(
product=self._product,
cluster=self._cluster,
dashboard_group=self._dashboard_group_id,
dashboard_name=self._dashboard_id
)
)

def _get_last_modified_node_key(self) -> str:
return DashboardLastModifiedTimestamp.DASHBOARD_LAST_MODIFIED_KEY_FORMAT.format(
product=self._product,
Expand Down
Loading