Skip to content

Commit

Permalink
Include parameter comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jorge-sanchez-2020 committed Jan 12, 2021
1 parent ca1978d commit 5380a21
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 16 deletions.
44 changes: 32 additions & 12 deletions cognite/replicator/replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,32 @@ def new_metadata(
metadata["_replicatedInternalId"] = obj.id
return metadata


def restore_fields(
dst_obj : Union[Asset, Event, FileMetadata, TimeSeries],
dst_obj_dump : Dict[str, Any],
exclude_fields : List[str]
)-> Union[Asset, Event, FileMetadata, TimeSeries]:
replicator_metadata_fields = ["_replicatedSource","_replicatedTime", "_replicatedInternalId"]

dst_obj: Union[Asset, Event, FileMetadata, TimeSeries], dst_obj_dump: Dict[str, Any], exclude_fields: List[str]
) -> Union[Asset, Event, FileMetadata, TimeSeries]:
"""
Restore the objects data according to exclude_fields defined.
Fields not restored
- **_replicatedSource**: The name of the project this object is replicated from.
- **_replicatedTime**: The timestamp of when the object was replicated, all objects created/updated in the same
execution will have the same timestamp.
- **_replicatedInternalId**: The internal id of the source object that the destination object
is being replicated from.
Args:
dst_obj: new object .
dst_obj_dump: previous object.
exclude_fields: List of fields: Only support name, description, metadata and metadata.customfield.
Returns:
Object restored.
"""

replicator_metadata_fields = ["_replicatedSource", "_replicatedTime", "_replicatedInternalId"]

for key in exclude_fields:
if key == "name":
dst_obj.name = dst_obj_dump["name"]
Expand All @@ -169,12 +188,13 @@ def restore_fields(
dst_obj.metadata = {**dst_obj.metadata, **replicator_metadata}

elif "metadata." in key:
metadata_key = key[key.index(".") + 1:]
if not(metadata_key in replicator_metadata_fields):
metadata_key = key[key.index(".") + 1 :]
if not (metadata_key in replicator_metadata_fields):
dst_obj.metadata[metadata_key] = dst_obj_dump["metadata"][metadata_key]

return dst_obj


def make_objects_batch(
src_objects: List[Union[Asset, Event, FileMetadata, TimeSeries]],
src_id_dst_map: Dict[int, Union[Asset, Event, FileMetadata, TimeSeries]],
Expand All @@ -185,7 +205,7 @@ def make_objects_batch(
replicated_runtime: int,
depth: Optional[int] = None,
src_filter: Optional[List[Union[Event, FileMetadata, TimeSeries]]] = None,
exclude_fields:Optional[List[str]] = None,
exclude_fields: Optional[List[str]] = None,
) -> Tuple[
List[Union[Asset, Event, FileMetadata, TimeSeries]],
List[Union[Asset, Event, FileMetadata, TimeSeries]],
Expand All @@ -206,6 +226,7 @@ def make_objects_batch(
depth: The depth of the asset within the asset hierarchy, only used for making assets.
src_filter: List of event/timeseries/files in the destination.
Will be used for comparison if current event/timeseries/files where not copied by the replicator.
exclude_fields: List of fields: Only support name, description, metadata and metadata.customfield
Returns:
create_objects: A list of all the new objects to be posted to CDF.
update_objects: A list of all the updated objects to be updated in CDF.
Expand All @@ -232,7 +253,7 @@ def make_objects_batch(
dst_obj = update(src_obj, dst_obj, src_dst_ids_assets, project_src, replicated_runtime, **kwargs)

if exclude_fields:
dst_obj = restore_fields(dst_obj,dst_obj_dump,exclude_fields)
dst_obj = restore_fields(dst_obj, dst_obj_dump, exclude_fields)

update_objects.append(dst_obj)
else:
Expand Down Expand Up @@ -290,7 +311,7 @@ def thread(
replicated_runtime: int,
client: CogniteClient,
src_filter: Optional[List[Union[Event, FileMetadata, TimeSeries]]] = None,
exclude_fields:Optional[List[str]] = None,
exclude_fields: Optional[List[str]] = None,
):
"""
Split up objects to replicate them in batches and thread each batch.
Expand Down Expand Up @@ -343,7 +364,6 @@ def thread(
)
)


for count, t in enumerate(threads, start=1):
t.start()
logging.info(f"Started thread: {count}")
Expand Down
12 changes: 8 additions & 4 deletions cognite/replicator/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from cognite.client.exceptions import CogniteNotFoundError

from . import replication
#import replication

# import replication


def create_time_series(
src_ts: TimeSeries, src_dst_ids_assets: Dict[int, int], project_src: str, runtime: int
Expand Down Expand Up @@ -94,7 +96,7 @@ def copy_ts(
client: CogniteClient,
src_filter: List[TimeSeries],
jobs: queue.Queue = None,
exclude_fields:Optional[List[str]] = None,
exclude_fields: Optional[List[str]] = None,
):
"""
Creates/updates time series objects and then attempts to create and update these time series in the destination.
Expand All @@ -107,6 +109,7 @@ def copy_ts(
runtime: The timestamp to be used in the new replicated metadata.
client: The client corresponding to the destination project.
src_filter: List of timeseries in the destination - Will be used for comparison if current timeseries where not copied by the replicator
exclude_fields: List of fields: Only support name, description, metadata and metadata.customfield.
"""

if jobs:
Expand Down Expand Up @@ -193,6 +196,7 @@ def replicate(
skip_nonasset: If a time series has no associated assets, do not replicate it
target_external_ids: List of specific time series external ids to replicate
exclude_pattern: Regex pattern; time series whose names match will not be replicated
exclude_fields: List of fields: Only support name, description, metadata and metadata.customfield
"""
project_src = client_src.config.project
project_dst = client_dst.config.project
Expand Down Expand Up @@ -252,7 +256,7 @@ def filter_fn(ts):
replicated_runtime=replicated_runtime,
client=client_dst,
src_filter=ts_dst,
exclude_fields=exclude_fields
exclude_fields=exclude_fields,
)
else:
copy_ts(
Expand All @@ -263,7 +267,7 @@ def filter_fn(ts):
runtime=replicated_runtime,
client=client_dst,
src_filter=ts_dst,
exclude_fields=exclude_fields
exclude_fields=exclude_fields,
)

logging.info(
Expand Down

0 comments on commit 5380a21

Please sign in to comment.