Skip to content

Commit

Permalink
refactor: Add a level of record abstraction (#380)
Browse files Browse the repository at this point in the history
* move away from dicts and more structured types

* Add a abstraction layer between the databuilder records and neo4j

Signed-off-by: Andrew <andrjc4@vt.edu>
  • Loading branch information
AndrewCiambrone committed Nov 5, 2020
1 parent 093b3d6 commit 414e825
Show file tree
Hide file tree
Showing 54 changed files with 1,507 additions and 1,118 deletions.
31 changes: 16 additions & 15 deletions databuilder/loader/file_system_neo4j_csv_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@

from databuilder.job.base_job import Job
from databuilder.loader.base_loader import Loader
from databuilder.models.neo4j_csv_serde import NODE_LABEL, \
RELATION_START_LABEL, RELATION_END_LABEL, RELATION_TYPE
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.utils.closer import Closer
from databuilder.serializers import neo4_serializer


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,7 +89,7 @@ def _delete_dir() -> None:
# Directory should be deleted after publish is finished
Job.closer.register(_delete_dir)

def load(self, csv_serializable: Neo4jCsvSerializable) -> None:
def load(self, csv_serializable: GraphSerializable) -> None:
"""
Writes Neo4jCsvSerializable into CSV files.
There are multiple CSV files that this method writes.
Expand All @@ -107,23 +106,25 @@ def load(self, csv_serializable: Neo4jCsvSerializable) -> None:
:return:
"""

node_dict = csv_serializable.next_node()
while node_dict:
key = (node_dict[NODE_LABEL], len(node_dict))
node = csv_serializable.next_node()
while node:
node_dict = neo4_serializer.serialize_node(node)
key = (node.label, len(node_dict))
file_suffix = '{}_{}'.format(*key)
node_writer = self._get_writer(node_dict,
self._node_file_mapping,
key,
self._node_dir,
file_suffix)
node_writer.writerow(node_dict)
node_dict = csv_serializable.next_node()

relation_dict = csv_serializable.next_relation()
while relation_dict:
key2 = (relation_dict[RELATION_START_LABEL],
relation_dict[RELATION_END_LABEL],
relation_dict[RELATION_TYPE],
node = csv_serializable.next_node()

relation = csv_serializable.next_relation()
while relation:
relation_dict = neo4_serializer.serialize_relationship(relation)
key2 = (relation.start_label,
relation.end_label,
relation.type,
len(relation_dict))

file_suffix = '{}_{}_{}'.format(key2[0], key2[1], key2[2])
Expand All @@ -133,7 +134,7 @@ def load(self, csv_serializable: Neo4jCsvSerializable) -> None:
self._relation_dir,
file_suffix)
relation_writer.writerow(relation_dict)
relation_dict = csv_serializable.next_relation()
relation = csv_serializable.next_relation()

def _get_writer(self,
csv_record_dict: Dict[str, Any],
Expand Down
69 changes: 38 additions & 31 deletions databuilder/models/application.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

from typing import Any, Dict, List, Union
from typing import List, Union

from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
from databuilder.models.graph_serializable import GraphSerializable

from databuilder.models.table_metadata import TableMetadata
from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship


class Application(Neo4jCsvSerializable):
class Application(GraphSerializable):
"""
Application-table matching model (Airflow task and table)
"""
Expand Down Expand Up @@ -48,14 +48,14 @@ def __init__(self,
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())

def create_next_node(self) -> Union[Dict[str, Any], None]:
def create_next_node(self) -> Union[GraphNode, None]:
# creates new node
try:
return next(self._node_iter)
except StopIteration:
return None

def create_next_relation(self) -> Union[Dict[str, Any], None]:
def create_next_relation(self) -> Union[GraphRelationship, None]:
try:
return next(self._relation_iter)
except StopIteration:
Expand All @@ -74,40 +74,47 @@ def get_application_model_key(self) -> str:
dag=self.dag,
task=self.task)

def create_nodes(self) -> List[Dict[str, Any]]:
def create_nodes(self) -> List[GraphNode]:
"""
Create a list of Neo4j node records
:return:
"""
results = []

results.append({
NODE_KEY: self.get_application_model_key(),
NODE_LABEL: Application.APPLICATION_LABEL,
Application.APPLICATION_URL_NAME: self.application_url,
Application.APPLICATION_NAME: Application.APPLICATION_TYPE,
Application.APPLICATION_DESCRIPTION:
'{app_type} with id {id}'.format(app_type=Application.APPLICATION_TYPE,
id=Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag,
task_id=self.task)),
Application.APPLICATION_ID: Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag,
task_id=self.task)
})
application_description = '{app_type} with id {id}'.format(
app_type=Application.APPLICATION_TYPE,
id=Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag, task_id=self.task)
)
application_id = Application.APPLICATION_ID_FORMAT.format(
dag_id=self.dag,
task_id=self.task
)
application_node = GraphNode(
key=self.get_application_model_key(),
label=Application.APPLICATION_LABEL,
attributes={
Application.APPLICATION_URL_NAME: self.application_url,
Application.APPLICATION_NAME: Application.APPLICATION_TYPE,
Application.APPLICATION_DESCRIPTION: application_description,
Application.APPLICATION_ID: application_id
}
)
results.append(application_node)

return results

def create_relation(self) -> List[Dict[str, Any]]:
def create_relation(self) -> List[GraphRelationship]:
"""
Create a list of relations between application and table nodes
:return:
"""
results = [{
RELATION_START_KEY: self.get_table_model_key(),
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_KEY: self.get_application_model_key(),
RELATION_END_LABEL: Application.APPLICATION_LABEL,
RELATION_TYPE: Application.TABLE_APPLICATION_RELATION_TYPE,
RELATION_REVERSE_TYPE: Application.APPLICATION_TABLE_RELATION_TYPE
}]

graph_relationship = GraphRelationship(
start_key=self.get_table_model_key(),
start_label=TableMetadata.TABLE_NODE_LABEL,
end_key=self.get_application_model_key(),
end_label=Application.APPLICATION_LABEL,
type=Application.TABLE_APPLICATION_RELATION_TYPE,
reverse_type=Application.APPLICATION_TABLE_RELATION_TYPE,
attributes={}
)
results = [graph_relationship]
return results
49 changes: 27 additions & 22 deletions databuilder/models/badge.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

from typing import Any, Dict, List, Optional
from typing import List, Optional
import re

from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship


class Badge:
Expand All @@ -19,7 +19,7 @@ def __repr__(self) -> str:
self.category)


class BadgeMetadata(Neo4jCsvSerializable):
class BadgeMetadata(GraphSerializable):
"""
Badge model.
"""
Expand Down Expand Up @@ -62,14 +62,14 @@ def __repr__(self) -> str:
return 'BadgeMetadata({!r}, {!r})'.format(self.start_label,
self.start_key)

def create_next_node(self) -> Optional[Dict[str, Any]]:
def create_next_node(self) -> Optional[GraphNode]:
# return the string representation of the data
try:
return next(self._node_iter)
except StopIteration:
return None

def create_next_relation(self) -> Optional[Dict[str, Any]]:
def create_next_relation(self) -> Optional[GraphRelationship]:
try:
return next(self._relation_iter)
except StopIteration:
Expand All @@ -84,30 +84,35 @@ def get_badge_key(name: str) -> str:
def get_metadata_model_key(self) -> str:
return self.start_key

def create_nodes(self) -> List[Dict[str, Any]]:
def create_nodes(self) -> List[GraphNode]:
"""
Create a list of Neo4j node records
:return:
"""
results = []
for badge in self.badges:
if badge:
results.append({
NODE_KEY: self.get_badge_key(badge.name),
NODE_LABEL: self.BADGE_NODE_LABEL,
self.BADGE_CATEGORY: badge.category
})
node = GraphNode(
key=self.get_badge_key(badge.name),
label=self.BADGE_NODE_LABEL,
attributes={
self.BADGE_CATEGORY: badge.category
}
)
results.append(node)
return results

def create_relation(self) -> List[Dict[str, Any]]:
def create_relation(self) -> List[GraphRelationship]:
results = []
for badge in self.badges:
results.append({
RELATION_START_LABEL: self.start_label,
RELATION_END_LABEL: self.BADGE_NODE_LABEL,
RELATION_START_KEY: self.start_key,
RELATION_END_KEY: self.get_badge_key(badge.name),
RELATION_TYPE: self.BADGE_RELATION_TYPE,
RELATION_REVERSE_TYPE: self.INVERSE_BADGE_RELATION_TYPE,
})
relation = GraphRelationship(
start_label=self.start_label,
end_label=self.BADGE_NODE_LABEL,
start_key=self.start_key,
end_key=self.get_badge_key(badge.name),
type=self.BADGE_RELATION_TYPE,
reverse_type=self.INVERSE_BADGE_RELATION_TYPE,
attributes={}
)
results.append(relation)
return results
41 changes: 21 additions & 20 deletions databuilder/models/column_usage_model.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

from typing import Union, Dict, Any, Iterable, List
from typing import Union, Iterable, List

from databuilder.models.neo4j_csv_serde import (
Neo4jCsvSerializable, RELATION_START_KEY, RELATION_END_KEY,
RELATION_START_LABEL, RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
)
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.usage.usage_constants import (
READ_RELATION_TYPE, READ_REVERSE_RELATION_TYPE, READ_RELATION_COUNT_PROPERTY
)
from databuilder.models.table_metadata import TableMetadata
from databuilder.models.user import User
from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship


class ColumnUsageModel(Neo4jCsvSerializable):
class ColumnUsageModel(GraphSerializable):

"""
A model represents user <--> column graph model
Expand Down Expand Up @@ -49,38 +48,40 @@ def __init__(self,
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())

def create_next_node(self) -> Union[Dict[str, Any], None]:
def create_next_node(self) -> Union[GraphNode, None]:

try:
return next(self._node_iter)
except StopIteration:
return None

def create_nodes(self) -> List[Dict[str, Any]]:
def create_nodes(self) -> List[GraphNode]:
"""
Create a list of Neo4j node records
:return:
"""

return User(email=self.user_email).create_nodes()

def create_next_relation(self) -> Union[Dict[str, Any], None]:

def create_next_relation(self) -> Union[GraphRelationship, None]:
try:
return next(self._relation_iter)
except StopIteration:
return None

def create_relation(self) -> Iterable[Any]:
return [{
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_LABEL: User.USER_NODE_LABEL,
RELATION_START_KEY: self._get_table_key(),
RELATION_END_KEY: self._get_user_key(self.user_email),
RELATION_TYPE: ColumnUsageModel.TABLE_USER_RELATION_TYPE,
RELATION_REVERSE_TYPE: ColumnUsageModel.USER_TABLE_RELATION_TYPE,
ColumnUsageModel.READ_RELATION_COUNT: self.read_count
}]
def create_relation(self) -> Iterable[GraphRelationship]:
relationship = GraphRelationship(
start_key=self._get_table_key(),
start_label=TableMetadata.TABLE_NODE_LABEL,
end_key=self._get_user_key(self.user_email),
end_label=User.USER_NODE_LABEL,
type=ColumnUsageModel.TABLE_USER_RELATION_TYPE,
reverse_type=ColumnUsageModel.USER_TABLE_RELATION_TYPE,
attributes={
ColumnUsageModel.READ_RELATION_COUNT: self.read_count
}
)
return [relationship]

def _get_table_key(self) -> str:
return TableMetadata.TABLE_KEY_FORMAT.format(db=self.database,
Expand Down

0 comments on commit 414e825

Please sign in to comment.