In [1]:
import osxphotos
import psycopg
from psycopg.rows import namedtuple_row
import json
from dataclasses import dataclass
from osxphotos import PhotoInfo, PhotosDB
from typing import List
from datetime import timedelta
import os
from PIL import Image
from pillow_heif import register_heif_opener
import subprocess
from multiprocess import Pool

register_heif_opener()

In [35]:
@dataclass
class ThumbnailDef:
    max_size: int
    name_modifier: str

small = ThumbnailDef(120, 'small')
medium = ThumbnailDef(400, 'medium')
large = ThumbnailDef(1600, 'large')

sizes = [large, medium, small] # will be computed sequentially, so order from large to small
name_modifiers = [size.name_modifier for size in sizes]

In [3]:
sizes

[ThumbnailDef(max_size=1600, name_modifier='large'),
 ThumbnailDef(max_size=400, name_modifier='medium'),
 ThumbnailDef(max_size=120, name_modifier='small')]

In [4]:
@dataclass
class PostgesConfig:
    hostname: str
    database: str
    username: str
    password: str

    def connection_info(self) -> str:
        return f'postgresql://{self.username}:{self.password}@{self.hostname}/{self.database}';

In [21]:
with open('./test.postgres_config', 'r') as config_file:
    config_dict = json.loads(config_file.read())
config = PostgesConfig(**config_dict)

In [6]:
with psycopg.connect(config.connection_info()) as connection:
    with connection.cursor(row_factory=namedtuple_row) as cursor:
        games_q = cursor.execute('SELECT "Id", "Name", "Date", "ScheduledTime", "StartTime", "EndTime" FROM "Games"')
        games = games_q.fetchall()

In [7]:
games[0]

Row(Id=638, Name='4/6/24 Los Angeles Dodgers at Chicago Cubs', Date=datetime.date(2024, 4, 6), ScheduledTime=datetime.datetime(2024, 4, 6, 20, 5, 8, tzinfo=zoneinfo.ZoneInfo(key='Etc/UTC')), StartTime=datetime.datetime(2024, 4, 6, 20, 5, 27, tzinfo=zoneinfo.ZoneInfo(key='Etc/UTC')), EndTime=datetime.datetime(2024, 4, 6, 22, 50, 39, tzinfo=zoneinfo.ZoneInfo(key='Etc/UTC')))

In [8]:

def get_photos_for_game(photosdb: PhotosDB, game: psycopg.rows.Row) -> List[PhotoInfo]:
    start_time = game.StartTime + timedelta(hours=-3)
    end_time = game.EndTime + timedelta(hours=2)
    return photosdb.photos(from_date=start_time, to_date=end_time)

In [9]:
photosdb = osxphotos.PhotosDB()

In [10]:
test = get_photos_for_game(photosdb, games[0])

In [11]:
len(test)

40

In [12]:
test[0].place.name

'Wrigley Field, Chicago, Chicagoland, United States'

In [13]:
def temp_dir(photo: PhotoInfo) -> str:
    out_dir = f'./exported/{photo.date:%Y-%m-%d}/{photo.uuid}'
    if not os.path.exists(out_dir):
        os.makedirs(out_dir)
    return out_dir

In [14]:
def thumbnail_photo(path: str):
    try:
        dir = os.path.dirname(path)
        filename, ext = os.path.splitext(os.path.basename(path))
        image = Image.open(path)
        for size in sizes:
            image.thumbnail((size.max_size, size.max_size))
            new_name = os.path.join(dir, f"{filename}_{size.name_modifier}{ext}")
            image.save(new_name)
    except Exception as e:
        return e
    

In [15]:
def thumbnail_video(path: str):
    try:
        dir = os.path.dirname(path)
        filename, _ = os.path.splitext(os.path.basename(path))
        ext = '.jpeg'
        new_name = os.path.join(dir, f"{filename}{ext}")
        subprocess.call(['ffmpeg', '-i', path, '-ss', '00:00:00.000', '-vframes', '1', new_name, '-y', '-hide_banner', '-loglevel', 'error'])
        image = Image.open(new_name)
        for size in sizes:
            image.thumbnail((size.max_size, size.max_size))
            new_name = os.path.join(dir, f"{filename}_{size.name_modifier}{ext}")
            image.save(new_name)
    except Exception as e:
        return e

In [16]:
def export(photo: PhotoInfo):
    out_dir = temp_dir(photo)
    paths = photo.export(out_dir, live_photo=True, export_as_hardlink=True, overwrite=True)
    return paths[0]


def thumbnail(params: tuple[str, bool]):
    path, ismovie = params
    if ismovie:
        return thumbnail_video(path)
    else:
        return thumbnail_photo(path)

In [17]:
movies = [p for p in test if p.ismovie]

In [18]:
def export_many(photos: list[PhotoInfo]):
    to_thumbnail = []
    for photo in photos:
        photo_path = export(photo)
        to_thumbnail.append((photo_path, photo.ismovie))
    return to_thumbnail


def thumbnail_many(params: list[tuple[str, bool]]):
    with Pool(12) as pool:
        return pool.map(thumbnail, params)

In [19]:
results = export_many(test)

In [20]:
thumbnail_many(results)

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

In [42]:
def resource_type(photo: PhotoInfo) -> int:
    if photo.ismovie:
        return 4
    elif photo.live_photo:
        return 3
    elif photo.isphoto:
        return 2
    else:
        return 0
    
def get_params(gameId: int, photo: PhotoInfo) -> tuple:
    return (photo.uuid, photo.date, photo.original_filename, gameId, 'MediaResource', resource_type(photo), photo.favorite)

def get_name_modifier(name: str) -> str:
    split_name = name.split('_')
    if split_name[-1] in name_modifiers:
        return split_name[-1]
    else:
        return None

def get_file_purpose(name_modifier: str, ext: str, photo: PhotoInfo) -> tuple[int, str]:
    if photo.ismovie and ext == '.jpeg':
        return 2 # Thumbnail
    elif name_modifier is not None:
        return 2 # Thumbnail
    else:
        return 1 # Original

def get_file_params(resourceId: int, photo: PhotoInfo) -> list[tuple]:
    out_dir = temp_dir(photo)
    results = []
    for file in os.listdir(out_dir):
        name, ext = os.path.splitext(os.path.basename(file))
        name_modifier = get_name_modifier(name)
        file_purpose = get_file_purpose(name_modifier, ext, photo)
        
        results.append((resourceId, file_purpose, name_modifier, ext))
    return results
    

def import_files(resourceId: int, photo: PhotoInfo, cursor):
    statement = """
        INSERT INTO "RemoteFile"("ResourceId", 
                                    "Purpose", 
                                    "NameModifier", 
                                    "Extension") 
        VALUES (%s, %s, %s, %s)
    """
    for params in get_file_params(resourceId, photo):
        cursor.execute(statement, params)

def import_resources(gameId: int, photos: list[PhotoInfo]):
    statement = """
        INSERT INTO "RemoteResource"("AssetIdentifier", 
                                    "DateTime", 
                                    "OriginalFileName", 
                                    "GameId", 
                                    "Discriminator", 
                                    "ResourceType", 
                                    "Favorite") 
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        RETURNING "Id"
    """
    with psycopg.connect(config.connection_info()) as connection:
        with connection.cursor() as cursor:
            for photo in photos:
                params = get_params(gameId, photo)
                cursor.execute(statement, params)
                id = cursor.fetchone()[0]
                import_files(id, photo, cursor)
        
        connection.commit()

In [40]:
get_file_params(5, movies[1])

[(5, 2, None, '.jpeg'),
 (5, 2, 'medium', '.jpeg'),
 (5, 2, 'large', '.jpeg'),
 (5, 1, None, '.MOV'),
 (5, 2, 'small', '.jpeg')]

In [43]:
results = import_resources(5, test)