In [1]:
import getpass
from urllib.parse import urljoin, urlparse

import boto3
import numpy as np
import requests
from bs4 import BeautifulSoup
import tqdm

def check_s3_prefix_exists(bucket_name, s3_prefix, source_id, specific_file = "annotations.csv"):
    s3 = boto3.client("s3")
    prefix = f"{s3_prefix}/s{source_id}/{specific_file}"

    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, MaxKeys=1)

    if "Contents" in response:
        # print(f"Prefix exists: {prefix}")
        return True
    else:
        # print(f"Prefix does not exist: {prefix}")
        return False

In [2]:
url = "https://coralnet.ucsd.edu/source/about/"

resp = requests.get(url, timeout=50)
resp.raise_for_status()

soup = BeautifulSoup(resp.text, "html.parser")
anchors = soup.find_all("a", href=True)

links = sorted(
    {
        urljoin(url, a["href"])
        for a in anchors
        if urlparse(urljoin(url, a["href"])).scheme in ("http", "https")
    }
)

print("Found", len(links), "links on the page.")
source_links = [link for link in links if "/source/" in link]
print("Found", len(source_links), "links on the page.")
all_coralnet_sources = sorted({int(link.split("/")[-2]) for link in source_links})

Found 1796 links on the page.
Found 1786 links on the page.


In [3]:
# Configuration - Where to save the downloaded CoralNet images
bucket_name = "dev-datamermaid-sm-sources"
prefix = "coralnet-public-images"

In [4]:
data = [] 
for i, source_id in tqdm.tqdm(enumerate(all_coralnet_sources)):
    # print(i, "Source ID", source_id)
    labelset_flag = check_s3_prefix_exists(
        bucket_name=bucket_name, s3_prefix=prefix, source_id=source_id, specific_file="labelset.csv"
    )
    metadata_flag = check_s3_prefix_exists(
        bucket_name=bucket_name, s3_prefix=prefix, source_id=source_id, specific_file="metadata.csv"
    )
    annotations_flag = check_s3_prefix_exists(
        bucket_name=bucket_name, s3_prefix=prefix, source_id=source_id, specific_file="annotations.csv"
    )
    image_list_flag = check_s3_prefix_exists(
        bucket_name=bucket_name, s3_prefix=prefix, source_id=source_id, specific_file="image_list.csv"
    )
    data.append([i, source_id, labelset_flag, metadata_flag, annotations_flag, image_list_flag]) 

0it [00:00, ?it/s]

1786it [07:34,  3.93it/s]


In [5]:
import pandas as pd 
source_status = pd.DataFrame(data, columns = ["idx", "source_id", "labelset", "metadata", "annotations", "image_list"])
source_status.head()
source_ids = source_status[(source_status["annotations"]==True)*(source_status["image_list"]==True)]["source_id"].values
source_ids_start = source_ids[:50]

In [6]:
source_ids_start

array([  23,   57,   69,   70,   82,  109,  155,  172,  173,  174,  258,
        290,  299,  307,  350,  428,  450,  466,  503,  546,  580,  616,
        620,  648,  683,  747,  793,  800,  841,  842,  843,  921,  958,
       1073, 1076, 1162, 1184, 1189, 1212, 1264, 1265, 1266, 1268, 1269,
       1270, 1271, 1272, 1273, 1274, 1276])

In [7]:
source_bucket = "dev-datamermaid-sm-sources"
source_s3_prefix = "coralnet-public-images"

In [8]:
s3 = boto3.client("s3")
import io

In [9]:
import time

In [None]:
# start_time = time.time()
# for i, source_id in tqdm.tqdm(enumerate(source_ids_start)):
#     if i == 0:
#         df_annotations = pd.read_csv(
#             f"s3://{source_bucket}/{source_s3_prefix}/s{source_id}/annotations.csv"
#         )
#         df_images = pd.read_csv(
#             f"s3://{source_bucket}/{source_s3_prefix}/s{source_id}/image_list.csv"  # Perhaps this is unnecessary and can just use tha annotations as in Mermaid
#         )
#         df_images["Name"] = df_images["Name"].apply(
#             lambda x: x.replace(" - Confirmed", "")
#         )
#         df_images["image_id"] = df_images["Image Page"].apply(
#             lambda x: x.replace("/image/", "").replace("/view/", "")
#         )
#         df_annotations = pd.merge(
#             df_annotations,
#             df_images,
#             left_on="Name",
#             right_on="Name",
#             how="left",
#             suffixes=("", "_y"),
#         )
#         df_annotations["source_id"] = source_id
#     else:
#         obj = s3.get_object(
#             Bucket=source_bucket,
#             Key=f"{source_s3_prefix}/s{source_id}/annotations.csv",
#         )
#         df_tmp = pd.read_csv(io.BytesIO(obj["Body"].read()))

#         obj = s3.get_object(
#             Bucket=source_bucket,
#             Key=f"{source_s3_prefix}/s{source_id}/image_list.csv",
#         )
#         df_images = pd.read_csv(io.BytesIO(obj["Body"].read()))

#         df_images["Name"] = df_images["Name"].apply(
#             lambda x: x.replace(" - Confirmed", "")
#         )
#         df_images["image_id"] = df_images["Image Page"].apply(
#             lambda x: x.replace("/image/", "").replace("/view/", "")
#         )
#         df_tmp = pd.merge(
#             df_tmp,
#             df_images,
#             left_on="Name",
#             right_on="Name",
#             how="left",
#             suffixes=("", "_y"),
#         )
#         df_tmp["source_id"] = source_id
#         df_annotations = pd.concat(
#             [df_annotations, df_tmp], ignore_index=True
#         )
# end_time = time.time()
# print("Time taken (seconds):", end_time - start_time)

  df_annotations = pd.concat(
  df_annotations = pd.concat(
  df_tmp = pd.read_csv(io.BytesIO(obj["Body"].read()))
  df_annotations = pd.concat(
  df_tmp = pd.read_csv(io.BytesIO(obj["Body"].read()))
  df_tmp = pd.read_csv(io.BytesIO(obj["Body"].read()))
  df_tmp = pd.read_csv(io.BytesIO(obj["Body"].read()))
  df_tmp = pd.read_csv(io.BytesIO(obj["Body"].read()))
  df_annotations = pd.concat(
50it [03:58,  4.76s/it]

Time taken (seconds): 238.16918087005615





In [None]:
# start_time = time.time()
# df_annotation_list = []
# for i, source_id in tqdm.tqdm(enumerate(source_ids_start)):
#     df_annotations = pd.read_csv(
#         f"s3://{source_bucket}/{source_s3_prefix}/s{source_id}/annotations.csv"
#     )
#     df_images = pd.read_csv(
#         f"s3://{source_bucket}/{source_s3_prefix}/s{source_id}/image_list.csv"  # Perhaps this is unnecessary and can just use tha annotations as in Mermaid
#     )
#     df_images["Name"] = df_images["Name"].apply(
#         lambda x: x.replace(" - Confirmed", "")
#     )
#     df_images["image_id"] = df_images["Image Page"].apply(
#         lambda x: x.replace("/image/", "").replace("/view/", "")
#     )
#     df_annotations = pd.merge(
#         df_annotations,
#         df_images,
#         left_on="Name",
#         right_on="Name",
#         how="left",
#         suffixes=("", "_y"),
#     )
#     df_annotations["source_id"] = source_id
#     df_annotation_list.append(df_annotations)

# df_annotations = pd.concat(
#     df_annotation_list, ignore_index=True
# )
# end_time = time.time()
# print("Time taken (seconds):", end_time - start_time)

  df_annotations = pd.read_csv(
  df_annotations = pd.read_csv(
  df_annotations = pd.read_csv(
  df_annotations = pd.read_csv(
  df_annotations = pd.read_csv(
50it [00:13,  3.75it/s]
  df_annotations = pd.concat(


Time taken (seconds): 15.807878017425537


In [10]:
start_time = time.time()
df_annotation_list = []
for i, source_id in tqdm.tqdm(enumerate(source_ids)):
    df_annotations = pd.read_csv(
        f"s3://{source_bucket}/{source_s3_prefix}/s{source_id}/annotations.csv", low_memory=False
    )
    df_images = pd.read_csv(
        f"s3://{source_bucket}/{source_s3_prefix}/s{source_id}/image_list.csv", low_memory=False  # Perhaps this is unnecessary and can just use tha annotations as in Mermaid
    )
    df_images["Name"] = df_images["Name"].apply(
        lambda x: x.replace(" - Confirmed", "")
    )
    df_images["image_id"] = df_images["Image Page"].apply(
        lambda x: x.replace("/image/", "").replace("/view/", "")
    )
    df_annotations = pd.merge(
        df_annotations,
        df_images,
        left_on="Name",
        right_on="Name",
        how="left",
        suffixes=("", "_y"),
    )
    df_annotations["source_id"] = source_id
    df_annotation_list.append(df_annotations[["source_id", "image_id", "Row", "Column", "Label ID"]])

df_annotations = pd.concat(
    df_annotation_list, ignore_index=True
)
end_time = time.time()
print("Time taken (seconds):", end_time - start_time)

1427it [04:13,  5.62it/s]


Time taken (seconds): 257.25816226005554


In [11]:
df_annotations = df_annotations.rename(
    columns={
        "image_id": "image_id",
        "Row": "row",
        "Column": "col",
        "Label ID": "coralnet_id",
    }
)

In [12]:
df_annotations.shape

(21326860, 5)

In [13]:
df_annotations.memory_usage(deep=True).sum() / (1024 ** 3)  # in GB

3.2886506663635373

In [15]:
source_s3_prefix

'coralnet-public-images'

In [16]:
source_bucket

'dev-datamermaid-sm-sources'

In [17]:
import s3fs

path = f"s3://{source_bucket}/coralnet_annotations_30112025.parquet"

df_annotations.to_parquet(path, engine="pyarrow", index=False)

In [None]:

except Exception:
    # fallback: open S3 file with s3fs and write to file-like object

    fs = s3fs.S3FileSystem()
    with fs.open(path, "wb") as f:
        df_annotations.to_parquet(f, engine="pyarrow", index=False)

print("Saved:", path)

In [24]:
fs = s3fs.S3FileSystem(session=s3)

In [40]:
df_annotations

Unnamed: 0_level_0,Name,Date,Site,Depth,Transect,Metermark,Aux5,Height (cm),Latitude,Longitude,Depth.1,Camera,Photographer,Water quality,Strobes,Framing gear used,White balance card,Comments,Row,Column,Label code,Label ID,Annotator,Date annotated
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1
,string,string,string,float64,string,int64,float64,int64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,int64,int64,string,int64,string,string
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [48]:
start_time = time.time()
df_annotation_list = []
for i, source_id in tqdm.tqdm(enumerate(source_ids_start)):
    df_annotations = dd.read_csv(
        f"s3://{source_bucket}/{source_s3_prefix}/s{source_id}/annotations.csv"
    ).compute()
    df_images = dd.read_csv(
        f"s3://{source_bucket}/{source_s3_prefix}/s{source_id}/image_list.csv"  # Perhaps this is unnecessary and can just use tha annotations as in Mermaid
    ).compute()
    df_images["Name"] = df_images["Name"].apply(
        lambda x: x.replace(" - Confirmed", "")
    )
    df_images["image_id"] = df_images["Image Page"].apply(
        lambda x: x.replace("/image/", "").replace("/view/", "")
    )
    df_annotations = dd.merge(
        df_annotations,
        df_images,
        left_on="Name",
        right_on="Name",
        how="left",
        suffixes=("", "_y"),
    )
    df_annotations["source_id"] = source_id
    df_annotation_list.append(df_annotations[["Image Page", "Row", "Column", "Label ID"]])

df_annotations = dd.concat(
    df_annotation_list, ignore_index=True
)

# convert Dask DataFrame to pandas DataFrame
df_annotations = df_annotations

# create image_id from "Image Page" (same logic as earlier)
df_annotations["image_id"] = (
    df_annotations["Image Page"]
    .str.replace("/image/", "", regex=False)
    .str.replace("/view/", "", regex=False)
)

# keep the relevant columns (optional)
df_annotations = df_annotations[["image_id", "Row", "Column", "Label ID"]]
end_time = time.time()
print("Time taken (seconds):", end_time - start_time)

11it [00:05,  1.99it/s]


ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+--------------+--------+----------+
| Column       | Found  | Expected |
+--------------+--------+----------+
| Photographer | object | float64  |
+--------------+--------+----------+

The following columns also raised exceptions on conversion:

- Photographer
  ValueError("could not convert string to float: 'KO'")

Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

dtype={'Photographer': 'object'}

to the call to `read_csv`/`read_table`.

In [None]:
delayed_dfs = [delayed(pd.read_csv)(fs.open(p, mode='rb')) for p in paths]
ddf = dd.from_delayed(delayed_dfs)

In [27]:
import dask.dataframe as dd

bucket = "dev-datamermaid-sm-sources"
source_id = 109
prefix = "coralnet-public-images"

path = f"s3://{bucket}/{prefix}/s{source_id}/annotations.csv"
ddf = dd.read_csv(path)   # blocksize controls partitioning
# inspect
ddf.head()

Unnamed: 0,Name,Date,Site,Transect Depth,Transect Number,Photo Number,Aux5,Height (cm),Latitude,Longitude,...,Strobes,Framing gear used,White balance card,Comments,Row,Column,Label code,Label ID,Annotator,Date annotated
0,APT_100_D_01_2013-07-24.jpg,2013-07-24,APT,100.0,D,1.0,,50,,,...,,,,,357,371,Turf,82,mbogeberg,2014-02-12 21:42:05+00:00
1,APT_100_D_01_2013-07-24.jpg,2013-07-24,APT,100.0,D,1.0,,50,,,...,,,,,569,613,Turf,82,mbogeberg,2014-02-12 21:42:05+00:00
2,APT_100_D_01_2013-07-24.jpg,2013-07-24,APT,100.0,D,1.0,,50,,,...,,,,,480,392,Turf,82,mbogeberg,2014-02-12 21:42:05+00:00
3,APT_100_D_01_2013-07-24.jpg,2013-07-24,APT,100.0,D,1.0,,50,,,...,,,,,418,202,Turf,82,mbogeberg,2014-02-12 21:42:05+00:00
4,APT_100_D_01_2013-07-24.jpg,2013-07-24,APT,100.0,D,1.0,,50,,,...,,,,,487,599,Turf,82,mbogeberg,2014-02-12 21:42:05+00:00


In [33]:
type(ddf)

dask.dataframe.dask_expr._collection.DataFrame

In [18]:
# Create an S3FileSystem instance with the credentials from secrets.json
s3 = s3fs.S3FileSystem()

In [17]:
df_tmp = dd.read_csv(f"{source_s3_prefix}/s{source_id}/annotations.csv")

FileNotFoundError: An error occurred while calling the read_csv method registered to the pandas backend.
Original Message: [Errno 2] No such file or directory: '/home/sagemaker-user/mermaid-segmentation/nbs/coralnet-public-images/s1301/annotations.csv'

In [None]:
dfs = []



    # Use s3 to access S3 objects
    bucket_name = 'coralnet-mermaid-share'
    missing_sources = []

    for source in chosen_sources:
        if s3.exists(f'{bucket_name}/{source}'):
            
            print(f"{source} exists in the bucket.")
            df = dd.read_csv(f's3://{bucket_name}/{source}', storage_options={'key': secrets['AWS_ACCESS_KEY_ID'], 'secret': secrets['AWS_SECRET_ACCESS_KEY']})

            df['source_id'] = source.split('/')[1]
            dfs.append(df)
        else:
            missing_sources.append(source)
            print(f"{source} does not exist in the bucket.")
except (ClientError, BotoCoreError) as e:
    print(f"An AWS error occurred: {e}")
except json.JSONDecodeError as e:
    print(f"Error reading secrets.json: {e}")
except IOError as e:
    print(f"File error: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")