# Reading in citizen science gauge data (1-5 min)

### Notes
- Intense QC code read in, not sure how easy that will be, I had to clone it off Github and change line 10 of code as got an error with a package, proabbly due to Python versions. Think the package needs updating on Github but not sure who is still in charge of maintaining it. Need ETCCDI data (available in example data for Intense QC)


### What does the code do
- Reads in Urban Observatory rain gauge data
- Reads in National Green Infrastructure Facility rain gauge data
- Reads in Acomb Flood Group rain gauge data

### What does the code need to do?
1. Download the data
    - Try API
    - Sort out problems with wrong accumulation methods
    - Save data
2. Quality control data
    - Use intense-qc code
    - Remove failed gauges
    - Remove failed observations
    - Save Qc'ed data
    
### Outputs format
- `\root` folder path 
    - `\UO` folder path (1-5 minute rain gauge data)
        - `<station-id>_<eastings>_<northings>.csv` - individual (varying resolution) gauge data for station `<station-id>`
        - `qc` folder path (quality controlled rain gauge data)
            - `<station-id>_<eastings>_<northings>.csv` - individual (15 minute resolution) quality controlled rain gauge data for station `<station-id>`
    - `\NGIF` folder path (1 minute rain gauge data)
        - `<station-id>_<eastings>_<northings>.csv` - individual (varying resolution) gauge data for station `<station-id>`
        - `qc` folder path (quality controlled rain gauge data)
            - `<station-id>_<eastings>_<northings>.csv` - individual (15 minute resolution) quality controlled rain gauge data for station `<station-id>`
    - `\CS` folder path (citizen science rain gauge data)
        - `<station-id>_<eastings>_<northings>.csv` - individual (varying resolution) gauge data for station `<station-id>`
        - `qc` folder path (quality controlled rain gauge data)
            - `<station-id>_<eastings>_<northings>.csv` - individual (15 minute resolution) quality controlled rain gauge data for station `<station-id>`

In [8]:
# Import relevent packages
import pandas as pd
import numpy as np
import requests
from os.path import join, exists, split
import os
from datetime import datetime
import io

In [9]:
##### Inputs to change
start_date = "2023-06-20"
end_date = "2023-06-30"

out_path = r"C:\Users\Amy\OneDrive - Newcastle University (1)\Documents\PYRAMID\data\realtime"

# Bounding box for data 
e_l, n_l, e_u, n_u = [355000, 534000, 440000, 609000]
#lon_l, lat_l, lon_u, lat_u = [-2.6771176, 54.702623, -1.3749203, 55.361917]
bbox = [e_l, e_u, n_l, n_u]

# Quality control test data path
static_data_path = join(out_path, "static")
intense_path = join(static_data_path, "intense-qc")
etccdi_data_path = join(join(intense_path, "tests"), "etccdi_data")

gauge_paths = []

In [10]:
def get_gauge_flags(data, loc):
    '''Function to apply wokring Intense QC hourly checks on gauge data. 
    Args: data is the gauge data (pd.Series), loc is the gauge location (eastings, northings)
    Returns: flags as a dictionary {gauge flagged, years flagged, obs flagged}
    '''
    eastings, northings = loc
    # convert coordinates to lat/lon
    transformer = Transformer.from_crs("epsg:27700", "epsg:4326")
    latitude, longitude = transformer.transform(eastings, northings)

    # create gauge object
    rain_gauge = gauge.Gauge(
        station_id,
        path_to_original_data="",
        latitude=latitude,
        longitude=longitude,
        original_timestep="15min",
        original_units="mm/h",
        new_units="mm/h",
        new_timestep="1h",
        data=data
    )
    rain_gauge.get_info()

    # create qc object 
    test = qc.Qc(
        gauge=rain_gauge,
        etccdi_data_folder=etccdi_data_path
    )

    # checks that don't work on existing data
    """
    test.check_percentiles()
    test.check_k_largest()
    test.check_intermittency()
    test.change_in_min_val_check() 
    test.cwd_check() # missing function in utils file???
    test.change_in_min_val_check() # Change in minimum value check, homogeneity check to see if the resolution of the data has changed. Change flag, flag years
    test.find_neighbours("hourly") # frequency: must be either hourly, daily or monthly, Names or names and paths of neighbouring stations
    # conditions are: must be within 50km, at least 3 years overlap, select the closest 10, don't have three years of data
    # check_hourly_neighbours(), check_daily_neighbours(), check_monthly_neighbours()
    test.get_flags() # runs all checks, fails at find_neighbours()
    """

    ### run checks ###

    flagged_sdii = 0
    sdii_thresh = 100 # just arbitrary atm
    if any(np.array(test.get_sdii()) > sdii_thresh): # Simple precipitation intensity index, SDII from ETCCDI and from gauge values (sdii_gridded, sdii_gauge), not sure how to use this
        flagged_sdii = 1
    
    # Flag data if any of these don't return 0:
    gauge_checks = [
        test.check_days_of_week(), # Checks if proportions of rainfall in each day is significantly different
        test.check_break_point(), # Pettitt breakpoint check
        flagged_sdii
    ]
    
    flagged_gauge = sum(gauge_checks) != 0
    #if flagged_gauge:
    #    print("Gauge", np.array(["days of week", "break point", "sdii"])[np.array(gauge_checks) == 1])
    #    print(test.check_break_point())

    # Flag individual data observations if don't return 0:
    obs_checks = pd.DataFrame(index=rain_gauge.data.index)
    obs_checks["world_record"] = test.world_record_check_ts() # Checks if and to what degree the world record has been exceeded by each rainfall value, 4, 3, 2 or 1 if exceeded by > 1.5x, 1.33x, 1.22x or 0x respectively and 0 if not exceeded for each value
    obs_checks["rx1day"] = test.rx1day_check_ts() # Checks hourly values against maximum 1-day precipitation, Magnitudes of exceedance for each day
    obs_checks["cdd"] = test.cdd_check() # ETCCDI provide an index for maximum length of dry spell. Look for suspicious number of consecutive dry hours recorded. Consecutive Dry Days: Maximum length of dry spell, maximum number of consecutive days with RR < 1mm. Magnitudes of exceedence of the length of longest dry period
    obs_checks["daily_accums"] = test.daily_accums_check() # Check daily accumulations. Suspect daily accumulations flagged where a recorded rainfall amount at these times is preceded by 23 hours with no rain. A threshold of 2x the mean wet day amount for the corresponding month is applied to increase thechance of identifying accumulated values at the expense of genuine, moderate events
    obs_checks["monthly_accums"] = test.monthly_accums_check() # Check monthly accumulations. Flags month prior to high value
    obs_checks["streaks"] = test.streaks_check() # Streaks: This is where you see the same value repeated in a run. Currently this records streaks of 2hrs in a row or more over 2 x Monthly mean rainfall. It is considered to be unlikely that you would see even 2 consecutive large rainfall amounts. For this code I have substituted the monthly mean rainfall for SDII as I want the thresholds to be independent of the rainfall time series as the global dataset is of highly variable quality.
    flagged_obs = obs_checks.index[obs_checks.sum(1) > 0]

    # Flags each individual year:
    year_flags = np.array([
        test.r99ptot_check_annual(), # Check against R99pTOT: R99pTOT. Annual total PRCP when RR > 99p. Magnitudes of exceedance for yearly 99th percentiles
        test.prcptot_check_annual() # check against annual total: PRCPTOT. Annual total precipitation in wet days. Magnitudes of exceedance for yearly totals
    ])

    flagged_years = np.arange(rain_gauge.data.index.min().year, rain_gauge.data.index.max().year + 1)[year_flags.sum(0) < 0]

    return({"gauge" : flagged_gauge, "years" : flagged_years, "obs" : flagged_obs})

In [11]:
### NGIF rain gauges

ngif_outpath = join(out_path, "NGIF")
if not exists(ngif_outpath):
    os.mkdir(ngif_outpath)
    
#ngif_15min_outpath = join(ngif_outpath, "15min")
#if not exists(ngif_15min_outpath):
#    os.mkdir(ngif_15min_outpath)
    
# Get list of rainfall stations
easting = "424038"
northing = "564414"
sensor_loc = "Ensemble E Pit Gauge"
sensor_id = "Pit rain gauge"
sensor = sensor_loc.replace(" ", "-")

# convert timestamp
start_time = pd.to_datetime(start_date)
end_time = pd.to_datetime(end_date)

dates = pd.date_range(start_time, end_time)
dates = dates.format("%f")[1:]

tabs = []

for i in range(len(dates) - 1):
    path = "https://ngif.newcastle.ac.uk/download/Ensemble%20E/Pit%20Rain%20Gauge%23%401m/" + dates[i] + "/" + dates[i + 1]
    try:
        r = requests.get(path, verify=False)
        tab = pd.read_csv(io.StringIO(r.text))
        if tab.shape[0] > 0:
            tabs.append(tab)
    except:
        print("Failed", dates[i])
        
tabs = pd.concat(tabs, ignore_index=True)

high_res = 0.2 * pd.Series(tabs[tabs.columns[1]].values, index = pd.to_datetime(tabs.time))
high_res.index = high_res.index.tz_localize(None)
high_res = high_res.sort_index()
high_res.to_csv(join(ngif_outpath, sensor + "_" + str(easting) + "_" + str(northing) + ".csv"))
gauge_paths.append(join(ngif_outpath, sensor + "_" + str(easting) + "_" + str(northing) + ".csv"))

# Accumulate up
#ngif_gauge_data = high_res.resample(str(60*15) + "s").sum() * 4 # gives mm/h every 15 min
#ngif_gauge_data.to_csv(join(ngif_15min_outpath, sensor + "_" + str(easting) + "_" + str(northing) + ".csv"))




Failed 2023-06-23




Failed 2023-06-24




Failed 2023-06-25




Failed 2023-06-26




Failed 2023-06-27




In [12]:
### Urban Observatory gauge data (including CS Acomb data)

uo_outpath = join(out_path, "UO")
if not exists(uo_outpath):
    os.mkdir(uo_outpath)

cs_outpath = join(out_path, "CS")
if not exists(cs_outpath):
    os.mkdir(cs_outpath)

#cs_15min_outpath = join(cs_outpath, "15min")
#if not exists(cs_15min_outpath):
#    os.mkdir(cs_15min_outpath)

#uo_15min_outpath = join(uo_outpath, "15min")
#if not exists(uo_15min_outpath):
#    os.mkdir(uo_15min_outpath)

# Get list of rainfall stations

# convert timestamp
start_time = pd.to_datetime(start_date)
end_time = pd.to_datetime(end_date)

api_date_string_format = "%Y%m%d%H%M%S"

# get sensors
sensor_params = dict(
    variable='Rainfall')

r = requests.get('http://uoweb3.ncl.ac.uk/api/v1.1/sensors/csv/', sensor_params)
sensor_info = pd.read_csv(io.StringIO(r.text))

# Function to fix dogdy accumulation that is done by UO maintenence
def rescale(data):
    rescaled = pd.Series(np.nan, index=data.index)
    d = data.dropna()
    diffs = d.diff()
    cond = (diffs > 0) & (~np.isnan(diffs))
    rescaled.loc[diffs[cond].index] = diffs.loc[diffs[cond].index]
    return rescaled

for sensor in sensor_info["Sensor Name"]:

    # get sensor data
    data_params = dict(
        data_variable='Rainfall',  # variable=Daily%20Accumulation%20Rainfall%2CRainfall
        starttime=pd.to_datetime(start_date).strftime(api_date_string_format),
        endtime=pd.to_datetime(end_date).strftime(api_date_string_format)
    )

    path = 'http://uoweb3.ncl.ac.uk/api/v1.1/sensors/{sensor_name}/data/csv/'
    path = path.replace("{sensor_name}", sensor)

    r = requests.get(path, data_params)
    if r.status_code == 200:
        try:
            data = pd.read_csv(io.StringIO(r.text))
            
            if len(data) > 0:
                transformer = Transformer.from_crs("epsg:4326", "epsg:27700")
                easting, northing = transformer.transform(data["Sensor Centroid Latitude"].iloc[0],
                                                          data["Sensor Centroid Longitude"].iloc[0])
                
                data_df = pd.Series(data.Value.values, index=pd.to_datetime(data.Timestamp))
                rescaled = pd.DataFrame(index=data_df.index)
                rescaled[sensor] = np.nan
                
                if "ACOMB" in sensor:
                    
                    rescaled.loc[: "2022-05-17", sensor] = data_df["0"].loc[: "2022-05-17"]
                    rescaled.loc["2022-05-17" :, sensor] = rescale(data_df["0"].loc["2022-05-17" :])
                    rescaled.to_csv(join(cs_outpath, sensor + "_" + str(easting) + "_" + str(northing) + ".csv"))
                    gauge_paths.append(join(cs_outpath, sensor + "_" + str(easting) + "_" + str(northing) + ".csv"))

                    # Accumulate up
                    #cs_gauge_data = rescaled.resample(str(60*15) + "s").sum() * 4 # gives mm/h every 15 min
                    #cs_gauge_data.to_csv(join(cs_15min_outpath, sensor + "_" + str(easting) + "_" + str(northing) + ".csv"))
                    gauge_paths.append(join(cs_15min_outpath, sensor + "_" + str(easting) + "_" + str(northing) + ".csv"))
                else:
                    if "FS_" not in sensor:
                        rescaled[sensor] = rescale(data_df["0"])
                    else:
                        rescaled[sensor] = data_df["0"]
                    
                    rescaled.to_csv(join(uo_outpath, sensor + "_" + str(easting) + "_" + str(northing) + ".csv"))
                    gauge_paths.append(join(uo_outpath, sensor + "_" + str(easting) + "_" + str(northing) + ".csv"))

                    # Accumulate up
                    #uo_gauge_data = rescaled.resample(str(60*15) + "s").sum() * 4 # gives mm/h every 15 min
                    #uo_gauge_data[sensor].to_csv(join(uo_15min_outpath, sensor + "_" + str(easting) + "_" + str(northing) + ".csv"))
        except:
            print("Not worked.")

KeyboardInterrupt: 

In [37]:
# Quality control gauge data

# Import relevent packages
os.sys.path.append(r"C:\Users\Amy\OneDrive - Newcastle University (1)\Documents\Jupyter\intense-qc")
from intense import gauge, qc, utils
from pyproj import Transformer

for path in gauge_paths:

    qc_output_path = join(split(path)[0], "qc")
    if not exists(qc_output_path):
        os.mkdir(qc_output_path)

    output_file = join(qc_output_path, split(path)[1])

    data_raw = pd.read_csv(path, index_col=0, parse_dates=True).iloc[:, 0]
    data_raw = data_raw.sort_index()
    res_min = pd.Series(data_raw.dropna().index).diff().median().seconds / 60

    data_15min = data_raw.resample(str(60*15) + "s").sum() / 4 # gives mm/h every 15 min
    try:
        if data.index.dtype == "datetime64[ns, UTC]":
            data_1h = data_15min.resample("1h").mean()
        else:
            data_1h = data_15min.resample("1h").mean().tz_localize('UTC')
    except:
        print(name, "not read in.")

    station_id, eastings, northings = split(path)[1].split(".")[0].split("_")

    data = data_1h.dropna()
    loc = (eastings, northings)

    flags = get_gauge_flags(data, loc)

    # only include gauge data that is not flagged
    if not flags["gauge"]:
        # currenly ignoring year flag as not enough data

        # remove flagged observationns
        if len(flags["obs"]) > 0:
            cond = [idx in flags["obs"] for idx in data_15min.index.round("1h")]
            data_15min.loc[cond] = np.nan

        data_15min.tz_localize('UTC').to_csv(output_file)

  df3 = df1.append(df2)
  cond = [idx in flags["obs"] for idx in data_15min.index.round("1h")]
