# Data Ingestion

Inserting data into btrdb involves five core steps:
1. Data loading 
2. Data manipulation
3. Stream creation
4. Inserting values
5. Updating metadata

This tutorial walks through each of these steps using a USGS dataset measuring the Earth's geomagnetic field. We'll use the USGS webservice to request and load the data, and then convert it into a format that can be inserted into the database.

In [1]:
import btrdb
import urllib
import json

In [2]:
db = btrdb.connect()

## Data loading

In [3]:
url = 'https://geomag.usgs.gov/ws/data/?id=USGS&format=json&sampling_period=1&starttime=2021-10-25T00:26:54.177Z&endtime=2021-10-25T12:26:54.177Z'
data_str = urllib.request.urlopen(url).read()
data_dict = json.loads(data_str)

In [4]:
metadata = data_dict['metadata']
timestamps = data_dict['times']
stream_data = data_dict['values']

## Data manipulation

### Generating a UUID for each stream

Here, we'll use the uuid python library to generate a new uuid for each of the streams we want to create.

In [5]:
import uuid

stream_uuids = [uuid.uuid4() for ix in range(len(stream_data))]

### Define information about each stream (i.e., `collection`, `annotations` and `tags`)

In [6]:
collection = 'zz/insertion/example/usgs'

stream_names = []
annotations_dict = {}
tags_dict = {}
for stream in stream_data:
    name = stream['id']
    stream_names.append(name)
    annotations_dict[name] = {**metadata, **stream['metadata']}
    tags_dict[name] = {'name': name,'unit': 'nanoteslas'}

### Convert time stamps to nanoseconds

In [7]:
from btrdb.utils.timez import datetime_to_ns
from datetime import datetime

In [8]:
to_datetime = lambda t: datetime.strptime(t, '%Y-%m-%dT%H:%M:%S.%fZ')
ns_times = [datetime_to_ns(to_datetime(t)) for t in timestamps]

# Stream creation

In [9]:
stream_objects = []
for i, name in enumerate(stream_names):
    try:
        # To create a stream, we must specify various parameters
        stream = db.create(stream_uuids[i], # UUID
                           collection, # collection
                           tags=tags_dict[name], # tags
                           annotations=annotations_dict[name] # annotations
                          )
        
        stream_objects.append(stream)
        
    # If a stream already exists with the UUID, trying to create it again will raise an error
    except Exception as e:
        if str(e) == 'a stream already exists with uuid %s'%(stream_uuids[i]):
            # Here we'll query the database to retrieve the stream 
            #    so we can then insert data into it
            stream = db.stream_from_uuid(stream_uuids[i])
            stream_objects.append(stream)
        else:
            assert False, e


# Inserting values

Each point in the database is a time-value pair. Just as database queries return a list of tuples describing the `[(time1, value1), (time2, value2), ...]`, data insertions follow the same structure.

#### Merge policies

When inserting data, you can specify one of several merge policies. Valid merge policies include
- ’never’: the default, no points are merged
- ’equal’: points are deduplicated if the time and value are equal
- ’retain’: if two points have the same timestamp, the old one is kept
- ’replace’: if two points have the same timestamp, the new one is kept

In [10]:
for stream, data in zip(stream_objects, stream_data):
    points = list(zip(ns_times, data['values']))
    stream.insert(points, merge='replace')

# Updating metadata

It is often the case metadata needs to be added, updated, or further refined after a stream has already been created. This can be done using `stream.update()`.

Here, we'll go through the process of adding a url to indicate where the data originated from. Note that there is a 'url' field in the raw metadata we had inserted previously, but the value was set to 'null'. 

In [11]:
streams = db.streams_in_collection(collection)
annotations, _ = streams[0].annotations()

annotations

{'url': 'null',
 'channel': 'X',
 'station': 'USGS',
 'element': 'X',
 'network': 'NT',
 'status': '200',
 'location': 'A0',
 'generated': '2021-12-10T21:39:06Z',
 'intermagnet': '{"imo": {"iaga_code": "USGS", "name": "USGS", "coordinates": [254.764, 40.137, 1682.0]}, "reported_orientation": "XYZF", "sensor_orientation": "HDZF", "data_type": "adjusted", "sampling_period": 1, "digital_sampling_rate": 0.01}'}

In [12]:
for stream in streams:
    new_annotations = {'url': url}
    stream.update(annotations=new_annotations, replace=False)

Here we can query the stream's annotations again to confirm that the url has been stored.

In [13]:
stream.annotations()[0]['url']

'https://geomag.usgs.gov/ws/data/?id=USGS&format=json&sampling_period=1&starttime=2021-10-25T00:26:54.177Z&endtime=2021-10-25T12:26:54.177Z'

### A note on the keyword `replace`

Setting `replace=False` above allowed us to insert/update a single metadata field, without touching metadata fields that were not in the `new_annotations` dictionary.

If the intended behavior is to completely overhaul the metadata that was previously in place, you can do so by setting `replace=True`.

Here, we'll go through an example that creates separate metadata fields for each fo the parameters stored in the `intermagnet` metadata field (see example above).

In [14]:
for stream in streams:
    old_annotations, _ = stream.annotations()
    intermagnet = old_annotations.pop('intermagnet')
    imo = intermagnet.pop('imo')
    new_annotations = {**old_annotations, **intermagnet, **imo}
    
    stream.update(annotations=new_annotations, replace=True)

In [15]:
stream.annotations()

({'location': 'A0',
  'reported_orientation': 'XYZF',
  'coordinates': [254.764, 40.137, 1682.0],
  'iaga_code': 'USGS',
  'channel': 'F',
  'url': 'https://geomag.usgs.gov/ws/data/?id=USGS&format=json&sampling_period=1&starttime=2021-10-25T00:26:54.177Z&endtime=2021-10-25T12:26:54.177Z',
  'station': 'USGS',
  'data_type': 'adjusted',
  'name': 'USGS',
  'element': 'F',
  'sensor_orientation': 'HDZF',
  'digital_sampling_rate': 0.01,
  'generated': '2021-12-10T21:39:06Z',
  'network': 'NT',
  'status': 200,
  'sampling_period': 1},
 3)

# Starting over

If you make a mistake, you can start with a clean state by obliterating the streams we just created.

In [16]:
for stream in db.streams_in_collection(collection):
    stream.obliterate()