# Catalog Raw Images

This notebook seeks to catalog all document all the raw images collected. Rerunning this notebook will update the catalog in S3.

In [1]:
import os
import pandas as pd
import deltalake as dl
import boto3
import json
import s3fs

session = boto3.Session(profile_name='default')
credentials = session.get_credentials()
credentials = credentials.get_frozen_credentials()

storage_options = {
    'AWS_REGION': 'us-west-1',
    'AWS_ACCESS_KEY_ID': credentials.access_key,
    'AWS_SECRET_ACCESS_KEY': credentials.secret_key,
    'AWS_S3_ALLOW_UNSAFE_RENAME': 'true'
}

s3 = s3fs.S3FileSystem(
    anon=False,
    use_ssl=False,
    key=storage_options['AWS_ACCESS_KEY_ID'],
    secret=storage_options['AWS_SECRET_ACCESS_KEY'],
    client_kwargs={
        'region_name': storage_options['AWS_REGION']
    }
)

dl_table_path = 's3a://coffee-dataset/lake/raw_images_v2'

dtable = dl.DeltaTable(
    table_uri=dl_table_path,
    storage_options=storage_options
).to_pandas()

dtable

[90m[[0m2024-10-23T19:43:45Z [33mWARN [0m aws_config::imds::region[90m][0m failed to load region from IMDS err=failed to load IMDS session token: dispatch failure: timeout: error trying to connect: HTTP connect timeout occurred after 1s: HTTP connect timeout occurred after 1s: timed out (FailedToLoadToken(FailedToLoadToken { source: DispatchFailure(DispatchFailure { source: ConnectorError { kind: Timeout, source: hyper::Error(Connect, HttpTimeoutError { kind: "HTTP connect", duration: 1s }), connection: Unknown } }) }))
[90m[[0m2024-10-23T19:43:46Z [33mWARN [0m aws_config::imds::region[90m][0m failed to load region from IMDS err=failed to load IMDS session token: dispatch failure: timeout: error trying to connect: HTTP connect timeout occurred after 1s: HTTP connect timeout occurred after 1s: timed out (FailedToLoadToken(FailedToLoadToken { source: DispatchFailure(DispatchFailure { source: ConnectorError { kind: Timeout, source: hyper::Error(Connect, HttpTimeoutError { kind

Unnamed: 0,exif_data,image_path
0,"{""imagewidth"": 4032.0, ""imagelength"": 3024.0, ...",coffee-dataset/raw_images/alteri_farms/2024042...
1,"{""imagewidth"": 4032.0, ""imagelength"": 3024.0, ...",coffee-dataset/raw_images/alteri_farms/2024042...
2,"{""imagewidth"": 4032.0, ""imagelength"": 3024.0, ...",coffee-dataset/raw_images/alteri_farms/2024042...
3,"{""imagewidth"": 4032.0, ""imagelength"": 3024.0, ...",coffee-dataset/raw_images/alteri_farms/2024042...
4,"{""imagewidth"": 4032.0, ""imagelength"": 3024.0, ...",coffee-dataset/raw_images/alteri_farms/2024042...
...,...,...
2707,{},coffee-dataset/raw_images/fivver_fred/17063856...
2708,{},coffee-dataset/raw_images/fivver_fred/17063901...
2709,{},coffee-dataset/raw_images/fivver_fred/17063902...
2710,{},coffee-dataset/raw_images/fivver_fred/17063903...


In [2]:
extensions = ['jpg', 'jpeg', 'png', ]

idf = []

root_dir = 'coffee-dataset/raw_images/'

for folder in s3.ls(root_dir):
    if folder == root_dir:
        continue

    images = s3.ls(folder)
    images = [f for f in images if f.split('.')[-1].lower() in extensions]
    idf.extend(images)

idf = pd.DataFrame(idf, columns=['image_path'])

# remove all images that are already in the delta table
#idf = idf[~idf['image_path'].isin(dtable['image_path'])]

idf

Unnamed: 0,image_path
0,coffee-dataset/raw_images/alteri_farms/2024042...
1,coffee-dataset/raw_images/alteri_farms/2024042...
2,coffee-dataset/raw_images/alteri_farms/2024042...
3,coffee-dataset/raw_images/alteri_farms/2024042...
4,coffee-dataset/raw_images/alteri_farms/2024042...
...,...
2707,coffee-dataset/raw_images/off_road_ditch_farm/...
2708,coffee-dataset/raw_images/off_road_ditch_farm/...
2709,coffee-dataset/raw_images/off_road_ditch_farm/...
2710,coffee-dataset/raw_images/off_road_ditch_farm/...


In [31]:
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from PIL import Image
from PIL.ExifTags import TAGS as EXIF_TAGS

def parse_exif_data(row):
    try:
        image_path = row['image_path']
        with s3.open(image_path, 'rb') as f:
            image = Image.open(f)
            exif_data = image._getexif() or {}
            exif_tags = {EXIF_TAGS.get(tag): value for tag, value in exif_data.items()}
        
        return { 'image_path': image_path, 'exif_data': exif_tags }

    except Exception as e:
        return { 'image_path': image_path, 'exif_data': {} }


loader = tqdm(total=len(idf), desc='Loading EXIF data', unit='image')

pool = ThreadPoolExecutor(max_workers=32)
futures = []

for index, row in idf.iterrows():
    futures.append(pool.submit(parse_exif_data, row))

results = []
for future in as_completed(futures):
    result = future.result()
    results.append(result)
    loader.update(1)

loader.close()

Loading EXIF data: 100%|██████████| 2712/2712 [09:51<00:00,  4.58image/s]


In [32]:
import datetime

def parse_gps(gps_info):
    gps = {}

    lat_ref = gps_info.get(1)
    lat = gps_info.get(2)
    lon_ref = gps_info.get(3)
    lon = gps_info.get(4)
    
    gps_date = gps_info.get(29)
    gps_time = gps_info.get(7)
    gps_error = gps_info.get(31)
    
    # convert the date and time to a unix timestamp
    # if gps_date is not None and gps_time is not None:
    #     gps_date = [int(x) for x in gps_date.split('-')]
    #     gps_time = [int(x) for x in gps_time]
        
    #     gps_timestamp = datetime.datetime(
    #         gps_date[0], gps_date[1], gps_date[2],
    #         gps_time[0], gps_time[1], gps_time[2],
    #         tzinfo=datetime.timezone.utc
    #     ).timestamp()
        
    #     gps['gps_timestamp'] = gps_timestamp
    
    # convert the latitude and longitude to decimal
    if lat is not None and lon is not None:
        latitude = lat[0] + lat[1]/60 + lat[2]/3600
        longitude = lon[0] + lon[1]/60 + lon[2]/3600

        if lat_ref == 'S':
            latitude *= -1
            
        if lon_ref == 'W':
            longitude *= -1
            
        gps['latitude'] = float(latitude)
        gps['longitude'] = float(longitude)
        
        if gps_error is not None:
            gps['gps_error'] = float(gps_error)

    # parse altitude information from the GPS data
    altitude = gps_info.get(6)
    if altitude is not None:
        gps['altitude'] = float(altitude)
        
    return gps

In [39]:
from PIL import TiffImagePlugin
import re

def parse_exif_data(exif_data):
    new_data = {}
    
    if exif_data is None:
        return {}
    
    for key in exif_data:
        if key == None:
            continue

        if key.lower() == 'gpsinfo':
            #print(exif_data[key])
            gps_info = parse_gps(exif_data[key])
            for k in gps_info:
                new_data[k.lower()] = gps_info[k]
        else:
            value = exif_data[key]
            
            if type(value) == TiffImagePlugin.IFDRational:
                new_data[key.lower()] = float(value)
            elif type(value) == bytes:
                new_data[key.lower()] = value.decode('utf-8', errors='ignore')
            else:
                new_data[key.lower()] = value

    return new_data



jdf = pd.DataFrame(results)

for index, row in jdf.iterrows():
    exif_data = row['exif_data']
    
    jdf.at[index, 'exif_data'] = parse_exif_data(exif_data)

# json dump the exif data
jdf['exif_data'] = jdf['exif_data'].apply(lambda x: json.dumps(x))

#jdf = jdf.drop(columns=['exif_data'])

# add jdf to delta table
jdf = jdf[['image_path', 'exif_data']]

# append rows to delta table
#jdf = pd.concat([dtable, jdf], ignore_index=True)

jdf

Unnamed: 0,image_path,exif_data
0,coffee-dataset/raw_images/alteri_farms/2024042...,"{""imagewidth"": 4032, ""imagelength"": 3024, ""lat..."
1,coffee-dataset/raw_images/alteri_farms/2024042...,"{""imagewidth"": 4032, ""imagelength"": 3024, ""lat..."
2,coffee-dataset/raw_images/alteri_farms/2024042...,"{""imagewidth"": 4032, ""imagelength"": 3024, ""lat..."
3,coffee-dataset/raw_images/alteri_farms/2024042...,"{""imagewidth"": 4032, ""imagelength"": 3024, ""lat..."
4,coffee-dataset/raw_images/alteri_farms/2024042...,"{""imagewidth"": 4032, ""imagelength"": 3024, ""res..."
...,...,...
2707,coffee-dataset/raw_images/off_road_ditch_farm/...,"{""imagewidth"": 4032, ""imagelength"": 3024, ""lat..."
2708,coffee-dataset/raw_images/off_road_ditch_farm/...,"{""imagewidth"": 4032, ""imagelength"": 3024, ""lat..."
2709,coffee-dataset/raw_images/off_road_ditch_farm/...,"{""imagewidth"": 4032, ""imagelength"": 3024, ""lat..."
2710,coffee-dataset/raw_images/off_road_ditch_farm/...,"{""imagewidth"": 4032, ""imagelength"": 3024, ""lat..."


In [40]:
dl.write_deltalake(
    table_or_uri=dl_table_path,
    data=jdf,
    mode='overwrite',
    schema_mode='overwrite',
    storage_options=storage_options,
    custom_metadata={
        'catalog_name': 'Raw Images Catalog',
        'catalog_description': 'Catalogs the raw_images folder in the bucket by parsing it out by folder name and includes the EXIF data.',
    }
)

table = dl.DeltaTable(
    table_uri=dl_table_path,
    storage_options=storage_options
)
history = table.history(1)[0]
catalog_params = {key: value for key, value in history.items() if key.startswith('catalog_')}
catalog_params

  dl.write_deltalake(


{'catalog_description': 'Catalogs the raw_images folder in the bucket by parsing it out by folder name and includes the EXIF data.',
 'catalog_name': 'Raw Images Catalog'}