In [None]:
from prefect import task, flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner
from pathlib import Path
from pprint import pprint
import glob
import pandas as pd
from tqdm import tqdm
import matplotlib

In [None]:
@task()
def find_years_to_prep(data_dir) -> list:
    uploaded_files = glob.glob(f"{data_dir}/**/*___complete", recursive=True)
    uploaded_files = [x.replace("___complete", "") for x in uploaded_files]
    
    prepped_files = glob.glob(f"{data_dir}/**/*___prepped", recursive=True)
    prepped_files = [x.replace("___prepped", "") for x in prepped_files]
    
    return list(set(uploaded_files).difference(set(prepped_files)))

In [None]:
@task()
def open_csv_station_index(filename: Path|str) -> pd.DataFrame:
    year = str(filename)[:4]
    df = pd.read_csv(filename, low_memory=False) #, dtype=str)
    df[['LATITUDE', 'LONGITUDE', 'ELEVATION']] = df[['LATITUDE', 'LONGITUDE', 'ELEVATION']].fillna('missing')
    return df.set_index('STATION')

In [None]:
@task()
def remove_missing_spatial(df: pd.DataFrame) -> pd.DataFrame:
    """
    Removes Records with Missing Spatial Data
    - If 'NaN' exists, replaces with 'missing'
    - Returns dataframe with all 'missing' spatial elements removed
    - Why: This is 100x (guestimate) faster than saving a separate csv with this information removed. Quicker to just
           run this function when a dataframe with 100% clean spatial data is required
    """
    df = df[['LATITUDE', 'LONGITUDE', 'ELEVATION']] = df[['LATITUDE', 'LONGITUDE', 'ELEVATION']].fillna('missing')
    return df[(df['LATITUDE'] != 'missing') & (df['LONGITUDE'] != 'missing') & (df['ELEVATION'] != 'missing')]

In [None]:
@task()
def find_missing_lat_long(station_indexed_df: pd.DataFrame, filename: Path|str) -> pd.DataFrame:
    """
    NOTE: Assumes dataframe has had 'nan' replaced with 'missing' string already.
    """
    # FIND RECORDS MISSING LATITUDE OR LONGITUDE (or both)
    df = station_indexed_df
    df = df[(df['LATITUDE']=='missing') | (df['LONGITUDE']=='missing')]
    df = df.reset_index()
    
    if isinstance(filename, str):
        filename = Path(filename)
    year = filename.name[:4]  # separated from f-string for clarity
    df.to_csv(Path(filename.parent) / f"{year}_missing_lat_long.csv")
    return

In [None]:
@task()
def find_missing_elevation(station_indexed_df: pd.DataFrame, filename: str) -> pd.DataFrame:
    """
    NOTE: Assumes dataframe has had 'nan' replaced with 'missing' string already.
    """
    # FIND RECORDS MISSING ONLY ELEVATION
    # - these could likely still be used
    # - may also be able to pull elevation from else where based on latitude and longitude
    df = station_indexed_df
    df = df[(df['LATITUDE'] != 'missing') & (df['LONGITUDE'] != 'missing') & (df['ELEVATION'] == 'missing')]
    df = df.reset_index()
    
    if isinstance(filename, str):
        filename = Path(filename)
    year = filename.name[:4]  # separated from f-string for clarity
    df.to_csv(Path(filename.parent) / f"{year}_missing_only_elevation.csv")
    return

In [None]:
@task()
def confirm_consistent_spatial_data(station_indexed_df: pd.DataFrame, filename: Path|str) -> pd.DataFrame:
    """
    NOTE: Assumes dataframe has had 'nan' replaced with 'missing' string already.
    """
    # Confirm all stations each has consistent spatial data
    # - *drop* records where spatial data is missing
    # - only runs after previous tasks for missing spatial data are complete
    df = station_indexed_df
    station_grouped_df = df.groupby('STATION')[['LATITUDE', 'LONGITUDE', 'ELEVATION']].value_counts() #dropna=False)
    if len(station_grouped_df) == len(df.index.unique()):
        # run again, but drop records with 'nan' spatial values
        # df = df[(df['LATITUDE'] != 'missing') & (df['LONGITUDE'] != 'missing') & (df['ELEVATION'] != 'missing')]
        # df = df.reset_index()
        return
    else:
        # TODO: This part doesn't work yet; also haven't had an inconsistencies above either (i.e., "else" has never been triggered).
        raise ValueError(f'Spatial data for one or more station ids is not consistent in: {filename}')

In [None]:
@task()
def data_clean_complete(filename):
    """
    In prefect flow should "wait_for" missing_lat_long, missing_elevation, and confirm_consistent to all be successfully completed
    """
    with open(f"{filename}___prepped", 'w'):
        pass

In [None]:
@flow(task_runner=SequentialTaskRunner())
def main():
    logger = get_run_logger()
    data_dir = Path('local_data').resolve() / 'global-summary-of-the-day-archive'
    
    to_process = glob.glob(f"{data_dir}/**/*_full.csv", recursive=True)
    to_prep = find_years_to_prep(data_dir)

    # process_years = tqdm(sorted(to_prep), desc='Cleaning Year Files')
    process_years = sorted(to_prep.wait().result())
    for filename in process_years:
        file_to_prep = f"{Path(filename).parent / Path(filename).name[:4]}_full.csv"
        logger.info(f"STARTING {Path(file_to_prep).name}")
        # process_years.set_description(Path(file_to_prep).name)
        station_idx_df = open_csv_station_index(file_to_prep)
        missing_lat_long = find_missing_lat_long(station_idx_df, file_to_prep)
        missing_elevation = find_missing_elevation(station_idx_df, file_to_prep)
        # remove_missing_spatial(station_index_df)
        consistent = confirm_consistent_spatial_data(station_idx_df, file_to_prep)
        data_clean_complete(filename, wait_for=[missing_lat_long, missing_elevation, consistent])

main()