### fetchAPI

In [1]:
import boto3
import json
import requests

s3_client = boto3.client("s3")
LOCAL_FILE_SYS = "/tmp"
S3_LAKE_BUCKET = "openfisheries-lake"
API_LINK = "https://www.openfisheries.org/api/landings/"
LIMIT_ROWS = 15

def fetch_and_upload_data(api_url, s3_key, S3_BUCKET):
    try:
        response = requests.get(api_url)
        data = response.json()
        s3_client.put_object(Body=json.dumps(data), Bucket=S3_BUCKET, Key=s3_key)
        return data  # Return the fetched data

    except requests.RequestException as e:
        print(f"No endpoint with {s3_key} found.")
        return None

def data_lake(S3_BUCKET=S3_LAKE_BUCKET):
    species_data = fetch_and_upload_data(f"{API_LINK}species.json", "species.json", S3_BUCKET)
    countries_data = fetch_and_upload_data(f"{API_LINK}countries.json", "countries.json", S3_BUCKET)
    
    if species_data == None:
        print('Species list not found.')
        return
    if countries_data == None:
        print('Countries list not found.')
        return
    
    successful_rows = 0
    for species in species_data:
        a3_code = species.get("a3_code")
        if a3_code:
            fetched_data = fetch_and_upload_data(f"{API_LINK}species/{a3_code}.json", f"species/{a3_code}.json", S3_BUCKET)

            if fetched_data:
                successful_rows += 1
            if successful_rows >= LIMIT_ROWS:
                break
            
    successful_rows = 0
    for country in countries_data:
        iso3c = country.get("iso3c")
        if iso3c:
            fetched_data = fetch_and_upload_data(f"{API_LINK}countries/{iso3c}.json", f"countries/{iso3c}.json", S3_BUCKET)

            if fetched_data:
                successful_rows += 1
            if successful_rows >= LIMIT_ROWS:
                break
            
def lambda_handler(event, context):
    data_lake()


### Wrangler

In [2]:
import boto3
import pandas as pd
from io import StringIO
import json

def process_s3_data(input_bucket, input_folder, output_column):
    s3 = boto3.client('s3')
    df = pd.DataFrame()
    response = s3.list_objects(Bucket=input_bucket, Prefix=f'{input_folder}/')

    for obj in response.get('Contents', []):
        file_key = obj['Key']
        KEY = file_key.split('/')[-1].replace('.json', '')
        response = s3.get_object(Bucket=input_bucket, Key=file_key)
        json_data = response['Body'].read().decode('utf-8')
        temp_df = pd.read_json(StringIO(json_data))
        temp_df[output_column] = KEY
        df = pd.concat([df, temp_df], ignore_index=True)
        
    return df
    
def clean_data(df):
    df.dropna(inplace=True)
    df.drop_duplicates(inplace=True)
    return df
    
def to_warehouse(df, folder_name, output_bucket):
    s3 = boto3.client('s3')
    csv_data = df.to_csv(index=False)
    csv_key = folder_name + '.csv'
    s3.put_object(Bucket=output_bucket, Key=csv_key, Body=csv_data)
    return f'CSV file created successfully in bucket: {output_bucket}'

def lambda_handler(event, context):
    LAKE = 'openfisheries-lake'
    WAREHOUSE = 'openfisheries-warehouse'

    # SPECIES
    species = process_s3_data(LAKE, 'species', 'a3_code')
    species = clean_data(species)
    to_warehouse(species, 'fact_species', WAREHOUSE)
    
    # COUNTRY
    country = process_s3_data(LAKE, 'countries', 'iso3c')
    country = clean_data(country)
    to_warehouse(country, 'fact_countries', WAREHOUSE)

    return

### Wrangler with logger

In [3]:
import boto3
import pandas as pd
from io import StringIO
import json
import logging

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def process_s3_data(input_bucket, input_folder, output_column):
    s3 = boto3.client('s3')
    df = pd.DataFrame()

    try:
        response = s3.list_objects(Bucket=input_bucket, Prefix=f'{input_folder}/')

        for obj in response.get('Contents', []):
            file_key = obj['Key']
            KEY = file_key.split('/')[-1].replace('.json', '')
            response = s3.get_object(Bucket=input_bucket, Key=file_key)
            json_data = response['Body'].read().decode('utf-8')
            temp_df = pd.read_json(StringIO(json_data))
            temp_df[output_column] = KEY
            df = pd.concat([df, temp_df], ignore_index=True)

    except Exception as e:
        logger.error(f"Error processing S3 data: {str(e)}")

    return df

def clean_data(df):
    try:
        df.dropna(inplace=True)
        df.drop_duplicates(inplace=True)
    except Exception as e:
        logger.error(f"Error cleaning data: {str(e)}")

    return df

def to_warehouse(df, folder_name, output_bucket):
    try:
        s3 = boto3.client('s3')
        csv_data = df.to_csv(index=False)
        csv_key = f'{folder_name}.csv'
        s3.put_object(Bucket=output_bucket, Key=f"data/{csv_key}", Body=csv_data)
        logger.info(f'CSV file created successfully in bucket: {output_bucket}')
    except Exception as e:
        logger.error(f"Error uploading CSV to warehouse: {str(e)}")
        
def parse_csv(bucket, key, destination_bucket):
    s3 = boto3.client('s3')
    try:
        csv_obj = s3.get_object(Bucket=bucket, Key=f'{key}.json')
        json_data = csv_obj['Body'].read().decode('utf-8')
        df = pd.read_json(StringIO(json_data), orient='records')
        csv_data = df.to_csv(index=False)
        s3.put_object(Bucket=destination_bucket, Key=f'data/dim_{key}.csv', Body=csv_data)
        
    except Exception as e:
        logger.error(f"Error parsing and uploading CSV: {str(e)}")

def lambda_handler(event, context):
    LAKE = 'openfisheries-lake'
    WAREHOUSE = 'openfisheries-warehouse'

    parse_csv(LAKE, 'species', WAREHOUSE)
    parse_csv(LAKE, 'countries', WAREHOUSE)

    try:
        # SPECIES
        species = process_s3_data(LAKE, 'species', 'a3_code')
        species = clean_data(species)
        to_warehouse(species, 'fact_species', WAREHOUSE)

        # COUNTRY
        country = process_s3_data(LAKE, 'countries', 'iso3c')
        country = clean_data(country)
        to_warehouse(country, 'fact_countries', WAREHOUSE)

        return {
            'statusCode': 200,
            'body': json.dumps('Processing completed successfully!')
        }
    except Exception as e:
        logger.error(f"Error in lambda_handler: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps('Error during processing. Check logs for details.')
        }