In [1]:
import os
import math
from PIL import Image
from PIL.ExifTags import TAGS
import pandas as pd
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import csv
from config import DIR_FOLDER, SHEET_ID, SHEET_NAME
from google_sheets_sync import read_google_sheet, get_new_rows, append_rows_to_sheet

In [2]:
def get_exif_data(image_path):
    """Extract EXIF data from an image and sanitize values."""
    try:
        img = Image.open(image_path)
        exif_data = img._getexif()
        if exif_data:
            exif = {}
            for tag_id, value in exif_data.items():
                tag_name = TAGS.get(tag_id, tag_id)
                # Sanitize data: Convert all values to strings and handle None
                if isinstance(value, bytes):
                    value = value.decode(errors='replace')  # Decode bytes
                exif[tag_name] = str(value) if value is not None else ""
            return exif
        else:
            return None
    except Exception as e:
        print(f"Error processing {image_path}: {str(e)}")
        return None
    

In [3]:
def scan_directories(root_dirs):
    """Process multiple directories in parallel. If a directory contains target files, read all files in that directory and skip its subdirectories."""
    data = []
    extensions = ['.jpg', '.jpeg', '.tiff', '.png']
    
    with ThreadPoolExecutor() as executor:
        futures = []
        for root_dir in root_dirs:  # Loop through each input directory
            for root, dirs, files in os.walk(root_dir):
                # Check if this directory contains any target files
                target_files = [file for file in files if os.path.splitext(file)[1].lower() in extensions]
                if target_files:
                    # Read all target files in this directory
                    for file in target_files:
                        image_path = os.path.join(root, file)
                        futures.append(
                            executor.submit(
                                lambda ip=image_path, fl=file, rd=root_dir: get_exif_data_with_file(ip, fl, rd)
                            )
                        )
                    # Skip subdirectories by clearing 'dirs' in-place
                    dirs.clear()
                # If no target files, continue walking into subdirectories
        
        for future in tqdm(futures, desc="Processing images"):
            exif = future.result()
            if exif:
                data.append(exif)
    return data

In [4]:
def get_exif_data_with_file(image_path, file, root_dir):
    """Wrapper to add filename/path and source directory."""
    exif = get_exif_data(image_path)
    if exif:
        exif['Filename'] = file
        exif['Filepath'] = image_path
        exif['SourceDirectory'] = root_dir  # Track which root_dir this file came from
    return exif

In [None]:
# # The following functions are fully created by AI. Just for reference that vibe coding will not always run smoothly.

# def convert_exposure_time(exposure_str):
#     """Convert any ExposureTime to strict '1/X' or 'X"' format."""
#     if not exposure_str or pd.isna(exposure_str):
#         return ""
    
#     try:
#         # Case 1: Tuple format like "(1, 16000)"
#         if exposure_str.startswith('(') and exposure_str.endswith(')'):
#             num_den = exposure_str[1:-1].replace(' ', '').split(',')
#             numerator, denominator = map(int, num_den)
#             return format_as_reciprocal(numerator, denominator)
        
#         # Case 2: Decimal format like "0.0000625"
#         if '.' in exposure_str and '/' not in exposure_str:
#             exposure_float = float(exposure_str)
#             return format_decimal(exposure_float)
        
#         # Case 3: Fraction format like "10/13" or "625/0"
#         if '/' in exposure_str:
#             numerator, denominator = map(int, exposure_str.split('/'))
#             return format_as_reciprocal(numerator, denominator)
        
#         # Case 4: Whole number like "2"
#         return f"{int(float(exposure_str))}\""
    
#     except Exception as e:
#         print(f"Failed to convert {exposure_str}: {str(e)}")
#         return ""

# def format_as_reciprocal(numerator, denominator):
#     """Force numerator=1 by calculating reciprocal (e.g., 2/5 → 1/2.5)."""
#     if denominator == 0:
#         # Handle camera-specific encodings like 625/0 → 1/16000
#         return f"1/{int(2 ** (numerator.bit_length() + 6))}"  # Empirical scaling
#     elif numerator == 1:
#         return f"1/{denominator}"
#     elif denominator == 1:
#         return f"{numerator}\""
#     else:
#         # Convert to 1/X format (e.g., 10/13 → 1/1.3, 5/16 → 1/3.2)
#         reciprocal = denominator / numerator
#         return f"1/{reciprocal:.1f}" if not reciprocal.is_integer() else f"1/{int(reciprocal)}"

# def format_decimal(exposure_float):
#     """Convert decimal to '1/X' or 'X"'."""
#     if exposure_float >= 1:
#         return f"{exposure_float}\""
#     else:
#         return f"1/{int(round(1 / exposure_float))}"

In [None]:
# # The function that works better compared to the previous one (and I wrote it myself)

def convert_exposure_time(exp, lim=100000):
    """
    Calculate the shutter speed denominator based on a decimal value.
    """
    dec = float(exp)
    if dec < 1:
        denum = lim / (dec * lim)
        if math.isclose(denum, round(denum)):
            x = int(round(denum))
            ss = f"1/{x}"
        else:
            x = round(denum, 1)
            ss = f"1/{x:.1f}"
    else:
        ss = f"{dec}\""
    return ss

In [6]:
def convert_35mm_focal_length(focal_length, camera_model):
    """
    Convert focal length to 35mm equivalent.
    """
    if pd.isna(focal_length) or not focal_length:
        return ""
    
    try:
        # Handle both string and numeric inputs
        if isinstance(focal_length, str):
            focal_length = float(focal_length)

        # Handle specific camera models with known conversion factors
        if camera_model == "TG-7":
            return int(round((25/4.5) * focal_length))
        elif camera_model in ["E-M10MarkII", "PEN-F","DC-G9M2"]:
            return int(round(focal_length *2))
        else:
            return int(round(focal_length))
    
    except Exception as e:
        print(f"Failed to convert focal length {focal_length}: {str(e)}")
        return ""

In [None]:
def split_datetime(dt):
    """
    Split EXIF DateTime into separate Date and Time columns.
    """
    if pd.isna(dt) or not dt:
        return pd.Series({'Date':'', 'Time':''})
    try:
        # EXIF DateTime format: 'YYYY:MM:DD HH:MM:SS'
        date_part, time_part = dt.split(' ')
        # Convert date to 'YYYY-MM-DD' for Excel/Sheets
        date_fmt = date_part.replace(':', '-')
        return pd.Series({'Date': date_fmt, 'Time': time_part})
    except Exception:
        return pd.Series({'Date':'', 'Time':''})
        

In [None]:
def extract_day(date_str):
    """
    Extract day of the week from 'YYYY-MM-DD' date string.
    """
    if pd.isna(date_str) or not date_str:
        return ''
    try:
        # Parse date string in 'YYYY-MM-DD' format
        dt = pd.to_datetime(date_str, format='%Y-%m-%d', errors='coerce')
        if pd.isna(dt):
            return ''
        return dt.strftime('%a')
    except Exception:
        return ''

In [None]:
def extract_foldername(folder_path):
    """
    Extract folder name from a given path.
    """
    if pd.isna(folder_path) or not folder_path:
        return ''
    try:
        cleaned_path = folder_path.replace('\\JPG','')
        return cleaned_path.split('/')[-1]  # Get the last part after splitting by '/'
    except Exception:
        return ''

In [12]:
def main():
    root_dir = DIR_FOLDER  # List of directories to scan
    output_csv = "exif_data.csv"
    
    # Collect all EXIF data
    exif_data = scan_directories(root_dir)
    
    # Convert to DataFrame and save as CSV
    if exif_data:
        df = pd.DataFrame(exif_data)
        
        # Remove NUL characters from all string/object columns
        df = df.apply(lambda x: x.str.replace('\x00', '', regex=False) if x.dtype == 'object' else x)
        
        df = df.apply(lambda x: x.str.strip() if x.dtype == 'object' else x)
        
        # Add ExposureFraction column
        df['ExposureFraction'] = df['ExposureTime'].apply(convert_exposure_time)
        
        # Add 35mm equivalent focal length column
        df['FocalLength35mm'] = df.apply(lambda x: convert_35mm_focal_length(x['FocalLength'], x['Model']), axis=1)
        
        # Add Date and Time columns from DateTime
        dt_split = df['DateTime'].apply(split_datetime)
        df = pd.concat([df, dt_split], axis=1)
        
        # Add Hour column from Time
        df['Hour'] = df['Time'].apply(lambda x: x.split(':')[0] if pd.notna(x) and x else '')
        
        # Add Day column from Date
        df['Day'] = df['Date'].apply(extract_day)
        
        # Ensure LensModel column exists
        if 'LensModel' not in df.columns:
            df['LensModel'] = ''
        
        # Set LensModel for TG-7 if missing or empty
        tg7_mask = (df['Model'] == 'TG-7') & (df['LensModel'].isna() | (df['LensModel'] == ''))
        df.loc[tg7_mask, 'LensModel'] = 'OM SYSTEM TG-7 4.5-18.0mm F2.0-4.9'
        
        # Add Foldername and Folderpath columns
        df['Folderpath'] = df['Filepath'].apply(lambda x: os.path.dirname(x) if pd.notna(x) and x else '')
        df['Foldername'] = df['Folderpath'].apply(extract_foldername)

        # Add UniqueID column as combination of Filename and Foldername
        df['UniqueID'] = df.apply(lambda x: f"{x['Filename']}_{x['Foldername']}", axis=1)
        
        # Sort by DateTime
        df = df.sort_values(by=['DateTime','Foldername'], ascending=True, na_position='last')
        
        # Columns to export
        df1 = df[["Filename","Foldername","Make","Model","DateTime","Date","Day","Time","Hour","YResolution","XResolution",
         "ExposureBiasValue","MaxApertureValue","Flash","FocalLength","FocalLength35mm",
         "ExifImageWidth","ExifImageHeight","FNumber","ISOSpeedRatings","ExposureTime","ExposureFraction",
         "LensModel","UniqueID"]]
        
        # Fix CSV escaping issues by quoting all fields and specifying an escape character
        df1.to_csv(
            output_csv,
            index=False,
            quoting=csv.QUOTE_ALL,  # Enclose all fields in quotes
            escapechar='\\'         # Use backslash to escape special chars
            # encoding='utf-8'
        )
        print(f"Data saved to {output_csv}")

        # Sync with Google Sheets
        creds_json = 'credentials.json'  # Path to your service account credentials
        old_data = read_google_sheet(SHEET_ID, SHEET_NAME, creds_json) # Read existing data from Google Sheet
        new_rows = get_new_rows(df1, old_data) # Identify new rows to append
        if not new_rows.empty:
            append_rows_to_sheet(SHEET_ID, SHEET_NAME, creds_json, new_rows)
            print(f"Appended {len(new_rows)} new rows to Google Sheet.")
    else:
        print("No EXIF data found.")
        
if __name__ == "__main__":
    main()

Processing images: 100%|██████████| 37175/37175 [06:53<00:00, 90.01it/s] 


Data saved to exif_data.csv
Appended 139 new rows to Google Sheet.
