# NOAA Loading Station Information to Database
Logan Gall

27 May 2024

In [3]:
#imports
import psycopg2 #for database connection
#for downloading data (below)
import csv
import requests
from collections import deque
import os

In [4]:
# Database connection parameters
dbname = 'postgres'
user = 'postgres'
password = 'Passwordd'
host = 'localhost'

# Read the API key from a file and remove any leading/trailing whitespace.
# with open('../API_Keys/NOAA_Token.txt', 'r') as file:
#    api_key = file.read().strip()

# OR PASTE API KEY HERE:
api_key = ""

In [6]:
# Create empty table to store NOAA API Observations
create_table_sql = """
CREATE TABLE IF NOT EXISTS noaa_api (
    uid VARCHAR(255) PRIMARY KEY,
    date TIMESTAMP,
    datatype VARCHAR(255),
    station VARCHAR(255),
    latitude NUMERIC,
    longitude NUMERIC,
    elevation VARCHAR(255),
    name VARCHAR(255),
    attributes TEXT, -- Using TEXT to accommodate commas and variable-length strings
    value NUMERIC -- NUMERIC is suitable for any kind of number, you could use FLOAT if appropriate
);
"""

# Connect to postgres DB
conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host)

# Open a cursor to perform database operations
cur = conn.cursor()

# Execute the SQL to create the table if it doesn't exist
cur.execute(create_table_sql)
conn.commit()  # Commit immediately after creating the table

# Close the cursor and connection
cur.close()
conn.close()

## Loadining weather station data to database

In [7]:
# Create empty table to store NOAA Station list
create_table_sql = """
CREATE TABLE IF NOT EXISTS noaa_station_list (
    elevation VARCHAR(255),
    mindate DATE,
    maxdate DATE,
    latitude NUMERIC,
    name VARCHAR(255),
    datacoverage NUMERIC,
    id VARCHAR(255) PRIMARY KEY,
    elevationUnit VARCHAR(50),
    longitude NUMERIC
);
"""

# Connect to postgres DB
conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host)

# Open a cursor to perform database operations
cur = conn.cursor()

# Execute the SQL to create the table if it doesn't exist
cur.execute(create_table_sql)
conn.commit()  # Commit immediately after creating the table

# Close the cursor and connection
cur.close()
conn.close()

### Download weather station data

This code is pulled from the ETL_Download_Stack.ipynb file to download data for our weather stations

In [None]:
# Base URL for the API calls.
url = 'https://www.ncdc.noaa.gov/cdo-web/api/v2/'

# Headers required for the API call, including the authorization token.
headers = {
    'token': api_key  # API key is passed as a token in the header.
}

def api_call(url, endpoint, headers, parameters):
    """
    Make an API call to the specified URL and endpoint with given headers and parameters.
    
    Args:
        url (str): The base URL for the API.
        endpoint (str): The specific endpoint to access data from the API.
        headers (dict): Headers to include in the request (e.g., authorization tokens).
        parameters (dict): Query parameters to customize the request.
    
    Returns:
        dict: The JSON response from the API if the call is successful, None otherwise.
    """
    try:
        print(parameters['offset'])  # Debugging: print the current offset before making the call.
        response = requests.get(url + endpoint, headers=headers, params=parameters)  # Perform the GET request.
        response.raise_for_status()  # Check for HTTP errors and raise exceptions for them.
        print("API Called")  # Debugging: confirm the API was called.
        return response.json()  # Return the parsed JSON response.
    except requests.exceptions.RequestException as e:
        print(f"API call failed: {e}")  # Handle exceptions (e.g., network issues, 4xx and 5xx errors).
        return None

def append_to_csv(file_path, data, fieldnames):
    """
    Append data to a CSV file. If the file doesn't exist, create it and write the headers.
    
    Args:
        file_path (str): Path to the CSV file.
        data (list): A list of dictionaries representing the rows to append.
        fieldnames (list): Headers or fieldnames for the CSV.
    """
    with open(file_path, mode='a', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=fieldnames)
        if file.tell() == 0:  # Check if the file is empty to decide if headers need to be written.
            writer.writeheader()
        writer.writerows(data)  # Append the data rows to the CSV file.

def append_metadata_to_csv(metadata_file_path, metadata):
    """
    Append metadata to a CSV file.
    
    Args:
        metadata_file_path (str): Path to the CSV file where metadata is stored.
        metadata (dict): Metadata to be appended.
    """
    with open(metadata_file_path, mode='a', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=metadata['resultset'].keys())
        if file.tell() == 0:  # Check if the file is empty to decide if headers need to be written.
            writer.writeheader()
        writer.writerow(metadata['resultset'])  # Append metadata to the metadata CSV.
        
def execute_api_calls_with_retries(base_url, endpoint, headers, initial_parameters, file_path, metadata_file_path, error_log_path, max_retries=3):
    """
    Execute API calls with a mechanism for retries on failure, appending successful results to a CSV file
    and metadata to another CSV file.
    
    Args:
        base_url (str): The base URL for the API calls.
        endpoint (str): The specific endpoint to access data from the API.
        headers (dict): Headers to include in the request.
        initial_parameters (dict): Initial query parameters for the API call.
        file_path (str): Path to the CSV file where results are stored.
        metadata_file_path (str): Path to the CSV file where metadata is stored.
        error_log_path (str): Path to the CSV file where failed attempts are logged.
        max_retries (int): Maximum number of retry attempts for a failed API call.
    """
    if os.path.exists(file_path):
        print(f"CSV file {file_path} already exists. Operation cancelled to prevent overwriting.")
        return  # Prevents overwriting existing data by aborting if the file already exists.
    
    stack = deque([initial_parameters])  # Use a stack to manage API call parameters, starting with the initial parameters.
    failed_attempts = []  # Track parameters that fail to succeed after the maximum number of retries.
    
    while stack:
        parameters = stack.pop()  # Pop the last set of parameters to make an API call.
        retries = parameters.pop('retries', 0)  # Extract or initialize the retry counter for these parameters.
        data = api_call(base_url, endpoint, headers, parameters)  # Attempt the API call.
        
        if data and 'results' in data:  # Check if the call was successful and data was returned.
            append_to_csv(file_path, data['results'], data['results'][0].keys())  # Append successful results to the CSV.
            append_metadata_to_csv(metadata_file_path, data['metadata'])  # Append metadata to the metadata CSV
            
            # Check if there are more results to fetch and prepare the next set of parameters.
            if parameters.get('offset', 1) + parameters.get('limit', 10) - 1 < data['metadata']['resultset']['count']:
                next_params = parameters.copy()
                next_params['offset'] += parameters.get('limit', 10)
                stack.append(next_params)  # Add the next parameters to the stack for subsequent API calls.
        else:
            # Retry logic for failed attempts.
            if retries < max_retries:
                parameters['retries'] = retries + 1  # Increment the retry counter.
                stack.append(parameters)  # Re-add the parameters to the stack for retry.
            else:
                print(f"Max retries reached for parameters: {parameters}")
                failed_attempts.append(parameters)  # Log parameters that exceeded retry attempts.
    
    # Optionally log failed attempts to a specified CSV for troubleshooting.
    if failed_attempts:
        append_to_csv(error_log_path, failed_attempts, ['locationid', 'limit', 'offset', 'retries'])

In [None]:
# Example of how to call the function
endpoint = "/stations/"
parameters = {
    "locationid": "FIPS:27", #CHANGE OR REMOVE FIPS CODE TO DOWNLOAD STATIONS NEAR LOCATION OF YOUR CHOICE
    "limit": 1000,
    "offset": 1
}
file_path = "stations.csv"
error_log_path = "error_log.csv"
metadata_path = "stations_meta.csv"

execute_api_calls_with_retries(url, endpoint, headers, parameters, file_path, metadata_path, error_log_path, max_retries=3)

### Push weather station data to database

In [None]:
#Load station data to database
csv_file_path = './stations.csv'  # Make sure this path is correct

# Connect to postgres DB
conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host)

# Open a cursor to perform database operations
cur = conn.cursor()

# Reading and inserting data from CSV
with open(csv_file_path, newline='') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        cur.execute("""
            INSERT INTO noaa_station_list(elevation, mindate, maxdate, latitude, name, datacoverage, id, elevationUnit, longitude)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (id) DO NOTHING;
        """, (row['elevation'], row['mindate'], row['maxdate'], row['latitude'], row['name'], row['datacoverage'], row['id'], row['elevationUnit'], row['longitude']))

conn.commit()

# Close the cursor and connection
cur.close()
conn.close()