Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pybind/mgr/progress: Disregard unreported pgs #40480

Merged
merged 1 commit into from Jun 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion qa/tasks/mgr/test_progress.py
Expand Up @@ -14,7 +14,7 @@ class TestProgress(MgrTestCase):

# How long we expect to wait at most between taking an OSD out
# and seeing the progress event pop up.
EVENT_CREATION_PERIOD = 5
EVENT_CREATION_PERIOD = 15

WRITE_PERIOD = 30

Expand Down
50 changes: 31 additions & 19 deletions src/pybind/mgr/progress/module.py
Expand Up @@ -12,7 +12,7 @@
import datetime
import uuid
import time

import logging
import json


Expand Down Expand Up @@ -62,7 +62,6 @@ def add_to_ceph_s(self):
# type: () -> bool
return self._add_to_ceph_s


@property
def progress(self):
# type: () -> float
Expand Down Expand Up @@ -180,6 +179,7 @@ def to_json(self):
d["failure_message"] = self._failure_message
return d


class GlobalRecoveryEvent(Event):
"""
An event whoese completion is determined by active+clean/total_pg_num
Expand All @@ -194,20 +194,28 @@ def __init__(self, message, refs, add_to_ceph_s, start_epoch, active_clean_num):
self._active_clean_num = active_clean_num
self._refresh()

def global_event_update_progress(self, pg_dump):
# type: (Dict) -> None
def global_event_update_progress(self, pg_dump, log):
# type: (Dict, logging.Logger) -> None
"Update progress of Global Recovery Event"

pgs = pg_dump['pg_stats']
new_active_clean_num = 0
for pg in pgs:
skipped_pgs = 0

if int(pg['reported_epoch']) < int(self._start_epoch):
continue
for pg in pgs:
# Disregard PGs that are not being reported
# if the states are active+clean. Since it is
# possible that some pgs might not have any movement
# even before the start of the event.

state = pg['state']

states = state.split("+")
if pg['reported_epoch'] < self._start_epoch:
if "active" in states and "clean" in states:
log.debug("Skipping pg {0} since reported_epoch {1} < start_epoch {2}"
.format(pg['pgid'], pg['reported_epoch'], self._start_epoch))
skipped_pgs += 1
continue

if "active" in states and "clean" in states:
new_active_clean_num += 1
Expand All @@ -218,10 +226,11 @@ def global_event_update_progress(self, pg_dump):
# the progress
try:
# Might be that total_pg_num is 0
self._progress = float(new_active_clean_num) / total_pg_num
self._progress = float(new_active_clean_num) / (total_pg_num - skipped_pgs)
except ZeroDivisionError:
self._progress = 0.0

log.debug("Updated progress to %s", self.summary())
self._refresh()

@property
Expand Down Expand Up @@ -301,7 +310,7 @@ def pg_update(self, raw_pg_stats, pg_ready, log):
# FIXME: far more fields getting pythonized than we really care about
# Sanity check to see if there are any missing PGs and to assign
# empty array and dictionary if there hasn't been any recovery
pg_to_state = dict([(p['pgid'], p) for p in raw_pg_stats['pg_stats']]) # type: Dict[str, Any]
pg_to_state = dict((p['pgid'], p) for p in raw_pg_stats['pg_stats']) # type: Dict[str, Any]
if self._original_bytes_recovered is None:
self._original_bytes_recovered = {}
missing_pgs = []
Expand Down Expand Up @@ -451,7 +460,7 @@ def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)

self._events = {} # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent, GlobalRecoveryEvent]]
self._completed_events = [] # type: List[GhostEvent]
self._completed_events = [] # type: List[GhostEvent]
kamoltat marked this conversation as resolved.
Show resolved Hide resolved

self._old_osd_map = None # type: Optional[OSDMap]

Expand Down Expand Up @@ -538,7 +547,6 @@ def _osd_in_out(self, old_map, old_dump, new_map, osd_id, marked):
self.log.warning("{0} PGs affected by osd.{1} being marked {2}".format(
len(affected_pgs), osd_id, marked))


# In the case of the osd coming back in, we might need to cancel
# previous recovery event for that osd
if marked == "in":
Expand Down Expand Up @@ -596,7 +604,8 @@ def _pg_state_changed(self, pg_dump):
active_clean_num = 0
for pg in pgs:
state = pg['state']

# TODO right here we can keep track of epoch as well
# and parse it to global_event_update_progress()
states = state.split("+")

if "active" in states and "clean" in states:
Expand All @@ -607,12 +616,15 @@ def _pg_state_changed(self, pg_dump):
except ZeroDivisionError:
return
if progress < 1.0:
self.log.warning(("Starting Global Recovery Event,"
"%d pgs not in active + clean state"),
total_pg_num - active_clean_num)
ev = GlobalRecoveryEvent("Global Recovery Event",
refs=[("global","")],
refs=[("global", "")],
add_to_ceph_s=True,
start_epoch=self.get_osdmap().get_epoch(),
active_clean_num=active_clean_num)
ev.global_event_update_progress(pg_dump)
ev.global_event_update_progress(self.get('pg_stats'), self.log)
self._events[ev.id] = ev

def notify(self, notify_type, notify_data):
Expand All @@ -625,9 +637,9 @@ def notify(self, notify_type, notify_data):
assert old_osdmap
assert self._latest_osdmap

self.log.info("Processing OSDMap change {0}..{1}".format(
old_osdmap.get_epoch(), self._latest_osdmap.get_epoch()
))
self.log.info(("Processing OSDMap change %d..%d"),
old_osdmap.get_epoch(), self._latest_osdmap.get_epoch())

self._osdmap_changed(old_osdmap, self._latest_osdmap)
elif notify_type == "pg_summary":
# if there are no events we will skip this here to avoid
Expand All @@ -647,7 +659,7 @@ def notify(self, notify_type, notify_data):
self.maybe_complete(ev)
elif isinstance(ev, GlobalRecoveryEvent):
global_event = True
ev.global_event_update_progress(data)
ev.global_event_update_progress(data, self.log)
self.maybe_complete(ev)

if not global_event:
Expand Down