In [1]:
import pandas as pd
# import matplotlib.pyplot as plt
# from scipy.interpolate import interp1d
import numpy as np
import pandas as pd
import os
import dask.dataframe as ddf
import re

import sys
sys.path.append("..")

from channeling_lib import AWS_file_loader, load_path

stations_str = ['Tom Joad','Rosanna','Bette Davis', 'Layla', 'Mrs Robinson']

# TinyTag_str = ['CEB_1', 'CEB_2', 'CEB_3', 'CEB_4', 'CEB_5', 'TH1', 'TH2', 'TH3', 'TH5', 'TH6', 'TH8', 'TT1', 'TT2', 'TT3', 'TT4', 'TT5', 'TT6', 'TT7', 'TT9', 'TT12', 'TT13', 'TT14', 'TT15', 'TT16', 'TT17', 'TT18']

In [2]:
path = load_path()+'RawData/TinyTag/'

instrument_textbook_name = load_path()+'instrument_textbook_BLcourse_spring2025(BL instruments).csv'
instrument_textbook_data = pd.read_csv(instrument_textbook_name, encoding='latin1')

# instrument_textbook_data['Station name']

In [3]:
# Step 1: Extract station names (keep only prefix + number)
instrument_textbook_data['Formatted Name'] = [
    re.sub(r"(TT|TH|CEB)(\d+).*", r"\1\2", name) if re.match(r"(TT|TH|CEB)\d+", name) else name
    for name in instrument_textbook_data['Station name']
]

# Step 2: Find duplicate names
duplicates = instrument_textbook_data['Formatted Name'].value_counts()
duplicate_names = duplicates[duplicates > 1].index  # Names that appear more than once

# Step 3: Apply "_low" and "_high" based on "Th height (m)"
for name in duplicate_names:
    subset = instrument_textbook_data[instrument_textbook_data['Formatted Name'] == name]
    
    # Find the lowest and highest Th height
    min_index = subset['Th height (m)'].idxmin()
    max_index = subset['Th height (m)'].idxmax()
    
    # Rename them
    instrument_textbook_data.loc[min_index, 'Formatted Name'] = f"{name}_low"
    instrument_textbook_data.loc[max_index, 'Formatted Name'] = f"{name}_high"

# Step 4: Get the final formatted list
formatted_names = instrument_textbook_data['Formatted Name'].tolist()

# formatted_names



In [4]:
# Insert 'Formatted Name' as the first column
instrument_textbook_data.insert(0, 'Formatted Station Name', instrument_textbook_data.pop('Formatted Name'))


# # Check if 'Formatted Name' exists, if not, create it
# if 'Formatted Name' not in instrument_textbook_data.columns:
#     instrument_textbook_data['Formatted Name'] = instrument_textbook_data['Station name']  # Default to 'Station name' if missing

# # Remove 'Formatted Station Name' if it already exists
# if 'Formatted Station Name' in instrument_textbook_data.columns:
#     instrument_textbook_data.drop(columns=['Formatted Station Name'], inplace=True)

# # Insert 'Formatted Name' as the first column under 'Formatted Station Name'
# instrument_textbook_data.insert(0, 'Formatted Station Name', instrument_textbook_data.pop('Formatted Name'))



# Loading start and end times of Tinytag for calibration

In [5]:
# TinyTag_str = instrument_textbook_data['Station name'].tolist()
TinyTag_str = [s for s in formatted_names if s.startswith(("TT", "TH", "CEB"))]


# TinyTag_str

In [6]:
# instrument_textbook_data

In [7]:
pd.to_datetime(instrument_textbook_data.loc[instrument_textbook_data['Formatted Station Name'] == 'TT12_low', 'Setup time (UTC)'].values[0])

Timestamp('2025-01-26 13:56:00')

In [14]:
manual_times = {}

for station in TinyTag_str:
    setup_time_idx = pd.to_datetime(instrument_textbook_data.loc[instrument_textbook_data['Formatted Station Name'] == station, 'Setup time (UTC)'].values[0])

    maintenance_start_time_idx = pd.to_datetime(instrument_textbook_data.loc[instrument_textbook_data['Formatted Station Name'] == station, 'Maintenance start time (UTC)'].values[0])

    # maintenance_duration_idx = int(instrument_textbook_data.loc[instrument_textbook_data['Formatted Station Name'] == station, 'Maintenance duration (minutes)'].values[0])
    maintenance_duration_value = instrument_textbook_data.loc[instrument_textbook_data['Formatted Station Name'] == station, 'Maintenance duration (minutes)'].values[0]
    maintenance_duration_idx = int(maintenance_duration_value) if not pd.isna(maintenance_duration_value) else 0

    retrieval_time_idx = pd.to_datetime(instrument_textbook_data.loc[instrument_textbook_data['Formatted Station Name'] == station, 'Retrieval time (UTC)'].values[0])

    manual_times[station] = {
            'setup_time': setup_time_idx,
            'maintenance_start_time': maintenance_start_time_idx,
            'maintenance_duration': maintenance_duration_idx,
            'retrieval_time': retrieval_time_idx
        }

# manual_times

## Filter data based on the start end and duration times

In [16]:
def filter_data_based_on_time(df, setup_time, maintenance_start_time, maintenance_duration, retrieval_time, is_second_file=False):
    """
    Filtert die Daten basierend auf den angegebenen Zeitbereichen für das 1. und 2. Dataset.
    
    :param df: Der DataFrame mit den Rohdaten.
    :param setup_time: Der Setup-Zeitpunkt in UTC (als pd.Timestamp).
    :param maintenance_start_time: Der Startzeitpunkt der Wartung in UTC (als pd.Timestamp).
    :param maintenance_duration: Die Dauer der Wartung in Minuten.
    :param retrieval_time: Der Retrieval-Zeitpunkt in UTC (als pd.Timestamp).
    :param is_second_file: True, wenn es sich um das 2. Dataset handelt, andernfalls False für das 1. Dataset.
    :return: Der gefilterte DataFrame.
    """
    if is_second_file:
        # Filter für das 2. Dataset: nach Maintenance start time + duration + 5 Minuten und Retrieval time - 5 Minuten
        start_time = maintenance_start_time + pd.Timedelta(minutes=int(maintenance_duration))
        end_time = retrieval_time
    else:
        # Filter für das 1. Dataset: nach Setup time + 5 Minuten und Maintenance start time - 5 Minuten
        start_time = setup_time
        end_time = maintenance_start_time

    # Filter die Daten innerhalb des angegebenen Zeitrahmens
    filtered_df = df[(df.index >= start_time) & (df.index <= end_time)]
    return filtered_df

# Loading TinyTag Calibration data

The calibration data is stored in format:

- CEB_i_calibration_data

- THi_calibration_data

- TTi_calibration_data

Where i corresponds to the number of the TinyTag, i.e. the same number as the raw files

In [9]:
#function from unis github
def read_Tinytag(filename, sensor):
    '''
    Reads data from one or several data files from the Tinytag output files.

    Parameters:
    -------
    filename: str
        String with path to file(s)
        If several files shall be read, specify a string including UNIX-style wildcards
    sensor: str
        One of "TT", "TH" or "CEB"
    Returns
    -------
    df : pandas dataframe
        a pandas dataframe with time as index and the individual variables as columns.
    '''

    import dask.dataframe as ddf


    if sensor == "TT":
        df = ddf.read_csv(filename, delimiter="\t", skiprows=5, parse_dates=[1], date_format="%d %b %Y %H:%M:%S", names=["RECORD", "TIMESTAMP", "T_black", "T_white"], encoding = "ISO-8859-1")
    elif sensor == "TH":
        df = ddf.read_csv(filename, delimiter="\t", skiprows=5, parse_dates=[1], date_format="%d %b %Y %H:%M:%S", names=["RECORD", "TIMESTAMP", "T", "RH"], encoding = "ISO-8859-1")
    elif sensor == "CEB":
        df = ddf.read_csv(filename, delimiter="\t", skiprows=5, parse_dates=[1], date_format="%d %b %Y %H:%M:%S", names=["RECORD", "TIMESTAMP", "T"], encoding = "ISO-8859-1")
    else:
        assert False, 'Sensortype of Tinytag not known. Should be one of "TT", "TH" or "CEB".'

    df = df.compute()
    df.set_index("TIMESTAMP", inplace=True)

    for key in list(df.columns):
        if key == "RECORD":
            pass
        else:
            data = [float(i.split(" ")[0]) for i in df[key]]
            unit = df[key].iloc[0].split(" ")[1]
            if unit == "°C":
                unit = "degC"
            new_key = f"{key}_{unit}"

            df[new_key] = data

            df.drop(key, axis=1, inplace=True)

    return df

In [10]:
# Get all CEB_i folders inside TinyTag
folders = [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f)) and f.startswith('CEB_')]

# Dictionary to store the datasets
calibration_data = {}

# Loop through each CEB_i folder
for folder in folders:
    folder_path = os.path.join(path, folder)
    
    # Get all .txt files that start with "CEB_i_calibration_"
    files = [f for f in os.listdir(folder_path) if f.startswith(folder + "_calibration_") and f.endswith(".txt")]
    
    for file in files:
        file_path = os.path.join(folder_path, file)
        
        # Run the read_Tinytag function
        dataset = read_Tinytag(file_path, 'CEB')
        
        # Store the dataset in a dictionary using the folder name as the key
        calibration_data[f"{folder}_calibration_data"] = dataset

# Loop over the stored datasets and assign them as individual variables
for dataset_name in calibration_data.keys():
    globals()[dataset_name] = calibration_data[dataset_name]

In [11]:
# TH
folders = [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f)) and f.startswith('TH')]

# Dictionary to store the datasets
calibration_data = {}

for folder in folders:
    folder_path = os.path.join(path, folder)
    
    # Get all .txt files that start with "THi_calibration_"
    files = [f for f in os.listdir(folder_path) if f.startswith(folder + "_calibration_") and f.endswith(".txt")]
    
    for file in files:
        file_path = os.path.join(folder_path, file)
        
        # Run the read_Tinytag function
        dataset = read_Tinytag(file_path, 'TH')
        
        # Store the dataset in a dictionary using the folder name as the key
        calibration_data[f"{folder}_calibration_data"] = dataset

for dataset_name in calibration_data.keys():
    globals()[dataset_name] = calibration_data[dataset_name]

In [12]:
# TT
folders = [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f)) and f.startswith('TT')]

# Dictionary to store the datasets
calibration_data = {}

for folder in folders:
    folder_path = os.path.join(path, folder)
    
    # Get all .txt files that start with "TTi_calibration_"
    files = [f for f in os.listdir(folder_path) if f.startswith(folder + "_calibration_") and f.endswith(".txt")]
    
    for file in files:
        file_path = os.path.join(folder_path, file)
        
        # Run the read_Tinytag function
        dataset = read_Tinytag(file_path, 'TT')
        
        # Store the dataset in a dictionary using the folder name as the key
        calibration_data[f"{folder}_calibration_data"] = dataset

for dataset_name in calibration_data.keys():
    globals()[dataset_name] = calibration_data[dataset_name]

# Loading TinyTag data

In [13]:
# Get all CEB_i folders inside TinyTag
folders = [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f)) and f.startswith('CEB_')]

# Dictionary to store the datasets
calibration_data = {}

# Loop through each CEB_i folder
for folder in folders:
    folder_path = os.path.join(path, folder)
    
    # Get all .txt files that does not include "CEB_i_calibration_"
    # files = [f for f in os.listdir(folder_path) if f.startswith(folder + "_calibration_") and f.endswith(".txt")]
    files = [f for f in os.listdir(folder_path) if f.endswith(".txt") and "_calibration_" not in f]

    
    for file in files:
        file_path = os.path.join(folder_path, file)
        
        # Run the read_Tinytag function
        dataset = read_Tinytag(file_path, 'CEB')
        
        # Store the dataset in a dictionary using the folder name as the key
        calibration_data[f"{folder}_calibration_data"] = dataset

# Loop over the stored datasets and assign them as individual variables
for dataset_name in calibration_data.keys():
    globals()[dataset_name] = calibration_data[dataset_name]