# Notebook for retrieving, processing, and saving daily (aggregated) UK rainfall data via the EA API

In [None]:
# importing required libraries
import numpy as np
import pandas as pd
import json
import requests
from datetime import datetime, date
import os

In [None]:
# printing current date and time
print('Date and time: ' + str(datetime.today().replace(microsecond=0)))

In [None]:
# list present working directory
#os.getcwd()

## Retrieving the data

In [None]:
# data retrieval functions - only for stations and measures via the API

# function to retrieve data on rainfall stations
def get_data_stations(parameter_name = None,
                 parameter = None,
                 qualifier = None,
                 label = None,
                 town = None,
                 catchment_name = None,
                 river_name = None,
                 station_reference = None,
                 rloi_id = None,
                 search = None,
                 lat = None,
                 long = None,
                 d = None,
                 type = None,
                 status = None):
    """Get details of rainfall monitoring stations from the EA API
    
      Query parameter details:
    
      :param parameter_name: Return only those stations which measure parameters with the given name, for example Rainfall, Water Level or Flow.
      :param parameter: Return only those stations which measure parameters with the given short form name, for example rainfall, level or flow.
      :param qualifier: Return only those stations which measure parameters with qualifier. Useful qualifiers are Stage and Downstream Stage (for stations such as weirs which measure levels at two locations), Groundwater for groundwater levels as opposed to river levels and Tidal Level for tidal levels.
      :param label: Return only those stations whose label is exactly as given.
      :param town: Return only those stations whose town is as given. Not all stations have an associated town.
      :param catchment_name: Return only those stations whose catchment name is exactly as given. Not all stations have an associated catchment area.
      :param river_name: Return only those stations whose river name is exactly as given. Not all stations have an associated river name.
      :param station_reference: Return only those stations whose reference identifier is as given. The station reference is an internal identifier used by the Environment Agency.
      :param rloi_id: Return only the station (if there is one) whose RLOIid (River Levels on the Internet identifier) matches.
      :param search: Return only those stations whose label contains the given value.
      :param lat: Return those stations whose location falls within d km of the given latitude/longitude (in WGS84 coordinates), this may be approximated by a bounding box.
      :param long: Return those stations whose location falls within d km of the given latitude/longitude (in WGS84 coordinates), this may be approximated by a bounding box.
      :param d: Return those stations whose location falls within d km of the given latitude/longitude (in WGS84 coordinates), this may be approximated by a bounding box.
      :param type: Return only those stations of the given type, where type can be one of "SingleLevel", "MultiTraceLevel", "Coastal", "Groundwater" or "Meteorological"
      :param status: Return only those stations with the given status. Can be one of "Active", "Closed" or "Suspended".
      """
    
    # URL of the UK EA API
    api_url = "https://environment.data.gov.uk/flood-monitoring/id/stations"
 
    # build a dictionary of the query parameters
    params =  {'parameterName': parameter_name,
                'parameter': parameter,
                'qualifier': qualifier,
                'label': label,
                'town': town,
                'catchmentName': catchment_name,
                'riverName': river_name,
                'stationReference': station_reference,
                'RLOIid': rloi_id,
                'search': search,
                'lat': lat,
                'long': long,
                'dist': d,
                'type': type,
                'status': status
              }
    
    # Get data on stations from the EA API
    response = requests.get(api_url, 
                            params = params)

    # ensuring that cookies are allowed
    #response = requests.get(api_url, params, cookies=response.cookies)
    
    # Extracting JSON data from the response container
    data = response.json()

    # Loading data to a Pandas data frame
    stations = pd.DataFrame(data["items"])

    return(stations)


# function to retrieve data on rainfall measures
def get_data_measures(parameter_name = None,
                     parameter = None,
                     qualifier = None,
                     station_reference = None,
                     station = None,
                     search = None):
    """Get details of measures available from the EA API

       :param parameter_name: Return only measures for parameters with the given name, for example Water Level or Flow.
       :param parameter: Return only measures for parameters with the given short form name, for example level or flow.
       :param qualifier: Return only those measures with qualifier. Useful qualifiers are Stage and Downstream Stage (for stations such as weirs which measure levels at two locations), Groundwater for groundwater levels as opposed to river levels and Tidal Level for tidal levels.
       :param station_reference: Return only those measures which are available from the station with the given reference identifier.
       :param station: Return only those measures which are available from the station with the given URI.
       :param search: Return only those measures whose label contains the given value.
    """
    api_url = "http://environment.data.gov.uk/flood-monitoring/id/measures"
    
     # build a dictionary of the query parameters
    params = {'parameterName': parameter_name,
            'parameter': parameter,
            'qualifier': qualifier,
            'stationReference': station_reference,
            'station': station,
            'search': search
            }
    
    # Get data about measures from the EA API
    response = requests.get(api_url, 
                            params)

    # ensuring that cookies are allowed
    #response = requests.get(api_url, params, cookies=response.cookies)
    
    # Extract JSON data from the response
    data = response.json()

    # Load data to a data frame
    measures = pd.DataFrame(data["items"])

    return(measures)

In [None]:
# retrieving the rainfall stations and measures data
rainfall_stations_data = get_data_stations(parameter='rainfall')
rainfall_measures_data = get_data_measures(parameter='rainfall')

# retrieving the rainfall readings archive for today
rainfall_readings_today = pd.read_csv('http://environment.data.gov.uk/flood-monitoring/data/readings.csv?parameter=rainfall&_view=full&date=' + str(date.today()))

In [None]:
# Checking the number of entries for each dataset
print(rainfall_stations_data.shape,
      rainfall_measures_data.shape,
      rainfall_readings_today.shape)

In [None]:
# Checking the number of entries for each dataset
print('Shape of station data: ' + str(rainfall_stations_data.shape))
print('Shape of measures data: ' + str(rainfall_measures_data.shape))
print("Shape of today's rainfall readings data: " + str(rainfall_readings_today.shape))

In [None]:
# checking for duplicates in the first two data frames by 'station_id' or 'stationReference' and dropping them
print('Number of duplicates in the rainfall stations data: ' + str(rainfall_stations_data[rainfall_stations_data.duplicated(['stationReference'])].shape[0]))
print('Number of duplicates in the rainfall measures data: ' + str(rainfall_measures_data[rainfall_measures_data.duplicated(['stationReference'])].shape[0]))

# dropping the duplicates and retaining the first values - this can be investigated later and rows with more entries can be
# kept to retain the most amount of information
rainfall_stations_data.drop_duplicates(subset=['stationReference'], keep='first', inplace = True)
rainfall_measures_data.drop_duplicates(subset=['stationReference'], keep='first', inplace = True)

# resetting the indices of the data frames
rainfall_stations_data.reset_index(drop = True, inplace = True)
rainfall_measures_data.reset_index(drop = True, inplace = True)

# Checking the number of entries for each dataset after dropping duplicates
print('Shape of datasets after dropping duplicates:')
print('Shape of station data: ' + str(rainfall_stations_data.shape))
print('Shape of measures data: ' + str(rainfall_measures_data.shape))

## Data cleaning and processing

In [None]:
# aggregating data by date and stationReference or station_id for all station IDs
rainfall_readings_aggregated_values = rainfall_readings_today.groupby(['date','stationReference'], as_index=False)['value'].sum()

In [None]:
# merging the 3 dataframes for rainfall stations, measures, and daily reaadings

# left join of readings with measures to keep all readings and outer join of resulting df with stations to keep
# details of non-responsive stations as well
rainfall_readings_aggregated = rainfall_readings_aggregated_values.merge(rainfall_measures_data,on='stationReference', how='left').merge(rainfall_stations_data,on='stationReference', how='outer')

# viewing the shape of the resulting df
print(rainfall_readings_aggregated.shape)

In [None]:
# further data cleaning

# dropping duplicate and redundant columns
rainfall_readings_aggregated = rainfall_readings_aggregated.drop(columns=['@id_x',
                                                              'label_x',
                                                              'measures',
                                                              'latestReading',
                                                              'notation_x',
                                                              'station',
                                                              '@id_y',
                                                              'notation_y',
                                                              # columns relevant only for river data
                                                              'catchmentName',
                                                              'dateOpened',
                                                              'riverName',
                                                              'stageScale',
                                                              'status',
                                                              'town',
                                                              'wiskiID',
                                                              'datumOffset',
                                                              'RLOIid'
                                                             ])

# adding date column

# creating date variable
rainfall_readings_aggregated['date'] = str(date.today())

# renaming columns
rainfall_readings_aggregated.rename(columns = {
                                           'value': 'reading_value',
                                           'parameter': 'parameter_id',
                                           'parameterName': 'parameter_name',
                                           'period': 'reading_period',
                                           'qualifier': 'reading_qualifier',
                                           'stationReference': 'station_id',
                                           'unit':'reading_unit_id',
                                           'unitName':'reading_unit_name',
                                           'valueType':'reading_value_type',
                                           'easting':'station_easting',
                                           'northing': 'station_northing',
                                           'gridReference': 'station_grid_reference',
                                           'label_y': 'station_type',
                                           'lat': 'station_latitude',
                                           'long':'station_longitude',
                                           'valueType':'reading_value_type',
                                           'gridReference': 'station_grid_reference'},
                              inplace = True)
                                                           
# reordering columns
rainfall_readings_aggregated = rainfall_readings_aggregated[['date',
                                                    'station_id',
                                                    'station_type',
                                                    'station_grid_reference',
                                                    'station_latitude',
                                                    'station_longitude',
                                                    'station_easting',
                                                    'station_northing',
                                                    'parameter_id',
                                                    'parameter_name',
                                                    'reading_qualifier',
                                                    'reading_value',
                                                    'reading_unit_id',
                                                    'reading_unit_name',
                                                    'reading_value_type',
                                                    'reading_period'
                                               ]]

# viewing the shape of the resulting combined df
print("Shape of the combined dataframe:" + str(rainfall_readings_aggregated.shape))

## Explanation of columns

__date__: date  
__time__: time  
__station_id__: station identifier  
__station_type__: type of station  
__station_grid_reference__: grid reference for the station, rounded to a 100m grid  
__station_latitude__: latitude coordinates of station  
__station_longitude__: longitude coordinates of station  
__station_easting__: easting coordinates of station  
__station_northing__: northing coordinates of station  
__parameter_id__: short name/id of the quantity being measured  
__parameter_name__: name of the quantity being measured  
__reading_qualifier__: a qualifier for the quantity being measured, "Tipping Bucket Raingauge" for rainfall  
__reading_value__: the value of the reading for the associated measurement  
__reading_unit_id__: unit id/url for the reading  
__reading_unit_name__: unit name for the reading  
__reading_value_type__: type of measurement, e.g., total, mean, etc.  
__reading_period__: the period between successive readings, in seconds

# Data quality checks

In [None]:
# check if there are any stations without recent rainfall measurements and list their Station IDs

# list total stations with no recent rainfall measurements
print( "Total stations with no recent rainfall measurements: " + str(rainfall_readings_aggregated['reading_value'].isna().sum()))

print('')

print('IDs of the respective stations:')

print('')

# listing their IDs
print(rainfall_readings_aggregated[rainfall_readings_aggregated['reading_value'].isna() == True]['station_id'].values)

In [None]:
# NaN value check with an assert statement
#assert len(rainfall_readings_aggregated[rainfall_readings_aggregated['reading_value'].isna() == True]['station_id'].tolist()) == 0, f"0 expected, got: {len(rainfall_readings_aggregated[rainfall_readings_aggregated['reading_value'].isna() == True]['station_id'].tolist())}"

In [None]:
# listing stations without latitude (can also be checked for longitude)
print( "Total stations without latitude coordinates: " + str(rainfall_readings_aggregated['station_latitude'].isna().sum()))

print('')

print('IDs of the respective stations:')

print('')

# listing their IDs
print(rainfall_readings_aggregated[rainfall_readings_aggregated['station_latitude'].isna() == True]['station_id'].values)

In [None]:
# check for duplicate stations by station ID on combined data frame (they could have come from the responses df)
print('Number of duplicates in the combined data frame: ' + str(rainfall_readings_aggregated[rainfall_readings_aggregated.duplicated(['station_id'])].shape[0]))

# dropping the duplicates and retaining the first values - this can be investigated later and rows with more entries can be
# kept to retain the most amount of information
rainfall_readings_aggregated.drop_duplicates(subset=['station_id'], keep='first', inplace = True)

# resetting the indices of the data frames
rainfall_readings_aggregated.reset_index(drop = True, inplace = True)

# Checking the number of entries for the dataset after dropping duplicates
print('Shape of combined data after dropping duplicates: ' + str(rainfall_readings_aggregated.shape))

# Saving processed data

In [None]:
# saving combined dataset as csv and excel files
rainfall_readings_aggregated.to_csv(f'../data/rainfall_data_aggregated_{date.today()}.csv')
rainfall_readings_aggregated.to_excel(f'../data/rainfall_data_aggregated_{date.today()}.xlsx')