In [None]:
"""
Train and classify from multisource (or project) using featurevector files provided on S3.

"""
import json
import boto3
from botocore.exceptions import ClientError, BotoCoreError
import os
import pickle
import pandas as pd
from datetime import datetime, timedelta
import boto3
import io
import csv
import json
import logging
import traceback
from operator import itemgetter
from botocore.exceptions import NoCredentialsError, ClientError
from pathlib import Path
from spacer import config
from spacer.data_classes import ImageLabels
from scripts.docker import runtimes
from spacer.tasks import process_job, classify_image, extract_features, train_classifier, classify_features
from spacer.storage import load_image, store_image
from spacer.messages import JobMsg, DataLocation, ExtractFeaturesMsg
from spacer.extract_features import EfficientNetExtractor
from spacer.messages import (
    DataLocation,
    ExtractFeaturesMsg, 
    ExtractFeaturesReturnMsg, 
    TrainClassifierMsg, 
    TrainClassifierReturnMsg, 
    ClassifyFeaturesMsg, 
    ClassifyImageMsg, 
    ClassifyReturnMsg, JobMsg, JobReturnMsg
)

from spacer.tasks import classify_features, extract_features, train_classifier

# Load the secret.json file
with open('secrets.json', 'r') as f:
    secrets = json.load(f)

## Assign the secrets to boto3


In [None]:
try:
    # Load the secret.json file
    with open('secrets.json', 'r') as f:
        secrets = json.load(f)

    # Create a session using the credentials from secrets.json
    s3_client = boto3.client(
        's3',
        region_name=secrets['AWS_REGION'],
        aws_access_key_id=secrets['AWS_ACCESS_KEY_ID'],
        aws_secret_access_key=secrets['AWS_SECRET_ACCESS_KEY']
    )
except (ClientError, BotoCoreError) as e:
    print(f"An AWS error occurred: {e}")
except json.JSONDecodeError as e:
    print(f"Error reading secrets.json: {e}")
except IOError as e:
    print(f"File error: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

## Coralnet

Let's use three sources that are smaller in size for mvp and store them in a list called `sources`:

```         
- `s1970`
- `s2083`
- `s2170`
```


In [None]:
sources = ['s1970', 's2083', 's2170']

## Find the annotations.csv files for the chosen sources

-   Create the key value from the source


In [None]:
# Create a list for the chosen sources for the annotations f'coralnet_public_features/{source}/annotations.csv'
chosen_sources = []
for source in sources:
    chosen_sources.append(f'coralnet_public_features/{source}/annotations.csv')

Use S3 to download the annotations.csv files for the chosen sources - check if they exist in the bucket first


In [None]:
# See if chosen_sources are in the s3 bucket using s3_client
bucketname='coralnet-mermaid-share'
for source in chosen_sources:
    try:
        s3_client.head_object(Bucket=bucketname, Key=source)
        print(f"{source} exists in the bucket.")
    except Exception as e:
        print(f"{source} does not exist in the bucket. Error: {e}")

## Append annotations

-   The following code will append the annotations.csv files for the chosen sources into one dataframe called `appended_df` in chunks that you can specify. In the future for larger sources we may need to consider memory efficient ways of combining the annotations.csv files.

Using pandas only option for now. Other options include: - dask - sqllite - pyarrow/parquet


In [None]:
def read_csv_in_chunks(bucketname, key, chunksize=10000):
    """
    Read a CSV file from S3 in chunks.
    Append the source as a column called 'source_id'
    """

    response = s3_client.get_object(Bucket=bucketname, Key=key)
    lines = []
    header = None
    for line in response['Body'].iter_lines():
        if not header:
            header = line.decode('utf-8')
            continue
        lines.append(line.decode('utf-8'))
        if len(lines) == chunksize:
            chunk = pd.read_csv(io.StringIO(header + '\n' + '\n'.join(lines)))
            chunk['source_id'] = key  # Add the source_id column
            yield chunk
            lines = []
    if lines:
        chunk = pd.read_csv(io.StringIO(header + '\n' + '\n'.join(lines)))
        chunk['source_id'] = key  # Add the source_id column
        yield chunk


Append each key in chosen_sources to the appended_df dataframe for each source.


In [None]:
appended_df = pd.DataFrame()

for key in chosen_sources:
        first_chunk = True
        for chunk in read_csv_in_chunks(bucketname, key, chunksize=10000):
            if first_chunk:
                if appended_df.empty:
                    appended_df = chunk
                else:
                    if not chunk.columns.equals(appended_df.columns):
                        raise ValueError(f"Inconsistent data structure in file: {key}")
                first_chunk = False
            appended_df = pd.concat([appended_df, chunk], ignore_index=True)

In [None]:
# Create a key to download the features from S3 given:
# 'coralnet_public_features/{source_id}/features/i{image_id}'
# E.g. 'coralnet_public_features/s1073/features/i84392.featurevector'
# Where source_id column currently looks like 'coralnet_public_features/s1073/annotations.csv'
# and image_id column currently looks like '84392'
# and the features are stored in the features directory

def format_featurevector_key(image_id, source_id):
    """
    Format the featurevector key to include the directories we need to access in S3.
    E.g. 'coralnet_public_features/{source_id}/features/i{image_id}'
    """
    source_id = source_id.split('/')[1]  # Get the source_id from the source_id column
    image_id = 'i' + str(image_id) + '.featurevector'  # Format the image_id
    return 'coralnet_public_features/' + source_id + '/features/' + image_id

Apply the format_featurevector_key function to the appended_df dataframe to create a new column called key.


In [None]:
# Format new column in appended_df called key
appended_df['key'] = appended_df.apply(lambda x: format_featurevector_key(x['Image ID'], x['source_id']), axis=1)

## Training Data

-   This is a basic implementation on how to split the train and val labels by a 7:1 ratio. However, we may want to consider other ways of splitting the data in the future.


In [None]:
# CoralNet uses a 7-to-1 ratio of train_labels to val_labels.
# Calculate the split index
total_labels = len(appended_df)
train_size = int(total_labels * 7 / 8)  # 7 parts for training out of 8 total parts

# Split the data
train_labels_data = appended_df.iloc[:train_size]
val_labels_data = appended_df.iloc[train_size:]

-   To work with the `ImageLabels` class, we need to convert the train_labels_data and val_labels_data to the required format which is represented as a dictionary of lists of tuples. The dictionary keys are the image keys, and the values are lists of tuples of the form (row, column, label_id).


In [None]:
# Convert train_labels_data and val_labels_data to the required format
train_labels_data = {f"{key}": [tuple(x) for x in group[['Row', 'Column', 'Label ID']].values]
                     for key, group in train_labels_data.groupby('key')}
train_labels_data

In [None]:
val_labels_data = {f"{key}": [tuple(x) for x in group[['Row', 'Column', 'Label ID']].values]
                   for key, group in val_labels_data.groupby('key')}

## Use spacer to create train classifier msg


In [None]:
train_msg = TrainClassifierMsg(
    job_token='mulitest',
    trainer_name='minibatch',
    nbr_epochs=10,
    clf_type='MLP',
    # A subset
    train_labels=ImageLabels(data = train_labels_data),
    val_labels=ImageLabels(data = val_labels_data),
    #S3 bucketname
    features_loc=DataLocation('s3', bucketname = bucketname, key=''),
    previous_model_locs=[],
    model_loc=DataLocation('filesystem',str(Path.cwd())+'/classifier_test.pkl'),
    valresult_loc=DataLocation('filesystem',str(Path.cwd())+'/valresult_test.json'),

)

In [None]:
return_msg = train_classifier(train_msg)

In [None]:
classifier_filepath = Path.cwd() /'classifier_test.pkl'
valresult_filepath = Path.cwd()  / 'valresult_test.json'

In [None]:
ref_accs_str = ", ".join([f"{100*acc:.1f}" for acc in return_msg.ref_accs])

print("------------------------------")
print(f"Classifier stored at: {classifier_filepath}")
print(f"New model's accuracy: {100*return_msg.acc:.1f}%")
print(
    "New model's accuracy progression (calculated on part of train_labels)"
    f" after each epoch of training: {ref_accs_str}")

print(f"Evaluation results:")
with open(valresult_filepath) as f:
    valresult = json.load(f)

------------------------------------------------------------------------

## Create matching labels between coralnet and mermaid


In [None]:
label_shortcode = pd.read_csv('coral_net_mermaid_labels.csv')

# Rename ID to class and Default shord code to shortcode
label_shortcode = label_shortcode.rename(columns={'ID': 'classes', 'Default short code': 'shortcode'})

# Get the unique class and shortcode pairs
label_shortcode = label_shortcode[['classes', 'shortcode']].drop_duplicates()

In [None]:
# Convert label_shortcode DataFrame to a dictionary
# Create label_list
# Account for those that don't match with a default shortcode
label_list = [label_ids_to_shortcodes.get(label_id, 'Unknown') for label_id in valresult['classes']]

In [None]:
for ground_truth_i, prediction_i, score in zip(
    valresult['gt'], valresult['est'], valresult['scores']
):
    print(f"Actual = {label_list[ground_truth_i]}, Predicted = {label_list[prediction_i]}, Confidence = {100*score:.1f}%")

print(f"Train time: {return_msg.runtime:.1f} s")

In [None]:
# From chosen_prj, get the features for the source from the features directory from the full Key path
# E.g. coralnet_public_features/s1073/features/i84392.featurevector
# Use Regex to get the features
feature_files = chosen_prj[chosen_prj['Key'].str.contains(r'coralnet_public_features/.*/features/.*\.featurevector$')]


## For each feature file, create a classify features message


In [None]:
# Create a list of classify features messages
# For each feature file, create a classify features message
messages = []
for key in appended_df['key']:
    message = ClassifyFeaturesMsg(
        job_token=key,
        feature_loc=DataLocation('s3', key=key, bucketname = 'coralnet-mermaid-share'),
        classifier_loc=DataLocation('filesystem', classifier_filepath),
    )
    messages.append(message)
    return_msg = classify_features(message)
    print("------------------------------")
    print(f"Classification result for {key}:")

    label_ids = return_msg.classes
    for i, (row, col, scores) in enumerate(return_msg.scores):
        top_scores = sorted(
            zip(label_ids, scores), key=itemgetter(1), reverse=True)
        top_scores_str = ", ".join([
            f"{label_ids_to_codes[str(label_id)]} = {100*score:.1f}%"
            for label_id, score in top_scores[:TOP_SCORES_PER_POINT]
        ])
        print(f"- Row {row}, column {col}: {top_scores_str}")

    print(f"Classification time: {return_msg.runtime:.1f} s")

print("------------------------------")
print("Clear the output dir before rerunning this script.")

In [None]:
return_message = classify_features(message)