# Converted Fast File Standardization

This step is to be completed after following the CardConvert of the protocol outlined in Protocol.md

The first thing that this notebook does is take the output files from CardConvert and process them into a more uniform format for EddyPro.

In EddyPro, the following things need to be consistent across time:
* Data column order
* Number of header lines
* Number of "text" columns to be ignored
* Seamless boundaries between files

However, EddyPro can handle the following, which will be addressed later:
* Changes in instrument location and orientation
* Changes in instrument type
* Changes in acquisiton frequency and file size

The program will also combine data from the same site. For example, at the BB-NF site, "Fast" data is collected at both a 3m tower and at a 17m tower. This data will have to be combined.

In [1]:
from pathlib import Path
import re
import os

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import trange, tqdm
from scipy import stats

## User inputs

Please input the following information in the next cell:
* path to the directory containing the "Converted" files of interest
* the time-span represented by each file, in minutes (usually 30)
* the acquisition frequency of the files, in Hz (usually 10)
* the start date of the timeseries, in yyyy-mm-dd HH:MM format. This should be the timestamp of the **first** line of the **first** file of interest, rounded **down** to the nearest half-hour.
* the end date of the timeseries, in yyyy-mm-dd HH:MM format. This should be the timestamp of the **first** line of the **last** file of interest, rounded **down** to the nearest half-hour.
* The name of the site (either BB-SF, BB-NF or BB-UF, or something else that the system recognizes). This will point the program towards the correct metadata file containing information on instruments and history for the site.

The program will use this information to correctly format the "Fast" files.

In [28]:
# User input: Change me!

converted_dir = "/Volumes/TempData/Bretfeld Mario/Chimney-Park-Reprocessing-Sandbox/Alex Work/Bad/Chimney/EC Processing/BB-NF/Fast/3m/Converted"
file_length_in = 30  # minutes
file_length_out = 30  # minutes. Must be a divisor of file_length_in. Keep at 30/30 for simplicity.
acq_freq = 10  # Hz
start_time = "2021-03-06 15:30"  # yyyy-mm-dd HH:MM
end_time = "2021-03-07 11:00"#"2021-03-30 11:00"
site_name = "BB-NF"

## Processing steps
Run, but do not edit!

For each time stamp in a given file, create an "ideal" file index, then merge the two, keeping everything in the "ideal" file and populating the other with nans.

In [26]:
class process_from_raw():
    def __init__(self, converted_dir, file_length_in, file_length_out, acq_freq, start_time, end_time, site_name):
        self.start_time = pd.to_datetime(start_time)
        self.end_time = pd.to_datetime(end_time)

        # convert acq freq to interval
        self.acq_freq = acq_freq
        self.acq_period = pd.Timedelta(f'{1000//self.acq_freq} ms')
    
        self.file_length = file_length
        self.file_length_out = file_length_out
        self.n_records = self.file_length*self.acq_freq*60
        
        self.site_name = site_name

        # convert files and directories to path objects and create an output directory
        self.converted_path = Path(converted_dir)
        self.out_path = self.converted_path / ".." / "Standardized"
        if not self.out_path.exists():
            self.out_path.mkdir()
        return
    
    def get_timestamp_from_fn(self, fn):
        """given a file, this will extract the timestamp in its name"""
        file_id = Path(fn).name.split('Hz')[1]
        file_start_str = "".join(re.split("_|\.", file_id)[1:-1])
        file_start_ts = pd.to_datetime(file_start_str, format="%Y%m%d%H%M")
        return file_start_ts

    def get_fn_from_timestamp(self, file_start_ts, fns_lst):
        """given a timestamp, this will find the exact file name it's associated with"""
        file_start_str = (f'{file_start_ts.year:04d}_' + 
                          f'{file_start_ts.month:02d}_' + 
                          f'{file_start_ts.day:02d}_' + 
                          f'{file_start_ts.hour:02d}{file_start_ts.minute:02d}')

        # if the file exists
        for i, fn in enumerate(fns_lst):
            if file_start_str in str(fn):
                return fns_lst[i]

        # if not
        return
    

    
    def find_files(self):
        """find all the raw data files that the user wants to process"""
        
        start_time, end_time, file_length, site_name, converted_path = self.start_time, self.end_time, self.file_length, self.site_name, self.converted_path
        
        desired_file_tss = pd.date_range(start_time, end_time, freq=f'{file_length} min')
        fns = list(converted_path.glob("TOA5*.dat"))  # list all files

        file_tss = []
        for fn in fns:
            fts = self.get_timestamp_from_fn(fn)
            if (fts >= start_time and fts <= end_time):
                file_tss.append(self.get_timestamp_from_fn(fn))

        file_tss = sorted(file_tss)
        fns_sorted = [self.get_fn_from_timestamp(fts, fns) for fts in file_tss]  # use that to sort the file names
        fns = fns_sorted.copy()
        file_ts_dict = {ts:fn for ts, fn in zip(file_tss, fns)}  # also a dict is nice I guess
        n_files_converted = len(fns)
        

        self.desired_file_tss, self.file_tss, self.fns, self.file_ts_dict, self.n_files_converted = desired_file_tss, file_tss, fns, file_ts_dict, n_files_converted
        
        return
    
    def metadata_template(self):
        """create a blank template to store raw file metadata in"""
        n_files_converted = self.n_files_converted
        rawfile_metadata = pd.DataFrame(
            data=np.zeros((n_files_converted, 8), dtype=str),
            columns=['Encoding', 'Station_name', 'Datalogger_model', 'Datalogger_serial_number', 'Datalogger_OS_version', 'Datalogger_program_name', 'Datalogger_program_signature', 'Table_name']
        )
        rawfile_metadata['File_name'] = self.fns
        rawfile_metadata['TIMESTAMP'] = self.file_tss
        rawfile_metadata['Start_fn'] = np.NAN
        rawfile_metadata.set_index('TIMESTAMP', inplace=True)

        self.rawfile_metadata = rawfile_metadata
        
        return
    
    def summary_template(self, header):
        # initialize summary statistics array: [TIMESTAMP, RECORD, Ux_Max, Ux_Min, Ux_Std, ... rho_c_Max, ...]
        summary_header = []
        for colname in header[2:]:
            summary_header.append(colname + '_Max')
            summary_header.append(colname + '_Min')
            summary_header.append(colname + '_Std')
            summary_header.append(colname + '_Mean')
            summary_header.append(colname + '_NANPct')
            summary_header.append(colname + '_Skw')
            summary_header.append(colname + '_Krt')
        summary_data = np.empty((len(self.desired_file_tss), len(summary_header)))
        return summary_data, summary_header
        
    
    def get_header(self, site_name):
        """each site has a specially formatted header associated with it"""
        if site_name == 'BB-NF3':
            return ["TIMESTAMP","RECORD","Ux_CSAT3B","Uy_CSAT3B","Uz_CSAT3B","Ts_CSAT3B","rho_c_LI7500","rho_v_LI7500","P_LI7500","DIAG_LI7500"]
        elif site_name == 'BB-NF17':
            return ["TIMESTAMP","RECORD","Ux_CSAT3_17m","Uy_CSAT3_17m","Uz_CSAT3_17m","Ts_CSAT3_17m","Ux_CSAT3_7m","Uy_CSAT3_7m","Uz_CSAT3_7m","Ts_CSAT3_7m","rho_c_LI7500","rho_v_LI7500","P_LI7500","DIAG_LI7500"]
        
    def write_out(self, dfts, dat):
        # generate output file name: yyyy_mm_dd_HHMM.csv
        
        # number of out files to write
        out_per_in = self.file_length//self.file_length_out
        # number of entries per out file
        n_records_out = self.n_records//out_per_in
        # timestamps of out files
        out_tss = pd.date_range(dfts, periods=out_per_in, freq=f'{file_length_out} min')
        
        # generate file names and write to out dir
        for i, ots in zip(range(0, self.n_records, n_records_out), out_tss):
            ots_str = re.sub('-| ', '_', str(ots))
            ots_str = re.sub(':', '', ots_str)[:-2]
            out_fn = self.out_path / f'{ots_str}.csv'
            out_dat = dat.iloc[i:i + n_records_out]
            dat.to_csv(out_fn, sep=',', na_rep='NAN', index=False, index_label=False, line_terminator='\n')
        
    def standardize_fast_files(self):
        """reads in raw fast files, and combines/standardizes them to be continuous. Re-writes the files. Also computes diagnostic/summary statistics on the data"""

        fns, file_tss, file_ts_dict, desired_file_tss, acq_period, n_records, out_path, file_length = self.fns, self.file_tss, self.file_ts_dict, self.desired_file_tss, self.acq_period, self.n_records, self.out_path, self.file_length
        
        # each site has a unique header
        header = self.get_header(self.site_name)
        
        # initialize summary data
        summary_data, summary_header = self.summary_template(header)
        
        # we'll be popping file names off of fns and file_tss, so we'll create temporary copies of them first
        fns_temp, file_tss_temp = fns.copy(), file_tss.copy()
        
        # start processing files by desired start timestamp:
        # for each file time window (usually every half-hour), find all the converted raw files that will fit into that time window.
        # then, if there are any holes in the timeseries defined by those raw files, fill the missing timestamps and then fill the missing data
        # with NANs. Write the final file to a csv and report file summary statistics.
        for idfts, dfts in enumerate(tqdm(desired_file_tss)):
            # generate the desired time index of the input file (start 100ms after file timestamp)
            desired_time_index = pd.date_range(dfts + acq_period, periods=n_records, freq=acq_period)

            # create an empty array containing the full desired index
            dat = pd.DataFrame(desired_time_index, columns=['TIMESTAMP'])
            dat.set_index('TIMESTAMP', inplace=True)

            # search for files within the range covered by this file and load into an array indexed by timestamp
            next_file_ts = dfts + pd.Timedelta(f'{file_length} Min')

            next_fns, next_file_tss = [], []
            # when the list empties, the while loop throws in index error.
            try:
                while file_tss_temp[0] < next_file_ts:
                    next_fns.append(fns_temp.pop(0))
                    next_file_tss.append(file_tss_temp.pop(0))
            except IndexError as err:
                pass
            
            # combine the found files
            # if no valid files are found, just make a null dataframe
            if next_fns == []:
                rawdat = pd.DataFrame(np.full((1, len(header)), np.nan), columns = header)
                rawdat['TIMESTAMP'] = desired_time_index[0]
                rawdat.set_index('TIMESTAMP', inplace=True)
            # otherwise proceed as normal
            else:
                for i, fn, ts in zip(range(len(next_fns)), next_fns, next_file_tss):
                    if i == 0:
                        rawdat = pd.read_csv(fn, sep=',', header=[0], skiprows=[0, 2, 3], na_values = ['NAN', '-4400906'])
                        rawdat['TIMESTAMP'] = pd.to_datetime(rawdat['TIMESTAMP'], format='%Y-%m-%d %H:%M:%S.%f')
                        rawdat.set_index('TIMESTAMP', inplace=True)
                    else:
                        rawdat_tmp = pd.read_csv(fn, sep=',', header=[0], skiprows=[0, 2, 3], na_values = ['NAN', '-4400906'])
                        rawdat_tmp['TIMESTAMP'] = pd.to_datetime(rawdat_tmp['TIMESTAMP'], format='%Y-%m-%d %H:%M:%S.%f')
                        rawdat_tmp.set_index('TIMESTAMP', inplace=True)
                        rawdat = pd.concat([rawdat, rawdat_tmp])
                        rawdat = rawdat.merge(rawdat_tmp, how='outer', left_index=True, right_index=True)

                    # get the metadata
                    dfts_str = re.sub('-| ', '_', str(dfts))
                    dfts_str = re.sub(':', '', dfts_str)[:-2]
                    desired_fn = out_path / f'{dfts_str}.csv'
                    with open(fn) as f: self.rawfile_metadata.loc[ts] = f.readline()[1:-2].split('","') + [fn, desired_fn]

            # merge raw files into complete array
            dat = dat.merge(rawdat, how='outer', left_index=True, right_index=True, sort=True)
            self.write_out(dfts, dat)
            
            
            # write summary stats
            for icolname, colname in enumerate(dat.columns[2:]):
                summary_row = np.array([
                    np.nanmax(dat[colname]),
                    np.nanmin(dat[colname]),
                    np.nanstd(dat[colname]),
                    np.nanmean(dat[colname]),
                    stats.skew(dat[colname], nan_policy='omit'),
                    stats.kurtosis(dat[colname], nan_policy='omit'),
                    100*np.sum(np.where(np.isnan(dat[colname])))/self.n_records
                ])
                summary_data[idfts, icolname*7:icolname*7 + 7] = summary_row
        
        self.summary = pd.DataFrame(summary_data, columns=summary_header)
        self.summary['TIMESTAMP'] = desired_file_tss
        self.summary.set_index('TIMESTAMP', inplace=True)
        
        return
    
    def process(self):
        self.find_files()
        self.metadata_template()
        self.standardize_fast_files()

In [27]:
dirs = [
    "/Volumes/TempData/Bretfeld Mario/Chimney-Park-Reprocessing-Sandbox/Alex Work/Bad/Chimney/EC Processing/BB-NF/Fast/17m/Converted",
    "/Volumes/TempData/Bretfeld Mario/Chimney-Park-Reprocessing-Sandbox/Alex Work/Bad/Chimney/EC Processing/BB-NF/Fast/3m/Converted"
]

file_length_in = 30  # minutes
file_length_out = 30  # minutes
acq_freq = 10  # Hz
start_times = ["2021-03-11 00:00", "2021-03-06 15:30"]  # yyyy-mm-dd HH:MM
end_times = ["2021-03-11 11:00", "2021-03-07 11:00"]  # 3/30
site_names = ["BB-NF17", "BB-NF3"]


for converted_dir, start_time, end_time, site_name in zip(dirs, start_times, end_times, site_names):
    processing_engine = process_from_raw(converted_dir, file_length_in, file_length_out, acq_freq, start_time, end_time, site_name)
    processing_engine.process()
    summary = processing_engine.summary
    summary.to_csv(processing_engine.converted_path / '../Converted_Summary.csv')
    meta = processing_engine.rawfile_metadata
    meta.to_csv(processing_engine.converted_path / '../Converted_Metadata.csv')

100%|██████████| 23/23 [00:18<00:00,  1.22it/s]
100%|██████████| 40/40 [00:23<00:00,  1.69it/s]


Initialize the processing engine

# ToDo
0. Aggregate data from every site simultaneously and automate gap filling
1. Implement processing for slow files
2. Combine data between BBSF 4m/7m and between BBNF 3m/17m
3. Adapt to site changes detailed in the site_changes.txt file when processing data and headers
4. Adapt to mistakes in logger code
5. Gap fill between neighboring sites for things like Precip, airtemp, etc.
6. Apply calibrations

Extra: try to find ideal in/out size ratio
* 30min/30min: 3it/s
* 60min/30min:
* 120min/30min:
* 1440min/30min:
