## Embeddings for Weather Data

An embedding is a low-dimensional, vector representation of a (typically) high-dimensional feature which maintains the semantic meaning of the feature in a such a way that similar features are close in the embedding space.

In this notebook, we use autoencoders to create embeddings for HRRR images. We can then use the embeddings to search for "similar" weather patterns.

In [None]:
!apt-get -y install libeccodes0

In [None]:
%pip install -q cfgrib xarray

In [None]:
import apache_beam as beam
print(beam.__version__)

### Reading HRRR data and converting to TensorFlow Records

HRRR data comes in a Grib2 files on Cloud Storage.

In [None]:
!gsutil ls -l gs://high-resolution-rapid-refresh/hrrr.20200811/conus/hrrr.*.wrfsfcf00*

In [None]:
FILENAME="gs://high-resolution-rapid-refresh/hrrr.20200811/conus/hrrr.t18z.wrfsfcf06.grib2"   # derecho in the Midwest
!gsutil ls -l {FILENAME}

In [None]:
import xarray as xr
import tensorflow as tf
import tempfile
import cfgrib

with tempfile.TemporaryDirectory() as tmpdirname:
    TMPFILE="{}/read_grib".format(tmpdirname)
    tf.io.gfile.copy(FILENAME, TMPFILE, overwrite=True)
    ds = cfgrib.open_datasets(TMPFILE)
    print(ds)

We have to choose one of the following:
```
    filter_by_keys={'typeOfLevel': 'unknown'}
    filter_by_keys={'typeOfLevel': 'cloudTop'}
    filter_by_keys={'typeOfLevel': 'surface'}
    filter_by_keys={'typeOfLevel': 'heightAboveGround'}
    filter_by_keys={'typeOfLevel': 'isothermal'}
    filter_by_keys={'typeOfLevel': 'isobaricInhPa'}
    filter_by_keys={'typeOfLevel': 'pressureFromGroundLayer'}
    filter_by_keys={'typeOfLevel': 'sigmaLayer'}
    filter_by_keys={'typeOfLevel': 'meanSea'}
    filter_by_keys={'typeOfLevel': 'heightAboveGroundLayer'}
    filter_by_keys={'typeOfLevel': 'sigma'}
    filter_by_keys={'typeOfLevel': 'depthBelowLand'}
    filter_by_keys={'typeOfLevel': 'isobaricLayer'}
    filter_by_keys={'typeOfLevel': 'cloudBase'}
    filter_by_keys={'typeOfLevel': 'nominalTop'}
    filter_by_keys={'typeOfLevel': 'isothermZero'}
    filter_by_keys={'typeOfLevel': 'adiabaticCondensation'}
```

In [None]:
import xarray as xr
import tensorflow as tf
import tempfile
import cfgrib
import numpy as np

refc
with tempfile.TemporaryDirectory() as tmpdirname:
    TMPFILE="{}/read_grib".format(tmpdirname)
    tf.io.gfile.copy(FILENAME, TMPFILE, overwrite=True)
    #ds = xr.open_dataset(TMPFILE, engine='cfgrib', backend_kwargs={'filter_by_keys': {'typeOfLevel': 'surface', 'stepType': 'instant'}})
    #ds.data_vars['prate'].plot()  # crain, prate
    #ds = xr.open_dataset(TMPFILE, engine='cfgrib', backend_kwargs={'filter_by_keys': {'typeOfLevel': 'unknown', 'stepType': 'instant'}})
    ds = xr.open_dataset(TMPFILE, engine='cfgrib', backend_kwargs={'filter_by_keys': {'typeOfLevel': 'atmosphere', 'stepType': 'instant'}})
    refc = ds.data_vars['refc']
    refc.plot()
    print(np.array([refc.sizes['y'], refc.sizes['x']]))
    print(refc.time.data)
    print(refc.valid_time.data)

In [None]:
print(type(str(refc.time.data)))

In [None]:
import numpy as np

def _array_feature(value, min_value, max_value):
    """Wrapper for inserting ndarray float features into Example proto."""
    value = np.nan_to_num(value.flatten()) # nan, -inf, +inf to numbers
    value = np.clip(value, min_value, max_value) # clip to valid
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))

def create_tfrecord(filename):
    with tempfile.TemporaryDirectory() as tmpdirname:
        TMPFILE="{}/read_grib".format(tmpdirname)
        tf.io.gfile.copy(filename, TMPFILE, overwrite=True)
        ds = xr.open_dataset(TMPFILE, engine='cfgrib', backend_kwargs={'filter_by_keys': {'typeOfLevel': 'atmosphere', 'stepType': 'instant'}})
   
        # create a TF Record with the raw data
        tfexample = tf.train.Example(
            features=tf.train.Features(
                feature={
                    'ref': _array_feature(ds.data_vars['refc'].data, min_value=0, max_value=60),
        }))
        return tfexample.SerializeToString()

s = create_tfrecord(FILENAME)
print(len(s), s[:16])

In [None]:
from datetime import datetime, timedelta
def generate_filenames(startdate: str, enddate: str):
    start_dt = datetime.strptime(startdate, '%Y%m%d')
    end_dt = datetime.strptime(enddate, '%Y%m%d')
    dt = start_dt
    while dt <= end_dt:
        # gs://high-resolution-rapid-refresh/hrrr.20200811/conus/hrrr.t04z.wrfsfcf00.grib2
        f = '{}/hrrr.{:4}{:02}{:02}/conus/hrrr.t{:02}z.wrfsfcf00.grib2'.format(
                'gs://high-resolution-rapid-refresh',
                dt.year, dt.month, dt.day, dt.hour)
        dt = dt + timedelta(hours=1)
        yield f
        
def generate_shuffled_filenames(startdate: str, enddate: str):
    """
    shuffle the files so that a batch of records doesn't contain highly correlated entries
    """
    filenames = [f for f in generate_filenames(startdate, enddate)]
    np.random.shuffle(filenames)
    return filenames

print(generate_shuffled_filenames('20190915', '20190917'))

## Write a Beam pipeline

In [None]:
%%writefile wxsearch/hrrr_to_tfrecord.py

import os
import xarray as xr
import tensorflow as tf
import tempfile
import cfgrib
from datetime import datetime, timedelta
import numpy as np
import argparse
import logging
import shutil
import subprocess
import apache_beam as beam
import random

def _array_feature(value, min_value, max_value):
    if isinstance(value, type(tf.constant(0))): # if value is tensor
        value = value.numpy() # get value of tensor
 
    """Wrapper for inserting ndarray float features into Example proto."""
    value = np.nan_to_num(value.flatten()) # nan, -inf, +inf to numbers
    value = np.clip(value, min_value, max_value) # clip to valid
    logging.info('Range of image values {} to {}'.format(np.min(value), np.max(value)))
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _string_feature(value):
    return _bytes_feature(value.encode('utf-8'))

def create_tfrecord(filename):
    print(filename)
    with tempfile.TemporaryDirectory() as tmpdirname:
        TMPFILE="{}/read_grib".format(tmpdirname)
        tf.io.gfile.copy(filename, TMPFILE, overwrite=True)
        ds = xr.open_dataset(TMPFILE, engine='cfgrib', backend_kwargs={'filter_by_keys': {'typeOfLevel': 'atmosphere', 'stepType': 'instant'}})
   
        # create a TF Record with the raw data
        refc = ds.data_vars['refc']
        size = np.array([ds.data_vars['refc'].sizes['y']*1.0, ds.data_vars['refc'].sizes['x']*1.0])
        tfexample = tf.train.Example(
            features=tf.train.Features(
                feature={
                    'size': tf.train.Feature(float_list=tf.train.FloatList(value=size)),
                    'ref': _array_feature(refc.data, min_value=0, max_value=60),
                    'time': _string_feature(str(refc.time.data)),
                    'valid_time': _string_feature(str(refc.valid_time.data))
        }))
        return tfexample.SerializeToString()

def generate_filenames(startdate: str, enddate: str):
    start_dt = datetime.strptime(startdate, '%Y%m%d')
    end_dt = datetime.strptime(enddate, '%Y%m%d')
    logging.info('Hourly records from {} to {}'.format(start_dt, end_dt))
    dt = start_dt
    while dt < end_dt:
        # gs://high-resolution-rapid-refresh/hrrr.20200811/conus/hrrr.t04z.wrfsfcf00.grib2
        f = '{}/hrrr.{:4}{:02}{:02}/conus/hrrr.t{:02}z.wrfsfcf00.grib2'.format(
                'gs://high-resolution-rapid-refresh',
                dt.year, dt.month, dt.day, dt.hour)
        dt = dt + timedelta(hours=1)
        yield f
                 
def generate_shuffled_filenames(startdate: str, enddate: str):
    """
    shuffle the files so that a batch of records doesn't contain highly correlated entries
    """
    filenames = [f for f in generate_filenames(startdate, enddate)]
    np.random.shuffle(filenames)
    return filenames

def run_job(options):
    # start the pipeline
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    with beam.Pipeline(options['runner'], options=opts) as p:
        # create examples
        examples = (
          p
          | 'hrrr_files' >> beam.Create(
              generate_shuffled_filenames(options['startdate'], options['enddate']))
          | 'create_tfr' >>
          beam.Map(lambda x: create_tfrecord(x))
        )

        # write out tfrecords
        _ = (examples
              | 'write_tfr' >> beam.io.tfrecordio.WriteToTFRecord(
                  os.path.join(options['outdir'], 'tfrecord')))


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
      description='Create training/eval files for lightning prediction')
    parser.add_argument(
      '--project',
      default='',
      help='Specify GCP project to bill to run on cloud')
    parser.add_argument(
      '--outdir', required=True, help='output dir. could be local or on GCS')
  
    parser.add_argument(
      '--startdate',
      type=str,
      required=True,
      help='eg 20200915')
    parser.add_argument(
      '--enddate',
      type=str,
      required=True,
      help='eg 20200916 -- this is exclusive')
    
    
    # parse command-line args and add a few more
    logging.basicConfig(level=getattr(logging, 'INFO', None))
    options = parser.parse_args().__dict__
    outdir = options['outdir']
    options.update({
      'staging_location':
          os.path.join(outdir, 'tmp', 'staging'),
      'temp_location':
          os.path.join(outdir, 'tmp'),
      'job_name':
          'wxsearch-' + datetime.now().strftime('%y%m%d-%H%M%S'),
      'teardown_policy':
          'TEARDOWN_ALWAYS',
      'max_num_workers':
          20,
      'machine_type':
          'n1-standard-8',
      'region':
          'us-central1',
      'setup_file':
          os.path.join(os.path.dirname(os.path.abspath(__file__)), './setup.py'),
      'save_main_session':
          True,
      # 'sdk_location':
      #    './local/beam/sdks/python/dist/apache-beam-2.12.0.tar.gz'
    })

    if not options['project']:
        print('Launching local job ... hang on')
        shutil.rmtree(outdir, ignore_errors=True)
        os.makedirs(outdir)
        options['runner'] = 'DirectRunner'
    else:
        print('Launching Dataflow job {} ... hang on'.format(options['job_name']))
        try:
            subprocess.check_call('gsutil -m rm -r {}'.format(outdir).split())
        except:  # pylint: disable=bare-except
            pass
        options['runner'] = 'DataflowRunner'

    run_job(options)

In [None]:
%run -m wxsearch.hrrr_to_tfrecord -- --startdate 20190915 --enddate 20190916 --project ai-analytics-solutions --outdir gs://ai-analytics-solutions-kfpdemo/wxsearch
# --outdir tmp


In [None]:
# try reading what was written out
import tensorflow as tf

def read_dataset(pattern):
    filenames = tf.io.gfile.glob(pattern)
    ds = tf.data.TFRecordDataset(filenames, compression_type=None, buffer_size=None, num_parallel_reads=None)
    return ds.prefetch(tf.data.experimental.AUTOTUNE)

ds = read_dataset('tmp/tfrecord*')
tfexample = next(iter(ds))
parsed = tf.train.Example.FromString(tfexample.numpy())
print(parsed.features.feature['size'])
print(parsed.features.feature['valid_time'])

Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License