In [None]:
import json
import os
import pathlib

source_root = pathlib.Path('D:\TEMP\Takeout\Google Photos\Photos from 2019')

In [None]:
def get_all_files(root: pathlib.Path, recursive=False):
    files = []
    for item in root.iterdir():
        if item.is_file():
            files.append(item)
        elif item.is_dir():
            if recursive:
                files.extend(get_all_files(item))
        else:
            raise 'Unknown type: {item}'
    return files

In [None]:
all_files = get_all_files(source_root)

In [None]:
len(all_files)

In [None]:
file_types = set([f.suffix.lower() for f in all_files])
media_types = file_types - {'.json'}

In [None]:
from dataclasses import dataclass
import re


def get_suffix_number(text):
    pattern = r'(\(\d+\))$'
    match = re.search(pattern, text.strip())

    if match:
        return int(match.group(1).removeprefix('(').removesuffix(')'))
    else:
        return None


# fuzzy match media stem name
def is_matched_stem(partial_stem, full_stem):
    if not full_stem.startswith(partial_stem):
        return False
    return len(partial_stem) / len(full_stem) >= 0.4


# use this to handle xxx-COLLAGE.jpg
def stem_similarity(partial_stem, full_stem):
    if not full_stem.startswith(partial_stem):
        return 0.0
    return len(partial_stem) / len(full_stem)


def is_matched_pair(media_info, metadata):
    if metadata['target_ext'] != '':
        return media_info['target_ext'] == metadata['target_ext'] and is_matched_stem(media_info['target_stem'],
                                                                                      metadata['target_stem'])
    return is_matched_stem(media_info['target_stem'], metadata['target_stem'])


def is_edited_version(target_stem, metadata_objs):
    """
    Warning: only for media that ends with '(1)'.
    Media file ends with '(1)': Situation 1, this file is an edited file. Situation 2, this file is a duplicated name. 
    How to tell: if metadata with the same name has a version ends with '(1)', then it's a name duplicated media file.
    If the corresponding metadata file does not end with '(1)', it's an edited file.
    """
    has_matched = False
    for metadata in metadata_objs:
        if metadata['target_stem'].startswith(target_stem):
            has_matched = True
            if metadata['meta_duplicated_number'] == 1:
                return False
    assert has_matched, f'{target_stem} has no matched metadata'
    return True


@dataclass
class MediaVersion:
    ORIGINAL = 'original'
    EDITED = 'edited'
    DUPLICAT_NAME = 'duplicate_name'


def is_media_metadata(json_obj):
    return json_obj.get('imageViews', None)


def load_metadata(f):
    json_obj = json.load(f.open(encoding='utf-8'))
    json_obj['file_path'] = f
    return json_obj

In [None]:
media_files = list(filter(lambda x: x.suffix != '.json', all_files))
metadata_files = list(filter(lambda x: x.suffix == '.json', all_files))
metadata_objs = [obj for f in metadata_files if is_media_metadata(obj := load_metadata(f))]

In [None]:
# mark metadata
for metadata in metadata_objs:
    file_name, file_ext = os.path.splitext(metadata['file_path'])
    metadata['meta_duplicated_number'] = get_suffix_number(file_name)
    metadata['target_stem'], metadata['target_ext'] = os.path.splitext(metadata['title'])

In [None]:
media_infos = []

for media_file in media_files:
    media_stem, media_ext = os.path.splitext(media_file.name)

    media_info = {'target_ext': media_ext, 'media_duplicated_number': None}

    if media_stem.endswith('-edited'):  # marked edited
        media_info['version'] = MediaVersion.EDITED
        media_info['target_stem'] = media_stem.removesuffix('-edited')
    elif media_stem.endswith('-edi'):  # google had this abbreviation for 'edited', I'm in shock...
        media_info['version'] = MediaVersion.EDITED
        media_info['target_stem'] = media_stem.removesuffix('-edi')
    elif (suffix_number := get_suffix_number(media_stem)) is not None:  # has mark
        stem_no_suffix = media_stem.removesuffix(f'({suffix_number})')
        if suffix_number == 1:  # special mark
            is_edited = is_edited_version(stem_no_suffix, metadata_objs)
            if is_edited:  # marked edited (1)
                media_info['version'] = MediaVersion.EDITED
            else:
                media_info['media_duplicated_number'] = suffix_number
                media_info['version'] = MediaVersion.ORIGINAL
        else:  # just a duplicated name
            media_info['media_duplicated_number'] = suffix_number
            media_info['version'] = MediaVersion.ORIGINAL
        media_info['target_stem'] = stem_no_suffix
    else:  # no mark
        media_info['version'] = MediaVersion.ORIGINAL
        media_info['target_stem'] = media_stem

    media_info['media_path'] = media_file

    media_infos.append(media_info)

    print(media_info)

In [None]:
metadata_objs

In [None]:
target_stem = 'original_b6bca13d-c92d-496f-a5c4-b9b6b5401d69_IMG_20220306_215604.jpg'
true_stem = 'original_b6bca13d-c92d-496f-a5c4-b9b6b5401d69_I'
false_stem = 'original'

is_matched_stem(true_stem, target_stem), is_matched_stem(false_stem, target_stem)

In [None]:
from utils import argmax

matched = [[] for _ in range(len(metadata_objs))]
not_matched_media = []

rest_media_infos = media_infos.copy()

for info in media_infos:
    candidates = []
    for meta_idx, meta in enumerate(metadata_objs):
        # use is_matched_pair to check target extension (for example, image.jpg, image.png and image.mp4)
        if is_matched_stem(info['target_stem'], meta['target_stem']):  # matched name
            if meta['meta_duplicated_number'] == info['media_duplicated_number']:
                print(f'matched {meta}')
                candidates.append(meta_idx)
            else:
                print(f'found but not matched {meta}')

    if not candidates:  # no candidates
        print(f'{info["target_stem"]} has no matched metadata', '\n\n\n\n\n')
        not_matched_media.append(info)
    else:  # find best candidate
        similarities = [stem_similarity(info['target_stem'], metadata_objs[c]['target_stem']) for c in candidates]
        best_idx = argmax(similarities)
        matched[candidates[best_idx]].append(info)
        print(candidates, similarities, best_idx)

In [None]:
matched

In [None]:
not_matched_media

In [None]:
matched_len = [len(x) for x in matched]
matched_len

In [None]:
set(matched_len)

In [None]:
assert 0 not in set(matched_len)

In [None]:
sum(matched_len), len(media_infos)

In [None]:
assert sum(matched_len) == len(media_infos)