In [None]:
import os
import json
import time
import shutil
from datetime import datetime

In [None]:
def get_data_into_raw(endpoint):
    """
    Fetches data from the specified endpoint and saves it in it's original format in raw layer.
    It also adds two metadata columns (i.e. extract_time & source) to the dataset. Every api call 
    will create a new file in the raw layer. The name of the file has a timestamp which we use to 
    determine which one is the latest dataset to be processed.
    
    Parameters:
    - endpoint (str): The URL endpoint to fetch data from.

    Returns:
    - None
    """
    try:
        current_time = datetime.now()
    
        data = get_data(endpoint)
        data['extract_time'] = datetime.now().timestamp()
        data['source'] = 'exchange rates api'
    
        with open(f'../../data/raw/raw_rates_{current_time}.json', 'w') as json_file:
            json.dump(data, json_file)
    except Exception as e:
        raise e

In [None]:
def get_data_into_structured(latest_file):
    """
    Reads the latest raw JSON data file from the raw layer, transform it, enforce schema and write 
    it into structured layer. Here we also add a number of metadata columns (i.e. source_file, 
    extract_time & current) to the dataset. In this layer we perform a crude SCD2 with a column 
    named 'current' which holds a '1' for latest records and a '0' for historical records.

    Parameters:
    - latest_file (str): Path to the latest file written to Raw layer
    
    Returns:
    - None
    """
    try:
        # We then extract the rates from this file, clean and enforce schema and load it into a dataframe
        with open(latest_file, "r") as file:
            json_data = json.load(file)
    
        base_currency = json_data['base']
        new_rates_df = get_rates_dataframe(json_data)
    
        extract_time = datetime.now().timestamp()
        new_rates_df = new_rates_df.assign(base=lambda x: base_currency)
        new_rates_df = new_rates_df.assign(source_file=lambda x: latest_file)
        new_rates_df = new_rates_df.assign(extract_time=lambda x: extract_time)
    
        # current = 1 for records in the latest batch
        new_rates_df = new_rates_df.assign(current=lambda x: 1) 
        
        # If structured has previouosly been processed we set current = 0 for all records
        structured_path = '../../data/structured/structured_rates.json'
        if os.path.exists(structured_path):
            with open(structured_path, 'r') as file:
                data = json.load(file)
    
            old_rates_df = pd.DataFrame(data)
            old_rates_df['current'] = 0
            
            combined_df = pd.concat([old_rates_df, new_rates_df], ignore_index=True)
    
            os.remove(structured_path)
        else:
            combined_df = new_rates_df
    
        combined_df.to_json(structured_path, orient='records')
    except Exception as e:
        raise e

In [None]:
def get_data_into_curated():
    """
    Reads the structured file from structured layer. We perform aggregation here.

    Parameters:
    - None
    
    Returns:
    - None
    """
    try:
        # Read latest rates from structured_rates.json
        structured_path = '../../data/structured/structured_rates.json'
        if not os.path.exists(structured_path):
            return
            
        with open(structured_path, 'r') as file:
            data = json.load(file)
    
        structured_rates_df = pd.DataFrame(data)
    
        latest_df = structured_rates_df[structured_rates_df['current'] == 1]
    
        summary_path = '../../data/curated/summary_rates.json'
        if os.path.exists(summary_path):
            os.remove(summary_path)
    
        statistics_df = latest_df.groupby(['currency', 'base'], as_index=False).agg(
            mean_rate=('rate', 'mean'),
            worst_rate=('rate', 'min'),
            best_rate=('rate', 'max')
        )
        
        statistics_df.to_json(summary_path, orient='records')
    except Exception as e:
        raise e

In [None]:
def run_pipeline(endpoint):
    """
    Orchestrates the end to end pipeline, brings data from source api into raw, structured & curated layers.

    Parameters:
    - endpoint (str): The URL endpoint to fetch data from.
    
    Returns:
    - None
    """
    try:
        # Call rates api and process data into Raw layer
        get_data_into_raw(endpoint)
        
        # First we need to find the latest file in raw layer
        directory = '../../data/raw/'
        files = os.listdir(directory)
        
        if not files:
            return None

        latest_file = max(files, key=lambda f: os.path.getmtime(os.path.join(directory, f)))

        # Get raw data, transform & save into Structured layer
        get_data_into_structured(f'{directory}{latest_file}')
    
        # Aggregte Structured data and save into Curated layer
        get_data_into_curated()
    except Exception as e:
        raise e

In [None]:
def rebuild_structured_and_curated():
    """
    In the event of a catastrophe, this function will reconstruct the structured and curated layers using data from 
    the raw layer, provided that the raw layer remains unaffected by the event.

    Parameters:
    - None
    
    Returns:
    - None
    """
    try:
        # Delete the files from Structured & Curated layers
        for directory in ['../../data/structured/', '../../data/curated/']:
            for filename in os.listdir(directory):
                # Check if the file has a .json extension
                if filename.endswith(".json"):
                    filepath = os.path.join(directory, filename)
                    os.remove(filepath)

        # Get a list of all files in Raw layer
        raw_directory = '../../data/raw/'
        files = os.listdir(raw_directory)
    
        # Sort files in ascending order
        files.sort(reverse=False)
    
        for f in files:
            get_data_into_structured(f'{raw_directory}{f}')

        # Get data into Curated layer from newly reconstructed Structured layer
        get_data_into_curated()
    except Exception as e:
        raise e  