    - Create a daily-updated data archive of observed meteorology:
        Stakeholders are Salient's Machine Learning team and our customers
        Duration limit to complete the task is a 2 hour timeframe, enforced on the honor system
        Deadline to submit an answer is 1 week after receipt of this email
    - For now, the archive will contain 3 different observed met station WBAN codes:
        14739 (Boston), 23169 (Las Vegas), 94846 (Chicago)
        Eventually, this system must scale to handle all >100k GHCNd stations
    - Get data from NCEI, example for Boston:
        https://www.ncei.noaa.gov/data/global-historical-climatology-network-daily/access/USW000014739.csv
    - Output is a zarr archive:
        Coordinates:   ghcn_id & time, chunked at your discretion
        Data variables: precip (mm/day),  tmax (°C), tmin (°C)
            The source data calls it "prcp", so you'll have to change it
    - Write functions:
        build_ghcnd_archive that establishes a fresh archive from scratch
        update_ghcnd_archive that updates the archive each day
    - Answer questions with 1-3 sentences:
        How would you orchestrate this system to run at scale?
        What major risks would this system face?
        What are the next set of enhancements you would add?
        How would you improve the clarity of this assignment?
    - Reply here to send your answer as zipped (.py | .ipynb) & .pdf
        PDF must contain a print statement that shows the archive contents

In [1]:
import numpy as np
import io
import xarray as xr
import pandas as pd
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

In [2]:
base_url = ("https://www.ncei.noaa.gov/data/global-historical-climatology-network-daily/access/{station}.csv")

stations = ['USW00014739', 'USW00023169', 'USW00094846']


In [3]:

def ghcnd_https_download(station_id):
    
    base_url = ("https://www.ncei.noaa.gov/data/global-historical-climatology-network-daily/access/{station}.csv")
    
    
    url = base_url.format(station = station_id)
    try:
        raw_data = requests.get(url, timeout=2, stream=True)
        #Check for http connectivity
        raw_data.raise_for_status() 
        
        raw_data_frame = pd.read_csv(io.StringIO(raw_data.text), parse_dates=['DATE'])
        
        df = data_transform(raw_data_frame)
        
        
        return df
    
    except requests.exceptions.RequestException as e:
        
        return f"Failed to download {station_id}: {str(e)}"

        
    

In [4]:
def data_transform(data_frame):
    df = data_frame.set_index(['STATION', 'DATE'])
        
    df = df.rename(columns={'PRCP': 'precip', 'TMAX': 'tmax', 'TMIN': 'tmin'})
    
    
    df['precip'] = df['precip'] / 10  # Convert tenths to mm
    df['tmax'] = df['tmax'] / 10  # Convert to °C
    df['tmin'] = df['tmin'] / 10  # Convert to °C
    
    df = df.loc[:, ['precip', 'tmax', 'tmin']]
    return df

In [5]:
def download_files(stations):
    results = []
    #Download multiple files 
    with ThreadPoolExecutor(max_workers=5) as executor:
        tasks = {executor.submit(ghcnd_https_download, station): station for station in stations}
        for future in as_completed(tasks):
            output = tasks[future]
            try:
                #Get the results from each task for further processing
                result = future.result()
                results.append(result)
                
            except Exception as e:
                print('error with '+output)
                
            #results.append(f"Unexpected error downloading {}: {str(e)}")   
    combined_data = pd.concat(results)
    ds = combined_data.to_xarray()
    ds = ds.rename({'STATION': 'ghcnd_id', 'DATE': 'time'})

    # Chunk the data (adjust as needed)
    ds = ds.chunk({'ghcnd_id': 1, 'time': -1})
    
    return(ds)

    


In [6]:
def build_ghcnd_archive(output_path, stations):
    
    all_stations_ds = download_files(stations)
    
    all_stations_ds.to_zarr(output_path, mode='w')
    return all_stations_ds
    

In [10]:
archive = build_ghcnd_archive('archive.zarr', stations)

  raw_data_frame = pd.read_csv(io.StringIO(raw_data.text), parse_dates=['DATE'])
  raw_data_frame = pd.read_csv(io.StringIO(raw_data.text), parse_dates=['DATE'])
  raw_data_frame = pd.read_csv(io.StringIO(raw_data.text), parse_dates=['DATE'])


In [8]:
def update_ghcnd_archive(archive_path, stations):
    todays_date = pd.Timestamp.today()
    try:
        ds = xr.open_zarr(archive_path)
    except FileNotFoundError:
        print('No archive created')
    
    for station_id in stations:
        station_da = ds.sel(ghcnd_id = station_id)
    
        last_station_date = station_da['time'].values[-1]
        days_outdated = (todays_date - last_station_date).days
        
        if days_outdated > 0:
            new_da = ghcnd_https_download(station_id)
            
            
            new_ds = new_da.to_xarray()
            
            new_ds = new_ds.rename({'STATION': 'ghcnd_id', 'DATE': 'time'})
    
            new_ds = new_ds.chunk({'ghcnd_id': 1, 'time': -1})
            
            ds = xr.merge([ds, new_ds], compat='override')
    
            # Write the updated dataset back to the zarr store
    ds.to_zarr(archive_path, mode='a')     
            
    
    
    
    
    
    
    
    

In [9]:
update_ghcnd_archive('archive.zarr', stations)

  raw_data_frame = pd.read_csv(io.StringIO(raw_data.text), parse_dates=['DATE'])
  raw_data_frame = pd.read_csv(io.StringIO(raw_data.text), parse_dates=['DATE'])
  raw_data_frame = pd.read_csv(io.StringIO(raw_data.text), parse_dates=['DATE'])


In [12]:
print(archive)

<xarray.Dataset> Size: 3MB
Dimensions:   (ghcnd_id: 3, time: 32364)
Coordinates:
  * ghcnd_id  (ghcnd_id) object 24B 'USW00014739' 'USW00023169' 'USW00094846'
  * time      (time) datetime64[ns] 259kB 1936-01-01 1936-01-02 ... 2024-08-09
Data variables:
    precip    (ghcnd_id, time) float64 777kB dask.array<chunksize=(1, 32364), meta=np.ndarray>
    tmax      (ghcnd_id, time) float64 777kB dask.array<chunksize=(1, 32364), meta=np.ndarray>
    tmin      (ghcnd_id, time) float64 777kB dask.array<chunksize=(1, 32364), meta=np.ndarray>


**How would you orchestrate this system to run at scale?**

I would run on a more powerful system, such as AWS or Google Cloud, that would be able to download more files at once to actually create the initial archive, and then save to an S3 store. To update the archive, there are computationally expensive operations that take place. To avoid this expense, we would want to try and only download the newest parts of the file, which should be possible using a CSV format since it's just a text based format. I would use a scheduler and some form of dashboard to make sure all 100K stations download without issue, and are updated everyday. 

**What major risks would this system face?**

As written, this system is susceptible to changes in URL formats, CSV metadata, or other changes that impact how the data is structured. If these components change, the system will fail because there is not extensive error checking. The system would also break if there are network connectivity issues and the URL is unreachable. By adding a timeout to the GET request, I mitigate this a bit, but not gracefully. Finally, we do not do any quality control checking, so errors in the actual data sourced from NCEI would pose problems for further analysis. 

**What are the next set of enhancements you would add?**

I would add significantly more error checking to the code to make sure that if the previously mentioned risks occur, that they don't cause silent failures. I would add QA/QC before building the archive to make sure we aren't poisoning our ML/AI ops with bad data. Finally, I would try to further improve performance by finding more efficient ways of updating the archive, adjust the chunking and compression structure of the data. 

**How would you improve the clarity of this assignment?**

I was tripped up by the daily updating function. To update something daily, I mostly use cron which falls outside the scope of the assignment. It might be clearer to say "write a function that if run every day, will update the archive with new data". 