# Calculate precipitation sum from GeoTiff radar images

Finnish Meteorological Institute is providing radar images as GeoTiff in S3 bucket. The data is free and openly available with CC4BY license.

This is a short and simple example how to calculate accumulated precipitation amount from GeoTiff radar images of precipitation rate (rr) with 5 minute interval. The example is ment to illustrate how to use the data with python.  

First we install necessary libraries. Rasterio (https://github.com/mapbox/rasterio) is used to fetch the data and pyproj (https://pypi.org/project/pyproj/) to calculate coordinate transformations.

In [None]:
!pip install rasterio[s3]

In [1]:
import rasterio as rio
from datetime import datetime, timedelta
from pyproj import Proj, Transformer

Small helper function to handle time steps.

In [2]:
def roundTime(dt, roundTo=300):
   """
   Round a datetime object to any time lapse in seconds
   
   dt      : datetime.datetime object
             DateTime to round
   roundTo : int
             Closest number of seconds to round to, default 1 minute. 
   
   Returns
   -------
   datetime.datetime object
            Modified datetime   
   """
   seconds = (dt.replace(tzinfo=None) - dt.min).seconds
   rounding = (seconds+roundTo/2) // roundTo * roundTo
   return dt + timedelta(0,rounding-seconds,-dt.microsecond)

Following function opens the geotiff image from S3 bucket and sample pixel value from requested coordinates. Note that images are in EPSG:3067 projection so we need to reproject the coordinates. 

Finally, pixel values are converted to millimeters. Conversions are documented here: https://en.ilmatieteenlaitos.fi/open-data-manual-radar-data.

In [3]:
def process_timestep(bucket, key, x, y, mode='rr', timestep=300):
    """ Process time step
    
    bucket   : str 
               S3 bucket name (where radar images exist)
    key      : str
               Radar image key in the bucket
    lat      : float
               Latitude of the point of interest
    lon      : float
               Longitude of the point of interest    
    mode     : str
               Defines how pixel values are converted to millimeters
    timestep : int
               Time step of radar images (used in pixel --> mm conversion)

    Returns
    -------
    float 
         Precipitation amount in millimeters
    """    
    with rio.open('s3://{}/{}'.format(bucket, key)) as src:
        for val in src.sample([(x, y)]):
            if mode == 'rr':
                # From https://en.ilmatieteenlaitos.fi/open-data-manual-radar-data
                return val[0]*.01*timestep/3600
            elif mode == 'dbz':
                # From https://en.wikipedia.org/wiki/DBZ_(meteorology)
                dbz = 0.5 * val[0] - 32
                return (10**(dbz/10)/200)**(5/8)*timestep/3600
            else:
                raise Exception('mode not implemented')
        
    return 0

Last, we define a function to go through requested time range

In [4]:
def process_time_range(bucket, starttime, endtime, lat, lon, timestep, mode = 'rr', filename_start='FIN-DBZ-3067-250M/', filename_end='_FIN-DBZ-3067-250M.tif'):
    """ Process time range 
    bucket    : str 
                S3 bucket name (where radar images exist)
    starttime : str
                Start time of the time range in format %Y-%m-%d %H:%M:%S
    endtime   : str
                End time of the time range in format %Y-%m-%d %H:%M:%S                
    lat       : float
                Latitude of the point of interest
    lon       : float
                Longitude of the point of interest    
    timestep  : int
                Time step of radar images
    mode      : str
                Defines how pixel values are converted to millimeters
    filename_start : str
                     For example path of the image. Images are assumed to found with name 
                     filename_start+timestamp+filename_end.  
    filename_start : str
                 Images are assumed to found with name filename_start+timestamp+filename_end.  
                      
    Returns
    -------
    float 
         Precipitation amount in millimeters
    
    """
    
    start = roundTime(datetime.strptime(starttime, '%Y-%m-%d %H:%M:%S'))
    end = roundTime(datetime.strptime(endtime, '%Y-%m-%d %H:%M:%S'))    
    time_it = start    
    
    transformer = Transformer.from_crs("epsg:4326", "epsg:3067")
    x,y = transformer.transform(lat, lon)
    
    prec_sum = 0
    while time_it <= end:
        key = '{year}/{month}/{day}/{prefix}{stamp}{suffix}'.format(year=time_it.strftime('%Y'),
                                                                    month=time_it.strftime('%m'),
                                                                    day=time_it.strftime('%d'),
                                                                    stamp=time_it.strftime('%Y%m%d%H%M'),
                                                                    prefix=filename_start,
                                                                    suffix=filename_end)
        prec_sum += process_timestep(bucket, key, x, y, timestep=timestep)

        if time_it.minute == 0: print('{:.2f}'.format(prec_sum), end='')
        else: print('.', end='')
            
        time_it +=  timedelta(seconds=timestep)

    print('')
    return prec_sum

And finally we run everything:

In [15]:
bucket = 'fmi-opendata-radar-geotiff'
starttime = '2020-09-06 00:00:00'
endtime = '2020-09-06 23:59:00'
lat = 63.1593
lon = 29.8346

In [16]:
%%time
prec_sum = process_time_range(bucket, starttime, endtime, lat, lon, 300, 'dbz')
print('Precipitation sum at {},{} (Koli, Finland) on 11th July 2020 is {:.2f} mm'.format(lat,lon,prec_sum))

0.06...........0.10...........0.17...........0.17...........0.17...........0.17...........0.17...........0.17...........0.17...........0.17...........0.17...........0.17...........0.17...........0.17...........0.17...........0.17...........0.17...........0.17...........0.17...........0.21...........0.24...........0.56...........0.91...........1.32...........1.67
Precipitation sum at 63.1593,29.8346 (Koli, Finland) on 11th July 2020 is 1.67 mm
CPU times: user 8.6 s, sys: 389 ms, total: 8.99 s
Wall time: 1min 53s


## Dask version

As the above computation contains almost 300 geotiff images to process, it does take some time. If we want to calculate precipitation sum over longer period or do some other heavy computation, it may be good idea to distritbute the calculation. Following example shows how this can achieved using Dask (https://docs.dask.org/en/latest/). 

First we need to install the envinronment. This example does not cover installing dask but all assets used to setup the dask cluster are included in the repository. A good tutorial for setting up the cluster can be found at: https://towardsdatascience.com/serverless-distributed-data-pre-processing-using-dask-amazon-ecs-and-python-part-1-a6108c728cc4

To use the cluster, we need several libraries to be installed. Following commands install the require libraries. Versions of each libraries are defined as they need to mach with the cluster.

In [None]:
!pip uninstall -y enum34
!pip install blosc==1.8.3
!pip install --upgrade joblib==0.16 dask[dataframe]==2.25.0 distributed==2.25.0 lz4==3.1.0 cloudpickle==1.6.0 msgpack numpy==1.19.1 toolz tornado blosc==1.7.0

Next we import necessary libraries and initialize dask client

In [17]:
from dask.distributed import Client
from dask import delayed
import dask

In [18]:
client = Client('Dask-Scheduler.local-dask:8786')
client

0,1
Client  Scheduler: tcp://Dask-Scheduler.local-dask:8786  Dashboard: http://Dask-Scheduler.local-dask:8787/status,Cluster  Workers: 2  Cores: 8  Memory: 34.36 GB


Now we can define necessary variables. Here we use the same values with previous example.

In [19]:
bucket = 'fmi-opendata-radar-geotiff'
lat = 63.1593
lon = 29.8346
timestep=300
filename_start='FIN-DBZ-3067-250M/'
filename_end='_FIN-DBZ-3067-250M.tif'
starttime = datetime.strptime('2020-09-06 00:00:00', "%Y-%m-%d %H:%M:%S")
endtime = datetime.strptime('2020-09-06 23:59:00', "%Y-%m-%d %H:%M:%S")

Computing is also almost similar to local version. We go through the time steps but instead of directly calculating the value, we define the calculation and call dask to compute all at once. Dask handles the distribution and gathering results.

In [21]:
%%time
transformer = Transformer.from_crs("epsg:4326", "epsg:3067")
x,y = transformer.transform(lat, lon)

dfs, df = [], []
time_it = starttime
while time_it <= endtime:
    key = '{year}/{month}/{day}/{prefix}{stamp}{suffix}'.format(year=time_it.strftime('%Y'),
                                                                month=time_it.strftime('%m'),
                                                                day=time_it.strftime('%d'),
                                                                stamp=time_it.strftime('%Y%m%d%H%M'),
                                                                prefix=filename_start,
                                                                suffix=filename_end)
    dfs.append(delayed(process_timestep)(bucket, key, x, y, timestep=timestep, mode='dbz'))
    time_it +=  timedelta(seconds=timestep)

df = dask.compute(*dfs)

print('Precipitation sum at {},{} (Koli, Finland) on between {} - {} is {:.2f} mm'.format(lat,
                                                                                          lon,
                                                                                          starttime.strftime('%dth %b %Y'), 
                                                                                          endtime.strftime('%dth %b %Y'), 
                                                                                          sum(df)))

Precipitation sum at 63.1593,29.8346 (Koli, Finland) on between 06th Sep 2020 - 06th Sep 2020 is 0.08 mm
CPU times: user 693 ms, sys: 33.1 ms, total: 726 ms
Wall time: 28.8 s
