Skip to content

Commit

Permalink
Migrate to single-quoted f-string literals everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp91 committed Aug 14, 2022
1 parent c80f8c3 commit c941174
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 53 deletions.
32 changes: 17 additions & 15 deletions digikam_db.py
Expand Up @@ -23,11 +23,11 @@ class DigikamDb(object):

def __init__(self, file: Path):
self.file = file
logging.debug("file=%s" % file)
logging.debug(f'file={file}')
try:
self.conn = sqlite3.connect(file)
except sqlite3.OperationalError as err:
raise RuntimeError('Failed to open SQLite database from %s.' % file) from err
raise RuntimeError(f'Failed to open SQLite database from {file}.') from err

if os.name == 'nt': # Windows
import win32api # From the pywin32 PIP package.
Expand All @@ -37,7 +37,7 @@ def __init__(self, file: Path):
if serial < 0:
serial = serial + (1 << 32) # Convert int32 to uint32
serial_to_mountpoints.setdefault(serial, set()).add(sdiskpart.mountpoint)
logging.debug('serial_to_mountpoints=%s' % serial_to_mountpoints)
logging.debug(f'serial_to_mountpoints={serial_to_mountpoints}')
def volume_uuid_to_mountpoints(uuid: str) -> Set[str]:
# On Windows, digiKam uses the serial number in hex format as the UUID:
# https://invent.kde.org/frameworks/solid/-/blob/006e013d18c20cf2c98cf1776d768476978a1a63/src/solid/devices/backends/win/winstoragevolume.cpp#L57
Expand All @@ -46,7 +46,7 @@ def volume_uuid_to_mountpoints(uuid: str) -> Set[str]:
dev_to_mountpoints: Dict[str, Set[str]] = {}
for sdiskpart in psutil.disk_partitions():
dev_to_mountpoints.setdefault(sdiskpart.device, set()).add(sdiskpart.mountpoint)
logging.debug('dev_to_mountpoints=%s' % dev_to_mountpoints)
logging.debug(f'dev_to_mountpoints={dev_to_mountpoints}')
def volume_uuid_to_mountpoints(uuid: str) -> Set[str]:
# On Unix, we use a trick with realpath and /dev/disk/by-uuid' to find the main mount point.
return dev_to_mountpoints[os.path.realpath(Path('/dev/disk/by-uuid') / uuid.upper())]
Expand All @@ -55,9 +55,10 @@ def volume_uuid_to_mountpoints(uuid: str) -> Set[str]:
for row in self.conn.cursor().execute('SELECT id, type, identifier, specificPath FROM AlbumRoots WHERE status = 0'):
id, type, identifier, specific_path = row
if type != 1 and type != 2 and type != 3: # 0=Undefined, 1=VolumeHardWired, 2=VolumeRemovable, 3=Network
logging.info('Skipping album %s at %s on %s because it is not recognized disk type' % (id, specific_path, identifier))
logging.info(
f'Skipping album {id} at {specific_path} on {identifier} because it is not recognized disk type')
continue
logging.debug('id=%s specific_path=%s identifier=%s' % (id, specific_path, identifier))
logging.debug(f'id={id} specific_path={specific_path} identifier={identifier}')
if identifier.startswith('volumeid:?uuid='):
if specific_path.startswith('/'):
specific_path = specific_path[1:]
Expand All @@ -68,9 +69,9 @@ def volume_uuid_to_mountpoints(uuid: str) -> Set[str]:
elif identifier.startswith('networkshareid:?mountpath='):
self.album_roots[identifier[26:]] = id
else:
raise ValueError('Unsupported volume type %s' % identifier)
raise ValueError(f'Unsupported volume type {identifier}')

logging.debug('album_roots=%s' % self.album_roots)
logging.debug(f'album_roots={self.album_roots}')

self.person_root_tag = self._detect_person_root_tag()
self.internal_tags_id = self.find_tag(0, _INTERNAL_ROOT_TAG_NAME)
Expand Down Expand Up @@ -100,9 +101,10 @@ def find_album_by_dir(self, path: Path) -> Optional[int]:
album_id = self._fetchcell('SELECT id FROM Albums WHERE albumRoot = ? AND relativePath = ?',
(root_id, relative_path))
if album_id is None:
logging.warning('No digiKam Album found for %s (relative path %s) under root %s' % (path, relative_path, root_id))
logging.warning(
f'No digiKam Album found for {path} (relative path {relative_path}) under root {root_id}')
return album_id
raise ValueError('No digiKam AlbumRoot found for %s, only have %s' % (path, self.album_roots))
raise ValueError(f'No digiKam AlbumRoot found for {path}, only have {self.album_roots}')

def get_album_images(self, album_id: int) -> Dict[str, id]:
"""Returns a dict from filename to id in Images."""
Expand All @@ -114,9 +116,9 @@ def get_image_size(self, image_id: int) -> Tuple[int, int, int]:
cur.execute('SELECT width, height, orientation FROM ImageInformation WHERE imageid = ?', (image_id,))
row = cur.fetchone()
if row is None:
raise ValueError('Image with ID %s not found' % image_id)
raise ValueError(f'Image with ID {image_id} not found')
if not row[0] or not row[1]:
raise ValueError('Size of image with ID %s is not in the database' % image_id)
raise ValueError(f'Size of image with ID {image_id} is not in the database')
return row[0], row[1], row[2]

def find_tag(self, parent_tag: int, name: str) -> Optional[int]:
Expand All @@ -128,7 +130,7 @@ def find_or_create_tag(self, parent_tag: int, name: str, dry_run: bool) -> int:
tag_id = self.find_tag(parent_tag, name)
if tag_id:
return tag_id
logging.info('Creating digiKam tag %s' % name)
logging.info(f'Creating digiKam tag {name}')
if dry_run:
return -1 # Pretend we created it
self.conn.execute('INSERT INTO Tags (pid, name) VALUES (?, ?)', (parent_tag, name))
Expand Down Expand Up @@ -162,7 +164,7 @@ def find_or_create_person_tag(self, person_name: str, dry_run: bool) -> int:
tag_id = self.find_person_tag(person_name)
if tag_id:
return tag_id
logging.info('Creating digiKam person tag %s' % person_name)
logging.info(f'Creating digiKam person tag {person_name}')
if dry_run:
return -1 # Pretend we created it
self.conn.execute('INSERT INTO Tags (pid, name) VALUES (?, ?)', (self.person_root_tag, person_name))
Expand All @@ -187,7 +189,7 @@ def image_has_property(self, image_id: int, propname: str, value: str) -> bool:
def image_has_pick_tag(self, image_id: int) -> bool:
"""Returns true if the given image has any of the (four) "Pick" tags."""
return self._fetchcell(
'SELECT tagid FROM ImageTags WHERE imageid = ? AND tagid IN (%s)' % ','.join('?' * len(self.pick_tags)),
f'SELECT tagid FROM ImageTags WHERE imageid = ? AND tagid IN ({",".join("?" * len(self.pick_tags))})',
(image_id,) + tuple(self.pick_tags)) is not None

def add_image_tag(self, image_id: int, tag_id: int) -> bool:
Expand Down
8 changes: 4 additions & 4 deletions main.py
Expand Up @@ -24,8 +24,8 @@ def init_argparse() -> ArgumentParser:
parser.add_argument('--dry_run', action='store_true')
parser.add_argument('--verbose', '-v', action='count', default=0,
help='Log verbosity. Pass -vv to see debug output.')
parser.add_argument('--skip_same_rect', action=argparse.BooleanOptionalAction,
help="Skip or not to skip adding face to digiKam if it already has that rectangle defined")
parser.add_argument('--skip_same_rect', action=argparse.BooleanOptionalAction,
help='Skip or not to skip adding face to digiKam if it already has that rectangle defined')
return parser


Expand All @@ -42,8 +42,8 @@ def main() -> None:
log_handler.setLevel(30 - (10 * args.verbose))

if not args.dry_run:
backup_path = '%s.bak.%i' % (args.digikam_db, time.time())
logging.info('Creating database backup at %s')
backup_path = f'{args.digikam_db}.bak.{time.time():d}'
logging.info(f'Creating database backup at {backup_path}')
shutil.copyfile(args.digikam_db, backup_path)

logging.info('Inspecting existing digiKam database')
Expand Down
66 changes: 34 additions & 32 deletions migrator.py
Expand Up @@ -36,7 +36,7 @@ def learn_contact_ids(input_dir: Path, ini_file_name: str, contact_id_2_nameset:
for contact_id, value in ini['Contacts2'].items():
person_name = value.split(';')[0]
if contact_id not in contact_id_2_nameset:
logging.info(f"Learned name for {contact_id}='{person_name}' from {ini_file}")
logging.info(f'Learned name for {contact_id}=\'{person_name}\' from {ini_file}')
contact_id_2_nameset[contact_id] = {person_name}
else:
if person_name not in contact_id_2_nameset[contact_id]:
Expand All @@ -48,9 +48,9 @@ def learn_contact_ids(input_dir: Path, ini_file_name: str, contact_id_2_nameset:
if 'Contacts' in ini:
for contact_id, value in ini['Contacts'].items():
picasa_name_hash = value.split(',')[1]
person_name = f".NoName-{picasa_name_hash}"
person_name = f'.NoName-{picasa_name_hash}'
if contact_id not in contact_id_2_nameset:
logging.info(f"Learned old picasa name hash for {contact_id}='{person_name}' from {ini_file}")
logging.info(f'Learned old picasa name hash for {contact_id}=\'{person_name}\' from {ini_file}')
contact_id_2_nameset[contact_id] = {person_name}

def migrate_directories_under(input_root_dir: Path,
Expand Down Expand Up @@ -85,9 +85,9 @@ def migrate_directories_under(input_root_dir: Path,
for ini_file in ( _PICASA_INI_FILE , _OLD_PICASA_INI_FILE):
learn_contact_ids(input_dir,ini_file,contact_id_2_nameset)

global_names = {k: "|".join(sorted(v)) for k,v in contact_id_2_nameset.items()}
global_names = {k: '|'.join(sorted(v)) for k, v in contact_id_2_nameset.items()}

logging.debug(f"global_names={global_names}")
logging.debug(f'global_names={global_names}')

contact_tags_per_dir: Dict[Path, ContactTags] = {}

Expand All @@ -101,9 +101,9 @@ def migrate_directories_under(input_root_dir: Path,
if any([_is_photo_file(file) for file in files]):
logging.warning(f'Found photos but no .ini in {dir}')
else:
logging.warning(f"No photos and no .ini file in {dir}")
logging.warning(f'No photos and no .ini file in {dir}')
continue
logging.debug(f"Processing {Path(dir/ini_file)}")
logging.debug(f'Processing {Path(dir / ini_file)}')
contact_tags_per_dir[dir] = migrate_directory(dir, files, db,
contact_tags_per_dir, global_names,
dry_run=dry_run, ini_file_name=ini_file,
Expand All @@ -120,9 +120,9 @@ def migrate_directory(input_dir: Path, files: List[str], db: DigikamDb,
"""Migrates metadata of all photo files in the given directory."""
logging.info('===========================================================================================')
if input_dir.name == '.picasaoriginals':
logging.info('Skipping %s' % input_dir)
logging.info(f'Skipping {input_dir}')
return {}
logging.info('Now migrating %s' % input_dir)
logging.info(f'Now migrating {input_dir}')

# Find digiKam album.
album_id = db.find_album_by_dir(input_dir)
Expand All @@ -141,7 +141,7 @@ def migrate_directory(input_dir: Path, files: List[str], db: DigikamDb,
album_to_tag = _map_albums_to_tags(ini, db, used_ini_sections, dry_run=dry_run)

self_contact_to_tag = _map_contacts_to_tags(ini['Contacts2'], db, dry_run=dry_run) if 'Contacts2' in ini else {}
logging.debug('self_contact_to_tag=%s' % self_contact_to_tag)
logging.debug(f'self_contact_to_tag={self_contact_to_tag}')

# Merge contacts declared in parent ini files.
contact_to_tag = self_contact_to_tag.copy()
Expand All @@ -165,20 +165,20 @@ def migrate_directory(input_dir: Path, files: List[str], db: DigikamDb,
skip_same_rect=skip_same_rect, prioritize_global_names=prioritize_global_names,
dry_run=dry_run)
except Exception as e:
logging.error(f"Exception: {e}")
logging.error(f'Exception: {e}')
logging.error(traceback.format_exc())
raise RuntimeError('Error when processing %s' % (input_dir / filename)) from e
raise RuntimeError(f'Error when processing {input_dir / filename}') from e

# Make sure we actually read all the data from the ini file.
unused_ini_sections = set(ini.sections()) - used_ini_sections
unused_photo_sections = {section for section in unused_ini_sections if _is_photo_file(section)}
if unused_photo_sections:
logging.warning(('Some files have metadata in %s but are gone ' +
'(probably fine, they might have been deleted or moved elsewhere on purpose): %s')
% (ini_file, unused_photo_sections))
logging.warning(
f'Some files have metadata in {ini_file} but are gone (probably fine, they might have been ' +
f'deleted or moved elsewhere on purpose): {unused_photo_sections}')
unused_ini_sections -= unused_photo_sections
if unused_ini_sections:
logging.warning('Unused INI sections in %s: %s' % (ini_file, unused_ini_sections))
logging.warning(f'Unused INI sections in {ini_file}: {unused_ini_sections}')

return self_contact_to_tag # For use in subdirectories

Expand All @@ -196,17 +196,17 @@ def migrate_file(filename: str, image_id: int, ini_section: configparser.Section
used_ini_keys.add('star')
if db.image_has_pick_tag(image_id):
logging.warning(
'Not applying star label to %s (%s) because it already has a Pick label' % (image_id, filename))
f'Not applying star label to {image_id} ({filename}) because it already has a Pick label')
else:
logging.debug('Applying star label to %s (%s)' % (image_id, filename))
logging.debug(f'Applying star label to {image_id} ({filename})')
if not dry_run:
db.star_image(image_id)

albums = ini_section.get('albums')
if albums:
used_ini_keys.add('albums')
for album_id in albums.split(','):
logging.debug('Adding album %s to image %s (%s)' % (album_id, image_id, filename))
logging.debug(f'Adding album {album_id} to image {image_id} ({filename})')
if not dry_run:
db.add_image_tag(image_id, album_to_tag[album_id])

Expand All @@ -216,7 +216,7 @@ def migrate_file(filename: str, image_id: int, ini_section: configparser.Section
if filename.lower().endswith('.psd'):
# Note: digiKam doesn't seem to know the size of PSD files and thus also
# can't place face tags on them.
logging.warning('Skipping faces on %s because of PSD format' % filename)
logging.warning(f'Skipping faces on {filename} because of PSD format')
else:
for face_data in faces.split(';'):
migrate_face(image_id, filename, face_data, db, contact_to_tag, global_names,
Expand All @@ -226,7 +226,7 @@ def migrate_file(filename: str, image_id: int, ini_section: configparser.Section

unused_ini_keys = set(ini_section.keys()) - used_ini_keys
if unused_ini_keys:
logging.warning('Unused INI keys for %s: %s' % (filename, unused_ini_keys))
logging.warning(f'Unused INI keys for {filename}: {unused_ini_keys}')


def migrate_face(image_id: int,
Expand Down Expand Up @@ -254,28 +254,28 @@ def migrate_face(image_id: int,
if contact_id not in global_names:
# This can happen often if not using contacts.xml
# Add to global
person_name = f".NoName-{contact_id}-from-rect64"
person_name = f'.NoName-{contact_id}-from-rect64'
logging.info(f'Learned {person_name} from a rect64 tag belonging to {filename}')
global_names[contact_id] = person_name
tag_id = db.find_or_create_person_tag(global_names[contact_id], dry_run=dry_run)
contact_to_tag[contact_id] = tag_id;
contact_to_tag[contact_id] = tag_id
else:
# global_names (learned from contacts.xml) has higher priority
if (contact_id in global_names):
if contact_id in global_names:
tag_id = db.find_or_create_person_tag(global_names[contact_id], dry_run=dry_run)
contact_to_tag[contact_id] = tag_id;
elif (contact_id in contact_to_tag):
contact_to_tag[contact_id] = tag_id
elif contact_id in contact_to_tag:
tag_id = contact_to_tag[contact_id]
else:
person_name = f".NoName-{contact_id}-from-rect64"
person_name = f'.NoName-{contact_id}-from-rect64'
logging.info(f'Learned {person_name} from a rect64 tag belonging to {filename}')
global_names[contact_id] = person_name
tag_id = db.find_or_create_person_tag(person_name, dry_run=dry_run)
contact_to_tag[contact_id] = tag_id;
contact_to_tag[contact_id] = tag_id

if db.image_has_tag(image_id, tag_id):
logging.warning(
'Not applying face %s (%s) to %s (%s) because it already has that face tag' % (tag_id, contact_id, image_id, filename))
f'Not applying face {tag_id} ({contact_id}) to {image_id} ({filename}) because it already has that face tag')
return

# Convert the rectangle.
Expand All @@ -289,7 +289,9 @@ def migrate_face(image_id: int,
# and then run this script again, you end up with another rect mapped to Picasa's name -- it's a conundrum..
if db.image_has_property(image_id, _FACE_TAG_REGION_PROPERTY, digikam_rect):
if skip_same_rect is None:
raise RuntimeError(f"digiKam already has face rectangle {digikam_rect} defined. Please specify what to do by running with argument --skip_same_rect or --no-skip_same_rect")
raise RuntimeError(
f'digiKam already has face rectangle {digikam_rect} defined. ' +
'Please specify what to do by running with argument --skip_same_rect or --no-skip_same_rect')
elif skip_same_rect:
logging.warning(
f'Not applying face {tag_id} ({contact_id}) to {image_id} ({filename}) because it already has that face rectangle')
Expand All @@ -316,7 +318,7 @@ def _map_albums_to_tags(
album_id = section_name[7:]
section = ini[section_name]
if not 'name' in section:
logging.debug('Skipping unnamed album %s' % album_id)
logging.debug(f'Skipping unnamed album {album_id}')
continue
assert section['name']
used_ini_sections.add(section_name)
Expand All @@ -334,5 +336,5 @@ def _map_contacts_to_tags(
for contact_id, value in contacts_section.items():
person_name = value.split(';')[0]
result[contact_id] = db.find_or_create_person_tag(person_name, dry_run=dry_run)
logging.debug("person_name=%s contact_id=%s tag=%s" % (person_name, contact_id, result[contact_id]))
logging.debug(f'person_name={person_name} contact_id={contact_id} tag={result[contact_id]}')
return result
4 changes: 2 additions & 2 deletions rect64.py
Expand Up @@ -5,7 +5,7 @@
def parse_rect64(data: str) -> Tuple[float, float, float, float]: # left, top, right, bottom
"""https://gist.github.com/fbuchinger/1073823#file-picasa-ini-L147-L160"""
# Strip the rect64() from the outside.
assert data.startswith("rect64(") and data.endswith(")"), input
assert data.startswith('rect64(') and data.endswith(')'), input
data = data[7:-1]
assert len(data) >= 1
data = data.zfill(16) # Zeros in front, as Picasa abbreviates.
Expand Down Expand Up @@ -48,7 +48,7 @@ def to_digikam_rect(image_size: Tuple[int, int, int], rect: Tuple[float, float,
y1 = 1 - right
y2 = 1 - left
else:
raise ValueError('Unsupported orientation %s' % orientation)
raise ValueError(f'Unsupported orientation {orientation}')

return '<rect x="{:d}" y="{:d}" width="{:d}" height="{:d}"/>'.format(
int(width * x1),
Expand Down

0 comments on commit c941174

Please sign in to comment.