## Data Ingestion

Source: https://www1.ncdc.noaa.gov/pub/data/noaa/isd-lite/

Description: https://www1.ncdc.noaa.gov/pub/data/noaa/isd-lite/isd-lite-format.txt

In [None]:
from dask.delayed import delayed
from dask import compute
from dask.distributed import Client, progress
import pandas as pd
import requests
import os
import glob

In [None]:
ROOT_URL = "https://www1.ncdc.noaa.gov/pub/data/noaa/isd-lite/"
STATIONS = ("722430-12960", )
YEARS = range(2022,1980, -1)

In [None]:
def get_source_url(year, station):
    file=f"{station}-{year}.gz"
    url = f"{ROOT_URL}{year}/{file}"
    return url
    
source_url = get_source_url(2021, "722430-12960")
source_url

In [None]:
def get_target_filename(year, station):
    file=f"{station}-{year}.gz"
    url = f"{ROOT_URL}{year}/{file}"
    filename = f'../data/{year}/{file}'
    return filename
target_filename = get_target_filename(2021, "722430-12960")
target_filename

In [None]:
def ingest(source_url, target_filename):
    if os.path.exists(target_filename):
        return
    
    if not os.path.exists(os.path.dirname(target_filename)):
        try:
            os.makedirs(os.path.dirname(target_filename))
        except OSError as exc: # Guard against race condition
            pass
            # if exc.errno != errno.EEXIST:
                # raise
    r = requests.get(source_url, allow_redirects=True)
    if r.status_code==200:
        open(target_filename, 'wb').write(r.content)            

ingest(source_url, target_filename)

In [None]:
source_urls=[]
target_filenames=[]

for year in YEARS:
    for station in STATIONS:
        source_urls.append(get_source_url(year, station))
        target_filenames.append(get_target_filename(year, station))
        
source_urls[0:3], target_filenames[0:3]

In [None]:
for source_url, target_filename in zip(source_urls, target_filenames):
    ingest(source_url, target_filename)

## Transform

In [None]:
target_filenames = glob.glob("../data/*/*.gz", recursive=True)
target_filenames[0]

In [None]:
import dask.dataframe as dd

def read_csv(filename):
    return pd.read_csv(filename, header=None)

raw_dfs = [delayed(read_csv)(fn) for fn in target_filenames]
raw_df = dd.from_delayed(raw_dfs) # df is a dask dataframe
raw_df.head()

In [None]:
year=raw_df[0].str[0:4].astype(int)
month=raw_df[0].str[6:7].astype(int)
day=raw_df[0].str[9:11].astype(int)
hour=raw_df[0].str[12:13].astype(int)

temperature=raw_df[0].str[14:19].astype(float).replace(-9999, pd.NA)/10
dewpoint=raw_df[0].str[20:24].astype(float).replace(-9999, pd.NA)/10
pressure=raw_df[0].str[26:31].astype(float).replace(-9999, pd.NA)/10

In [None]:
pressure.head()

In [None]:
df=pd.DataFrame(zip(year, month, day, hour, temperature, dewpoint, pressure), columns=["year", "month", "hour", "day", "temperature", "dewpoint", "pressure"])
df.head()