In [179]:
import json
import uuid
import time
import boto3
import sagemaker
import pandas as pd

from datetime import datetime
from botocore.exceptions import ClientError 
from sklearn.model_selection import train_test_split
from sagemaker.feature_store.feature_group import FeatureGroup, FeatureDefinition, FeatureTypeEnum

In [160]:
s3 = boto3.client('s3')
session = sagemaker.Session()

sagemaker_client = session.boto_session.client('sagemaker')
feature_store_client = session.boto_session.client('sagemaker-featurestore-runtime')

bucket_name = 'wildfires'
sm_role = sagemaker.get_execution_role()

In [161]:
def update_fs_version(version):
    local_file_path = "/tmp/fs_version.txt"
    with open(local_file_path, mode="w") as file:
        file.write(str(version))

    session.upload_data(path=local_file_path, bucket=bucket_name, key_prefix="feature-store")
    print("FS version updated")


def get_fs_last_vesrion():
    version_file_path = "feature-store/fs_version.txt"
    local_file_path = "/tmp/fs_version.txt"
    try:
        s3.download_file(bucket_name, version_file_path, local_file_path)
        print(f"Version file successfully loaded: {local_file_path}")
        with open(local_file_path) as file:
            version = file.readline().strip()
        return int(version)
    except ClientError as e:
        print(f"No version file founded")
        return 0


def wait_for_group_created(feature_group_name):
    while True:
        response = sagemaker_client.describe_feature_group(FeatureGroupName=feature_group_name)
        status = response['FeatureGroupStatus']
        if status == 'Created':
            print('Feature group created successfully.')
            break
        elif status == 'CreateFailed':
            raise Exception(f'Failed to create feature group: {response["FailureReason"]}')
        else:
            print('Waiting for feature group to be created...')
            time.sleep(5)

In [162]:
fs_version = get_fs_last_vesrion() + 1
feature_group_name = f"wildfire-feature-group-v{fs_version}"

# Define feature definitions
feature_definitions = [
    FeatureDefinition('image_id', FeatureTypeEnum.STRING),
    FeatureDefinition('image_location', FeatureTypeEnum.STRING),
    FeatureDefinition('label', FeatureTypeEnum.INTEGRAL),
    FeatureDefinition('image_type', FeatureTypeEnum.STRING),
    FeatureDefinition('event_time', FeatureTypeEnum.STRING),
    FeatureDefinition('data_purpose', FeatureTypeEnum.STRING), # train or test
]

# Create the feature group
feature_group = FeatureGroup(
    name=feature_group_name,
    sagemaker_session=session,
    feature_definitions=feature_definitions
)

# Run feature group create
feature_group.create(
    s3_uri=f's3://{bucket_name}/feature-store',
    description=f'Feature group for storing fire image features. Version v{fs_version}',
    tags={'Version': f"v{fs_version}"},
    record_identifier_name='image_id',
    event_time_feature_name='event_time',
    role_arn=sm_role
)

update_fs_version(fs_version)
wait_for_group_created(feature_group_name)

No version file founded
FS version updated
Waiting for feature group to be created...
Waiting for feature group to be created...
Waiting for feature group to be created...
Feature group created successfully.


In [182]:
def list_s3_objects(prefix):
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    return [obj['Key'] for obj in response.get('Contents', [])]

def generate_metadata(image_list, label):
    metadata = []
    for image_location in image_list:
        image_id = str(uuid.uuid4())
        image_type = image_location.split('.')[-1]
        metadata.append({
            'image_id': image_id,
            'image_location': f's3://{bucket_name}/{image_location}',
            'label': label,
            'image_type': image_type,
            'event_time': datetime.utcnow().isoformat() + 'Z'
        })
    return metadata


def update_metadata(metadata, data_purpose):
    for record in metadata:
        record['data_purpose'] = data_purpose
        
        image_location = record['image_location']
        bucket, key = image_location.replace('s3://', '').split('/', 1)
        copy_source = {
            'Bucket': bucket,
            'Key': key
        }
        image_id = record['image_id']
        image_type = record['image_type']
        new_image_location = f'data/{data_purpose}/{image_id}.{image_type}'
        
        s3.copy(copy_source, bucket_name, new_image_location)
        print(f'Copyied {image_location} to {new_image_location}')
        record['image_location'] = f's3://{bucket_name}/{new_image_location}'
        
    return metadata

In [None]:
# List images
fire_images = list_s3_objects('data/raw_data/fire_images/')
non_fire_images = list_s3_objects('data/raw_data/non_fire_images/')

# Generate metadata
fire_metadata = generate_metadata(fire_images, 1)
non_fire_metadata = generate_metadata(non_fire_images, 0)
all_metadata = fire_metadata + non_fire_metadata

train_metadata, test_metadata = train_test_split(
    all_metadata, 
    test_size=0.2, 
    stratify=[m['label'] for m in all_metadata],
    random_state=42
)

train_metadata = update_metadata(train_metadata, 'train')
test_metadata = update_metadata(test_metadata, 'test')
train_metadata[:5]

Copying s3://wildfires/data/raw_data/fire_images/fire.7.png to data/train/e5588b37-0b8f-40a4-a6c3-81fe2b47e528.png
Copying s3://wildfires/data/raw_data/fire_images/fire.593.png to data/train/4180b266-3b5a-4d8e-92da-b624a147f22c.png
Copying s3://wildfires/data/raw_data/fire_images/fire.445.png to data/train/8b718937-722f-4d81-a85e-7eae07f723f7.png
Copying s3://wildfires/data/raw_data/fire_images/fire.322.png to data/train/4ff18563-a026-42c7-8fbd-14349854a498.png
Copying s3://wildfires/data/raw_data/non_fire_images/non_fire.147.png to data/train/af4565d7-0df7-4627-90c0-41b5e89a01f6.png
Copying s3://wildfires/data/raw_data/fire_images/fire.575.png to data/train/cc2e4a38-3ca2-4d8b-850b-bc7491a4d354.png
Copying s3://wildfires/data/raw_data/fire_images/fire.467.png to data/train/0ebf8b47-64a5-44f4-923b-82696fa2a0f1.png
Copying s3://wildfires/data/raw_data/fire_images/fire.492.png to data/train/d973f52f-a1f5-41eb-a3b4-f0684d4cfd3e.png
Copying s3://wildfires/data/raw_data/fire_images/fire.157.

In [184]:
# Convert metadata to DataFrame
def convert_to_df(metadata):
    return pd.DataFrame(metadata)


df = convert_to_df(all_metadata)

In [185]:
# Ingest data into the feature group
feature_group.ingest(data_frame=df, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='wildfire-feature-group-v1', feature_definitions={'image_id': {'FeatureName': 'image_id', 'FeatureType': 'String'}, 'image_location': {'FeatureName': 'image_location', 'FeatureType': 'String'}, 'label': {'FeatureName': 'label', 'FeatureType': 'Integral'}, 'image_type': {'FeatureName': 'image_type', 'FeatureType': 'String'}, 'event_time': {'FeatureName': 'event_time', 'FeatureType': 'String'}, 'data_purpose': {'FeatureName': 'data_purpose', 'FeatureType': 'String'}}, sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7fd4e43d01f0>, sagemaker_session=<sagemaker.session.Session object at 0x7fd4e4999a50>, max_workers=3, max_processes=1, profile_name=None, _async_result=<multiprocess.pool.MapResult object at 0x7fd4e7497130>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])