In [2]:
import numpy as np
import pandas as pd
import json
import netCDF4 as nc
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fpq
import fastavro as avro
import uuid
import os
from time import time_ns
from enum import Enum
import matplotlib.pyplot as plt

In [3]:
# Common functions and values
def make_test_meta():
    test_meta = {'uuid': str(uuid.uuid1()),
                'param1':12,
                'param2':'a_string',
                'param3': np.random.rand()*1e9,
                'param4':['alist','of','strings']}
    return test_meta
    

def make_test_data(length_of_data):
    test_data = {'time' :(np.array(range(length_of_data))*1e-9), 
                 'vals' : np.random.randn(length_of_data),
                 'volts': np.random.randn(length_of_data), 
                 'dp'   : np.random.randn(length_of_data),
                 'dr'   : np.random.randn(length_of_data)}
    return test_data

def get_col_names():
    return ['time', 'vals', 'volts', 'dp', 'dr']
def round_4(val):
    return round(val, 4)


In [143]:
# Avro methods
def Avro_make_meta_groups(n_to_write):
    metadata_rows = {}
    for i in range(n_to_write):
        test_meta = json.dumps(make_test_meta())
        metadata_rows[test_meta] = i
    return metadata_rows

def Avro_get_schema():
    schema_1 = {
      "type" : "record",
      "name" : "Experiment details",
      "fields" : [ {
        "name" : "meta_exper_name",
        "type" : "string"
      }, {
        "name" : "data",
        "type" : {
          "type" : "record",
          "name" : "data",
          "fields" : [ {
            "name" : "time",
            "type" : {
              "type" : "array",
              "items" : "double"
            }
          }, {
            "name" : "vals",
            "type" : {
              "type" : "array",
              "items" : "double"
            }
          }, {
            "name" : "volts",
            "type" : {
              "type" : "array",
              "items" : "double"
            }
          }, {
            "name" : "dr",
            "type" : {
              "type" : "array",
              "items" : "double"
            }
          }, {
            "name" : "dp",
            "type" : {
              "type" : "array",
              "items" : "double"
            }
          } ]
        }
      } ]
    }
    return avro.parse_schema(schema_1)
    

def Avro_make_record(metadata, length_of_data):
    record ={'meta_exper_name' : metadata, 'data':make_test_data(length_of_data)}
    return record  # records must be an iterable

def Avro_write_n_to_file(length_of_data, n_to_write, fp = None):
    
    if fp is None:
        fp = 'AvroTestFile1.avro'
        
    if os.path.exists(fp):
        os.unlink(fp)
    
    then = time_ns()
    my_schema = Avro_get_schema()
    with open(fp, 'a+b') as file:
        record_mappings = Avro_make_meta_groups(n_to_write)
        custom_metadata = {'records_meta': json.dumps(record_mappings)}
        for i, exp_meta in zip(range(n_to_write),record_mappings.keys()) : 
            record = [make_record(exp_meta, length_of_data)]
            avro.writer(file, my_schema, record, metadata = custom_metadata) 
            # metadata is only written the first time

    now = time_ns()
    file_size = os.path.getsize(fp)/1000 #to get in kb
    return (now-then) * 1e-9, file_size


def Avro_load(source):
    then = time_ns()
    
    with open(fp, 'rb') as fo:
        avro_reader = avro.reader(fo)
        record_mappings = json.loads(avro_reader.metadata['records_meta'])
        metadata_read = (time_ns() - then)*1e-9

        for record in avro_reader:
            values = record['data']

    columns_read = (time_ns() - then)*1e-9 

    return metadata_read, columns_read



In [146]:
# small test to check functionality
fp = 'AvroTestFile1.avro'
writetime, filesize = Avro_write_n_to_file(2000, 1, fp)
metadata_read, columns_read = Avro_load(fp)
# print(writetime, filesize)

print(f'time to write: {round_4(writetime)} s, time to metaread {round_4(metadata_read)} s and colread is {round_4(columns_read)} s, size is {round_4(filesize)} KB')

time to write: 0.0051 s, time to metaread 0.0034 s and colread is 0.0112 s, size is 80.933 KB


Links to documentation/resources

Objects:
- [Fastavro.Writer](https://fastavro.readthedocs.io/en/latest/writer.html)


Resources:
- [Make an avro schema](http://avro4s-ui.landoop.com/)
- [Schema documentation](https://avro.apache.org/docs/1.11.1/specification/)
- [Panda Avro](https://github.com/ynqa/pandavro)
- [Reading a records into a dataframe](https://martinhynar.medium.com/quick-into-to-avro-in-python-and-how-to-make-it-pandas-dataframe-37bd0ae30ac6)



In [142]:
# Testing functions

'''
Schema = schema_1, each row is a metadata and all experiment records (like JSON)
Test to check read/write using schema 1
'''
def test_schema_1(fp):
    fp = 'example1.avro'
    parsed_schema = avro.parse_schema(schema_1)
    # Writing 
    if os.path.exists(fp):
            os.unlink(fp)
    n_to_write = 2
    length_of_data = 5
    metadata = []
    with open(fp, 'a+b') as out:
        record_mappings = Avro_make_meta_groups(n_to_write)
        custom_metadata = {'records_meta': json.dumps(record_mappings)}
        for i, exp_meta in zip(range(n_to_write),record_mappings.keys()) : 
            records = [make_record(exp_meta, length_of_data)] #records must be an iterable
            avro.writer(out, parsed_schema, records, metadata = custom_metadata) # metadata is only written the first time
    
    # Counting
    with open(fp, 'rb') as fo:
        avro_reader = avro.block_reader(fo)
        z = sum(1 for _ in avro_reader)
        print("Total blocks: ", z) # Should match n_to_write
    
    # Reading
    with open(fp, 'rb') as fo:
        avro_reader = avro.reader(fo)
#         print("meta:\n ", json.loads(avro_reader.metadata['records_meta']).keys())
        for record in avro_reader:
            print(len(record['data']['time'])) # should match length_of_data
#             print(record['data']) # gives back all testdata
    return

 

'''
Schema = schema_2, each row is a record 
Test to see if each experiment can be a block but it seems that block size has a limit, if the size of records in the block
is too big it splits into more blocks. It seems after 400 records the experiments splits into different blocks
'''
def check_if_blocks_work(fp):
    fp = 'example2.avro'
    parsed_schema = avro.parse_schema(schema_2)
    # Writing 
    if os.path.exists(fp):
            os.unlink(fp)
    n_to_write = 3
    length_of_data = 401
    with open(fp, 'a+b') as out:
        for i in range(n_to_write): # Each i should be a new block
            df = pd.DataFrame(make_test_data(length_of_data))
            records = df.to_dict(orient='records')
            avro.writer(out, parsed_schema, records)
    # Counting
    with open(fp, 'rb') as fo:
        avro_reader = avro.block_reader(fo)
        z = sum(1 for _ in avro_reader)
        print("Total blocks: ", z) # Should match n_to_write
    
    # Records per block
    with open(fp, 'rb') as fo:
        avro_reader = avro.block_reader(fo)
        for i, block in enumerate(avro_reader):
            print("block ", i ,"has no. records:",block.num_records)
    return

# Run
test_schema_1('example1.avro')
check_if_blocks_work('example2.avro')

Total blocks:  2
5
5
Total blocks:  6
block  0 has no. records: 400
block  1 has no. records: 1
block  2 has no. records: 400
block  3 has no. records: 1
block  4 has no. records: 400
block  5 has no. records: 1


In [None]:
# Schemas 
'''
Records use the type name “record” 
name: a JSON string providing the name of the record (required).
namespace, a JSON string that qualifies the name (optional);
doc: a JSON string providing documentation to the user of this schema (optional).

'''
schema_1 = {
  "type" : "record",
  "name" : "Experiment details",
  "fields" : [ {
    "name" : "meta_exper_name",
    "type" : "string"
  }, {
    "name" : "data",
    "type" : {
      "type" : "record",
      "name" : "data",
      "fields" : [ {
        "name" : "time",
        "type" : {
          "type" : "array",
          "items" : "double"
        }
      }, {
        "name" : "vals",
        "type" : {
          "type" : "array",
          "items" : "double"
        }
      }, {
        "name" : "volts",
        "type" : {
          "type" : "array",
          "items" : "double"
        }
      }, {
        "name" : "dr",
        "type" : {
          "type" : "array",
          "items" : "double"
        }
      }, {
        "name" : "dp",
        "type" : {
          "type" : "array",
          "items" : "double"
        }
      } ]
    }
  } ]
}

schema_2 = {
  "type" : "record",
  "name" : "Experiment details",
  "fields" : [ {
    "name" : "time",
    "type" : "double"
  }, {
    "name" : "vals",
    "type" : "double"
  }, {
    "name" : "volts",
    "type" : "double"
  }, {
    "name" : "dr",
    "type" : "double"
  }, {
    "name" : "dp",
    "type" : "double"
  } ]
}

parsed_schema = avro.parse_schema(schema_2) #adds some extra details to for ease of writers