In [1]:
import os
import csv
import requests
from datetime import datetime
import pandas as pd
import json
from pathlib import Path
import numpy as np
import geopandas as gpd
import time



In [2]:
proj_dir = Path("../../../../")
data_dir = proj_dir / "data/insitu/fish"

In [3]:
fish_count_sites = pd.read_csv("site_table.csv")
stations_metadata_path = proj_dir / "data/insitu/metadata/stations.csv"
stations_metadata = pd.read_csv(stations_metadata_path)

In [4]:
# list(lat_lon_dict.keys())

In [5]:
missing_locations = [
    x
    for x in fish_count_sites["Abbrev"]
    if f"DART_{x}" not in stations_metadata["station_ID"].values
]
lat_lon_dict = {
    "LYL": {"lat": 45.7163626949042, "lon": -121.2595910431273},
    "PRO": {"lat": 46.213277560318176, "lon": -119.77283276031208},
    "ROZ": {"lat": 46.74919299784255, "lon": -120.46569546028712},
    "TUM": {"lat": 47.61688013975446, "lon": -120.72316429020638},
    "ZOS": {"lat": 48.933696826389344, "lon": -119.41963260251167},
}

In [6]:
# standard key per the THORR project
data_key = {
    "Chin": 'chinook', "JChin": 'chinook','Stlhd': 'steelhead','WStlhd': 'steelhead','Sock': 'sockeye','Coho': 'coho','JCoho': 'coho','Shad': 'shad','Lmpry': 'lamprey','BTrout': 'bull_trout','Chum': 'chum','Pink': 'pink','TempC': 'avg_temp(C)'
}



In [7]:
# format the url for the request
def format_url(proj: str, startDate: str, endDate: str):
    """Formats the url for the request for a particular project and date range within the same year
    Args:
        proj (str): abbreviated form of the project name
        startDate (str): start date of the query [YYYY-MM-DD]
        endDate (str): end date of the query [YYYY-MM-DD]
    Returns:
        url (str): formated url for the request
    """
    # convert the dates to datetime objects
    startDate = datetime.strptime(startDate, "%Y-%m-%d")
    endDate = datetime.strptime(endDate, "%Y-%m-%d")

    # get the year from the start date
    year = startDate.year

    # get the month and day from the start and end dates
    startMonth = startDate.month
    startDay = startDate.day
    endMonth = endDate.month
    endDay = endDate.day

    # format the url
    url = f"https://www.cbr.washington.edu/dart/cs/php/rpt/adult_daily.php?outputFormat=csv&year={year}&proj={proj}&span=no&startdate={startMonth}%2F{startDay}&enddate={endMonth}%2F{endDay}"

    return url

In [8]:
# get the data from the url and convert it to a csv
def get_data(proj: str, startDate: str, endDate: str, path: str):
    """Gets the data from the url and converts it to a csv
    Args:
        proj (str): abbreviated form of the project name
        startDate (str): start date of the query [YYYY-MM-DD]
        endDate (str): end date of the query [YYYY-MM-DD]
        path (str): path to the directory where the csv file will be saved
    Returns:
        None
    """

    # capitalize the project name
    proj = proj.upper()

    # get start year and end year
    startYear = datetime.strptime(startDate, "%Y-%m-%d").year
    endYear = datetime.strptime(endDate, "%Y-%m-%d").year

    first_data = True

    # create a csv file for the data by adding all the data from each year
    with open(
        os.path.join(path, 'raw/dart/' "DART_{}.csv".format(proj)), "w", newline=""
    ) as csvfile:
        writer = csv.writer(csvfile, delimiter=",")
        # for each year, take off the lines after the line that begins with 'Notes:'
        for year in range(startYear, endYear + 1):
            if year == startYear and year == endYear:
                # get the url for the request
                url = format_url(proj, startDate, endDate)
            elif year == startYear:
                # get the url for the request
                url = format_url(proj, startDate, "{}-12-31".format(year))
            elif year == endYear:
                # get the url for the request
                url = format_url(proj, "{}-01-01".format(year), endDate)
            else:
                # get the url for the request
                url = format_url(proj, "{}-01-01".format(year), "{}-12-31".format(year))

            # print(url)
            # get the data from the url and convert it to csv format
            try:
                response = requests.get(url)
            except requests.ConnectionError as e:
                # sleep and try again
                time.sleep(np.random.randint(20, 60))
                response = requests.get(url)

                
            data = response.text.splitlines()
            if (
                data[0] == "<!DOCTYPE html>"
                or data[0] == '<html lang="en" class="no-js">'
            ):
                pass
            else:
                for i in range(len(data)):
                    if data[i].startswith("Notes:"):
                        data = data[:i]
                        break
                # write the data to the csv file but don't repeat the header row
                # print(data[0])
                if year == startYear or first_data:
                    writer.writerows(csv.reader(data))
                    first_data = False
                else:
                    writer.writerows(csv.reader(data[1:]))
                # writer.writerows(csv.reader(data))

In [9]:
# postprocess the downloaded data
def postprocess_data(proj: str, path: str, conditions_path = None):
    # if the data exists, read it in
    if os.path.exists(os.path.join(path, "processed", "DART_{}.csv".format(proj))):
        df_existing = pd.read_csv(
            os.path.join(path, "processed", "DART_{}.csv".format(proj))
        )
        df_existing["date"] = pd.to_datetime(df_existing["date"])
    else:
        df_existing = pd.DataFrame()

    # read in the data
    df = pd.read_csv(os.path.join(path, "raw/dart", "DART_{}.csv".format(proj.upper())))
    df.drop_duplicates(inplace=True)
    df.to_csv(
        os.path.join(path, "raw/dart", "DART_{}.csv".format(proj.upper())), index=False
    )
    df = pd.read_csv(os.path.join(path, "raw/dart", "DART_{}.csv".format(proj.upper())))

    new_df = pd.DataFrame(columns=["date"]+list(set(data_key.values())))
    new_df["date"] = pd.to_datetime(df["Date"])
    
    new_df['chinook'] = (df['Chin'].fillna(0) + df['JChin'].fillna(0)).replace(0, np.nan)
    new_df['steelhead'] = (df['Stlhd'].fillna(0) + df['WStlhd'].fillna(0)).replace(0, np.nan)
    new_df['sockeye'] = df['Sock'].replace(0, np.nan)
    new_df['coho'] = (df['Coho'].fillna(0) + df['JCoho'].fillna(0)).replace(0, np.nan)
    new_df['shad'] = df['Shad'].replace(0, np.nan)
    new_df['lamprey'] = df['Lmpry'].replace(0, np.nan)
    new_df['bull_trout'] = df['BTrout'].replace(0, np.nan)
    new_df['chum'] = df['Chum'].replace(0, np.nan)
    new_df['pink'] = df['Pink'].replace(0, np.nan)
    new_df['avg_temp(C)'] = df['TempC']


    df = None
    # merge the data with existing data
    if not df_existing.empty:
        new_df = pd.concat([df_existing, new_df], ignore_index=True)
        new_df.drop_duplicates(subset=["date"], inplace=True)

    # drop null columns
    new_df = new_df.dropna(axis=1, how="all")

    # save the data
    new_df.to_csv(
        os.path.join(path, "processed", "DART_{}.csv".format(proj)), index=False
    )

    if 'avg_temp(C)' in new_df.columns:
        # load conditions dataset if it exists
        if os.path.exists(os.path.join(conditions_path ,"processed", f"DART_{proj}.csv")):
            existing_conditions = pd.read_csv(os.path.join(conditions_path ,"processed", f"DART_{proj}.csv"))
            existing_conditions["date"] = pd.to_datetime(existing_conditions["date"])
        else:
            existing_conditions = pd.DataFrame()
        
        # merge the data with existing data
        if not existing_conditions.empty:
            existing_conditions = existing_conditions.merge(new_df[['date', 'avg_temp(C)']], on="date", how="outer")
        else:
            existing_conditions = new_df[['date', 'avg_temp(C)']]

        if 'avg_temp(C)_x' in existing_conditions.columns:
            existing_conditions['avg_temp(C)'] =existing_conditions['avg_temp(C)_x'].where(existing_conditions['avg_temp(C)_x'].notnull(), existing_conditions['avg_temp(C)_y'])
            existing_conditions.drop(columns=['avg_temp(C)_x', 'avg_temp(C)_y'], inplace=True)

        # save the data
        # print(path, "/../conditons/processed", f"DART{proj}.csv")
        existing_conditions.to_csv(
            Path(conditions_path ,"processed", f"DART_{proj}.csv"), index=False
        )

    # print(new_df.columns)
    return new_df.columns

In [10]:
startDate = "1939-01-01"
# endDate = "2024-09-18"
endDate = datetime.today().strftime("%Y-%m-%d")

In [11]:
os.makedirs(data_dir, exist_ok=True)
os.makedirs(os.path.join(data_dir, "raw/dart"), exist_ok=True)
os.makedirs(os.path.join(data_dir, "processed"), exist_ok=True)

In [12]:
for p in fish_count_sites["Abbrev"]:
# for p in list(lat_lon_dict.keys()):
# for p in ['ZOS',]:
    get_data(p, startDate, endDate, data_dir)
    parameters = postprocess_data(
        p, data_dir, conditions_path=Path(proj_dir, "data/insitu/conditions")
    )

    # update metadata and existing conditions file if temperature is included
    station_ID = "DART_" + p.upper()

    if station_ID not in stations_metadata["station_ID"].values:
        lat = lat_lon_dict[p]["lat"]
        lon = lat_lon_dict[p]["lon"]

        project = fish_count_sites[fish_count_sites["Abbrev"] == p.upper()][
            "Project/Dam"
        ].values[0]
        river = fish_count_sites[fish_count_sites["Abbrev"] == p.upper()][
            "River"
        ].values[0]
        description = f"{project} - {river}"

        stations_metadata = pd.concat(
            [
                stations_metadata,
                pd.DataFrame(
                    {
                        "station_ID": [station_ID],
                        "id_at_source": [p.upper()],
                        "available_data": ["{}"],
                        "source_URL": [
                            '{"url" : []}'
                        ],
                        "description": [description],
                        # "latitude": [
                        #     dart_stations_metadata[
                        #         dart_stations_metadata["Abbrev"] == p.upper()
                        #     ]["Longitude"].values[0]
                        # ],
                        # "longitude": [
                        #     dart_stations_metadata[
                        #         dart_stations_metadata["Abbrev"] == p.upper()
                        #     ]["Latitude"].values[0]
                        # ],
                        "latitude": [lat],
                        "longitude": [lon],
                        "site_params": ["{}"],
                    }
                ),
            ],
            ignore_index=True,
        )

    # update source url
    source_url = json.loads(
        stations_metadata.loc[
            stations_metadata["station_ID"] == station_ID, "source_URL"
        ].values[0]
    )

    if "https://www.cbr.washington.edu/dart/query/adult_daily" not in source_url["url"]:
        source_url["url"].append("https://www.cbr.washington.edu/dart/query/adult_daily")
        stations_metadata.loc[
            stations_metadata["station_ID"] == station_ID, "source_URL"
        ] = json.dumps(source_url)




    # update the available data
    availble_data = stations_metadata.loc[
        stations_metadata["station_ID"] == station_ID, "available_data"
    ].values[0]
    availble_data = json.loads(availble_data)

    # check if there is "conditions"  in the available data
    if "fish" not in availble_data.keys():
        availble_data["fish"] = []
    # add the parameters to the available data
    # print(parameters[1:])
    for param in parameters[1:]:
        if (param not in availble_data["fish"]) and (param != "avg_temp(C)"):
            availble_data["fish"].append(param)
        elif param == "avg_temp(C)" and "conditions" not in availble_data.keys():
            availble_data["conditions"] = ["avg_temp(C)"]
        elif param == "avg_temp(C)" and "conditions" in availble_data.keys():
            if "avg_temp(C)" not in availble_data["conditions"]:
                availble_data["conditions"].append("avg_temp(C)")

    # update the metadata
    stations_metadata.loc[
        stations_metadata["station_ID"] == station_ID, "available_data"
    ] = json.dumps(availble_data)

    # save the metadata
    stations_metadata.to_csv(stations_metadata_path, index=False)

    print(f"Finished processing {p} data")
    # # sleep for a random time between 30 to 60 seconds
    # time.sleep(np.random.randint(30, 60))

Finished processing BON data
Finished processing IHR data
Finished processing JDA data
Finished processing LGS data
Finished processing LMN data
Finished processing LWG data
Finished processing LYL data
Finished processing MCN data
Finished processing PRD data
Finished processing PRO data
Finished processing RIS data
Finished processing ROZ data
Finished processing RRH data
Finished processing TDA data
Finished processing TUM data
Finished processing WAN data
Finished processing WEL data
Finished processing WFF data
Finished processing ZOS data


In [13]:
# add last updated date and last updated by
metadata_status = {
    "last_updated": pd.Timestamp.now().strftime("%Y-%m-%d %H:%M:%S"),
    "update_message": "Updated the metadata to include the new DART stations and fish data",
    "last_updated_by": "George Darkwah",
    "last_updated_by_email": "gdarkwah@uw.edu",
}

# save metadata
with open(Path(proj_dir, "Data/insitu/metadata/metadata_status.csv"), "w") as f:
    json.dump(metadata_status, f, indent=4)