In [2]:
from urllib.request import urlopen
from urllib.parse import urlencode
import urllib3
import json
import pandas as pd
from time import sleep
from io import StringIO
import os
from enum import Enum
from concurrent.futures import ThreadPoolExecutor

In [12]:
class Measure(Enum):
    LEVEL = 'level'
    RAINFALL = 'rainfall'

class HydrologyApi:
    API_BASE_URL = "https://environment.data.gov.uk/hydrology/"
    DATA_DIR = "data"
    
    def __init__(self, max_threads):
        self.http = urllib3.PoolManager(maxsize=max_threads)
        self.thread_pool = ThreadPoolExecutor(max_workers=max_threads)
    
    def get_stations_on_river(self, river):
        api_url = self.API_BASE_URL + 'id/stations'
        result = urlopen(
            api_url + '?' + urlencode({'riverName': river})).read().decode('utf-8')
        data = json.loads(result)
        return pd.DataFrame(data['items'])
    
    def get_levels(self, station_id, start):
        api_url = self.API_BASE_URL + f"id/measures/{station_id}-level-i-900-m-qualified/readings"
        # result = urlopen(api_url).read().decode('utf-8')
        result = self.http.request(
            'GET',
            api_url,
            fields={
                'mineq-date': start.strftime('%Y-%m-%d')             
            }
        ).data.decode('utf-8')
        
        data = json.loads(result)
        return pd.DataFrame(data['items'])
    
    def _batch_request(self, api_url, query_params):
        status = "Pending"

        while status in ("Pending", "InProgress"):
            print(f"Making request to: {api_url}")
            
            request = self.http.request(
                'GET', 
                api_url, 
                headers={
                    'Accept-Encoding': 'gzip'
                }
            )
            content_type = request.headers['Content-Type']

            if content_type == 'text/csv':
                if len(request.data) == 0:
                    print('Got empty CSV')
                    return None
                buffer = StringIO(request.data.decode('utf-8'))
                return pd.read_csv(buffer, low_memory=False)
            
            assert content_type in (
                'application/json',
                'application/json;charset=UTF-8'), f"Unexpected content type: {content_type}"

            data = json.loads(request.data.decode('utf-8'))
            status = data["status"]

            if status == "Pending":
                print(f"Query is pending")
                pos_in_queue = data["positionInQueue"]
                print(f"Position in queue: {pos_in_queue}")
                eta = data["eta"] / 1000
                print(f"Estimated completion: {eta}")
                sleep(eta * 1.1)

            elif status == "InProgress":
                print(f"Query in progress")
                eta = data["eta"] / 1000
                print(f"Estimated completion: {eta}")
                sleep(eta * 1.1)

            elif status in ("Complete", "Completed"):
                print(f"Query completed: {data}")
                csv_url = data["dataUrl"] if "dataUrl" in data else data["url"]
                return pd.read_csv(csv_url, low_memory=False)

            elif status == "Failed":
                raise Exception(f"Query failed, response: {data}")

            else:
                raise Exception(f"Unknown status: {data['status']}")
        
    
    def batch_get_levels(self, station_id, start_date=None):
        api_url = self.API_BASE_URL + \
            f"data/batch-readings/batch/?measure={station_id}-level-i-900-m-qualified"
            
        return self._batch_request(api_url, {
            'mineq-date': start_date
        } if start_date else {})
            
    def batch_get_rainfall(self, station_id, start_date=None):
        api_url = self.API_BASE_URL + \
            f"data/batch-readings/batch/?measure={station_id}-rainfall-i-900"
            
        return self._batch_request(api_url, {
            'mineq-date': start_date
        } if start_date else {})
    
    def batch_get_measure(self, measure: Measure, station_id, start_date=None):
        return {
            Measure.LEVEL: self.batch_get_levels,
            Measure.RAINFALL: self.batch_get_rainfall
        }[measure](station_id, start_date)
        
    def batch_get_measure_on_river(self, measure: Measure, river, start_date=None):
        data = pd.DataFrame()
    
        stations = self.get_stations_on_river(river)
        
        threads = [
            self.thread_pool.submit(
                self.batch_get_measure, measure, station_id, start_date)
            for station_id in stations['notation'].values
        ]
        
        for thread, (station_id, station_name) in zip(threads, stations[['notation', 'label']].values):
            new_data = thread.result()
            if new_data is None:
                print(f"No new data for station: {station_name}")
                continue
            new_data = new_data.drop(columns=['measure', 'date', 'qcode', 'completeness'])
            new_data['station'] = station_name
            new_data['station'] = new_data['station'].astype('category')
            new_data['dateTime'] = pd.to_datetime(new_data['dateTime'])
            new_data['value'] = new_data['value'].astype('float32')
            new_data['quality'] = new_data['quality'].astype('category')
            data = pd.concat([data, new_data])
            data.drop_duplicates(subset=['dateTime', 'station'], inplace=True)
        return data
        
    def get_filename(self, measure: Measure, river):
        return f"{river.lower().replace(' ', '_')}_{measure.value}_raw.parquet"
        
    def update_dataframe(self, df: pd.DataFrame, measure: Measure, river: str):
        # last_date = df['dateTime'].max()
        # if last_date >= pd.to_datetime('today'):
        #     print(f"Data is up to date")
        #     return df
        # df = pd.concat([df, self.batch_get_measure_on_river(measure, river, last_date.strftime('%Y-%m-%d'))])
        # df.drop_duplicates(subset=['dateTime', 'station'], inplace=True)
        # return df
        assert measure == Measure.LEVEL
        for station_name, station_id in self.get_stations_on_river(river)[['label', 'notation']].values:
            print(f"Updating data for station: {station_name}")
            last = df[df['station'] == station_name]['dateTime'].max()
            new_measurements = self.get_levels(station_id, last)[['dateTime', 'value', 'quality']]
            new_measurements['station'] = station_name
            new_measurements['station'] = new_measurements['station'].astype('category')
            new_measurements['dateTime'] = pd.to_datetime(new_measurements['dateTime'])
            new_measurements['value'] = new_measurements['value'].astype('float32')
            print(f"Got {len(new_measurements)} new measurements")
            df = pd.concat([df, new_measurements])
        df.drop_duplicates(subset=['dateTime', 'station'], inplace=True)
        return df
            
    def load(self, measure: Measure, river):
        filename = self.get_filename(measure, river)
        filepath = os.path.join(self.DATA_DIR, filename)
        if os.path.exists(filepath):
            print(f"Loading {filepath}")
            df = pd.read_parquet(filepath)
            df['dateTime'] = pd.to_datetime(df['dateTime'])
            df['station'] = df['station'].astype('category')
            df['value'] = df['value'].astype('float32')
        else:
            print(f"Downloading {measure.value} data on: {river}")
            df = self.batch_get_measure_on_river(measure, river)
        df = self.update_dataframe(df, measure, river)
        df.to_parquet(filepath)
        return df

def process_hydrology_data(df):
    return df[df['quality'].isin(['Good', 'Unchecked', 'Estimated'])] \
        .pivot(index='dateTime', columns='station', values='value') \
        .resample('15min').interpolate('time', limit_direction='both', limit=2)
        

api = HydrologyApi(max_threads = 2)


In [9]:
level_df = api.load(Measure.LEVEL, "River Wear")
level_df.tail()

Loading data/river_wear_level_raw.parquet


Updating data for station: Chester Le Street
Got 49 new measurements
Updating data for station: Witton Park
Got 49 new measurements
Updating data for station: Sunderland Bridge
Got 49 new measurements
Updating data for station: Stanhope
Got 49 new measurements
Updating data for station: Durham New Elvet Bridge
Got 49 new measurements


Unnamed: 0,dateTime,value,quality,station
1247,2023-11-02 11:00:00,1.403,Unchecked,Durham New Elvet Bridge
1248,2023-11-02 11:15:00,1.464,Unchecked,Durham New Elvet Bridge
1249,2023-11-02 11:30:00,1.502,Unchecked,Durham New Elvet Bridge
1250,2023-11-02 11:45:00,1.54,Unchecked,Durham New Elvet Bridge
1251,2023-11-02 12:00:00,1.574,Unchecked,Durham New Elvet Bridge


In [13]:
level_df.pipe(process_hydrology_data).tail()

station,Chester Le Street,Durham New Elvet Bridge,Stanhope,Sunderland Bridge,Witton Park
dateTime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2023-11-02 11:00:00,1.303,1.403,1.097,1.236,1.357
2023-11-02 11:15:00,1.33,1.464,1.132,1.259,1.402
2023-11-02 11:30:00,1.357,1.502,1.162,1.272,1.441
2023-11-02 11:45:00,1.386,1.54,1.216,1.289,1.482
2023-11-02 12:00:00,1.416,1.574,1.276,1.308,1.538
