# Save data from weather stations as pandas dataframe
- Downloaded files on measured hourly precipitation from [CEDA](https://data.ceda.ac.uk/badc/ukmo-midas-open/data/uk-hourly-rain-obs/dataset-version-202207). 
- Here, saving files to dataframe and removing duplicates within single file. 
- Files are concatenated in multindex data frame with latitude and longitude of the weather station as the two indices.

Info on weather station data:
- `prcp_amt` gives hourly observed total rainfal in mm to nearest 0.2mm
- `prcp_dur` gives duration of rainfall within that hour to nearest 6min (rarely noted in files)

In [None]:
# allows update of external libraries without need to reload package
%load_ext autoreload
%autoreload 2

In [None]:
import os
import re
import glob
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import logging
import tqdm
import a2.utils

logging.basicConfig(level=logging.INFO)

In [None]:
files = []
start_dir = "/home/kristian/Projects/a2/data/weather_stations/dap.ceda.ac.uk/"
pattern = r"*/(?!capability).csv"
pattern = r"*.csv"

for dir, _, _ in os.walk(start_dir):
    files.extend(glob.glob(os.path.join(dir, pattern)))
files = [f for f in files if "capability.csv" not in f and "station-metadata.csv" not in f and "_qcv-1_" in f]
print(f"{len(files)} fiels in total")
files[182]

In [None]:
def assert_arrays_equal(array1, array2):
    if not np.array_equal(array1, array2):
        for a1, a2 in zip(array1, array2):
            if a1 != a2:
                print(f"{a1=} != {a2=}")
        raise ValueError(f"{array1=} not same as expected: {array2=}!")


def location_from_header(header):
    s = "".join(header)
    r = re.findall("location,G,(-?\d+.\d+),(-?\d+.\d+)", s)
    if np.shape(r) != (1, 2):
        raise ValueError(f"Location not correctly extracted from header: {s=}, extracted: {r=}")
    latitude = float(r[0][0])
    longitude = float(r[0][1])
    return latitude, longitude


def station_name_from_header(header):
    s = "".join(header)
    station = re.findall("observation_station,G,(.+)\n", s)[0]
    county = re.findall("historic_county_name,G,(.+)\n", s)[0]
    return county + "_" + station


def load_single_csv(
    filename,
    only_hourly=True,
    check_duplicate=True,
    check_domain=True,
    keep_max_tp=True,
):
    # For description of columns see https://artefacts.ceda.ac.uk/badc_datadocs/ukmo-midas/RH_Table.html
    # Or header of the csv files
    # Documentation for CEDA can be found here http://cedadocs.ceda.ac.uk/1492/
    df = pd.read_csv(filename, skiprows=61)
    if df["ob_end_time"].isnull().sum():
        raise ValueError(f"Nan time values in {filename}!")
    column_names = np.array(
        [
            "ob_end_time",
            "id",
            "id_type",
            "ob_hour_count",
            "version_num",
            "met_domain_name",
            "src_id",
            "rec_st_ind",
            "prcp_amt",
            "prcp_dur",
            "prcp_amt_q",
            "prcp_dur_q",
            "prcp_amt_j",
            "meto_stmp_time",
            "midas_stmp_etime",
        ],
        dtype=object,
    )
    assert_arrays_equal(column_names, df.columns.values)
    if df["ob_end_time"].values[-1] == "end data":
        df.drop(df.tail(1).index, inplace=True)
    if not np.all(df["id_type"].values == "RAIN"):
        raise ValueError(
            f"{filename}:\nUnexpected value in df['id_type']==RAIN, but found {[x for x in df['id_type'].values if x != 'RAIN']}"
        )
    df["ob_end_time"] = pd.to_datetime(df["ob_end_time"])
    df["meto_stmp_time"] = pd.to_datetime(df["meto_stmp_time"])
    # dropping rows when neither duration of precipitation `prcp_dur` nor amount of precipitation `prcp_amt` is available
    df = df.drop(df[df["prcp_dur"].isnull() & df["prcp_amt"].isnull()].index).reset_index(drop=True)
    if only_hourly:
        df = df.loc[df["ob_hour_count"] == 1].reset_index(drop=True)
    if check_domain:
        mask = df["met_domain_name"] != "SREW"
        counts_non_srew = mask.sum()
        if counts_non_srew:
            # Note, SSER or SAMOS also possible but shouldn't be relevant for our domain (hourly rain water: SREW)
            mask_not = ~mask
            # logging.info(f"Removing {counts_non_srew}/{df.shape[0]} non-SREW domains (found: {np.unique(df['met_domain_name'].loc[mask])}, time: {df['ob_end_time'].loc[mask].min()}-{df['ob_end_time'].loc[mask].max()}) from dataframe!")
            df = df.loc[mask_not].reset_index(drop=True)
    if check_duplicate:
        # drop measurements of different devices (marked by different `id` which are stored at different times possibly `meto_stmp_time`/`midas_stmp_etime`) but with same results
        df = df.drop_duplicates(
            subset=[x for x in df.columns.values if x not in ["id", "meto_stmp_time", "midas_stmp_etime"]],
            keep="first",
        ).reset_index(drop=True)
        # check if only difference is due to State Indicators, if case only retain 1011
        if (
            df.loc[df.duplicated(subset=["ob_end_time", "ob_hour_count"])].shape[0]
            and not df.loc[
                df.duplicated(
                    subset=["ob_end_time", "ob_hour_count", "rec_st_ind"],
                    keep=False,
                )
            ].shape[0]
        ):
            df = df.drop(
                df[
                    df.duplicated(subset=["ob_end_time", "ob_hour_count"], keep=False) & (df["rec_st_ind"] != 1011)
                ].index
            ).reset_index(drop=True)
        # check if duplicates only occure due to same `ob_end_time` but varying 'id', 'meto_stmp_time', 'midas_stmp_etime', if case retain first occurence
        # sort values by prcp_amt (nan-values will come first), drop duplicate values that do not agree in 'id', 'meto_stmp_time', 'midas_stmp_etime' by picking the highest value in `prcp_amt`
        if keep_max_tp:
            df = df.sort_values("prcp_amt", na_position="first")
            df = df.drop(
                df[
                    ~df.duplicated(
                        subset=[x for x in df.columns.values if x not in ["id", "meto_stmp_time", "midas_stmp_etime"]],
                        keep="last",
                    )
                    & df.duplicated(subset=["ob_end_time", "ob_hour_count"], keep="last")
                ].index
            ).reset_index(drop=True)
        if df.loc[df.duplicated(subset=["ob_end_time", "ob_hour_count"])].shape[0]:
            raise ValueError(
                f"{filename}:\nDuplicate columns with the same `ob_end_time` and `ob_hour_count`: {df.loc[df.duplicated(subset=['ob_end_time', 'ob_hour_count'], keep=False)]}"
            )
    # only retain data with status code 1011 -> "Normal ingestion of observation at creation"
    if not df["rec_st_ind"].loc[df["rec_st_ind"] != 1011].all():
        # see https://dap.ceda.ac.uk/badc/ukmo-midas/metadata/doc/state_indicators.html
        raise ValueError(
            f"{filename}:\nRecords found that do not have QC=1, found State indicators{df['rec_st_ind'].value_counts()}"
        )
    df.attrs["header"] = a2.utils.file_handling.get_header(filename, 60)
    df.attrs["latitude"], df.attrs["longitude"] = location_from_header(df.attrs["header"])
    df.attrs["station_name"] = station_name_from_header(df.attrs["header"])
    df["latitude"] = df.attrs["latitude"]
    df["longitude"] = df.attrs["longitude"]
    df["station_name"] = df.attrs["station_name"]
    return df


def load_csv_multi_index(filename, only_hourly=True, check_duplicate=True, check_domain=True):
    df = load_single_csv(
        filename,
        only_hourly=only_hourly,
        check_duplicate=check_duplicate,
        check_domain=check_domain,
    )
    return df


def unique_coordinates(df, coordinates=None):
    if coordinates is None:
        coordinates = ["latitude", "longitude"]
    return np.unique(np.array([df[x].values for x in coordinates]), axis=1)


def load_all_files(files):
    loaded = [load_csv_multi_index(f) for f in tqdm.tqdm(files)]
    df = pd.concat(loaded).reset_index(drop=True)
    return df

In [None]:
def load_weather_stations(filename):
    df = pd.read_csv(
        filename,
        # usecols=["latitude", "longitude", "ob_end_time", "prcp_amt"],
        dtype={"latitude": float, "longitude": float, "prcp_amt": float},
        parse_dates=["ob_end_time"],
    )
    df = df.set_index(["latitude", "longitude"])
    return df

In [None]:
df = load_all_files(files)

In [None]:
def check_for_duplicates_coord_based(df):
    for latitude, longitude in zip(*unique_coordinates(df)):
        coordinate_mask = (df.latitude == latitude) & (df.longitude == longitude)
        mask = coordinate_mask & df.loc[coordinate_mask].duplicated(subset=["ob_end_time"], keep=False)
        duplicates = df.loc[mask]
        if duplicates.shape[0]:
            print(f"{latitude=}, {longitude=}")
            print((duplicates).sort_values("ob_end_time"))


def drop_duplicates_same_location(df):
    # duplicates include entries where "prcp_amt", "prcp_dur" differ -> retain larger value
    for latitude, longitude in zip(*unique_coordinates(df)):
        coordinate_mask = (df.latitude == latitude) & (df.longitude == longitude)
        df = df.sort_values("prcp_amt", na_position="first")
        df = df.drop(
            df.loc[coordinate_mask][df.loc[coordinate_mask].duplicated(subset=["ob_end_time"], keep="last")].index
        ).reset_index(drop=True)
    print(f"{df.loc[df.duplicated(subset=['ob_end_time'], keep=False)].sort_values('ob_end_time')=}")
    return df

In [None]:
df_unique = drop_duplicates_same_location(df.copy())

In [None]:
df_unique.to_csv("../../data/weather_stations/weather_stations_hourly_rainfall_uk.csv")

In [None]:
!ls -tlrh '../../data/weather_stations/weather_stations_hourly_rainfall_uk_2017-2020_reduced.csv'

In [None]:
df = load_weather_stations(
    "../../data/weather_stations/weather_stations_hourly_rainfall_uk.csv",
)

In [None]:
df = df.reset_index()

In [None]:
def check_for_duplicates_coord_based(df):
    for latitude, longitude in zip(*unique_coordinates(df)):
        coordinate_mask = (df.latitude == latitude) & (df.longitude == longitude)
        mask = coordinate_mask & df.loc[coordinate_mask].duplicated(subset=["ob_end_time"], keep=False)
        duplicates = df.loc[mask]
        if duplicates.shape[0]:
            print(f"{latitude=}, {longitude=}")
            print((duplicates).sort_values("ob_end_time"))

In [None]:
check_for_duplicates_coord_based(df)

In [None]:
df_1720 = load_weather_stations("../../data/weather_stations/weather_stations_hourly_rainfall_uk_2017-2020_reduced.csv")
df_1720

In [None]:
check_for_duplicates_coord_based(df_1720.reset_index())

In [None]:
df_1720 = df.loc[
    (df["ob_end_time"] > np.datetime64("2017-01-01 00:00:00"))
    & (df["ob_end_time"] < np.datetime64("2021-01-01 00:00:00"))
].reset_index(drop=True)
df_1720.to_csv("../../data/weather_stations/weather_stations_hourly_rainfall_uk_2017-2020_reduced.csv")

In [None]:
check_for_duplicates_coord_based(df_1720)

In [None]:
hemsby = df.loc[df["station_name"].str.contains("norfolk_hemsby")]

In [None]:
df.loc[df["station_name"] == "norfolk_hemsby-trial"].shape

In [None]:
hemsby.loc[
    hemsby.duplicated(subset=["ob_end_time"], keep=False)
    & ~hemsby.duplicated(subset=["ob_end_time", "prcp_amt", "prcp_dur"], keep=False)
].sort_values("ob_end_time")

In [None]:
def remove_duplicates_same_site(df):
    df.reset_index(inplace=True)
    df = df.drop(df[df.duplicated(subset=["ob_end_time", "prcp_amt", "prcp_dur"], keep="first")].index).reset_index(
        drop=True
    )
    df = df.sort_values("ob_end_time").reset_index(drop=True)
    duplicate_times = df.loc[df.duplicated(subset=["ob_end_time"], keep=False)]
    if duplicate_times.shape[0]:
        # keep maximum values
        logging.info("Have {duplicate_times.shape[0]} remaining duplicates, keep max value.")
        df = df.sort_values("prcp_amt", na_position="first")
        df = df.drop(df[df.duplicated(subset=["ob_end_time"], keep="last")].index).reset_index(drop=True)
    duplicate_times = df.loc[df.duplicated(subset=["ob_end_time"], keep=False)]
    print(f"{duplicate_times.shape[0]} values are duplicates")
    return df.sort_values("ob_end_time")


hemsby_r = remove_duplicates_same_site(hemsby.copy())
hemsby_r.loc[hemsby_r["ob_end_time"] == np.datetime64("2000-07-31 04:00:00")]

In [None]:
hemsby.loc[hemsby.duplicated(subset=["ob_end_time"], keep=False)].sort_values("ob_end_time")

In [None]:
hemsby.loc[hemsby.duplicated(subset=["ob_end_time", "prcp_amt", "prcp_dur"], keep=False)].sort_values("ob_end_time")

In [None]:
df.to_csv("../../data/weather_stations/weather_stations_hourly_rainfall_uk.csv")

In [None]:
df["prcp_amt"].plot.hist(bins=np.linspace(0, 10, 100))