Skip to content

Commit

Permalink
grass.grassdb: New Python subpackage for working with locations and m…
Browse files Browse the repository at this point in the history
…apsets (#837)

New package under grass should be a place for all functionality related to manipulating locations and mapsets. The need for this comes specifically from sharing the code between init/grass.py and wxGUI data catalog (and startup window).

This should be the place where things like "locked mapset" should be defined for Python code (whatever the implementation is, the function to check that will be here). It also aspires to be a lightweight Python API equivalent of wxGUI data catalog. With that functions related to raster maps, vector maps, and other actual data manipulations can be included (i.e., manipulations, not computations).

Besides existing functions from from init/grass.py, startup/utils.py, and datacatalog/tree.py, this includes new functions for mapset checking from (currently) unmerged #767 which needs to know ahead of time if mapset is usable for running in it.

Currently, only standalone functions are used (i.e., no classes like trees, etc.). This can change if needed.

grassdb stands for GRASS GIS Spatial Database.

A check for PERMANENT added to rename mapset.

Note the lack of treatment of SKIP_MAPSET_OWN_CHK build flag in the pure Python implementation of ownership check.

The library currently complies with Black and most of Pylint.
  • Loading branch information
wenzeslaus committed Jul 29, 2020
1 parent 939739c commit 0538cf8
Show file tree
Hide file tree
Showing 13 changed files with 438 additions and 280 deletions.
38 changes: 2 additions & 36 deletions gui/wxpython/datacatalog/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import grass.script as gscript
from grass.script import gisenv
from grass.grassdb.data import map_exists
from grass.exceptions import CalledModuleError


Expand Down Expand Up @@ -157,41 +158,6 @@ def getLocationTree(gisdbase, location, queue, mapsets=None):
gscript.try_remove(tmp_gisrc_file)


def map_exists(name, element, env, mapset=None):
"""Check is map is present in the mapset given in the environment
:param name: name of the map
:param element: data type ('raster', 'raster_3d', and 'vector')
:param env environment created by function gscript.create_environment
"""
if not mapset:
mapset = gscript.run_command('g.mapset', flags='p', env=env).strip()
# change type to element used by find file
if element == 'raster':
element = 'cell'
elif element == 'raster_3d':
element = 'grid3'
# g.findfile returns non-zero when file was not found
# se we ignore return code and just focus on stdout
process = gscript.start_command(
'g.findfile',
flags='n',
element=element,
file=name,
mapset=mapset,
stdout=gscript.PIPE,
stderr=gscript.PIPE,
env=env)
output, errors = process.communicate()
info = gscript.parse_key_val(output, sep='=')
# file is the key questioned in grass.script.core find_file()
# return code should be equivalent to checking the output
if info['file']:
return True
else:
return False


class NameEntryDialog(TextEntryDialog):

def __init__(self, element, mapset, env, **kwargs):
Expand All @@ -206,7 +172,7 @@ def OnOK(self, event):
new = self.GetValue()
if not new:
return
if map_exists(new, self._element, self._env, self._mapset):
if map_exists(new, self._element, env=self._env, mapset=self._mapset):
dlg = wx.MessageDialog(
self,
message=_(
Expand Down
4 changes: 3 additions & 1 deletion gui/wxpython/gis_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import wx
import wx.lib.mixins.listctrl as listmix

from grass.grassdb.checks import get_lockfile_if_present

from core.gcmd import GError, RunCommand
from core.utils import GetListOfLocations, GetListOfMapsets
from startup.utils import (
get_lockfile_if_present, get_possible_database_path,
get_possible_database_path,
create_database_directory)
from startup.guiutils import (SetSessionMapset,
create_mapset_interactively,
Expand Down
2 changes: 1 addition & 1 deletion gui/wxpython/location_wizard/wizard.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
from gui_core.wrap import SpinCtrl, SearchCtrl, StaticText, \
TextCtrl, Button, CheckBox, StaticBox, NewId, ListCtrl, HyperlinkCtrl
from location_wizard.dialogs import SelectTransformDialog
from startup.utils import location_exists

from grass.grassdb.checks import location_exists
from grass.script import decode
from grass.script import core as grass
from grass.exceptions import OpenError
Expand Down
11 changes: 8 additions & 3 deletions gui/wxpython/startup/guiutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@

import grass.script as gs
from grass.script import gisenv
from grass.grassdb.checks import mapset_exists, location_exists
from grass.grassdb.create import create_mapset, get_default_mapset_name
from grass.grassdb.manage import (
delete_mapset,
delete_location,
rename_mapset,
rename_location,
)

from core import globalvar
from core.gcmd import GError, GMessage, DecodeString, RunCommand
from gui_core.dialogs import TextEntryDialog
from location_wizard.dialogs import RegionDef
from gui_core.widgets import GenericMultiValidator
from startup.utils import (create_mapset, delete_mapset, delete_location,
rename_mapset, rename_location, mapset_exists,
location_exists, get_default_mapset_name)


def SetSessionMapset(database, location, mapset):
Expand Down
87 changes: 0 additions & 87 deletions gui/wxpython/startup/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@


import os
import shutil
import tempfile
import getpass
import sys
Expand Down Expand Up @@ -88,89 +87,3 @@ def create_database_directory():
pass

return None


def get_lockfile_if_present(database, location, mapset):
"""Return path to lock if present, None otherwise
Returns the path as a string or None if nothing was found, so the
return value can be used to test if the lock is present.
"""
lock_name = ".gislock"
lockfile = os.path.join(database, location, mapset, lock_name)
if os.path.isfile(lockfile):
return lockfile
else:
return None


def create_mapset(database, location, mapset):
"""Creates a mapset in a specified location"""
location_path = os.path.join(database, location)
mapset_path = os.path.join(location_path, mapset)
# create an empty directory
os.mkdir(mapset_path)
# copy DEFAULT_WIND file and its permissions from PERMANENT
# to WIND in the new mapset
region_path1 = os.path.join(location_path, "PERMANENT", "DEFAULT_WIND")
region_path2 = os.path.join(location_path, mapset, "WIND")
shutil.copy(region_path1, region_path2)
# set permissions to u+rw,go+r (disabled; why?)
# os.chmod(os.path.join(database,location,mapset,'WIND'), 0644)


def delete_mapset(database, location, mapset):
"""Deletes a specified mapset"""
if mapset == "PERMANENT":
# TODO: translatable or not?
raise ValueError(
"Mapset PERMANENT cannot be deleted" " (whole location can be)"
)
shutil.rmtree(os.path.join(database, location, mapset))


def delete_location(database, location):
"""Deletes a specified location"""
shutil.rmtree(os.path.join(database, location))


def rename_mapset(database, location, old_name, new_name):
"""Rename mapset from *old_name* to *new_name*"""
location_path = os.path.join(database, location)
os.rename(
os.path.join(location_path, old_name), os.path.join(location_path, new_name)
)


def rename_location(database, old_name, new_name):
"""Rename location from *old_name* to *new_name*"""
os.rename(os.path.join(database, old_name), os.path.join(database, new_name))


def mapset_exists(database, location, mapset):
"""Returns True whether mapset path exists."""
location_path = os.path.join(database, location)
mapset_path = os.path.join(location_path, mapset)
if os.path.exists(mapset_path):
return True
return False


def location_exists(database, location):
"""Returns True whether location path exists."""
location_path = os.path.join(database, location)
if os.path.exists(location_path):
return True
return False


def get_default_mapset_name():
"""Returns default name for mapset."""
try:
defaultName = getpass.getuser()
defaultName.encode("ascii")
except UnicodeEncodeError:
# raise error if not ascii (not valid mapset name)
defaultName = "user"

return defaultName
167 changes: 16 additions & 151 deletions lib/init/grass.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,156 +859,6 @@ def create_location(gisdbase, location, geostring):
fatal(err.value.strip('"').strip("'").replace('\\n', os.linesep))


# TODO: distinguish between valid for getting maps and usable as current
# https://lists.osgeo.org/pipermail/grass-dev/2016-September/082317.html
# interface created according to the current usage
def is_mapset_valid(full_mapset):
"""Return True if GRASS Mapset is valid"""
# WIND is created from DEFAULT_WIND by `g.region -d` and functions
# or modules which create a new mapset. Most modules will fail if
# WIND doesn't exist (assuming that neither GRASS_REGION nor
# WIND_OVERRIDE environmental variables are set).
return os.access(os.path.join(full_mapset, "WIND"), os.R_OK)


def is_location_valid(gisdbase, location):
"""Return True if GRASS Location is valid
:param gisdbase: Path to GRASS GIS database directory
:param location: name of a Location
"""
# DEFAULT_WIND file should not be required until you do something
# that actually uses them. The check is just a heuristic; a directory
# containing a PERMANENT/DEFAULT_WIND file is probably a GRASS
# location, while a directory lacking it probably isn't.
return os.access(os.path.join(gisdbase, location,
"PERMANENT", "DEFAULT_WIND"), os.F_OK)


# basically checking location, possibly split into two functions
# (mapset one can call location one)
def get_mapset_invalid_reason(gisdbase, location, mapset):
"""Returns a message describing what is wrong with the Mapset
The goal is to provide the most suitable error message
(rather than to do a quick check).
:param gisdbase: Path to GRASS GIS database directory
:param location: name of a Location
:param mapset: name of a Mapset
:returns: translated message
"""
full_location = os.path.join(gisdbase, location)
full_mapset = os.path.join(full_location, mapset)
# first checking the location validity
# perhaps a special set of checks with different messages mentioning mapset
# will be needed instead of the same set of messages used for location
location_msg = get_location_invalid_reason(
gisdbase, location, none_for_no_reason=True
)
if location_msg:
return location_msg
# if location is valid, check mapset
elif mapset not in os.listdir(full_location):
return _("Mapset <{mapset}> doesn't exist in GRASS Location <{loc}>. "
"A new mapset can be created by '-c' switch.").format(
mapset=mapset, loc=location)
elif not os.path.isdir(full_mapset):
return _("<%s> is not a GRASS Mapset"
" because it is not a directory") % mapset
elif not os.path.isfile(os.path.join(full_mapset, 'WIND')):
return _("<%s> is not a valid GRASS Mapset"
" because it does not have a WIND file") % mapset
# based on the is_mapset_valid() function
elif not os.access(os.path.join(full_mapset, "WIND"), os.R_OK):
return _("<%s> is not a valid GRASS Mapset"
" because its WIND file is not readable") % mapset
else:
return _("Mapset <{mapset}> or Location <{location}> is"
" invalid for an unknown reason").format(
mapset=mapset, location=location)


def get_location_invalid_reason(gisdbase, location, none_for_no_reason=False):
"""Returns a message describing what is wrong with the Location
The goal is to provide the most suitable error message
(rather than to do a quick check).
By default, when no reason is found, a message about unknown reason is
returned. This applies also to the case when this function is called on
a valid location (e.g. as a part of larger investigation).
``none_for_no_reason=True`` allows the function to be used as part of other
diagnostic. When this function fails to find reason for invalidity, other
the caller can continue the investigation in their context.
:param gisdbase: Path to GRASS GIS database directory
:param location: name of a Location
:param none_for_no_reason: When True, return None when reason is unknown
:returns: translated message or None
"""
full_location = os.path.join(gisdbase, location)
full_permanent = os.path.join(full_location, 'PERMANENT')

# directory
if not os.path.exists(full_location):
return _("Location <%s> doesn't exist") % full_location
# permament mapset
elif 'PERMANENT' not in os.listdir(full_location):
return _("<%s> is not a valid GRASS Location"
" because PERMANENT Mapset is missing") % full_location
elif not os.path.isdir(full_permanent):
return _("<%s> is not a valid GRASS Location"
" because PERMANENT is not a directory") % full_location
# partially based on the is_location_valid() function
elif not os.path.isfile(os.path.join(full_permanent,
'DEFAULT_WIND')):
return _("<%s> is not a valid GRASS Location"
" because PERMANENT Mapset does not have a DEFAULT_WIND file"
" (default computational region)") % full_location
# no reason for invalidity found (might be valid)
if none_for_no_reason:
return None
else:
return _("Location <{location}> is"
" invalid for an unknown reason").format(location=full_location)


def dir_contains_location(path):
"""Return True if directory *path* contains a valid location"""
if not os.path.isdir(path):
return False
for name in os.listdir(path):
if os.path.isdir(os.path.join(path, name)):
if is_location_valid(path, name):
return True
return False


def get_location_invalid_suggestion(gisdbase, location_name):
"""Return suggestion what to do when specified location is not valid
It gives suggestion when:
* A mapset was specified instead of a location.
* A GRASS database was specified instead of a location.
"""
full_path = os.path.join(gisdbase, location_name)
# a common error is to use mapset instead of location,
# if that's the case, include that info into the message
if is_mapset_valid(full_path):
return _(
"<{loc}> looks like a mapset, not a location."
" Did you mean just <{one_dir_up}>?").format(
loc=location_name, one_dir_up=gisdbase)
# confusion about what is database and what is location
elif dir_contains_location(full_path):
return _(
"It looks like <{loc}> contains locations."
" Did you mean to specify one of them?").format(
loc=location_name)
return None


def can_create_location(gisdbase, location):
"""Checks if location can be created"""
path = os.path.join(gisdbase, location)
Expand All @@ -1027,6 +877,8 @@ def cannot_create_location_reason(gisdbase, location):
:param location: name of a Location
:returns: translated message
"""
from grass.grassdb.checks import is_location_valid

path = os.path.join(gisdbase, location)
if is_location_valid(gisdbase, location):
return _("Unable to create new location because"
Expand Down Expand Up @@ -1054,6 +906,14 @@ def set_mapset(gisrc, arg=None, geofile=None, create_new=False,
tmp_location requires tmpdir (which is used as gisdbase)
"""
from grass.grassdb.checks import (
is_mapset_valid,
is_location_valid,
get_mapset_invalid_reason,
get_location_invalid_reason,
get_location_invalid_suggestion,
mapset_exists,
)
# TODO: arg param seems to be always the mapset parameter (or a dash
# in a distant past), refactor
l = arg
Expand Down Expand Up @@ -1116,7 +976,12 @@ def set_mapset(gisrc, arg=None, geofile=None, create_new=False,
if not create_new:
# 'path' is not a valid mapset and user does not
# want to create anything new
fatal(get_mapset_invalid_reason(gisdbase, location_name, mapset))
reason = get_mapset_invalid_reason(gisdbase, location_name, mapset)
if not mapset_exists(gisdbase, location_name, mapset):
suggestion = _("A new mapset can be created using '-c' flag.")
else:
suggestion = _("Maybe you meant a different directory.")
fatal("{reason}\n{suggestion}".format(**locals()))
else:
# 'path' is not valid and the user wants to create
# mapset on the fly
Expand Down

0 comments on commit 0538cf8

Please sign in to comment.