diff --git a/digikam_db.py b/digikam_db.py index cce980d..7eee5c3 100755 --- a/digikam_db.py +++ b/digikam_db.py @@ -23,11 +23,11 @@ class DigikamDb(object): def __init__(self, file: Path): self.file = file - logging.debug("file=%s" % file) + logging.debug(f'file={file}') try: self.conn = sqlite3.connect(file) except sqlite3.OperationalError as err: - raise RuntimeError('Failed to open SQLite database from %s.' % file) from err + raise RuntimeError(f'Failed to open SQLite database from {file}.') from err if os.name == 'nt': # Windows import win32api # From the pywin32 PIP package. @@ -37,7 +37,7 @@ def __init__(self, file: Path): if serial < 0: serial = serial + (1 << 32) # Convert int32 to uint32 serial_to_mountpoints.setdefault(serial, set()).add(sdiskpart.mountpoint) - logging.debug('serial_to_mountpoints=%s' % serial_to_mountpoints) + logging.debug(f'serial_to_mountpoints={serial_to_mountpoints}') def volume_uuid_to_mountpoints(uuid: str) -> Set[str]: # On Windows, digiKam uses the serial number in hex format as the UUID: # https://invent.kde.org/frameworks/solid/-/blob/006e013d18c20cf2c98cf1776d768476978a1a63/src/solid/devices/backends/win/winstoragevolume.cpp#L57 @@ -46,7 +46,7 @@ def volume_uuid_to_mountpoints(uuid: str) -> Set[str]: dev_to_mountpoints: Dict[str, Set[str]] = {} for sdiskpart in psutil.disk_partitions(): dev_to_mountpoints.setdefault(sdiskpart.device, set()).add(sdiskpart.mountpoint) - logging.debug('dev_to_mountpoints=%s' % dev_to_mountpoints) + logging.debug(f'dev_to_mountpoints={dev_to_mountpoints}') def volume_uuid_to_mountpoints(uuid: str) -> Set[str]: # On Unix, we use a trick with realpath and /dev/disk/by-uuid' to find the main mount point. return dev_to_mountpoints[os.path.realpath(Path('/dev/disk/by-uuid') / uuid.upper())] @@ -55,9 +55,10 @@ def volume_uuid_to_mountpoints(uuid: str) -> Set[str]: for row in self.conn.cursor().execute('SELECT id, type, identifier, specificPath FROM AlbumRoots WHERE status = 0'): id, type, identifier, specific_path = row if type != 1 and type != 2 and type != 3: # 0=Undefined, 1=VolumeHardWired, 2=VolumeRemovable, 3=Network - logging.info('Skipping album %s at %s on %s because it is not recognized disk type' % (id, specific_path, identifier)) + logging.info( + f'Skipping album {id} at {specific_path} on {identifier} because it is not recognized disk type') continue - logging.debug('id=%s specific_path=%s identifier=%s' % (id, specific_path, identifier)) + logging.debug(f'id={id} specific_path={specific_path} identifier={identifier}') if identifier.startswith('volumeid:?uuid='): if specific_path.startswith('/'): specific_path = specific_path[1:] @@ -68,9 +69,9 @@ def volume_uuid_to_mountpoints(uuid: str) -> Set[str]: elif identifier.startswith('networkshareid:?mountpath='): self.album_roots[identifier[26:]] = id else: - raise ValueError('Unsupported volume type %s' % identifier) + raise ValueError(f'Unsupported volume type {identifier}') - logging.debug('album_roots=%s' % self.album_roots) + logging.debug(f'album_roots={self.album_roots}') self.person_root_tag = self._detect_person_root_tag() self.internal_tags_id = self.find_tag(0, _INTERNAL_ROOT_TAG_NAME) @@ -100,9 +101,10 @@ def find_album_by_dir(self, path: Path) -> Optional[int]: album_id = self._fetchcell('SELECT id FROM Albums WHERE albumRoot = ? AND relativePath = ?', (root_id, relative_path)) if album_id is None: - logging.warning('No digiKam Album found for %s (relative path %s) under root %s' % (path, relative_path, root_id)) + logging.warning( + f'No digiKam Album found for {path} (relative path {relative_path}) under root {root_id}') return album_id - raise ValueError('No digiKam AlbumRoot found for %s, only have %s' % (path, self.album_roots)) + raise ValueError(f'No digiKam AlbumRoot found for {path}, only have {self.album_roots}') def get_album_images(self, album_id: int) -> Dict[str, id]: """Returns a dict from filename to id in Images.""" @@ -114,9 +116,9 @@ def get_image_size(self, image_id: int) -> Tuple[int, int, int]: cur.execute('SELECT width, height, orientation FROM ImageInformation WHERE imageid = ?', (image_id,)) row = cur.fetchone() if row is None: - raise ValueError('Image with ID %s not found' % image_id) + raise ValueError(f'Image with ID {image_id} not found') if not row[0] or not row[1]: - raise ValueError('Size of image with ID %s is not in the database' % image_id) + raise ValueError(f'Size of image with ID {image_id} is not in the database') return row[0], row[1], row[2] def find_tag(self, parent_tag: int, name: str) -> Optional[int]: @@ -128,7 +130,7 @@ def find_or_create_tag(self, parent_tag: int, name: str, dry_run: bool) -> int: tag_id = self.find_tag(parent_tag, name) if tag_id: return tag_id - logging.info('Creating digiKam tag %s' % name) + logging.info(f'Creating digiKam tag {name}') if dry_run: return -1 # Pretend we created it self.conn.execute('INSERT INTO Tags (pid, name) VALUES (?, ?)', (parent_tag, name)) @@ -162,7 +164,7 @@ def find_or_create_person_tag(self, person_name: str, dry_run: bool) -> int: tag_id = self.find_person_tag(person_name) if tag_id: return tag_id - logging.info('Creating digiKam person tag %s' % person_name) + logging.info(f'Creating digiKam person tag {person_name}') if dry_run: return -1 # Pretend we created it self.conn.execute('INSERT INTO Tags (pid, name) VALUES (?, ?)', (self.person_root_tag, person_name)) @@ -187,7 +189,7 @@ def image_has_property(self, image_id: int, propname: str, value: str) -> bool: def image_has_pick_tag(self, image_id: int) -> bool: """Returns true if the given image has any of the (four) "Pick" tags.""" return self._fetchcell( - 'SELECT tagid FROM ImageTags WHERE imageid = ? AND tagid IN (%s)' % ','.join('?' * len(self.pick_tags)), + f'SELECT tagid FROM ImageTags WHERE imageid = ? AND tagid IN ({",".join("?" * len(self.pick_tags))})', (image_id,) + tuple(self.pick_tags)) is not None def add_image_tag(self, image_id: int, tag_id: int) -> bool: diff --git a/main.py b/main.py index b10ecf9..59ce478 100755 --- a/main.py +++ b/main.py @@ -24,8 +24,8 @@ def init_argparse() -> ArgumentParser: parser.add_argument('--dry_run', action='store_true') parser.add_argument('--verbose', '-v', action='count', default=0, help='Log verbosity. Pass -vv to see debug output.') - parser.add_argument('--skip_same_rect', action=argparse.BooleanOptionalAction, - help="Skip or not to skip adding face to digiKam if it already has that rectangle defined") + parser.add_argument('--skip_same_rect', action=argparse.BooleanOptionalAction, + help='Skip or not to skip adding face to digiKam if it already has that rectangle defined') return parser @@ -42,8 +42,8 @@ def main() -> None: log_handler.setLevel(30 - (10 * args.verbose)) if not args.dry_run: - backup_path = '%s.bak.%i' % (args.digikam_db, time.time()) - logging.info('Creating database backup at %s') + backup_path = f'{args.digikam_db}.bak.{time.time():d}' + logging.info(f'Creating database backup at {backup_path}') shutil.copyfile(args.digikam_db, backup_path) logging.info('Inspecting existing digiKam database') diff --git a/migrator.py b/migrator.py index 7039068..14588c9 100755 --- a/migrator.py +++ b/migrator.py @@ -36,7 +36,7 @@ def learn_contact_ids(input_dir: Path, ini_file_name: str, contact_id_2_nameset: for contact_id, value in ini['Contacts2'].items(): person_name = value.split(';')[0] if contact_id not in contact_id_2_nameset: - logging.info(f"Learned name for {contact_id}='{person_name}' from {ini_file}") + logging.info(f'Learned name for {contact_id}=\'{person_name}\' from {ini_file}') contact_id_2_nameset[contact_id] = {person_name} else: if person_name not in contact_id_2_nameset[contact_id]: @@ -48,9 +48,9 @@ def learn_contact_ids(input_dir: Path, ini_file_name: str, contact_id_2_nameset: if 'Contacts' in ini: for contact_id, value in ini['Contacts'].items(): picasa_name_hash = value.split(',')[1] - person_name = f".NoName-{picasa_name_hash}" + person_name = f'.NoName-{picasa_name_hash}' if contact_id not in contact_id_2_nameset: - logging.info(f"Learned old picasa name hash for {contact_id}='{person_name}' from {ini_file}") + logging.info(f'Learned old picasa name hash for {contact_id}=\'{person_name}\' from {ini_file}') contact_id_2_nameset[contact_id] = {person_name} def migrate_directories_under(input_root_dir: Path, @@ -85,9 +85,9 @@ def migrate_directories_under(input_root_dir: Path, for ini_file in ( _PICASA_INI_FILE , _OLD_PICASA_INI_FILE): learn_contact_ids(input_dir,ini_file,contact_id_2_nameset) - global_names = {k: "|".join(sorted(v)) for k,v in contact_id_2_nameset.items()} + global_names = {k: '|'.join(sorted(v)) for k, v in contact_id_2_nameset.items()} - logging.debug(f"global_names={global_names}") + logging.debug(f'global_names={global_names}') contact_tags_per_dir: Dict[Path, ContactTags] = {} @@ -101,9 +101,9 @@ def migrate_directories_under(input_root_dir: Path, if any([_is_photo_file(file) for file in files]): logging.warning(f'Found photos but no .ini in {dir}') else: - logging.warning(f"No photos and no .ini file in {dir}") + logging.warning(f'No photos and no .ini file in {dir}') continue - logging.debug(f"Processing {Path(dir/ini_file)}") + logging.debug(f'Processing {Path(dir / ini_file)}') contact_tags_per_dir[dir] = migrate_directory(dir, files, db, contact_tags_per_dir, global_names, dry_run=dry_run, ini_file_name=ini_file, @@ -120,9 +120,9 @@ def migrate_directory(input_dir: Path, files: List[str], db: DigikamDb, """Migrates metadata of all photo files in the given directory.""" logging.info('===========================================================================================') if input_dir.name == '.picasaoriginals': - logging.info('Skipping %s' % input_dir) + logging.info(f'Skipping {input_dir}') return {} - logging.info('Now migrating %s' % input_dir) + logging.info(f'Now migrating {input_dir}') # Find digiKam album. album_id = db.find_album_by_dir(input_dir) @@ -141,7 +141,7 @@ def migrate_directory(input_dir: Path, files: List[str], db: DigikamDb, album_to_tag = _map_albums_to_tags(ini, db, used_ini_sections, dry_run=dry_run) self_contact_to_tag = _map_contacts_to_tags(ini['Contacts2'], db, dry_run=dry_run) if 'Contacts2' in ini else {} - logging.debug('self_contact_to_tag=%s' % self_contact_to_tag) + logging.debug(f'self_contact_to_tag={self_contact_to_tag}') # Merge contacts declared in parent ini files. contact_to_tag = self_contact_to_tag.copy() @@ -165,20 +165,20 @@ def migrate_directory(input_dir: Path, files: List[str], db: DigikamDb, skip_same_rect=skip_same_rect, prioritize_global_names=prioritize_global_names, dry_run=dry_run) except Exception as e: - logging.error(f"Exception: {e}") + logging.error(f'Exception: {e}') logging.error(traceback.format_exc()) - raise RuntimeError('Error when processing %s' % (input_dir / filename)) from e + raise RuntimeError(f'Error when processing {input_dir / filename}') from e # Make sure we actually read all the data from the ini file. unused_ini_sections = set(ini.sections()) - used_ini_sections unused_photo_sections = {section for section in unused_ini_sections if _is_photo_file(section)} if unused_photo_sections: - logging.warning(('Some files have metadata in %s but are gone ' + - '(probably fine, they might have been deleted or moved elsewhere on purpose): %s') - % (ini_file, unused_photo_sections)) + logging.warning( + f'Some files have metadata in {ini_file} but are gone (probably fine, they might have been ' + + f'deleted or moved elsewhere on purpose): {unused_photo_sections}') unused_ini_sections -= unused_photo_sections if unused_ini_sections: - logging.warning('Unused INI sections in %s: %s' % (ini_file, unused_ini_sections)) + logging.warning(f'Unused INI sections in {ini_file}: {unused_ini_sections}') return self_contact_to_tag # For use in subdirectories @@ -196,9 +196,9 @@ def migrate_file(filename: str, image_id: int, ini_section: configparser.Section used_ini_keys.add('star') if db.image_has_pick_tag(image_id): logging.warning( - 'Not applying star label to %s (%s) because it already has a Pick label' % (image_id, filename)) + f'Not applying star label to {image_id} ({filename}) because it already has a Pick label') else: - logging.debug('Applying star label to %s (%s)' % (image_id, filename)) + logging.debug(f'Applying star label to {image_id} ({filename})') if not dry_run: db.star_image(image_id) @@ -206,7 +206,7 @@ def migrate_file(filename: str, image_id: int, ini_section: configparser.Section if albums: used_ini_keys.add('albums') for album_id in albums.split(','): - logging.debug('Adding album %s to image %s (%s)' % (album_id, image_id, filename)) + logging.debug(f'Adding album {album_id} to image {image_id} ({filename})') if not dry_run: db.add_image_tag(image_id, album_to_tag[album_id]) @@ -216,7 +216,7 @@ def migrate_file(filename: str, image_id: int, ini_section: configparser.Section if filename.lower().endswith('.psd'): # Note: digiKam doesn't seem to know the size of PSD files and thus also # can't place face tags on them. - logging.warning('Skipping faces on %s because of PSD format' % filename) + logging.warning(f'Skipping faces on {filename} because of PSD format') else: for face_data in faces.split(';'): migrate_face(image_id, filename, face_data, db, contact_to_tag, global_names, @@ -226,7 +226,7 @@ def migrate_file(filename: str, image_id: int, ini_section: configparser.Section unused_ini_keys = set(ini_section.keys()) - used_ini_keys if unused_ini_keys: - logging.warning('Unused INI keys for %s: %s' % (filename, unused_ini_keys)) + logging.warning(f'Unused INI keys for {filename}: {unused_ini_keys}') def migrate_face(image_id: int, @@ -254,28 +254,28 @@ def migrate_face(image_id: int, if contact_id not in global_names: # This can happen often if not using contacts.xml # Add to global - person_name = f".NoName-{contact_id}-from-rect64" + person_name = f'.NoName-{contact_id}-from-rect64' logging.info(f'Learned {person_name} from a rect64 tag belonging to {filename}') global_names[contact_id] = person_name tag_id = db.find_or_create_person_tag(global_names[contact_id], dry_run=dry_run) - contact_to_tag[contact_id] = tag_id; + contact_to_tag[contact_id] = tag_id else: # global_names (learned from contacts.xml) has higher priority - if (contact_id in global_names): + if contact_id in global_names: tag_id = db.find_or_create_person_tag(global_names[contact_id], dry_run=dry_run) - contact_to_tag[contact_id] = tag_id; - elif (contact_id in contact_to_tag): + contact_to_tag[contact_id] = tag_id + elif contact_id in contact_to_tag: tag_id = contact_to_tag[contact_id] else: - person_name = f".NoName-{contact_id}-from-rect64" + person_name = f'.NoName-{contact_id}-from-rect64' logging.info(f'Learned {person_name} from a rect64 tag belonging to {filename}') global_names[contact_id] = person_name tag_id = db.find_or_create_person_tag(person_name, dry_run=dry_run) - contact_to_tag[contact_id] = tag_id; + contact_to_tag[contact_id] = tag_id if db.image_has_tag(image_id, tag_id): logging.warning( - 'Not applying face %s (%s) to %s (%s) because it already has that face tag' % (tag_id, contact_id, image_id, filename)) + f'Not applying face {tag_id} ({contact_id}) to {image_id} ({filename}) because it already has that face tag') return # Convert the rectangle. @@ -289,7 +289,9 @@ def migrate_face(image_id: int, # and then run this script again, you end up with another rect mapped to Picasa's name -- it's a conundrum.. if db.image_has_property(image_id, _FACE_TAG_REGION_PROPERTY, digikam_rect): if skip_same_rect is None: - raise RuntimeError(f"digiKam already has face rectangle {digikam_rect} defined. Please specify what to do by running with argument --skip_same_rect or --no-skip_same_rect") + raise RuntimeError( + f'digiKam already has face rectangle {digikam_rect} defined. ' + + 'Please specify what to do by running with argument --skip_same_rect or --no-skip_same_rect') elif skip_same_rect: logging.warning( f'Not applying face {tag_id} ({contact_id}) to {image_id} ({filename}) because it already has that face rectangle') @@ -316,7 +318,7 @@ def _map_albums_to_tags( album_id = section_name[7:] section = ini[section_name] if not 'name' in section: - logging.debug('Skipping unnamed album %s' % album_id) + logging.debug(f'Skipping unnamed album {album_id}') continue assert section['name'] used_ini_sections.add(section_name) @@ -334,5 +336,5 @@ def _map_contacts_to_tags( for contact_id, value in contacts_section.items(): person_name = value.split(';')[0] result[contact_id] = db.find_or_create_person_tag(person_name, dry_run=dry_run) - logging.debug("person_name=%s contact_id=%s tag=%s" % (person_name, contact_id, result[contact_id])) + logging.debug(f'person_name={person_name} contact_id={contact_id} tag={result[contact_id]}') return result diff --git a/rect64.py b/rect64.py index cb9ec23..321fa44 100755 --- a/rect64.py +++ b/rect64.py @@ -5,7 +5,7 @@ def parse_rect64(data: str) -> Tuple[float, float, float, float]: # left, top, right, bottom """https://gist.github.com/fbuchinger/1073823#file-picasa-ini-L147-L160""" # Strip the rect64() from the outside. - assert data.startswith("rect64(") and data.endswith(")"), input + assert data.startswith('rect64(') and data.endswith(')'), input data = data[7:-1] assert len(data) >= 1 data = data.zfill(16) # Zeros in front, as Picasa abbreviates. @@ -48,7 +48,7 @@ def to_digikam_rect(image_size: Tuple[int, int, int], rect: Tuple[float, float, y1 = 1 - right y2 = 1 - left else: - raise ValueError('Unsupported orientation %s' % orientation) + raise ValueError(f'Unsupported orientation {orientation}') return ''.format( int(width * x1),