Skip to content

Commit

Permalink
add try-catch for error handling in data putting
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Dec 13, 2018
1 parent 7c11921 commit db8fefa
Showing 1 changed file with 34 additions and 26 deletions.
60 changes: 34 additions & 26 deletions aliyun/log/logclient_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,20 +385,24 @@ def pull_log_dump(client, project_name, logstore_name, from_time, to_time, file_
def copy_worker(from_client, from_project, from_logstore, shard_id, from_time, to_time,
to_client, to_project, to_logstore, batch_size=500, compress=True,
new_topic=None, new_source=None):
iter_data = from_client.pull_log(from_project, from_logstore, shard_id, from_time, to_time, batch_size=batch_size,
compress=compress)
try:
iter_data = from_client.pull_log(from_project, from_logstore, shard_id, from_time, to_time, batch_size=batch_size,
compress=compress)

count = 0
for res in iter_data:
for loggroup in res.get_loggroup_list().LogGroups:
if new_topic is not None:
loggroup.Topic = new_topic
if new_source is not None:
loggroup.Source = new_source
rtn = to_client.put_log_raw(to_project, to_logstore, loggroup, compress=compress)
count += len(loggroup.Logs)
count = 0
for res in iter_data:
for loggroup in res.get_loggroup_list().LogGroups:
if new_topic is not None:
loggroup.Topic = new_topic
if new_source is not None:
loggroup.Source = new_source
rtn = to_client.put_log_raw(to_project, to_logstore, loggroup, compress=compress)
count += len(loggroup.Logs)

return shard_id, count
return shard_id, count
except Exception as ex:
logger.error(ex)
raise


def _parse_shard_list(shard_list, current_shard_list):
Expand Down Expand Up @@ -682,20 +686,24 @@ def transform_worker(from_client, from_project, from_logstore, shard_id, from_ti
config,
to_client, to_project, to_logstore, batch_size=500, compress=True,
):
runner = Runner(config)
iter_data = from_client.pull_log(from_project, from_logstore, shard_id, from_time, to_time, batch_size=batch_size,
compress=compress)

count = 0
removed = 0
for s in iter_data:
events = s.get_flatten_logs_json(time_as_str=True)

c, r = _transform_events_to_logstore(runner, events, to_client, to_project, to_logstore)
count += c
removed += r

return shard_id, count, removed
try:
runner = Runner(config)
iter_data = from_client.pull_log(from_project, from_logstore, shard_id, from_time, to_time, batch_size=batch_size,
compress=compress)

count = 0
removed = 0
for s in iter_data:
events = s.get_flatten_logs_json(time_as_str=True)

c, r = _transform_events_to_logstore(runner, events, to_client, to_project, to_logstore)
count += c
removed += r

return shard_id, count, removed
except Exception as ex:
logger.error(ex)
raise


class TransformDataConsumer(ConsumerProcessorBase):
Expand Down

0 comments on commit db8fefa

Please sign in to comment.