## Telemetry

In [1]:
import os
import glob
import datetime
import re
from typing import Union, List, Tuple

import duckdb
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
from tqdm import tqdm
from joblib import Parallel, delayed


sns.set(rc={'axes.facecolor':'whitesmoke'})


ModuleNotFoundError: No module named 'joblib'

In [None]:
def csv_to_parquet(file_path: str,
                   output_directory: str):
    """Convert CSV files to Parquet file format."""

    # create output file name
    output_parquet_file = os.path.join(output_directory, f"{os.path.splitext(os.path.basename(file_path))[0]}.parquet")
    
    # construct query to convert to parquet files
    sql = f"""
    COPY(
        SELECT
            *
        FROM 
            '{file_path}'
        ) TO '{output_parquet_file}' (FORMAT PARQUET);
    """
    
    # execute conversion
    duckdb.query(sql)
    
    return output_parquet_file


def read_tagging_file(tagging_file: str) -> pd.DataFrame:
    """Read in raw excel tagging file to data frame and add field for PST date time fields."""
    
    # read in tagging data
    df = pd.read_excel(tagging_file)

    # rename fields
    df.rename(columns={"rel_datetime": "tag_release_date"}, inplace=True)

    # adjust date times in tagging file to PST
    df["tag_activation_date_pst"] = pd.to_datetime(df["tag_activation_date"]) - pd.Timedelta(hours=1)
    df["tag_release_date_pst"] = pd.to_datetime(df["tag_release_date"]) - pd.Timedelta(hours=1)
    
    return df.sort_values(by="fish_id").reset_index(drop=True)


def read_beacon_file(beacon_file: str) -> pd.DataFrame:
    """Read in raw beacon file to data frame."""
    
    pass


def generate_directory_list(target_directory: str,
                            ignore_dirs: Tuple[str] = (".DS_Store", "pre_study", "zips", ".zip")) -> list:
    """Generate a clearn directory list."""
    
    return [os.path.join(target_directory, i) for i in os.listdir(target_directory) if i not in ignore_dirs]


def add_trailing_zeros(x: str, 
                       length: int):
    """Add trailing zeros to a string."""
    
    return(x + f"{'0' * (length - len(x))}")


def add_leading_zeros(x: str, 
                    length: int):
    """Add leading zeros to a string."""
    
    return(f"{'0' * (length - len(x))}" + x)


def validate_file_to_directory_match(file_list: list):
    """Validate file names to their parent directory to ensure that no errors have occurred."""

    valid_list = []
    
    for i in file_list:

        # get the source directory name of the download
        source_directory = os.path.basename(os.path.dirname(i))

        # extact the file parts to match to source directory
        target_file_parts = os.path.splitext(os.path.basename(i))
        target_file_base = target_file_parts[0].split("_")[-1]
        target_file_extension = target_file_parts[1]

        if source_directory != target_file_base:
            print(f"File '{i}' does not match the parent directory name.")
            print("Removing from inputs.  Please review.")

        else:
            valid_list.append(i)

    return valid_list


def generate_orion_import_file_list(orion_dir: str):
    """Generate a list of orion files to import after validation."""

    # generate full path lists of text and hex files in the orion directory
    text_files = glob.glob(os.path.join(orion_dir, "**/*.txt"))
    hex_files = glob.glob(os.path.join(orion_dir, "**/*.hex"))

    # validate file name to directory name match
    text_files = validate_file_to_directory_match(text_files)
    hex_files = validate_file_to_directory_match(hex_files)

    # validate to ensure a hex / text pair
    text_file_base_list = [os.path.splitext(os.path.basename(i))[0] for i in text_files]
    hex_file_base_list = [os.path.splitext(os.path.basename(i))[0] for i in hex_files]

    # files in text list not in hex list
    no_match_text_files = set(text_file_base_list) - set(hex_file_base_list)

    # files in hex list not in text list
    no_match_hex_files = set(hex_file_base_list) - set(text_file_base_list)

    if len(no_match_text_files) > 0:
        print(f"There are not hex file matches for the following text files: {no_match_text_files}")

    if len(no_match_hex_files) > 0:
        print(f"There are not text file matches for the following hex files: {no_match_hex_files}")

    # hex file size should be smaller than the text file or something may be wrong
    text_file_sizes = [os.stat(i).st_size for i in text_files]
    hex_file_sizes = [os.stat(i).st_size for i in hex_files]

    for index, i in enumerate(text_files):
        
        text_file_size = os.stat(i).st_size
        hex_file_size = os.stat(f"{os.path.splitext(i)[0]}.hex").st_size

        if hex_file_size >= text_file_size:
            print(f"WARNING:  Text file '{i}' is smaller than or equal to the hex file size (bytes).")
            print(f"Text file size: {text_file_size}, Hex file size: {hex_file_size}, Difference (hex - text): {hex_file_size - text_file_size}")
            print(f"Removing files from import.  Please review.")

            # remove files with incorrect sizes
            text_files.remove(text_files[index])
            
    return text_files


def whitespace_to_csv(input_file:str, 
                      output_dir:str) -> str:
    """Generate new output files with whitespace converted to CSV."""
    
    # extract basename from input file
    basename = os.path.splitext(os.path.basename(input_file))[0]
    
    # construct output file name
    output_file = os.path.join(output_dir, f"{basename}.csv")
    
    # open file to write
    with open(output_file, "w") as out:
        
        # read input file as string
        with open(input_file) as get:
            content = get.read()
            
            # write content replacing any whitespace with commas but keeping new lines or carriage returns
            out.write(re.sub("[^\S^\r\n]+", ",", content))
            
    return output_file


def orion_raw_to_parquet(input_file: str,
                         output_directory: str,
                         target_frequency_list: list,
                         target_code_list: list) -> str:
    """Create a parquet file for each formatted CSV."""
    
    # TODO: pass in from outside
    site_to_receiver = {1: 180, 2: 181, 3: 182, 4: 183, 5: 184, 6: 185, 7: 159, 8: 186, 9: 187}

    
    basename = os.path.basename(input_file)
    
    # extract expected site number from file name
    #  expected file name format:  "Site9_20220907.csv"
    expected_site_number = int(basename.split("_")[0].casefold().split("site")[-1])
    
    # get expected receiver id
    expected_receiver = site_to_receiver[expected_site_number]
    
    # extract file name from input file
    file_name = f"0_ORION_{os.path.splitext(basename)[0]}"
    
    # create output file name
    output_parquet_file = os.path.join(output_directory, f"{file_name}.parquet")


    sql = f"""
    COPY(
        SELECT
            concat(
                CASE
                    WHEN length(Freq::VARCHAR) = 4
                    THEN Freq::VARCHAR || '000'
                    WHEN length(Freq::VARCHAR) = 5
                    THEN Freq::VARCHAR || '00'        
                    WHEN length(Freq::VARCHAR) = 6
                    THEN Freq::VARCHAR || '0' 
                    ELSE Freq::VARCHAR 
                END
                ,'.'
                ,CASE
                    WHEN length(Code::VARCHAR) = 1
                    THEN '00' || Code::VARCHAR
                    WHEN length(Code::VARCHAR) = 2
                    THEN '0' || Code::VARCHAR  
                    ELSE Code::VARCHAR
                END
            ) AS fish_id
            ,(Date + Time) AS date_time
            ,Site AS receiver_id
            ,Power AS signal_power
            ,'{file_name}' AS file_nm
        FROM
            read_csv_auto('{input_file}')
        WHERE
            Freq IS NOT NULL
            AND Code IS NOT NULL
            AND Date IS NOT NULL
            AND Time IS NOT NULL
            AND Site IS NOT NULL
            AND Power IS NOT NULL
            AND Type IS NOT NULL 
            AND Type = 'LOTEK'
            AND Freq IN {tuple(target_frequency_list)}
            AND Code IN {tuple(target_code_list)}
            -- remove all receiver ids where not matching file name
            AND Site = {expected_receiver}
        ) TO '{output_parquet_file}' (FORMAT PARQUET);
    """
    
    # execute query
    try:
        duckdb.query(sql)
    except (duckdb.BinderException, duckdb.InvalidInputException) as error:
        print(f"ERROR:  Passing import of {input_file}. Please review.")
        pass
    
    return output_parquet_file


def generate_orion_parquet_files(orion_dir: str,
                                 target_frequency_list: List[float],
                                 target_code_list: List[int],
                                 output_directory: str) -> List[str]:
    """Generate ORION parquet files for query."""
    
    # generate the full file list to process
    file_list = tqdm(generate_orion_import_file_list(orion_dir))

    # process files
    processed_files = []
    for i in file_list:

        # convert all whitespace in Orion text files to commas
        raw_csv_file = whitespace_to_csv(i, orion_dir)

        # convert CSV file to parquet format
        parquet_file = orion_raw_to_parquet(input_file=raw_csv_file,
                                            output_directory=output_directory,
                                            target_frequency_list=target_frequency_list,
                                            target_code_list=target_code_list)
        # add processed file to output list
        processed_files.append(parquet_file)
    
    return processed_files


def mitas_raw_to_parquet(input_file: str,
                         output_directory: str,
                         target_frequency_list: List[float],
                         target_code_list: List[int],
                         daylight_savings_time_spring: str):
    """Ingest and format an input MITAS file."""
        
    # extract file name from input file
    file_name = f"1_MITAS_{os.path.splitext(os.path.basename(input_file))[0]}"
    
    # create output file name
    output_parquet_file = os.path.join(output_directory, f"{file_name}.parquet")

    sql = f"""
    COPY(
        SELECT
            concat(
                CASE
                    WHEN length(frequency::VARCHAR) = 4
                    THEN frequency::VARCHAR || '000'
                    WHEN length(frequency::VARCHAR) = 5
                    THEN frequency::VARCHAR || '00'        
                    WHEN length(frequency::VARCHAR) = 6
                    THEN frequency::VARCHAR || '0' 
                    ELSE frequency::VARCHAR 
                END
                ,'.'
                ,CASE
                    WHEN length(codeNumber::VARCHAR) = 1
                    THEN '00' || codeNumber::VARCHAR
                    WHEN length(codeNumber::VARCHAR) = 2
                    THEN '0' || codeNumber::VARCHAR  
                    ELSE codeNumber::VARCHAR
                END
            ) AS fish_id
            ,CASE
                -- daylight savings time spring adjustment
                WHEN "decodeTimeUTC-04:00" - INTERVAL 3 HOUR <= '{daylight_savings_time_spring}'
                THEN "decodeTimeUTC-04:00" - INTERVAL 4 HOUR
                ELSE "decodeTimeUTC-04:00" - INTERVAL 3 HOUR
            END AS date_time
            ,ReceiverId AS receiver_id
            ,power::INT AS signal_power
            ,'{file_name}' AS file_nm
        FROM
            '{input_file}'
        WHERE
            frequency IS NOT NULL
            AND codeNumber IS NOT NULL
            AND "decodeTimeUTC-04:00" IS NOT NULL
            AND ReceiverId IS NOT NULL
            AND power IS NOT NULL
            AND frequency IN {tuple(target_frequency_list)}
            AND codeNumber IN {tuple(target_code_list)}
        ) TO '{output_parquet_file}' (FORMAT PARQUET);
    """
    
    # fire query
    duckdb.query(sql)
    
    return output_parquet_file


def generate_mitas_parquet_files(mitas_dir: str,
                                 target_frequency_list: List[float],
                                 target_code_list: List[int],
                                 output_directory: str,
                                 daylight_savings_time_spring: str) -> List[str]:
    """Generate MITAS parquet files for use in query."""
    
    # get a list of mitas CSV files
    mitas_csv_files = glob.glob(os.path.join(mitas_dir, "*.csv"))

    processed_files = []
    for i in tqdm(mitas_csv_files):

        # convert each file to a parquet file
        output_file = mitas_raw_to_parquet(input_file=i, 
                                           output_directory=output_directory,
                                           target_frequency_list=target_frequency_list,
                                           target_code_list=target_code_list,
                                           daylight_savings_time_spring=daylight_savings_time_spring)
        
        # add processed file to output list
        processed_files.append(output_file)
        
    return processed_files


def filter_tagged_fish(df, tagging_df):
    """Only keep fish in tagging file."""
    
    n_records = df.shape[0]

    # only keep fish in tagging file
    df = df.loc[df["fish_id"].isin(tagging_df["fish_id"].unique())]

    n_dropped = n_records - df.shape[0]

    print(f"Dropped {n_dropped} records for fish not in tagging file.")
    
    return df 


def filter_release_time(df, tagging_df, time_buffer_minutes=25):
    """Only keep records greater than or equal to release time."""
    
    n_records = df.shape[0]

    # get a lookup dictionary of release date times from each fish and add time buffer to reduce initial noise
    fish_release_time_dict = (tagging_df.set_index("fish_id")["tag_release_date_pst"] + pd.Timedelta(minutes=time_buffer_minutes)).to_dict()

    # add field for release time to bound study start
    df["tag_release_date_pst"] = df["fish_id"].map(fish_release_time_dict)

    # only keep records greater than the fish release time
    df = df.loc[df["date_time"] >= df["tag_release_date_pst"]].copy()

    df.drop(columns=["tag_release_date_pst"], inplace=True)

    n_dropped = n_records - df.shape[0]

    print(f"Dropped {n_dropped} records for detections before release time.")
    
    return df


def filter_study_period(df, 
                        end_date_time):
    """Only keep records that span through the study period."""
    
    n_records = df.shape[0]

    # only keep records that account for tag life
    df = df.loc[df["date_time"] <= end_date_time]

    n_dropped = n_records - df.shape[0]

    print(f"Dropped {n_dropped} records exceeding study period date and time.")
    
    return df


def filter_tag_life(df: pd.DataFrame, 
                    tagging_df: pd.DataFrame, 
                    max_tag_life_days: float) -> pd.DataFrame:
    """Only keep records that span through tag life."""
    
    n_records = df.shape[0]
    
    # create a tag expiration date
    tagging_df["tag_expire_dt"] = tagging_df["tag_activation_date_pst"] + pd.Timedelta(days=max_tag_life_days)
    
    # create a dictionary of tax expiration datetime
    tag_expire_dict = tagging_df.set_index("fish_id")["tag_expire_dt"].to_dict()
    
    # add the expiration date to the input data frame
    df["tag_expire_dt"] = df["fish_id"].map(tag_expire_dict)

    # only keep records that account for tag life
    df = df.loc[df["date_time"] <= df["tag_expire_dt"]]

    n_dropped = n_records - df.shape[0]

    print(f"Dropped {n_dropped} records where detection datetime exceeded tag life.")
    
    return df


def filter_drop_duplicate_detections(df: pd.DataFrame) -> pd.DataFrame:
    """Drop duplicate detections by keeping the first files which are Orion files."""

    n_records = df.shape[0]

    # drop duplicates by keeping "first" which are the orion files
    df.drop_duplicates(subset=["fish_id", "date_time", "site_number", "signal_power"], 
                       keep="first", 
                       inplace=True)

    n_dropped = n_records - df.shape[0]

    print(f"Dropped {n_dropped} duplicate MITAS and ORION records.")

    return df


def filter_raw_data(target_fish_id: str,
                    glob_path: str,
                    reciever_to_detect_site_dict: dict,
                    receiver_to_site_number_dict: dict,
                    tagging_df: pd.DataFrame,
                    project_end_date: str,
                    max_tag_life_days: float) -> pd.DataFrame:
    """Apply filters to raw data and add detection site and site number.
    
    Filter 1:  only expected fish in the data 
    Filter 2:  ensures the fish times are bound by release datetime
    Filter 3:  ensures that only detections that fall into the tag life window are considered
    Filter 4:  ensure that only detections that fall into the tag life window are considered
    Filter 5:  drop duplicates occurring in MITAS and ORION; keep ORION by default
    
    """
    
    sql = f"""
    SELECT
        *
    FROM 
        '{glob_path}'
    WHERE
        fish_id = '{target_fish_id}';
    """
    
    df = duckdb.query(sql).df()
    
    if df.shape[0] == 0:
        print(f"WARNING:  There were no valid detections for fish_id:  '{target_fish_id}'")
        print(f"WARNING:  Output file will not be created.")
        
    else:
    
        # add detect site
        df["detect_site"] = df["receiver_id"].map(reciever_to_detect_site_dict)

        # change receiver id to site number
        df["site_number"] = df["receiver_id"].map(receiver_to_site_number_dict)

        # ensure that only expected fish are in the data
        df = filter_tagged_fish(df, tagging_df)

        # ensure that fish times are bound by release time
        df = filter_release_time(df, tagging_df)

        # ensure that only detections that fall into the study period window
        df = filter_study_period(df,
                                 end_date_time=project_end_date)
        
        # ensure that only detections that fall into the tag life window are considered
        df = filter_tag_life(df,
                             tagging_df,
                             max_tag_life_days=max_tag_life_days)


        # drop duplicates by keeping "first" which are the orion files
        df = filter_drop_duplicate_detections(df)
        
    return df


def generate_lag_lead_records(df: pd.DataFrame) -> pd.DataFrame:
    """Create lag and lead records for the target fish to show what happened before
    and after the target.
    
    """

    # sort data frame
    df.sort_values(by=["fish_id", "detect_site", "date_time"], inplace=True)

    # reindex dataset
    df.reset_index(inplace=True, drop=True)

    # lag
    lag_df = df.shift(periods=1)[["fish_id", "detect_site", "date_time"]]

    # lead
    lead_df = df.shift(periods=-1)[["fish_id", "detect_site", "date_time"]]

    # add to main data frame
    df["lag_fish_id"] = lag_df["fish_id"]
    df["lag_detect_site"] = lag_df["detect_site"]
    df["lag_date_time"] = lag_df["date_time"]
    df["lead_fish_id"] = lead_df["fish_id"]
    df["lead_detect_site"] = lead_df["detect_site"]
    df["lead_date_time"] = lead_df["date_time"]
    
    return df


def create_events(df, hits, seconds, detection_sites):
    
    df = df.loc[df["detect_site"].isin(detection_sites)].copy()

    # create timedelta field in seconds; set
    df['time_from_previous_hit'] = np.where(
                                    (df.fish_id == df.lag_fish_id) & (df.detect_site == df.lag_detect_site),
                                    (df.date_time - df.lag_date_time).fillna(pd.Timedelta('0 days')).values.view('<i8')/10**9,
                                    -1)

    # create block_id for each event where hits are no more than time threshold seconds apart
    df['block_id'] = ((df.time_from_previous_hit >= seconds) | (df.time_from_previous_hit < 0)).astype(int).cumsum()

    # get hit count of each block
    df['block_count'] = df.groupby(['block_id'])['block_id'].transform('count')

    # remove unneeded columns
    drop_cols = ['time_from_previous_hit', 'lead_fish_id', 'lag_fish_id', 'lead_detect_site', 'lag_detect_site',
                 'lead_date_time', 'lag_date_time']

    df.drop(drop_cols, axis=1, inplace=True)

    # only keep event blocks that meet the hits per block threshold
    df = df[df['block_count'] >= hits]
    
    # add in hits per sec claus
    df["grouping"] = f"{hits}_{seconds}"
    
    return df


def build_first_last_detection(df, detect_site_to_abbrev_dict):
    """Generate first and last detections at each detect site."""

    # generate last event time for each detection site
    events_last = df.groupby(["fish_id", "grouping", "detect_site"])["date_time"].max().reset_index()
    events_last["site_abbrev"] = events_last["detect_site"].map(detect_site_to_abbrev_dict).str.lower() + "_l"

    # generate first event time for each detection site
    events_first = df.groupby(["fish_id", "grouping", "detect_site"])["date_time"].min().reset_index()
    events_first["site_abbrev"] = events_first["detect_site"].map(detect_site_to_abbrev_dict).str.lower() + "_f"

    # combine
    events_first_last = pd.concat([events_first, events_last])

    return events_first_last.sort_values(by="detect_site")



def plot_fish_heatmap(fish_id: str,
                      df: pd.DataFrame,
                      output_directory: str,
                      detect_plot_name_dict: dict,
                      site_plot_name_dict: dict,
                      time_aggregation: str = "min",
                      plot_field: str = "detect_site",
                      plot_type: str = "power",
                      figsize: tuple = (30, 8),
                      title: str = "",
                      file_name_suffix: str = "",
                      dpi: int = 80):
    
    if plot_field == "site_number":
        map_plot_name_dict = site_plot_name_dict

        
    else:
        map_plot_name_dict = detect_plot_name_dict

    
    beacon_df[f"{plot_field}_name"] = beacon_df[plot_field].map(map_plot_name_dict)

    df[f"{plot_field}_name"] = df[plot_field].map(map_plot_name_dict)

    df["date"] = df["date_time"].dt.floor(time_aggregation).dt.strftime('%m-%d %H:%M:00')

    fig, ax = plt.subplots(figsize=figsize)
    
    if plot_type == "detection":
        
        plot_detections = df.groupby([f"{plot_field}_name", "date"]).count().reset_index().pivot(f"{plot_field}_name", "date", "fish_id")

        g = sns.heatmap(plot_detections, annot=False, cmap="vlag", ax=ax) #linewidths=0.5, ax=ax)
        g.set_title(title)
        
    elif plot_type == "power":
        
        # more negative power is weaker; closer to 0 is more powerful
        plot_power = df.groupby([f"{plot_field}_name", "date"]).max().reset_index().pivot(f"{plot_field}_name", "date", "signal_power")

        g = sns.heatmap(plot_power, annot=False, cmap="vlag", ax=ax) #linewidths=0.5, ax=ax)
        g.set_title(title)
        
    else:
        print(f"Option for 'plot_type' = '{plot_type}' is not available.  Choose either 'detection' or 'power'.")
    
    figure = g.get_figure()
    figure.savefig(os.path.join(output_directory, f"{fish_id}_{file_name_suffix}.png"), dpi=dpi, bbox_inches="tight")
    
    plt.close()


def generate(fish_id: str, 
             output_directory: str,
             parquet_raw_dir: str,
             travel_time_template_dtypes: dict,
             reciever_to_detect_site_dict: dict,
             receiver_to_site_number_dict: dict,
             signal_power_threshold_dict: dict,
             detect_site_to_abbrev_dict: dict,
             detection_site_abbrev_list: list,
             detect_plot_name_dict: dict,
             site_plot_name_dict: dict,
             tagging_df: pd.DataFrame,
             beacon_df: pd.DataFrame,
             project_end_date: str,
             max_tag_life_days: int,
             use_events: bool, 
             consider_dam_operations: bool,
             output_plot_directory: str,
             simulation: str,
             generate_plots: bool = True,
             plot_size: tuple = (60, 7),
             write_clean_data: bool = False,
             generate_clean_plots: bool = False):
    
    
    # get path to glob all raw parquet files
    glob_path = os.path.join(parquet_raw_dir, "*.parquet")


    # process    
    df = filter_raw_data(fish_id,
                         glob_path,
                         reciever_to_detect_site_dict=reciever_to_detect_site_dict,
                         receiver_to_site_number_dict=receiver_to_site_number_dict,
                         tagging_df=tagging_df,
                         project_end_date=project_end_date,
                         max_tag_life_days=max_tag_life_days)
    if write_clean_data:
        df.sort_values(by="date_time").to_csv(os.path.join(output_directory, f"{simulation}/clean/clean_by_fish_id/{fish_id}.csv"), index=False)
    
    if generate_clean_plots:
    
        # set level of time aggregation for plots
        if df.shape[0] <= 20000:
            time_aggregation = "min"
            time_designation = "minute"
        else:
            time_aggregation = "H"
            time_designation = "hour"

        print("Producing clean plots...")

        # plot heatmap of detections  
        plot_fish_heatmap(fish_id=fish_id,
                          df=df,
                          output_directory=os.path.join(output_directory, f"{simulation}/clean/clean_plots"),
                          detect_plot_name_dict=detect_plot_name_dict,
                          site_plot_name_dict=site_plot_name_dict,
                          time_aggregation=time_aggregation,
                          plot_field="site_number",
                          plot_type="detection",
                          figsize=plot_size,
                          title=f"Clean detections by site number aggregated to {time_designation}:  {fish_id}",
                          file_name_suffix="event_detections-by-site_number")

        # plot heatmap of signal power
        plot_fish_heatmap(fish_id=fish_id,
                          df=df,
                          output_directory=os.path.join(output_directory, f"{simulation}/clean/clean_plots"),
                          detect_plot_name_dict=detect_plot_name_dict,
                          site_plot_name_dict=site_plot_name_dict,
                          time_aggregation=time_aggregation,
                          plot_field="site_number",
                          plot_type="power",
                          figsize=plot_size,
                          title=f"Clean signal power per detecion by site number aggregated to {time_designation}:  {fish_id}",
                          file_name_suffix="event_signal_power-by-site_number")


        # plot heatmap of detections  
        plot_fish_heatmap(fish_id=fish_id,
                          df=df,
                          output_directory=os.path.join(output_directory, f"{simulation}/clean/clean_plots"),
                          detect_plot_name_dict=detect_plot_name_dict,
                          site_plot_name_dict=site_plot_name_dict,
                          time_aggregation=time_aggregation,
                          plot_field="detect_site",
                          plot_type="detection",
                          figsize=plot_size,
                          title=f"Clean detections by detecion site aggregated to {time_designation}:  {fish_id}",
                          file_name_suffix="event_detections-by-detect_site")

        # plot heatmap of signal power
        plot_fish_heatmap(fish_id=fish_id,
                          df=df,
                          output_directory=os.path.join(output_directory, f"{simulation}/clean/clean_plots"),
                          detect_plot_name_dict=detect_plot_name_dict,
                          site_plot_name_dict=site_plot_name_dict,
                          time_aggregation=time_aggregation,
                          plot_field="detect_site",
                          plot_type="power",
                          figsize=plot_size,
                          title=f"Clean signal power per detection by detect site aggregated to {time_designation}:  {fish_id}",
                          file_name_suffix="event_signal_power-by-detect_site")

    if use_events:

        # create lag and lead records
        df = generate_lag_lead_records(df)

        # get a list of detection sites per hits per second condition
        cond_one = beacon_df.loc[beacon_df["hits_seconds_run"] == "3_60"]["detect_site"].to_list()
        cond_two = beacon_df.loc[beacon_df["hits_seconds_run"] == "2_120"]["detect_site"].to_list()

        # create events per condition
        dfa = create_events(df, hits=3, seconds=60, detection_sites=cond_one)
        dfb = create_events(df, hits=2, seconds=120, detection_sites=cond_two)

        # merge output
        events_nops = pd.concat([dfa, dfb])

    else:

        # if not calculating events
        events_nops = df.copy()

        events_nops["grouping"] = None

    # only keep event detections that are greater than or equal to the site specific thresholds for signal power
    events_nops = events_nops.loc[events_nops["signal_power"] >= events_nops["site_number"].map(signal_power_threshold_dict)]

    # generate first and last detections per detection site
    events_nops_tt = build_first_last_detection(events_nops, detect_site_to_abbrev_dict)

    events_ops = events_nops
    events_ops_tt = events_nops_tt    
    
    if events_ops.shape[0] == 0:
        print(f"No events available.")
        generate_plots = False
        
    if generate_plots:
        print("Producing events plots...")
        
        # set level of time aggregation for plots
        if events_ops.shape[0] <= 20000:
            time_aggregation = "min"
            time_designation = "minute"
        else:
            time_aggregation = "H"
            time_designation = "hour"

        # plot heatmap of detections  
        plot_fish_heatmap(fish_id=fish_id,
                          df=events_ops,
                          output_directory=output_plot_directory,
                          detect_plot_name_dict=detect_plot_name_dict,
                          site_plot_name_dict=site_plot_name_dict,
                          time_aggregation=time_aggregation,
                          plot_field="site_number",
                          plot_type="detection",
                          figsize=plot_size,
                          title=f"Event detections by site number aggregated to {time_designation}:  {fish_id}",
                          file_name_suffix="event_detections-by-site_number")

        # plot heatmap of signal power
        plot_fish_heatmap(fish_id=fish_id,
                          df=events_ops,
                          output_directory=output_plot_directory,
                          detect_plot_name_dict=detect_plot_name_dict,
                          site_plot_name_dict=site_plot_name_dict,
                          time_aggregation=time_aggregation,
                          plot_field="site_number",
                          plot_type="power",
                          figsize=plot_size,
                          title=f"Event signal power per detecion by site number aggregated to {time_designation}:  {fish_id}",
                          file_name_suffix="event_signal_power-by-site_number")

        # plot heatmap of detections  
        plot_fish_heatmap(fish_id=fish_id,
                          df=events_ops,
                          output_directory=output_plot_directory,
                          detect_plot_name_dict=detect_plot_name_dict,
                          site_plot_name_dict=site_plot_name_dict,
                          time_aggregation=time_aggregation,
                          plot_field="detect_site",
                          plot_type="detection",
                          figsize=plot_size,
                          title=f"Event detections by detecion site aggregated to {time_designation}:  {fish_id}",
                          file_name_suffix="event_detections-by-detect_site")

        # plot heatmap of signal power
        plot_fish_heatmap(fish_id=fish_id,
                          df=events_ops,
                          output_directory=output_plot_directory,
                          detect_plot_name_dict=detect_plot_name_dict,
                          site_plot_name_dict=site_plot_name_dict,
                          time_aggregation=time_aggregation,
                          plot_field="detect_site",
                          plot_type="power",
                          figsize=plot_size,
                          title=f"Event signal power per detection by detect site aggregated to {time_designation}:  {fish_id}",
                          file_name_suffix="event_signal_power-by-detect_site")

    # build a dictionary of field with empty data
    travel_time_template_dict = {i: [] for i in travel_time_template_dtypes.keys()}

    # generate travel time template
    travel_time_template = pd.DataFrame(travel_time_template_dict).astype(travel_time_template_dtypes)

    # add fish ids to the travel time file
    travel_time_template["fish_id"] = tagging_df.index

    # set template site designation to 0 
    travel_time_template[detection_site_abbrev_list] = 0

    # generate a list of detection site first and last columns in order
    detection_site_time_columns = []
    for i in detection_site_abbrev_list:
        detection_site_time_columns.append(f"{i}_f")
        detection_site_time_columns.append(f"{i}_l")

    # sort by fish id
    travel_time_template.sort_values(by=["fish_id"], inplace=True)

    # set index to fish id
    travel_time_template.set_index("fish_id", inplace=True)
    

    if events_ops.shape[0] > 0:
        events_ops_tt_layout = pd.pivot_table(events_ops_tt,
                                               values="date_time",
                                               index=["fish_id"],
                                               columns=["site_abbrev"]).rename_axis(None, axis=1).reset_index()
    else:
        events_ops_tt_layout = pd.DataFrame({"fish_id": [fish_id], "date_time": [pd.NaT]})
    
    events_ops_tt_layout["pit_code"] = events_ops_tt_layout["fish_id"].map(pit_tag_dict)
    events_ops_tt_layout["srr"] = events_ops_tt_layout["fish_id"].map(srr_dict)
    events_ops_tt_layout["rel_name"] = events_ops_tt_layout["fish_id"].map(release_name_dict)
    events_ops_tt_layout["lot"] = events_ops_tt_layout["fish_id"].map(lot_dict)
    events_ops_tt_layout["act_datetime"] = events_ops_tt_layout["fish_id"].map(activation_dict)
    events_ops_tt_layout["release_datetime"] = events_ops_tt_layout["fish_id"].map(release_dict)
    events_ops_tt_layout["mort_xlat"] = events_ops_tt_layout["fish_id"].map(mort_dict)

    events_ops_tt_layout["hole"] = "NA"
    events_ops_tt_layout["subhole"] = "NA"
    events_ops_tt_layout["rel_weir_time_s"] = 0
    events_ops_tt_layout["pool_stage"] = ""
    events_ops_tt_layout["release_stage"] = "NA"
    events_ops_tt_layout["censor"] = ""
    events_ops_tt_layout["altered"] = ""
    events_ops_tt_layout["comments"] = ""
    
    for i in detection_site_abbrev_list:
        events_ops_tt_layout[i] = 0
        
        first = f"{i}_f"
        last = f"{i}_l"
        
        if first not in events_ops_tt_layout:
            events_ops_tt_layout[first] = pd.NaT
        
        if last not in events_ops_tt_layout:
            events_ops_tt_layout[last] = pd.NaT

    events_ops_tt_layout = events_ops_tt_layout[list(travel_time_template.reset_index().columns)].set_index("fish_id")

    # update template fields for each detection site where an event was logged regardless of whether or not the fish passed the site
    x = events_ops.set_index("fish_id")["detect_site"].map({i: detect_site_to_abbrev_dict[i].lower() for i in detect_site_to_abbrev_dict.keys()}).reset_index()

    x["detected"] = 1

    detect_designation = pd.pivot_table(x, 
                                        values="detected", 
                                        index="fish_id", 
                                        columns="detect_site", 
                                        fill_value=0).rename_axis(None, axis=1)

    # update the template data frame with the new data
    events_ops_tt_layout.update(detect_designation)

    travel_time_df = events_ops_tt_layout.copy()

    # calculate the releative weir time in seconds
    travel_time_df["rel_weir_time_s"] = (travel_time_df['tw_l'] - travel_time_df['tw_f']).dt.seconds.fillna(0)
    
    # keep the following event fields
    events_keep_fields = ['fish_id', 'date_time', 'receiver_id', 'signal_power', 'file_nm', 
                          'detect_site', 'site_number', 'block_id', 'block_count']

    return travel_time_df, events_ops[events_keep_fields]


def process_parallel(fish_id,
                     output_directory,
                     parquet_raw_dir,
                      travel_time_template_dtypes,
                      reciever_to_detect_site_dict,
                      receiver_to_site_number_dict,
                      signal_power_threshold_dict,
                      detect_site_to_abbrev_dict,
                      detection_site_abbrev_list,
                      detect_plot_name_dict,
                      site_plot_name_dict,
                      simulation,
                      tagging_df,
                      beacon_df,
                      project_end_date,
                      max_tag_life_days,
                      use_events, 
                      consider_dam_operations,
                      output_plot_directory,
                      generate_event_plots,
                      write_clean_data,
                      generate_clean_plots):
    """Wrap generate function to allow parallel processing of each fish."""
    
    ttdf, evdf = generate(fish_id=fish_id,
                          output_directory=output_directory,
                          parquet_raw_dir=parquet_raw_dir,
                          travel_time_template_dtypes=travel_time_template_dtypes,
                          reciever_to_detect_site_dict=reciever_to_detect_site_dict,
                          receiver_to_site_number_dict=receiver_to_site_number_dict,
                          signal_power_threshold_dict=site_to_power_percentile_dict,
                          detect_site_to_abbrev_dict=detect_site_to_abbrev_dict,
                          detection_site_abbrev_list=detection_site_abbrev_list,
                          detect_plot_name_dict=detect_plot_name_dict,
                          site_plot_name_dict=site_plot_name_dict,
                          simulation=simulation,
                          tagging_df=tagging_df,
                          beacon_df=beacon_df,
                          project_end_date=project_end_date,
                          max_tag_life_days=max_tag_life_days,
                          use_events=use_events, 
                          consider_dam_operations=consider_dam_operations,
                          output_plot_directory=output_plot_directory,
                          generate_plots=generate_event_plots,
                          write_clean_data=write_clean_data,
                          generate_clean_plots=generate_clean_plots)

    # save event detection by fish id CSV file
    print("Writing events by fish id...")
    evdf.sort_values(by="date_time").to_csv(os.path.join(output_directory, f"{simulation}/events/events_by_fish_id/{fish_id}.csv"), index=False)

    return ttdf




## Configuration

In [None]:
# project directory
# root_dir = "/Users/d3y010/projects/telemetry/mm_sum_2022"
root_dir = "/Users/mcca512/OneDrive - PNNL/McCann Documents/MMD Adult Study/mmd_sum_2023"

# data directory holding raw data
data_dir = os.path.join(root_dir, "data")

# mitas raw data directory
mitas_dir = os.path.join(data_dir, "mitas")

# orion raw data directory
orion_dir = os.path.join(data_dir, "orion")

# output directory
output_dir = os.path.join(data_dir, "outputs")

# directory to hold formatted raw parquet files for query
parquet_raw_dir = os.path.join(data_dir, "parquet_raw_data")

# project start date
project_start_date = "2023-06-22 08:00:00"

# project end date
project_end_date = "2023-10-17 11:15:00"

# daylight savings time spring start
daylight_savings_time_spring = "2023-03-12 02:00:00"

# maximum tag life days
max_tag_life_days = 93.4

# supporting files
beacon_file = os.path.join(root_dir, "data", "load_db", "tbl_beacons_mmd_summer_20231016.csv")
tagging_file = os.path.join(root_dir, "data", "load_db", "acttagrel_mmd_rt_20231016.xlsx")
# dam_ops_file = os.path.join(root_dir, "data", "load_db", "Hourly_dam_ops_foster_2022_final_091522.csv")

# for reproducibility of any stochastic process
np.random.seed(123)


In [5]:
print(mitas_dir)

NameError: name 'mitas_dir' is not defined

In [2]:
# for use with plot_field = detect_site
detect_plot_name_dict = {1: '01_TW',
                         2: '02_ED1',
                         3: '03_EU2',
                         4: '04_LD1',
                         5: '05_LM2',
                         6: '06_LU3',
                         7: '07_PS'}

# for use with plot field = site_number
site_plot_name_dict= {1: "order-01_TWS",
                      2: "order-02_TWN",
                      3: "order-03_ED1",
                      4: "order-04_EU2",
                      5: "order-05_LD1",
                      6: "order-06_LM2",
                      7: "order-07_LU3",
                      8: "order-08_PSN",
                      9: "order-09_PSS"}

# list of detection sites in travel order
detection_site_abbrev_list = ['tw', 'ed1', 'eu2', 'ld1', 'lm2', 'lu3', 'ps']

# dictionary of detection sites to travel time file abbreviation
detect_site_to_abbrev_dict = {1: 'tw',
                              2: 'ed1',
                              3: 'eu2',
                              4: 'ld1',
                              5: 'lm2',
                              6: 'lu3',
                              7: 'ps'}

# dictionary of data types for the travel time data frame
travel_time_template_dtypes = {
    "fish_id": str,
    "pit_code": str,
    "rel_name": str,
    "lot": int,
    "srr": str,
    "act_datetime": "datetime64[ns]",
    "release_datetime": "datetime64[ns]",
    "hole": str,
    "subhole": str,
    "tw": int,
    "ed1": int,
    "eu2": int,
    "ld1": int,
    "lm2": int,
    "lu3": int,
    "ps": int,
    "tw_f": "datetime64[ns]",
    "tw_l": "datetime64[ns]",
    "ed1_f": "datetime64[ns]",
    "ed1_l": "datetime64[ns]",
    "eu2_f": "datetime64[ns]",
    "eu2_l": "datetime64[ns]",
    "ld1_f": "datetime64[ns]",
    "ld1_l": "datetime64[ns]",
    "lm2_f": "datetime64[ns]",
    "lm2_l": "datetime64[ns]",
    "lu3_f": "datetime64[ns]",
    "lu3_l": "datetime64[ns]",
    "ps_f": "datetime64[ns]",
    "ps_l": "datetime64[ns]",
    "rel_weir_time_s": int,
    "pool_stage": str,
    "censor": int,
    "altered": int,
    "mort_xlat": int,
    "release_stage": str,
    "comments": str
}


## Read in beacon and tagging files

In [3]:
# read in beacon file
beacon_df = pd.read_csv(beacon_file)

# # build a dictionary of receiver id to detect site
reciever_to_detect_site_dict = beacon_df.set_index("receiver_id")["detect_site"].to_dict()

# construct a dictionary of receiver id to site number
receiver_to_site_number_dict = beacon_df.set_index("receiver_id")["site_number"].to_dict()

# read in tagging data
tagging_df = read_tagging_file(tagging_file)

# create a list of valid fish ids to process
fish_array = tagging_df["fish_id"].unique()

# get a dict of PIT tags per fish_id
pit_tag_dict = tagging_df.set_index("fish_id")["pit_code"].to_dict()

# get a dict of SRR per fish_id
srr_dict = tagging_df.set_index("fish_id")["srr"].to_dict()
release_name_dict = tagging_df.set_index("fish_id")["release_location_xlat"].to_dict()

# there is no lot in the tagging file; using srr as a proxy
lot_dict = tagging_df.set_index("fish_id")["srr"].to_dict()

activation_dict = tagging_df.set_index("fish_id")["tag_activation_date_pst"].to_dict()
release_dict = tagging_df.set_index("fish_id")["tag_release_date_pst"].to_dict()
mort_dict = tagging_df.set_index("fish_id")["mort_xlat"].to_dict()


NameError: name 'beacon_file' is not defined

In [6]:
# generate lists of expected frequencies and codes from tagging file
fish_id_array = tagging_df["fish_id"].values
target_frequency_list = np.unique([float(i[:7]) for i in fish_id_array])
target_code_list = np.unique([int(i[-3:]) for i in fish_id_array])

# generate a list of expected site numbers from the beacons file
target_site_number_list = beacon_df["site_number"].unique()


## Convert native input files to parquet files

Once the MITAS and ORION files have been built, they do not need to be built again unless new data is added.

In [20]:
# generate formatted raw files into parquet format for query
mitas_raw_parquet_files = generate_mitas_parquet_files(mitas_dir=mitas_dir,
                                                       target_frequency_list=target_frequency_list,
                                                       target_code_list=target_code_list, 
                                                       output_directory=parquet_raw_dir,
                                                       daylight_savings_time_spring=daylight_savings_time_spring)


100%|███████████████████████████████████████████████████████████████████████████████████| 41/41 [01:10<00:00,  1.71s/it]


In [21]:
# generate formatted raw files into parquet format for query
orion_raw_parquet_files = generate_orion_parquet_files(orion_dir=orion_dir,
                                                       target_frequency_list=target_frequency_list,
                                                       target_code_list=target_code_list, 
                                                       output_directory=parquet_raw_dir)


Text file size: 2882, Hex file size: 3584, Difference (hex - text): 702
Removing files from import.  Please review.


100%|█████████████████████████████████████████████████████████████████████████████████| 100/100 [05:01<00:00,  3.01s/it]


## Create percentile thresholds per receiver id


In [7]:
%%time

# target percentiles to evaluate for signal power threshold; 0.0 threshold is equivalent to no threshold
percentile_list = [0.8]

for percentile in percentile_list:
    
    print(f"Simulating percentile:  {percentile}")

    # all processed files
    glob_path = os.path.join(parquet_raw_dir, "*.parquet")

    # use all detections to generate a target percentile of signal power for each receiver
    sig_df = duckdb.query(f"""SELECT receiver_id, quantile(signal_power, {percentile}) as power FROM '{glob_path}' GROUP BY receiver_id""").df()

    # add detect site
    sig_df["detect_site"] = sig_df["receiver_id"].map(reciever_to_detect_site_dict)

    # change receiver id to site number
    sig_df["site_number"] = sig_df["receiver_id"].map(receiver_to_site_number_dict)

    # keep only valid receivers
    sig_df = sig_df.loc[~sig_df["site_number"].isna()]

    # convert site ids to integers to facilitate lookup
    sig_df["detect_site"] = sig_df["detect_site"].astype(int)
    sig_df["site_number"] = sig_df["site_number"].astype(int)

    # generate a dictionary of site id to its power threshold dictionary
    site_to_power_percentile_dict = sig_df.set_index("site_number")["power"].to_dict()

    # generate a data frame to display the signal power threshold per receiver
    xd = {"site_number": [], "signal_power": [], "percentile": []}
    for x in site_to_power_percentile_dict.keys():

        xd["site_number"].append(x)
        xd["signal_power"].append(site_to_power_percentile_dict[x])
        xd["percentile"].append(int(percentile * 100))

    xf = pd.DataFrame(xd).sort_values(by=["site_number"])
    print("Signal power thresholds:")
    print(xf)
    
    # construct simulation name    
    simulation = f"mm_sum_2022_{int(percentile * 100)}-percentile-power"

    # generate fish list to process
    fish_list = tagging_df["fish_id"].unique()
    
    results = Parallel(n_jobs=-2, backend="loky")(delayed(process_parallel)(fish_id=i,
                                                                            output_directory=output_dir,
                                                                              parquet_raw_dir=parquet_raw_dir,
                                                                              travel_time_template_dtypes=travel_time_template_dtypes,
                                                                              reciever_to_detect_site_dict=reciever_to_detect_site_dict,
                                                                              receiver_to_site_number_dict=receiver_to_site_number_dict,
                                                                              signal_power_threshold_dict=site_to_power_percentile_dict,
                                                                              detect_site_to_abbrev_dict=detect_site_to_abbrev_dict,
                                                                              detection_site_abbrev_list=detection_site_abbrev_list,
                                                                              detect_plot_name_dict=detect_plot_name_dict,
                                                                              site_plot_name_dict=site_plot_name_dict,
                                                                              simulation=simulation,
                                                                              tagging_df=tagging_df,
                                                                              beacon_df=beacon_df,
                                                                              project_end_date=project_end_date,
                                                                              max_tag_life_days=max_tag_life_days,
                                                                              use_events=True, 
                                                                              consider_dam_operations=False,
                                                                              output_plot_directory=os.path.join(output_dir, f"{simulation}/events/events_plots"),
                                                                              generate_event_plots=True,
                                                                              write_clean_data=False,
                                                                              generate_clean_plots=False) for i in fish_list)        

    for index, ttdf in enumerate(results):
        # add fish to travel time output
        if index == 0:
            travel_time_df = ttdf
        else:
            travel_time_df = pd.concat([travel_time_df, ttdf], axis=0)

    # save travel time file
    print("Writing travel time file...")
    travel_time_df.to_csv(os.path.join(output_dir, f"{simulation}/travel_time/travel_time.csv"), index=True)


Simulating percentile:  0.8
Signal power thresholds:
   site_number  signal_power  percentile
3            1           -70          80
1            2           -75          80
0            3           -66          80
8            4           -70          80
4            5           -90          80
2            6           -87          80
5            7           -90          80
7            8           -96          80
6            9           -78          80
Writing travel time file...
CPU times: user 6.86 s, sys: 2.41 s, total: 9.27 s
Wall time: 6min 3s
