Skip to content

Commit

Permalink
Merged master changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sgeulette committed Feb 2, 2024
2 parents 50788ad + 6e694b2 commit 45140d8
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 43 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Expand Up @@ -8,6 +8,10 @@ Changelog
[gbastien]
- Added `security.setup_app` to be used in run scripts.
[sgeulette]
- Added `setup.load_xml_tool_only_from_package` to load only main tool xml file.
[sgeulette]
- Added `setup.test_remove_gs_step` to remove a generic setup step.
[sgeulette]

0.80 (2023-12-11)
-----------------
Expand Down
100 changes: 79 additions & 21 deletions src/imio/helpers/setup.py
@@ -1,17 +1,55 @@
# -*- coding: utf-8 -*-

from plone import api
from plone.api.exc import InvalidParameterError
from plone.dexterity.fti import DexterityFTI
from Products.GenericSetup.interfaces import IBody
from Products.GenericSetup.utils import importObjects
from zope.component import queryMultiAdapter

import logging


logger = logging.getLogger('imio.helpers.setup')


def load_type_from_package(type_name, profile_id, purge_actions=False):
"""Loads a portal_type from his xml definition.
:param type_name: portal_type id
:param profile_id: package profile id
:param purge_actions: empties type actions
:return: status as boolean
"""
types_tool = api.portal.get_tool('portal_types')
portal_type = types_tool.get(type_name)
if portal_type is None:
logger.error("Cannot find '{}' portal_type name in portal".format(type_name))
return False
ps_tool = api.portal.get_tool('portal_setup')
try:
context = ps_tool._getImportContext(profile_id, True)
except KeyError:
logger.error("Cannot find '{}' profile id".format(profile_id))
return False

# special case for DX FTI, _should_purge is set to False or it fails when purging
if isinstance(portal_type, DexterityFTI):
context._should_purge = False

if purge_actions:
# remove actions so it is reloaded in correct order
portal_type._actions = ()

# ps_tool.applyContext(context) # necessary ?
importObjects(portal_type, 'types/', context)
if portal_type._p_changed is False:
logger.error("Could not update '{}' using profile '{}'".format(type_name, profile_id))
return False
return True


def load_workflow_from_package(wkf_name, profile_id):
"""Updates a workflow from an xml definition.
"""Loads a workflow from his xml definition.
:param wkf_name: workflow id
:param profile_id: package profile id
:return: status as boolean
Expand All @@ -35,35 +73,55 @@ def load_workflow_from_package(wkf_name, profile_id):
return True


def load_type_from_package(type_name, profile_id, purge_actions=False):
"""Updates a portal_type from an xml definition.
:param type_name: portal_type id
def load_xml_tool_only_from_package(tool_name, profile_id):
"""Loads a tool from his xml definition.
:param tool_name: tool id
:param profile_id: package profile id
:return: status as boolean
"""
types_tool = api.portal.get_tool('portal_types')
portal_type = types_tool.get(type_name)
if portal_type is None:
logger.error("Cannot find '{}' portal_type name in portal".format(type_name))
raise NotImplementedError
try:
tool = api.portal.get_tool(tool_name)
except InvalidParameterError:
logger.error("Cannot find '{}' tool name in portal".format(tool_name))
return False
ps_tool = api.portal.get_tool('portal_setup')
try:
context = ps_tool._getImportContext(profile_id, True)
context = ps_tool._getImportContext(profile_id, False) # do not purge !
except KeyError:
logger.error("Cannot find '{}' profile id".format(profile_id))
return False

# special case for DX FTI, _should_purge is set to False or it fails when purging
if isinstance(portal_type, DexterityFTI):
context._should_purge = False

if purge_actions:
# remove actions so it is reloaded in correct order
portal_type._actions = ()

# ps_tool.applyContext(context) # necessary ?
importObjects(portal_type, 'types/', context)
if portal_type._p_changed is False:
logger.error("Could not update '{}' using profile '{}'".format(type_name, profile_id))
importer = queryMultiAdapter((tool, context), IBody)
path = tool_name.replace(' ', '_')
__traceback_info__ = path
if importer:
if importer.name:
path = importer.name
filename = '%s%s' % (path, importer.suffix)
body = context.readDataFile(filename)
if body is not None:
importer.filename = filename # for error reporting
importer.body = body
if tool._p_changed is False:
logger.error("Could not update '{}' using profile '{}'".format(tool_name, profile_id))
return False
return True


def remove_gs_step(step_id, registry='_import_registry'):
"""Remove a step from a generic setup registry.
:param step_id: import step id
:param registry: registry name (default: _import_registry)
:return: status as boolean
"""
ps_tool = api.portal.get_tool('portal_setup')
if not hasattr(ps_tool, registry):
logger.error("Cannot find '{}' registry in portal_setup".format(registry))
return False
registry = getattr(ps_tool, registry)
if step_id in registry.listSteps():
registry.unregisterStep(step_id)
ps_tool._p_changed = True
return True
return False
82 changes: 60 additions & 22 deletions src/imio/helpers/tests/test_setup.py
@@ -1,9 +1,13 @@
# -*- coding: utf-8 -*-
from imio.helpers.setup import load_type_from_package
from imio.helpers.setup import load_workflow_from_package
from imio.helpers.setup import load_xml_tool_only_from_package
from imio.helpers.setup import remove_gs_step
from imio.helpers.testing import IntegrationTestCase
from plone import api

import unittest


class TestSetupModule(IntegrationTestCase):
"""
Expand All @@ -13,28 +17,6 @@ class TestSetupModule(IntegrationTestCase):
def setUp(self):
self.portal = self.layer['portal']

def test_load_workflow_from_package(self):
wkf_tool = api.portal.get_tool('portal_workflow')
wkf_obj = wkf_tool.get('intranet_workflow')
states = wkf_obj.states
self.assertIn('internal', states)
states.deleteStates(['internal'])
self.assertNotIn('internal', states)
self.assertTrue(load_workflow_from_package(
'intranet_workflow', 'profile-Products.CMFPlone:plone'))
wkf_obj = wkf_tool.get('intranet_workflow')
states = wkf_obj.states
self.assertIn('internal', states)
# not found WF
self.assertFalse(load_workflow_from_package(
'intranet_workflow2', 'profile-Products.CMFPlone:plone'))
# not found profile_id
self.assertFalse(load_workflow_from_package(
'intranet_workflow', 'profile-Products.CMFPlone:plone2'))
# WF not managed by given profile_id
self.assertFalse(load_workflow_from_package(
'comment_review_workflow', 'profile-Products.CMFPlone:plone'))

def test_load_type_from_package(self):
types_tool = api.portal.get_tool('portal_types')
portal_type = types_tool.get('Discussion Item')
Expand All @@ -60,3 +42,59 @@ def test_load_type_from_package(self):
# with purge_actions=True
self.assertTrue(load_type_from_package(
'testingtype', 'profile-imio.helpers:testing', purge_actions=True))

def test_load_workflow_from_package(self):
wkf_tool = api.portal.get_tool('portal_workflow')
wkf_obj = wkf_tool.get('intranet_workflow')
states = wkf_obj.states
self.assertIn('internal', states)
states.deleteStates(['internal'])
self.assertNotIn('internal', states)
self.assertTrue(load_workflow_from_package(
'intranet_workflow', 'profile-Products.CMFPlone:plone'))
wkf_obj = wkf_tool.get('intranet_workflow')
states = wkf_obj.states
self.assertIn('internal', states)
# not found WF
self.assertFalse(load_workflow_from_package(
'intranet_workflow2', 'profile-Products.CMFPlone:plone'))
# not found profile_id
self.assertFalse(load_workflow_from_package(
'intranet_workflow', 'profile-Products.CMFPlone:plone2'))
# WF not managed by given profile_id
self.assertFalse(load_workflow_from_package(
'comment_review_workflow', 'profile-Products.CMFPlone:plone'))

@unittest.skip('Not working')
def test_load_xml_tool_only_from_package(self):
wkf_tool = api.portal.get_tool('portal_workflow')
wkf_obj = wkf_tool.get('intranet_workflow')
states = wkf_obj.states
self.assertIn('internal', states)
self.assertTupleEqual(wkf_tool.getChainForPortalType('Image'), ())
self.assertTupleEqual(wkf_tool.getChainForPortalType('Document'), ('intranet_workflow',))
# some changes
states.deleteStates(['internal'])
self.assertNotIn('internal', states)
wkf_tool.setChainForPortalTypes(('Image',), ('one_state_workflow',))
wkf_tool.setChainForPortalTypes(('Document',), ('one_state_workflow',))
self.assertTupleEqual(wkf_tool.getChainForPortalType('Image'), ('one_state_workflow',))
self.assertTupleEqual(wkf_tool.getChainForPortalType('Document'), ('one_state_workflow',))
# we reload default config
self.assertTrue(load_xml_tool_only_from_package('portal_workflow', 'profile-Products.CMFPlone:plone'))
self.assertTupleEqual(wkf_tool.getChainForPortalType('Image'), ()) # chain is reset
self.assertTupleEqual(wkf_tool.getChainForPortalType('Document'), ('one_state_workflow',)) # not in xml
wkf_obj = wkf_tool.get('intranet_workflow')
states = wkf_obj.states
self.assertNotIn('internal', states) # be sure it's not recursive
# not found tool
self.assertFalse(load_xml_tool_only_from_package('portal_workflow2', 'profile-Products.CMFPlone:plone'))
# not found profile_id
self.assertFalse(load_xml_tool_only_from_package('portal_workflow', 'profile-Products.CMFPlone:plone2'))

def test_remove_gs_step(self):
ps_tool = api.portal.get_tool('portal_setup')
step_id = u'mockmailhost-various'
self.assertIn(step_id, ps_tool.getSortedImportSteps())
self.assertTrue(remove_gs_step(step_id))
self.assertNotIn(step_id, ps_tool.getSortedImportSteps())
1 change: 1 addition & 0 deletions test-4.3.cfg
Expand Up @@ -48,6 +48,7 @@ ipdb = 0.13.11
iw.debug = 0.3
pdbpp = 0.10.3
plone.versioncheck = 1.7.0
soupsieve = 1.9.2

# Required by:
# ipdb
Expand Down

0 comments on commit 45140d8

Please sign in to comment.