In [None]:
from ftplib import FTP, error_perm, error_temp
from pathlib import Path
from itertools import product


from tqdm import tqdm
import s3fs
from cloudside.asos import MetarParser
from cloudside.asos import FIVEMIN, _find_reset_time, _process_precip

from dask import bag
from distributed import Client
from coiled import Cluster

# %% Create a cluster and client with coiled
cluster = Cluster(name="asos-mirror", n_workers=20, package_sync=True, show_widget=False)
client = Client(cluster)
client

## B - With Dask

* (Same) Function to list all of the files available for a given station-year (e.g., 2020 at KPDX)
* (Slightly Different) Function to transfer individual files from FTP to S3
* (Different) Function process the precip data of a dataframe

In [None]:
fs = s3fs.S3FileSystem()

def get_all_files(station, year):
    with FTP("ftp.ncei.noaa.gov") as ftp:
        ftp.login(passwd=email)
        try:
            dat_files = ftp.nlst(f"/pub/data/asos-fivemin/6401-{year}/*{station}*")
        except error_perm as e:
            print(e)
    return dat_files


def transfer(dat_in, dat_out_uri):
    # don't try to transfer something that is already in S3
    if not fs.exists(dat_out_uri) or (fs.size(dat_out_uri) == 0):
        with fs.open(dat_out_uri, "w") as dat_out, \
             FTP("ftp.ncei.noaa.gov") as ftp:
            ftp.login(passwd=email)
            try:
                ftp.retrlines(
                    f"RETR {dat_in}", lambda x: dat_out.write(x + "\n")
                )
                status = "transferred"
                file_size = fs.size(dat_out_uri)
            except Exception as e:
                file_size = None
                status = str(e)
    else:
        file_size = fs.size(dat_out_uri)
        status = "exists already"

    return {
        "source": dat_in,
        "file": dat_out_uri,
        "status": status,
        "size": file_size
    }


def precip_process(df):
    data = df.groupby("datetime").last().sort_index().resample(FIVEMIN).asfreq()

    rt = _find_reset_time(data["raw_precipitation"])
    precip = _process_precip(data, rt, "raw_precipitation")
    return data.assign(precip=precip)

In [7]:
%%time
# %% set up the data we're going to download
email = "paul+faa@coiled.io"
stations = [
    "KATL", "KPDX", "KLAS",
    "KBHM", "KLGA", "KSEA",
    "KORD", "KLAX", "KSFO",
]
years = list(range(2016, 2022))

station_years = product(stations, years)
all_files = (
    bag.from_sequence(station_years)
       .map(lambda station_year: get_all_files(*station_year))
       .compute()
) # returns a list of file lists

flattened = []
for station_set in all_files:
    flattened.extend(station_set)

s3_base = "oss-shared-scratch/paul-scratch/faa-scratch-demo/raw"
in_out = [
    {"dat_in": f, "dat_out_uri": f"{s3_base}/{Path(f).name}"}
    for f in flattened
]

# %%
status_futures = client.map(lambda x: transfer(**x), in_out, retries=5)
statuses = client.gather(status_futures)

parsed = (
    bag.read_text(f"s3://{s3_base}/*.dat")
        .map(lambda x: MetarParser(x, strict=False).asos_dict(null_sky_as=None))
        .to_dataframe()
        .map_partitions(precip_process)
)

summary = (
    parsed.loc[lambda df: df["temperature"].gt(-25)]
        .groupby(by="station")[["temperature", "wind_speed"]]
        .agg(["min", "max"])
)

summary.compute()

CPU times: user 1.61 s, sys: 422 ms, total: 2.03 s
Wall time: 4min 54s


Unnamed: 0_level_0,temperature,temperature,wind_speed,wind_speed
Unnamed: 0_level_1,min,max,min,max
station,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
KATL,-10.6,37.8,0.0,39.0
KBHM,-17.2,39.4,0.0,35.0
KLAS,-11.7,46.7,0.0,39.0
KLAX,3.9,40.0,0.0,37.0
KLGA,-17.2,38.3,0.0,39.0
KORD,-24.4,36.1,0.0,60.0
KPDX,-11.7,46.7,0.0,94.0
KSEA,-8.3,42.2,0.0,38.0
KSFO,2.2,40.0,0.0,45.0


In [None]:
client.close()
cluster.close()