Skip to content

Commit

Permalink
introducing integrity
Browse files Browse the repository at this point in the history
  • Loading branch information
dparalen committed Apr 20, 2018
1 parent 13c49cd commit 1c4ae8a
Show file tree
Hide file tree
Showing 4 changed files with 391 additions and 0 deletions.
Empty file.
202 changes: 202 additions & 0 deletions server/pulp/integrity/generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import functools
import os
from os import path

from pulp.server import util
from pulp.server.db import model
from pulp.plugins.util import verification
import validator

from pulp_rpm.plugins.distributors.yum import configuration as yum_config


class MissingStoragePath(validator.ValidationError):
def __init__(self, unit, exc=None):
msg = 'Unit %(u)s is missing (or unreadable) from the storage path: %(p)s.' % {
'u': unit,
'p': unit.storage_path,
}
if exc:
msg += ' Original error: %s.' % exc
super(MissingStoragePath, self).__init__(msg)


class InvalidStoragePathSize(validator.ValidationError):
def __init__(self, unit):
msg = (
'Unit %(u)s, storage path %(p)s has invalid size: %(s)d. '
'Expected: %(e)d.'
) % {
'u': unit,
'p': unit.storage_path,
's': path.getsize(unit.storage_path),
'e': unit.size,
}
super(InvalidStoragePathSize, self).__init__(msg)


class InvalidStoragePathChecksum(validator.ValidationError):
def __init__(self, unit):
msg = 'Unit %(u)s, storage path %(p)s has an invalid checksum.' % {
'u': unit,
'p': unit.storage_path,
}
super(InvalidStoragePathChecksum, self).__init__(msg)


class BrokenSymlink(validator.ValidationError):
def __init__(self, unit, repo, link):
msg = 'Unit %(u)s, has a broken symlink %(l)s in repo %(r)s.' % {
'u': unit,
'l': link,
'r': repo.repo_id,
}
super(BrokenSymlink, self).__init__(msg)


class DownloadedFileContentUnitValidator(validator.MultiValidator):
def applicable(self, unit):
return (
super(DownloadedFileContentUnitValidator, self).applicable(unit) and
isinstance(unit, model.FileContentUnit) and
unit.downloaded
)


class ExistenceValidator(DownloadedFileContentUnitValidator):
@validator.MultiValidator.affects_repositories
def validate(self, unit, *args):
if not path.exists(unit.storage_path):
raise MissingStoragePath(unit)


class SizeValidator(DownloadedFileContentUnitValidator):
@validator.MultiValidator.affects_repositories
def validate(self, unit, *args):
try:
unit.verify_size(unit.storage_path)
except IOError as exc:
raise MissingStoragePath(unit, exc=exc)
except verification.VerificationException as exc:
raise InvalidStoragePathSize(unit)


class ChecksumValidator(DownloadedFileContentUnitValidator):
@validator.MultiValidator.affects_repositories
def validate(self, unit, *args):
try:
with open(unit.storage_path, 'rb') as fd:
checksums = util.calculate_checksums(fd, [unit.checksumtype])
except IOError as exc:
raise MissingStoragePath(unit, exc=exc)
if unit.checksum != checksums[unit.checksumtype]:
raise InvalidStoragePathChecksum(unit)


class DarkContentValidator(validator.Validator):
# FIXME(milan): POC; instantiating this is going to take considerable time
# and memory; couple of dozens of megabytes most likely but is supposed to be used
# once throughout the validation

# provides: Existence validation, DarkContent validation
# requires: None

def applicable(self, unit):
return (
super(DarkContentValidator, self).applicable(unit) and
isinstance(unit, model.FileContentUnit) and
unit.downloaded
)

def __init__(self):
super(DarkContentValidator, self).__init__()
self.paths = set()
# FIXME(milan): where is this path defined?
for dirpath, dirnames, filenames in os.walk('/var/lib/pulp/content'):
for filename in filenames:
self.paths.add(path.join(dirpath, filename))

def validate(self, unit, *args):
try:
self.paths.remove(unit.storage_path)
except KeyError as exc:
# FIXME(milan): double remove?
raise MissingStoragePath(unit, exc=exc)

@property
def results(self):
for storage_path in self.paths:
yield validator.DarkPath(self, storage_path)


class BrokenSymlinksValidator(validator.MultiValidator):
def __init__(self):
# the amount of repositories&distributors is small compared to the amount of units
self.repo_cache = {}
self.current_repositories = None
self.http_publish_dir = yum_config.get_http_publish_dir()
self.https_publish_dir = yum_config.get_https_publish_dir()

def set_current_repositories(self, unit):
self.current_repositories = unit.get_repositories()

def get_distributors(self):
for repository in self.current_repositories:
self.repo_cache[repository] = {}
try:
distributor = self.repo_cache[repository]['distributor']
except KeyError:
self.repo_cache[repository]['distributor'] = model.Distributor.objects.get(
repo_id=repository.repo_id,
distributor_type_id='yum_distributor'
)
distributor = self.repo_cache[repository]['distributor']
yield distributor, repository

def applicable(self, unit):
# applicable to published repositories only
self.set_current_repositories(unit)
return isinstance(unit, model.FileContentUnit)

def check_link(self, unit, repository, link, check_func):
return (
check_func(link) and validator.ValidationSuccess(self, unit) or
validator.ValidationFailure(self, unit, repository,
BrokenSymlink(unit, repository, link))
)

def validate(self, unit, *args):
check_func = (
unit.downloaded and
functools.partial(os.path.samefile, unit.storage_path) or
os.path.lexists
)
symlink_name = unit.get_symlink_name()
for distributor, repository in self.get_distributors():
if not distributor.last_publish:
yield validator.ValidationNotApplicable(unit, self)
continue
if distributor.config.get('http', False):
link = os.paht.join(self.http_publish_dir, distributor.repo_id, symlink_name)
yield self.check_link(unit, repository, link, check_func)
if distributor.config.get('https', False):
link = os.path.join(self.https_publish_dir, distributor.repo_id, symlink_name)
yield self.check_link(unit, repository, link, check_func)


class AffectedRepositoriesValidator(validator.CumulativeValidator):
def __init__(self):
self.repositories = set()

def validate(self, unit, validation):
for result in validation.results:
if not result:
try:
self.repositories.add(result.repository)
except AttributeError:
continue

@property
def results(self):
for repository in self.repositories:
yield validator.BrokenRepository(self, repository)
55 changes: 55 additions & 0 deletions server/pulp/integrity/integrity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from generic import ChecksumValidator, DarkContentValidator, ExistenceValidator, SizeValidator, AffectedRepositoriesValidator, BrokenSymlinksValidator
from validator import Validation, ValidationFailure

from pulp.plugins.loader import manager
from pulp.server.db.connection import initialize

import pulp.server.db.model as model

import json


def default(thing):
if isinstance(thing, model.Repository):
return thing.repo_id
return str(thing)


def result_to_json(resut):
return json.dumps(result._asdict(), default=default)


if __name__ == '__main__':
initialize('pulp_database')
ev = ExistenceValidator()
dcv = DarkContentValidator()
arv = AffectedRepositoriesValidator()
sv = SizeValidator()
cv = ChecksumValidator()
pm = manager.PluginManager()
bsv = BrokenSymlinksValidator()
report = {
'unit_failures': [],
'dark_matter': [],
'broken_repos': [],
}

validators_struct = [[[[[ev], dcv], sv], cv], bsv, arv]

for modelname in pm.unit_models:
for unit in pm.unit_models[modelname].objects:
validation = Validation.from_iterable(validators_struct)
# always check for affected repositories
validation.operation = lambda x: bool(len(x))
# calculate unit validation results
validation(unit)
for result in validation.results:
if not bool(result):
report['unit_failures'].append(result._asdict())

for result in dcv.results:
report['dark_matter'].append(result._asdict())
for result in arv.results:
report['broken_repos'].append(result._asdict())

print(json.dumps(report, default=default, indent=True))
134 changes: 134 additions & 0 deletions server/pulp/integrity/validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from collections import namedtuple
import functools

class ValidationError(Exception):
"""A generic validation error."""

# TODO(milan): might require some form of inheritance

ValidationFailure = namedtuple('ValidationFailure', ('validator', 'unit', 'repository', 'error'))
# this is False in the bool context
ValidationFailure.__nonzero__ = staticmethod(lambda: False)
ValidationNotApplicable = namedtuple('ValidationNotApplicable', ('validator', 'unit'))
ValidationSuccess = namedtuple('ValidationSuccess', ('validator', 'unit'))
DarkPath = namedtuple('DarkPath', ('validator', 'path'))
DarkPath.__nonzero__ = staticmethod(lambda: False)
BrokenRepository = namedtuple('BrokenRepository', ('validator', 'repository'))
BrokenRepository.__nonzero__ = staticmethod(lambda: False)


class Validator(object):
# Applicable to everything explicitly
applicable = staticmethod(lambda unused: True)

def __call__(self, unit, validation):
"""Perform a sanitized validation.
Appends a result to validation.results.
:param unit: a content unit to validate
:type unit: pulp.server.db.model.FileContentUnit
"""
if not self.applicable(unit):
validation.results.append(ValidationNotApplicable(self, unit))

try:
self.validate(unit, validation)
except ValidationError as exc:
validation.results.append(ValidationFailure(self, unit, exc))
else:
validation.results.append(ValidationSuccess(self, unit))

def validate(self, unused):
"""To be overridden."""
pass

def __repr__(self):
return type(self).__name__

__reduce__ = __repr__


class CumulativeValidator(Validator):
@property
def results(self):
raise NotImplementedError('TODO')


class MultiValidator(Validator):
def __call__(self, unit, validation):
"""Perform a sanitized validation.
Extends the valiation.results with calculated results.
:param unit: a content unit to validate
:type unit: pulp.server.db.model.FileContentUnit
"""
if not self.applicable(unit):
validation.results.append(ValidationNotApplicable(self, unit))
return

for result in self.validate(unit, validation):
validation.results.append(result)

@staticmethod
def affects_repositories(func):
def inner(self, unit, *args, **kwargs):
try:
func(self, unit, *args, **kwargs)
except ValidationError as exc:
for repository in unit.get_repositories():
yield ValidationFailure(self, unit, repository, exc)
else:
yield ValidationSuccess(self, unit)
finally:
raise StopIteration()
return inner


class Validation(object):
"""Chain&nest validators."""
def __init__(self, children=None, validators=None, operation=all):
self.children = children or []
self.validators = validators or []
self._visited = False
# all the children share a common, flat results list eventually
# the order of results is given by the order of the children walk
self.results = None
self.operation = operation

@classmethod
def from_iterable(cls, iterable):
validators = []
children = []
for item in iterable:
try:
# add a child validation
children.append(cls.from_iterable(item))
except TypeError:
# add a leaf (validator)
validators.append(item)
return cls(validators=validators, children=children)

def __iter__(self):
# A DFS children iteration
if self._visited:
raise StopIteration('Already visited')
self._visited = True

for child in self.children:
yield child

def __call__(self, unit, results=None):
if results is None:
results = []

self.results = results
for validation in self:
validation(unit, results)
# fast forward in case of nested errors
if not self.operation(self.results):
return self.results

for validator in self.validators:
validator(unit, self)

0 comments on commit 1c4ae8a

Please sign in to comment.