# Resusable Digital Work flow
Process GBRMPA control data into a standardised format that researchers have utilised historically. Verification functions were written in consultation with domain experts determine whether a row contains an error and is suitable for use in research. The ecological observations are assigned to the nearest site.

### Load Libraries

In [None]:
%%r
library("tools")
library("installr")
library("sets")
library("methods")
library("rio")
library("dplyr")
library("stringr")
library("fastmatch")
library("lubridate")
library("rlang")
library("purrr")
library("jsonlite")
library("sf")
library("raster")
library("terra")
library("units")
library("tidyverse")
library("tidyr")
library("lwgeom")
library("stars")
library("stringr")
library("future")
library("furrr")
library("foreach")
library("doParallel")
library("digest")

# Define Source Code Functions

In [None]:
# Format the new control data into the stardard legacy format. ONLY REQUIRED IF 
# rio::import() depreciates. 
import_data <- function(data, index=1){
  out <- tryCatch(
    {
      file_name <- tools::file_path_sans_ext(basename(data))
      
      # Opens file and stores information into dataframe. Different file types
      # require different functions to read data
      file_extension <- file_ext(data)
      if (file_extension == 'xlsx'){
        data_tibble <- read_xlsx(data, sheet = index)
        data_tibble_colnames <- colnames(data_tibble)
        data_df <- data.frame(data_tibble)
        colnames(data_df) <- data_tibble_colnames
      } else if (file_extension == 'csv'){
        data_df <- read.csv(data, header = TRUE,  encoding="UTF-8", check.names=FALSE)
      } else {
        data_df <- read.table(file=data, header=TRUE)
      }
      
      column_names <- colnames(data_df)
      column_names <- gsub("\\.", " ", column_names)
      column_names <- gsub("\\s+", " ", column_names)
      colnames(data_df) <- column_names
      
      return(data_df)
    },
    
    error=function(cond) {
      base::message(paste("Cannot read from:", data))
      base::message("Original error message:")
      base::message(cond)
    }
  ) 
  return(out)
}

# get vector of datetime formats to parse
get_datetime_parse_order <- function(){
  order <- c('%Y/%m/%d %I:%M:%S %p','%Y/%m/%d %H:%M:%S', '%d/%m/%Y %I:%M:%S %p', '%d/%m/%Y %H:%M:%S', '%Y/%m/%d %I:%M %p','%Y/%m/%d %H:%M', '%d/%m/%Y %I:%M %p','%d/%m/%Y %H:%M', '%Y-%m-%d %H:%M:%OS%z', '%Y-%m-%d %H:%M:%OS', 'ymd', 'dmy')
  return(order)
}


get_vessel_short_name <- function(string) {
  # Function to extract short name from a single string
  extract_short_name <- function(s) {
    words <- strsplit(s, "\\s+")[[1]]
    first_letters <- substr(words, 1, 1)
    result <- paste(first_letters, collapse = "")
    if (length(words) == 1) {
      result <- words
    }
    return(result)
  }
  
  # If input is a vector, apply extract_short_name to each element
  if (is.vector(string)) {
    sapply(string, extract_short_name)
  } else {
    # If input is a single string, directly apply extract_short_name
    extract_short_name(string)
  }
}

get_file_keyword <- function(string) {
  
  # Convert the input string to lowercase for case-insensitive matching
  string <- tolower(string)
  if (grepl("cull|dive", string)) {
    return("cull")
  }
  if (grepl("manta|surveillance", string)) {
    return("manta_tow")
  }
  if (grepl("rhis|survey|health", string)) {
    return("rhis")
  }
  return(NA)
}


# Function to check and append records to the Vessel table. data_df is expected 
# to have the same column names and order as the table that is being referenced
append_to_table_unique <- function(con, table_name, data_df) {
  
  #check what vessels exist already
  db_df <- dbReadTable(con, table_name, row.names = FALSE)
  unique_df <- distinct(data_df)
  data_to_apppend <- anti_join(unique_df, db_df[,-which(colnames(db_df) %in% "id")])
  if (nrow(data_to_apppend) > 0){
    append_cmd  <-  sqlAppendTable( con = con , table = table_name , values = data_to_apppend , row.names = FALSE )
    dbExecute( conn = con , statement = append_cmd )
  }
}


# get the Ids of rows that meet search criteria in search column 
get_id_by_cell <- function(con, table_name, search_column, search_term) {
  entry_id <- dbGetQuery(con, paste("SELECT id FROM ", table_name,  " WHERE ", search_column, " = '", search_term, "'", sep = ""))$id
  return(entry_id)
}


# retrieve IDs from a database table based on matching rows in an input 
# dataframe, using a left join operation 
get_id_by_row <- function(con, table_name, data_df) {
  db_df <- dbReadTable(con, table_name, row.names = FALSE)
  merged_df <- left_join(data_df, db_df %>% dplyr::select(c(colnames(data_df), id)), by = colnames(data_df))
  return(merged_df$id)
}


# get voyage dates with regex from a vector of strings. This was initially created
# to ingest dates from strings in JSON files that do not have consistent formatting.
get_voyage_dates_strings <- function(strings) {
  
  date_df <- data.frame(start_date = character(), stop_date = character(), stringsAsFactors = FALSE)
  # Extract dates from each string
  for (string in strings) {
    dates <- str_extract_all(string, "\\b\\d{2}/\\d{2}/\\d{4}\\b")[[1]]
    if (length(dates) != 2) {
      dates <- c(NA, NA)
    }
    date_df <- rbind(date_df, setNames(as.data.frame(t(dates)), c("start_date", "stop_date")))
  }
  
  date_df$start_date <- parse_date_time(date_df$start_date, orders = c('dmy', 'ymd'))
  date_df$stop_date <- parse_date_time(date_df$stop_date, orders = c('dmy', 'ymd'))
  
  return(date_df)
}


# Extract required data from Cots Control Centre databases to use as legacy dataset in reusable workflow. 
get_app_data_database <- function(con, control_data_type){
  if(control_data_type == "manta_tow"){
    query <- "
      SELECT manta_tow.date, vessel.name AS vessel_name, voyage.vessel_voyage_number, reef.reef_label, reef.name AS reef_name, manta_tow.start_latitude, manta_tow.stop_latitude, manta_tow.start_longitude, manta_tow.stop_longitude, manta_tow.distance, manta_tow.average_speed, manta_tow.cots, manta_tow.scars, manta_tow.hard_coral, manta_tow.soft_coral, manta_tow.recently_dead_coral, site.name AS site_name, manta_tow.error_flag
      FROM manta_tow
      INNER JOIN voyage ON manta_tow.voyage_id = voyage.id
      INNER JOIN vessel ON voyage.vessel_id = vessel.id
      INNER JOIN site ON manta_tow.site_id = site.id
      INNER JOIN reef ON site.reef_id = reef.id
      "
  } else if (control_data_type == "cull") {
    query <- "
      SELECT cull.date, vessel.name AS vessel_name, voyage.vessel_voyage_number, voyage.start_date, voyage.stop_date, reef.reef_label, reef.name AS reef_name, site.latitude, site.longitude, cull.average_depth, cull.less_than_fifteen_centimetres, cull.fifteen_to_twenty_five_centimetres, cull.twenty_five_to_forty_centimetres, cull.greater_than_forty_centimetres, site.name AS site_name, cull.error_flag, cull.bottom_time
      FROM cull
      INNER JOIN voyage ON cull.voyage_id = voyage.id
      INNER JOIN vessel ON voyage.vessel_id = vessel.id
      INNER JOIN site ON cull.site_id = site.id
      INNER JOIN reef ON site.reef_id = reef.id
      "
  } else {
    ### WRITE QUERY FOR RHIS ###
    query <- ""
  }
  
  result <- dbGetQuery(con, query)
  return(result)
  
}


# Parse datetimes and then extract the time component for a vector of datetimes. 
separate_date_time <- function(date_time){
  is_date_time_na <- is.na(date_time)
  date_time <- parse_date_time(date_time[!is_date_time_na], orders = get_datetime_parse_order())
  time <- format(date_time, "%H:%M:%S")
  return(time)
}

# Extract GBRMPA reef IDs from a vector of strings. 
get_reef_label <- function(names){
  reef_label_pattern <- "\\b(1[0-9]|2[0-9]|10)-\\d{3}[a-z]?\\b"
  reef_labels <- sapply(str_extract(names, reef_label_pattern), toString)
  return(reef_labels)
}

#Aggregates coordinates of ecological observations that requires several trip to
#survey the desired region. This specific iteration of the function utilises the 
#naming convention of the research format. This should be depreciated in future
#in place of a single function for research and app data.
get_start_and_end_coords_research <- function(start_lat, stop_lat, start_long, stop_long){
  output <- get_start_and_end_coords_base(start_lat, stop_lat, start_long, stop_long)
  names(output) <- c("Start Lat", "End Lat", "Start Lng", "End Lng")
  return(output)
}

#Aggregates coordinates of ecological observations that requires several trip to
#survey the desired region. This specific iteration of the function utilises the
#naming convention of the app format. This should be depreciated in future in 
#place of a single function for research and app data.
get_start_and_end_coords_app <- function(start_lat, stop_lat, start_long, stop_long){
  output <- get_start_and_end_coords_base(start_lat, stop_lat, start_long, stop_long)
  names(output) <- c("start_latitude", "stop_latitude", "start_longitude", "stop_longitude")
  return(output)
}

#Aggregates coordinates of ecological observations that requires several trip to 
#survey the desired region. This specific iteration of the function is the base for both data formats.
get_start_and_end_coords_base <- function(start_lat, stop_lat, start_long, stop_long){
  
  start_coords <- cbind(start_long, start_lat)
  start_points <- lapply(1:length(start_lat), function(i) st_point(c(start_long[i], start_lat[i])))
  start_points <- st_sfc(start_points)
  
  stop_coords <- cbind(stop_long, stop_lat)
  stop_points <- lapply(1:length(stop_lat), function(i) st_point(c(stop_long[i], stop_lat[i])))
  stop_points <- st_sfc(stop_points)
  
  distances <- st_distance(start_points,stop_points)
  if(!any(is.na(distances))){
    max_index <- which(distances == max(distances), arr.ind=TRUE)[1,]
    start_lat <-  st_coordinates(start_points[max_index[1]])[2]
    start_long <-  st_coordinates(start_points[max_index[1]])[1]
    stop_lat <-  st_coordinates(stop_points[max_index[2]])[2]
    stop_long <-  st_coordinates(stop_points[max_index[2]])[1]
  } else {
    start_lat <-  NA
    start_long <-  NA
    stop_lat <-  NA
    stop_long <-  NA
  }
  output <- list(start_lat, start_long, stop_lat, stop_long)
  return(output)
}

# Get COTS feeding scar from string decription 
get_feeding_scar_from_description <- function(scar_desctiptions){
  return(tolower(substr(scar_desctiptions, 1, 1)))
}

# Get the worst observed COTS scarring as a string from a vector observed scarring.
get_worst_case_feeding_scar <- function(scars){
  
  pattern <- "a|p|c"
  matches <- str_extract(scars, pattern)
  ordered_scars <- c("a", "p", "c")
  matches <- na.omit(matches)
  numerical_values <- match(matches, ordered_scars)
  max_vlaue <- max(numerical_values)
  return(ordered_scars[max_vlaue])
  
}

# Extract coral cover categories from strings with regex
get_coral_cover <- function(coral){
  pattern <- "(?:[1-9]\\-)|0|(?:(?:[1-9]\\+))"
  matches <- str_extract(coral, pattern)
}

# get median coral cover category for aggregating observations
get_median_coral_cover <- function(coral){
  matches <- get_coral_cover(coral)
  ordered_coral_cover <- c("0", "1-", "1+", "2-", "2+", "3-", "3+", "4-", "4+", "5-", "5+")
  matches <- na.omit(matches)
  numerical_values <- match(matches, ordered_coral_cover)
  median_value <- median(numerical_values)
  return(ordered_coral_cover[median_value])
  
}

# Which rows of a dataframe have information missing in the columns specified.
missing_reef_information <- function(data, columns, test_value = c(NA)) {
  results <- vector(mode = "logical", length = nrow(data))
  for (col_name in columns) {
    col_values <- data[[col_name]]
    results <- results | is.na(col_values) | is.null(col_values) | col_values %in% test_value
  }  
  return(results)
}


assign_missing_site_and_reef <- function(transformed_data_df, serialised_spatial_path, control_data_type){
  # Determines if the dataframe derived from app export is missing information 
  # about the site or reef. Creates a geometry collection with available coordinates
  # and extracts the the missing information from the RDS file. 
  
  crs_ <- st_crs("+proj=longlat +datum=WGS84 +no_defs") 
  if(control_data_type == "manta_tow"){
    data_sf <- get_centroids(transformed_data_df, crs_)
    cols_to_check <- c("Reef ID", "Reef", "Nearest Site")  
    Reef <- "Reef"
  } else if (control_data_type == "cull"){
    data_sf <- st_as_sf(transformed_data_df, coords=c("Longitude", "Latitude"), crs=crs_)
    cols_to_check <- c("Reef ID", "Reef Name", "Site Name") 
    Reef <- "Reef Name"
  } else {
    data_sf <- st_as_sf(transformed_data_df, coords=c("Longitude", "Latitude"), crs=crs_)
    cols_to_check <- c("Reef ID", "Reef Name") 
    Reef <- "Reef Name"
  }
  
  is_missing_reef_information <- missing_reef_information(transformed_data_df, cols_to_check, c("Finding nearest..."))
  if (!any(is_missing_reef_information)) {
    return(transformed_data_df)
  }
  
  site_regions <- readRDS(serialised_spatial_path)
  
  
  for (i in 1:length(site_regions)) {
    current_brick <- site_regions[i][[1]]
    bounding_box <- st_as_sfc(st_bbox(extent(current_brick)))
    st_crs(bounding_box) <- crs_
    is_within_extent <- lengths(st_within(data_sf$geometry, bounding_box)) > 0
    is_within_extent_and_missing_information <- is_within_extent & is_missing_reef_information
    if (any(is_within_extent_and_missing_information)){
      reef_names <- names(site_regions[i])
      if(control_data_type == "manta_tow")
        transformed_data_df$Reef[is_within_extent_and_missing_information] <- reef_names
      site_numbers <- raster::extract(current_brick, data_sf)[is_within_extent_and_missing_information]
      site_names <- site_numbers_to_names(site_numbers, reef_names)
      transformed_data_df$`Nearest Site`[is_within_extent_and_missing_information] <- site_names
    } else if (control_data_type == "cull"){
      transformed_data_df$`Reef Name`[is_within_extent_and_missing_information] <- reef_names
      site_numbers <- raster::extract(current_brick, data_sf)[is_within_extent_and_missing_information]
      site_names <- site_numbers_to_names(site_numbers, reef_names)
      transformed_data_df$`Site Name`[is_within_extent_and_missing_information] <- site_names
    } else if (control_data_type == "rhis"){
      transformed_data_df$`Reef Name`[is_within_extent_and_missing_information] <- reef_names
    }
  }
  
  return(transformed_data_df)
}


site_numbers_to_names <- function(numbers, reef_names){
  # convert site numbers to names
  
  short_hand <- substr(reef_names, start = 1, stop = 3)
  labels <- get_reef_label(reef_names)
  site_names <- paste(toupper(short_hand), labels, numbers, sep="_")
}

# aggregates cull data to the site level for a vessel voyage
aggregate_culls_site_resolution_research <- function(data_df) {
  col_names <- colnames(data_df)
  
  aggregated_data <- data_df  %>%
    group_by(Vessel, Voyage, `Reef ID`, `Site Name`) %>%
    dplyr::summarize(
      `Capture ID` = first(`Capture ID`),
      `Survey Date` = min(`Survey Date`),
      `Reef Name` = first(`Reef Name`), 
      `Vessel` = Vessel, 
      Voyage = Voyage,
      `Site Name` = `Site Name`,
      `Reef ID` = `Reef ID`,
      `Voyage Start` = min(`Voyage Start`),
      `Voyage End` = max(`Voyage End`),
      Latitude = first(Latitude),
      Longitude = first(Longitude),
      Bottomtime = sum(Bottomtime),
      `Depth (meters)` = mean(`Depth (meters)`),
      `Cohort 1 (<15cm)` = sum(`Cohort 1 (<15cm)`),
      `Cohort2 (15-25cm)` = sum(`Cohort2 (15-25cm)`),
      `Cohort3 (25-40cm)` = sum(`Cohort3 (25-40cm)`),
      `Cohort4 (>40cm)` = sum(`Cohort4 (>40cm)`),
      `Nearest Site` = `Nearest Site`,
      error_flag = as.numeric(any(as.logical(error_flag)))
    ) %>%
    dplyr::select(all_of(col_names)) %>%
    dplyr::distinct()
  
  
  return(aggregated_data)
}


aggregate_culls_site_resolution_app <- function(data_df) {
  col_names <- colnames(data_df)
  
  aggregated_data <- data_df  %>%
    group_by(vessel_name, vessel_voyage_number, reef_label, site_name) %>%
    dplyr::summarize(
      date = min(date),
      reef_name = first(reef_name), 
      vessel_name = vessel_name, 
      vessel_voyage_number = vessel_voyage_number,
      reef_label = reef_label,
      start_date = min(start_date),
      stop_date = max(stop_date),
      latitude = first(latitude),
      longitude = first(longitude),
      bottom_time = sum(bottom_time),
      average_depth = mean(average_depth),
      less_than_fifteen_centimeters = sum(less_than_fifteen_centimeters),
      fifteen_to_twenty_five_centimeters = sum(fifteen_to_twenty_five_centimeters),
      twenty_five_to_forty_centimeters = sum(twenty_five_to_forty_centimeters),
      greater_than_forty_centimeters = sum(greater_than_forty_centimeters),
      site_name = site_name,
      error_flag = as.numeric(any(as.logical(error_flag)))
    ) %>%
    dplyr::select(all_of(col_names)) %>%
    dplyr::distinct()
  
  
  return(aggregated_data)
}

  
aggregate_manta_tows_site_resolution_app <- function(data_df) {
  col_names <- colnames(data_df)
  
  aggregated_data <- data_df  %>%
    group_by(vessel_name, vessel_voyage_number, reef_label, site_name) %>%
    dplyr::summarize(
      date = min(date),
      reef_name = first(reef_name), 
      vessel_name = vessel_name, 
      vessel_voyage_number = vessel_voyage_number,
      reef_label = reef_label,
      coords = list(get_start_and_end_coords_app(start_latitude, start_longitude, stop_latitude, stop_latitude)),
      distance = sum(distance),
      average_speed = mean(average_speed),
      cots = sum(cots),
      scars =  get_worst_case_feeding_scar(scars),
      hard_coral = get_median_coral_cover(hard_coral),
      soft_coral = get_median_coral_cover(soft_coral),
      recently_dead_coral = get_median_coral_cover(recently_dead_coral),
      site_name = site_name,
      error_flag = as.numeric(any(as.logical(error_flag))),
      start_date = min(start_date),
      stop_date = min(stop_date)
    ) %>%
    unnest_wider(coords) %>%
    dplyr::select(all_of(col_names)) %>%
    dplyr::distinct()
  
  
  return(aggregated_data)
}


# across(c(Vessel, Voyage, `Reef ID`, `Nearest Site`), first),
aggregate_manta_tows_site_resolution_research <- function(data_df) {
  col_names <- colnames(data_df)
  
  aggregated_data <- data_df %>%
    group_by(Vessel, Voyage, `Reef ID`, `Nearest Site`) %>%
    dplyr::summarize(
      ID = min(ID),
      `Tow date` = min(`Tow date`),
      `Tow Time` = min(`Tow Time`),
      Reef = first(Reef), 
      Vessel = Vessel, 
      Voyage = Voyage,
      `Reef ID` = `Reef ID`,
      coords = list(get_start_and_end_coords_research(`Start Lat`, `Start Lng`, `End Lat`, `End Lng`)),
      `Distance (metres)` = sum(`Distance (metres)`),
      `Average Speed (km/h)` = mean(`Average Speed (km/h)`),
      `COTS Observed` = sum(`COTS Observed`),
      `Feeding Scars` =  get_worst_case_feeding_scar(`Feeding Scars`),
      `Hard Coral` = get_median_coral_cover(`Hard Coral`),
      `Soft Coral` = get_median_coral_cover(`Soft Coral`),
      `Recently Dead Coral` = get_median_coral_cover(`Recently Dead Coral`),
      `Nearest Site` = `Nearest Site`,
      `error_flag` = as.numeric(any(as.logical(`error_flag`)))
    ) %>%
    unnest_wider(coords) %>%
    dplyr::select(all_of(col_names)) %>%
    dplyr::distinct()

  
  return(aggregated_data)
} 


contribute_to_metadata_report <- function(key, data, parent_key=NULL, report_path=NULL){
  if(is.null(report_path)){
    report_path <- find_recent_file("Output\\reports", "Report", "json")
  }
  if (file.exists(report_path)) {
    report <- fromJSON(report_path)
  } else {
    print("report does not exist")
    return(NULL)
  }
  if(!is.null(parent_key)){
    report[[parent_key]][[key]] <- data
  } else {
    report[[key]] <- data
  }
  toJSON(report, pretty = TRUE, auto_unbox = TRUE) %>% writeLines(report_path)
}

separate_control_dataframe <- function(new_data_df, legacy_data_df, has_authorative_ID){
  if(any(grepl("ID", colnames(new_data_df)))){
    ID_col <- colnames(new_data_df)[which(grepl("ID", colnames(new_data_df)))[1]]
  } else {
    ID_col <- ""
  } 
  
  column_names <- colnames(legacy_data_df)
  verified_data_df <- data.frame(matrix(ncol = length(column_names), nrow = 0))
  perfect_duplicates <- data.frame(matrix(ncol = length(column_names), nrow = 0))
  errors_df <- data.frame(matrix(ncol = length(column_names), nrow = 0))
  colnames(verified_data_df) <- column_names 
  colnames(perfect_duplicates) <- column_names 
  
  # If there is a unique ID then perfect duplicates can easily be removed.
  if(has_authorative_ID){
    
    # Find close matches of rows left without an ID and no perfect duplicates. 
    # Rows with a single close match will be considered a discrepancy and then
    # the ID checked against the all the IDs in legacy_data_df to ensure it does 
    # not already exist.
    
    # Determine additional columns required by the new data format and remove 
    # from comparison
    required_columns <- intersect(configuration$mappings$new_fields$field, names(new_data_df))
    
    # save original dataframes for legacy and new data so it can be manipulated
    # without loosing data. Columns will be removed that are not going to be  
    # compared for similarity. A unique identifier for each row will be created 
    # based on data in the row and compared. 
    temp_legacy_df <- legacy_data_df[,-which(colnames(legacy_data_df) %in% required_columns)]
    temp_new_df <- new_data_df[,-which(colnames(new_data_df) %in% required_columns)]
    
    # Create a unique identifier for each row in legacy_data_df and new_data_df
    temp_legacy_df$Identifier <- apply(temp_legacy_df, 1, function(row) paste(row, collapse = "_"))
    temp_new_df$Identifier <- apply(temp_new_df, 1, function(row) paste(row, collapse = "_"))
    
    #find perfect duplicates and add to verified data df
    perfect_duplicates <- legacy_data_df[temp_legacy_df$Identifier %in% temp_new_df$Identifier, ]
    
    # remove identifier columns
    temp_legacy_df$Identifier <- NULL
    temp_new_df$Identifier <- NULL
    
    # find new entries based on whether the ID is present in both dataframes 
    new_entries <- anti_join(new_data_df, legacy_data_df, by=ID_col)
    
    # Determine discrepancies and store both versions of the entries
    non_discrepancy_ids <- c(perfect_duplicates[[ID_col]], new_entries[[ID_col]])
    discrepancies_new <- new_data_df[!(new_data_df[[ID_col]] %in% non_discrepancy_ids),]
    discrepancies_legacy <- legacy_data_df[!(legacy_data_df[[ID_col]] %in% non_discrepancy_ids),]
    discrepancies_legacy <- discrepancies_legacy[discrepancies_legacy[[ID_col]] %in% discrepancies_new[[ID_col]], ]
    verified_discrepancies <- compare_discrepancies_directly(discrepancies_new, discrepancies_legacy)
    
  } else {
    
    # Determine additional columns required by the new data format and remove 
    # from comparison. Even when "Nearest Site" exists in the legacy version we 
    # do not want to compare it to see if it has changed. It is recalculated 
    # every single workflow run through based on the available KML file.
    required_columns <- intersect(c(configuration$mappings$new_fields$field, ID_col), names(new_data_df))
    
    # find close matching rows (distance of two) based on all columns except ID. ID is not 
    # because it will always be null if the data is exported from powerBI. 
    distance <- 3
    
    base::message("Finding close matches...")
    temp_new_df <- new_data_df[ , -which(names(new_data_df) %in% required_columns)]
    temp_legacy_df <- legacy_data_df[ , -which(names(legacy_data_df) %in% required_columns)]
    close_match_rows <- matrix_close_matches_vectorised(temp_legacy_df, temp_new_df, distance)
    
    # There can be many to many perfect matches. This means that there will be 
    # multiple indices referring to the same row for perfect duplicates. 
    # unique() should be utilised when subsetting the input dataframes.
    
    if(nrow(close_match_rows) > 0){
      base::message("Close matches found.")
      separated_close_matches <- vectorised_separate_close_matches(close_match_rows)
      perfect_duplicates <- new_data_df[unique(separated_close_matches$perfect[,2]),]
      new_entries_i <- unique(c(separated_close_matches$discrepancies[,2],separated_close_matches$perfect[,2]))
        
      # This will contain any new entries and any rows that could not be separated
      new_entries <- new_data_df[-new_entries_i,]

      if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
        # Append the warning to an existing matrix 
        new_info_df <- data.frame(
          ID = ifelse(ID_col != "", new_data_df[[ID_col]][new_entries_i],NA),
          index = new_entries_i,
          message = "New Entry to the dataset. Not present in previous dataset."
        )
        duplicate_info_df <- data.frame(
          ID = ifelse(ID_col != "", new_data_df[[ID_col]][separated_close_matches$perfect[,2]],NA),
          index = separated_close_matches$perfect[,2],
          message = "Perfect duplicate. This entry was present in previous dataset."
        )
        discpreancies_info_df <- data.frame(
          ID = ifelse(ID_col != "", new_data_df[[ID_col]][separated_close_matches$discrepancies[,2]],NA),
          index = separated_close_matches$discrepancies[,2],
          message = "Discrepancy. This row has changed since the last dataset"
        )
        contribute_to_metadata_report("New Entry", new_info_df, parent_key = "Data")
        contribute_to_metadata_report("Perfect Duplicate", duplicate_info_df, parent_key = "Data")
        contribute_to_metadata_report("Discrepancies", discpreancies_info_df, parent_key = "Data")
      }

      verified_discrepancies <- compare_discrepancies(new_data_df, legacy_data_df, separated_close_matches$discrepancies)

    } else {
      base::message("No close matches found treating entire dataset as new")
      new_entries <- new_data_df
    }
    
  }
  
  # Given that it is not possible to definitively know if a change / discrepancy 
  # was intentional or not both new and change entries will pass through the 
  # same validation checks and if passed will be accepted as usable and assumed to be . If failed, 
  # assumed to be a QA change. If failed,  the data will be flagged. Failed 
  # discrepancies will check the original legacy entry, which if failed will 
  # be left as is. 
  
  verified_data_df <- rbind(verified_data_df, perfect_duplicates)
  verified_data_df <- rbind(verified_data_df, verified_discrepancies)
  verified_data_df <- rbind(verified_data_df, new_entries)
  
  # merge the verified dataset
  return(verified_data_df)
  
}


separate_new_control_app_data <- function(new_data_df, legacy_data_df){
  new_data_df <- data.frame(new_data_df, check.names = FALSE)
  legacy_data_df <- data.frame(legacy_data_df, check.names = FALSE)
  
  column_names <- colnames(legacy_data_df)
  verified_data_df <- data.frame(matrix(ncol = length(column_names), nrow = 0))
  colnames(verified_data_df) <- column_names 
  
  if(any(grepl("ID", colnames(new_data_df)))){
    temp_new_df <- new_data_df[ , -which(grepl("ID", colnames(new_data_df)))]
    temp_legacy_df <- legacy_data_df[ , -which(grepl("ID", colnames(legacy_data_df)))]
  } else {
    temp_new_df <- new_data_df
    temp_legacy_df <- legacy_data_df
  } 
  
  # find close matching rows (distance of three) based on all columns except ID. ID is not 
  # because it will always be null if the data is exported from powerBI. 
  distance <- 3
 
  close_match_rows <- matrix_close_matches_vectorised(temp_legacy_df, temp_new_df, distance)
  
  # There can be many to many perfect matches. This means that there will be 
  # multiple indices referring to the same row for perfect duplicates. 
  # unique() should be utilised when subsetting the input dataframes.
  if(nrow(close_match_rows) > 0){
    separated_close_matches <- vectorised_separate_close_matches(close_match_rows)
    print(head(separated_close_matches))
    perfect_duplicates <- new_data_df[unique(separated_close_matches$perfect[,2]),]
    new_entries_i <- unique(c(separated_close_matches$discrepancies[,2],separated_close_matches$perfect[,2]))
    
    # This will contain any new entries and any rows that could not be separated
    new_entries <- new_data_df[-new_entries_i,]
  } else {
    new_entries <- new_data_df
  }
  # Given that it is not possible to definitively know if a change / discrepancy 
  # was intentional or not both new and change entries will pass through the 
  # same validation checks and if passed will be accepted as usable and assumed to be . If failed, 
  # assumed to be a QA change. If failed,  the data will be flagged. Failed 
  # discrepancies will check the original legacy entry, which if failed will 
  # be left as is. 
  
  verified_data_df <- rbind(verified_data_df, new_entries)
  return(verified_data_df)
  
}


flag_duplicates <- function(new_data_df){
  # New entries need to be checked for duplicates. If there is more than one
  # duplicate it can be assumed to be an error and the error flag set. This 
  # will use a similar identifier new_entries df. This will only flag the 
  # duplicate versions of the row as an error as it still contains new 
  # information there has just been multiple instances of data entry. 
  # Additionally no new entry should be a duplicate of any "perfect duplicate" 
  # as legitimate duplicates would come from the same source and uploaded at 
  # the same time.
  
  new_data_df$Identifier <- apply(new_data_df[,2:ncol(new_data_df)], 1, function(row) paste(row, collapse = "_"))
  duplicates <- duplicated(new_data_df$Identifier)
  counts <- ave(duplicates, new_data_df$Identifier, FUN = sum)
  is_duplicate <- (counts >= 2 & duplicates)
  new_data_df$error_flag <- ifelse(is_duplicate, 1, new_data_df$error_flag)
  new_data_df$Identifier <- NULL
  
  if (any(is_duplicate)) {
    grandparent <- as.character(sys.call(sys.parent()))[1]
    parent <- as.character(match.call())[1]
    warning <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have been flagged as duplicates", 
                     toString(new_data_df[is_duplicate, 1]), "and the following indexes", toString((1:nrow(new_data_df))[is_duplicate]))
    base::message(warning)
    if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
      # Append the warning to an existing matrix 
      warnings <- data.frame(
        ID = new_data_df[is_duplicate, 1],
        index = which(is_duplicate),
        message = "flagged as duplicates"
      )
      contribute_to_metadata_report("Duplicates", warnings, parent_key = "Warning")
    }
  }
  return(new_data_df)
}


compare_discrepancies <- function(new_data_df, legacy_data_df, discrepancies){
  # compare the rows identified as discrepancies from the new and legacy 
  # dataframes. Most changes should be QA and either still meet the requirements 
  # or now meet the requirements and hence will not be flagged as an error. 
  # However a minor number of cases a row is mistakingly changed to no longer be
  # useable, when this occurs the original row will be used implace of the new 
  # one. 
  
  legacy_error_flag <- legacy_data_df[discrepancies[,1], "error_flag"]
  new_error_flag <- new_data_df[discrepancies[,2], "error_flag"]
  
  is_legacy_rows <- ifelse((new_error_flag == 1) & (legacy_error_flag == 0), 1, 0)
  is_legacy_rows <- as.logical(is_legacy_rows)
  output <- rbind(new_data_df[discrepancies[!is_legacy_rows,2],], legacy_data_df[discrepancies[is_legacy_rows,1],])
  return(output)
  
}

compare_discrepancies_directly <- function(new_data_df, legacy_data_df){
  # compare the rows identified as discrepancies from the new and legacy 
  # dataframes. Most changes should be QA and either still meet the requirements 
  # or now meet the requirements and hence will not be flagged as an error. 
  # However a minor number of cases a row is mistakingly changed to no longer be
  # useable, when this occurs the original row will be used implace of the new 
  # one. 
  
  legacy_error_flag <- legacy_data_df[, "error_flag"]
  new_error_flag <- new_data_df[, "error_flag"]
  
  is_legacy_rows <- ifelse((new_error_flag == 1) & (legacy_error_flag == 0), 1, 0)
  is_legacy_rows <- as.logical(is_legacy_rows)
  output <- rbind(new_data_df[!is_legacy_rows,], legacy_data_df[is_legacy_rows,])
  return(output)
  
}

vectorised_separate_close_matches <- function(close_match_rows){
  # Separate the close matching rows with a vectorized process. Duplicates in 
  # columns of `close_match_rows` indicates that there are multiple close 
  # matches between a row(s) in dataframe and and row(s) in y. This process 
  # utilised logical checks on vectors or matrices so Boolean operations can be 
  # utilsied to separate the rows. Although this does reduce readability, the 
  # reduced computational time of two or three orders of magnitude was deemed a 
  # worthy trade off. 
  
  # Order of matches handled
  # 1) Handle rows with one-one close matches 
  # 2) Handle rows with many-many perfect matches 
  # 3) Handle rows with one-many perfect matches
  # 4) Handle rows with one-many nonperfect matches
  # 5) Handle rows with many-many nonperfect matches
  # 6) Handle rows with one-one and one-many nonperfect matches
  
  # This order MATTERS. This is NOT a definitive process and therefore the order
  # needs to maximize the probability that a row from the new data set is 
  # matched with one from the previous data set. One to one matches and
  # many-many perfect matches are the most likely to be correct and therefore 
  # are removed first. It is important that the next matches handled are
  # one-many. This is to ensure a match is found for the "one", as its most 
  # likely match is one of the many with the smallest distance. Any rows with 
  # those indices can then be removed to prevent double handling. Once many-many 
  # rows have been handled, one-one or one-many relationships may have been 
  # formed and therefore can be handled repetitively until all matches have been 
  # found or no more can be found
  
  
  ### ---------- 
  #Initialise variables 
  
  x_df_col <- 1
  y_df_col <- 2
  distance <- max(close_match_rows[,3])
  
  # initialise variables to store indices of rows after seperation
  discrepancies_indices <<- matrix(,ncol=3, nrow =0)
  perfect_duplicate_indices <<- matrix(,ncol=3, nrow =0)
  error_indices <<- matrix(,ncol=3, nrow =0)
  mistake_duplicates <- matrix(,ncol=3, nrow =0)
  one_to_many_e <- matrix(,ncol=3, nrow =0)
  
  ### ---------- 
  #one-one close matches
  
  close_match_rows_updated <- find_one_to_one_matches(close_match_rows)
  
  ### ---------- 
  #one-many perfect matches
  
  x_updated_dup_indices <- (duplicated(close_match_rows_updated[,x_df_col])|duplicated(close_match_rows_updated[,x_df_col], fromLast=TRUE))
  y_updated_dup_indices <- (duplicated(close_match_rows_updated[,y_df_col])|duplicated(close_match_rows_updated[,y_df_col], fromLast=TRUE))
  
  # condition finds rows in `close_match_rows_updated` where only one column is a duplicate
  perfect_one_to_many <- !(y_updated_dup_indices & x_updated_dup_indices) & (y_updated_dup_indices | x_updated_dup_indices) & (close_match_rows_updated[,3] == 0)
  
  # Only select one match for a row
  perfect_one_to_many_matches <- close_match_rows_updated[perfect_one_to_many,, drop = FALSE]
  x_indices <- perfect_one_to_many_matches[,x_df_col]
  y_indices <- perfect_one_to_many_matches[,y_df_col]
  x_dup_indices <- (duplicated(x_indices)|duplicated(x_indices, fromLast=TRUE))
  y_dup_indices <- (duplicated(y_indices)|duplicated(y_indices, fromLast=TRUE))
  dup_indices <- (y_dup_indices | x_dup_indices)
  non_dup_indices <- !dup_indices
  
  perfect_duplicate_indices <- rbind(perfect_duplicate_indices, perfect_one_to_many_matches[non_dup_indices,])
  error_indices <- rbind(error_indices, perfect_one_to_many_matches[dup_indices,])
  
  # remove checked rows
  close_match_rows_updated <- close_match_rows_updated[-unique(c(which(close_match_rows_updated[,x_df_col] %fin% x_indices), which(close_match_rows_updated[,y_df_col] %fin% y_indices))),]
  
  ### ---------- 
  #Re-check one-one close matches
  close_match_rows_updated <- find_one_to_one_matches(close_match_rows_updated)
  
  ### ---------- 
  #many-many perfect matches
  
  # initialize variables for many to many separation
  one_to_one_indices <- c()
  one_to_many_indices <- c()
  many_to_many_indices<- c()
  mistake_duplicates_indices <- c()
  
  # Boolean logic to subset many-many perfect matches from the `close_match_rows`
  # matrix
  is_perfect_match <- close_match_rows_updated[,3] == 0
  x_updated_dup_indices <- (duplicated(close_match_rows_updated[,x_df_col])|duplicated(close_match_rows_updated[,x_df_col], fromLast=TRUE))
  y_updated_dup_indices <- (duplicated(close_match_rows_updated[,y_df_col])|duplicated(close_match_rows_updated[,y_df_col], fromLast=TRUE))
  many_to_many <- (y_updated_dup_indices & x_updated_dup_indices) 
  many_to_many_with_perfect_match <- many_to_many & is_perfect_match
  
  # many-many with perfect matches have three possible scenarios. 
  # x_df_col) A single perfect matching row with multiple close matches
  # 2) Two perfect matching rows (This is considered coincidental and both are
  #     treated as unique)
  # 3) more than two perfect matching rows (This is considered a mistake and 
  #     flagged)
  # All scenarios need to be checked against.
  
  # update the relevant matrices if matches are found. The indices with 
  # multiple perfect matches may also have other matches that are nonperfect 
  # and therefore need to be filtered again to ensure only perfect is 
  # considered
  
  if(any(many_to_many_with_perfect_match)){
    
    many_to_many_with_perfect_match_entries <- close_match_rows_updated[many_to_many_with_perfect_match,, drop = FALSE]
    
    x_dup_indices <- (duplicated(many_to_many_with_perfect_match_entries[,x_df_col])|duplicated(many_to_many_with_perfect_match_entries[,x_df_col], fromLast=TRUE))
    y_dup_indices <- (duplicated(many_to_many_with_perfect_match_entries[,y_df_col])|duplicated(many_to_many_with_perfect_match_entries[,y_df_col], fromLast=TRUE))
    
    
    many_to_many_i <- x_dup_indices & y_dup_indices
    one_to_many_i <- !(x_dup_indices & y_dup_indices) & (y_dup_indices | x_dup_indices)
    one_to_one_i <- !(x_dup_indices | y_dup_indices)
    
    # argument , drop = FALSE keeps as a matrix even if it is only a single row. 
    many_to_many_e <- many_to_many_with_perfect_match_entries[many_to_many_i,, drop = FALSE]
    one_to_many_e <- many_to_many_with_perfect_match_entries[one_to_many_i,, drop = FALSE]
    one_to_one_e <- many_to_many_with_perfect_match_entries[one_to_one_i,, drop = FALSE]
    
    # separate groups of many to many matches. There should not be one to many 
    # relationship of perfect matches, if there is there are a combination of 
    # discrepancies which will not be able to be definitively determined and 
    # therefore will be set as new entries. 
    
    if(nrow(many_to_many_e) > 0){
      
      my_df_colm_split <- lapply(split(many_to_many_e[,x_df_col:y_df_col], many_to_many_e[,y_df_col]), matrix, ncol=y_df_col)
      
      groups <- replicate(length(my_df_colm_split), vector(), simplify = FALSE)
      group <- 0
      output_rec_group <- rec_group(my_df_colm_split, groups, group)
      
      # Now that the many to many perfect matches are grouped, if they are
      # genuine perfect matches with no mistakes there will be as many duplicate
      # vectors as their are matching rows. Any with more than 2 matching rows 
      # as previously discussed will be removed and considered mistake 
      # duplicates. A pair of perfect duplicates will be considered a 
      # coincidence and kept 
      
      perfect_matching_many_to_many_rows <- (duplicated(output_rec_group)|duplicated(output_rec_group, fromLast=TRUE))
      too_many_matches <- unlist(lapply(output_rec_group, function(x) length(x) > 2))
      too_many_matches_indices <- unique(unlist(output_rec_group[too_many_matches]))
      
      too_few_matches <- unlist(lapply(output_rec_group, function(x) length(x) < 2))
      too_few_matches_indices <- unique(unlist(output_rec_group[too_few_matches]))
      
      new_many_to_many <- unique(many_to_many_e[,y_df_col])
      incorrect_many_to_many <- new_many_to_many[new_many_to_many %fin% too_few_matches_indices | new_many_to_many %fin% too_many_matches_indices]
      correct_many_to_many <- new_many_to_many[!(new_many_to_many %fin% too_few_matches_indices | new_many_to_many %fin% too_many_matches_indices)]
      
      
      mistake_duplicate_manye_indices <- which(many_to_many_e[,y_df_col] %fin% incorrect_many_to_many)
      mistake_duplicates <- many_to_many_e[mistake_duplicate_manye_indices,]
      mistake_duplicates_indices <- unique(c(which(close_match_rows_updated[,y_df_col] %fin% mistake_duplicates[,y_df_col]), which(close_match_rows_updated[,x_df_col] %fin% mistake_duplicates[,x_df_col])))
      many_to_many_e <- many_to_many_e[-mistake_duplicate_manye_indices,]
      many_to_many_indices <- unique(c(which(close_match_rows_updated[,y_df_col] %fin% many_to_many_e[,y_df_col]), which(close_match_rows_updated[,x_df_col] %fin% many_to_many_e[,x_df_col])))
      
    }
    
    if(nrow(one_to_many_e) > 0){ 
      one_to_many_indices <- unique(c(which(close_match_rows_updated[,y_df_col] %fin% one_to_many_e[,y_df_col]), which(close_match_rows_updated[,x_df_col] %fin% one_to_many_e[,x_df_col])))
    
      }
        
    one_to_one_indices <- unique(c(which(close_match_rows_updated[,y_df_col] %fin% one_to_one_e[,y_df_col]), which(close_match_rows_updated[,x_df_col] %fin% one_to_one_e[,x_df_col])))
    
    perfect_duplicate_indices <- rbind(perfect_duplicate_indices, one_to_one_e, many_to_many_e)
    error_indices <- rbind(error_indices, mistake_duplicates, one_to_many_e)
    
    # remove rows that have already been handled to prevent double handling.
    close_match_rows_updated <- close_match_rows_updated[-unique(c(one_to_one_indices, one_to_many_indices, many_to_many_indices, mistake_duplicates_indices)),]
    
  }
  
  ### ---------- 
  #Re-check one-one close matches
  close_match_rows_updated <- find_one_to_one_matches(close_match_rows_updated)
  
  ### ---------- 
  #one-many nonperfect matches
  
  # Iterates starting with matches of the closest distance to ensure they are 
  # given priority. 
  for(i in 1:distance){
    x_dup_indices <- (duplicated(close_match_rows_updated[,x_df_col])|duplicated(close_match_rows_updated[,x_df_col], fromLast=TRUE))
    y_dup_indices <- (duplicated(close_match_rows_updated[,y_df_col])|duplicated(close_match_rows_updated[,y_df_col], fromLast=TRUE))
    
    # condition finds rows in `close_match_rows_updated` where only one column is a duplicate
    one_to_many <- !(y_dup_indices & x_dup_indices) & (y_dup_indices | x_dup_indices) & close_match_rows_updated[,3] == i
    
    # Check that the same row is not being matched to multiple. If they are 
    # them and they will be handled at the end by being treated as new rows
    matches <- close_match_rows_updated[one_to_many,]
    if(!is.null(nrow(matches))){
      match_x_dup_indices <- (duplicated(matches[,x_df_col])|duplicated(matches[,x_df_col], fromLast=TRUE))
      match_y_dup_indices <- (duplicated(matches[,y_df_col])|duplicated(matches[,y_df_col], fromLast=TRUE))
      discrepancies_indices <- rbind(discrepancies_indices, matches[!(match_y_dup_indices | match_x_dup_indices),])
      error_indices <- rbind(error_indices, matches[(match_y_dup_indices | match_x_dup_indices),])
      
      # remove checked rows and any that have already been matched. 
      close_match_rows_updated <- close_match_rows_updated[-unique(c(which(close_match_rows_updated[,x_df_col] %fin% matches[,x_df_col]), which(close_match_rows_updated[,y_df_col] %fin% matches[,y_df_col]))),]
    }
  }
  
  ### ---------- 
  #Re-check one-one close matches
  close_match_rows_updated <- find_one_to_one_matches(close_match_rows_updated)
  
  ### ---------- 
  #many-many nonperfect matches
  
  x_dup_indices <- (duplicated(close_match_rows_updated[,x_df_col])|duplicated(close_match_rows_updated[,x_df_col], fromLast=TRUE))
  y_dup_indices <- (duplicated(close_match_rows_updated[,y_df_col])|duplicated(close_match_rows_updated[,y_df_col], fromLast=TRUE))
  
  
  # Only many-to-many non perfect matches are left and need to be handled. 
  for(i in 1:distance){
    if(nrow(close_match_rows_updated) > 0){
      many_to_man_with_nonperfect_match <- y_dup_indices & x_dup_indices & (close_match_rows_updated[,3] == i)
      many_to_many_with_nonperfect_match_entries <- close_match_rows_updated[many_to_man_with_nonperfect_match,]
      
      x_dup_indices <- (duplicated(many_to_many_with_nonperfect_match_entries[,x_df_col])|duplicated(many_to_many_with_nonperfect_match_entries[,x_df_col], fromLast=TRUE))
      y_dup_indices <- (duplicated(many_to_many_with_nonperfect_match_entries[,y_df_col])|duplicated(many_to_many_with_nonperfect_match_entries[,y_df_col], fromLast=TRUE))
      
      
      many_to_many_i <- x_dup_indices & y_dup_indices
      one_to_many_i <- !(y_updated_dup_indices & x_updated_dup_indices) & (y_updated_dup_indices | x_updated_dup_indices)
      one_to_one_i <- !(y_updated_dup_indices | x_updated_dup_indices)
      
      
      many_to_many_e <- many_to_many_with_nonperfect_match_entries[many_to_many_i,,drop = FALSE]
      one_to_many_e <- many_to_many_with_nonperfect_match_entries[one_to_many_i,,drop = FALSE]
      one_to_one_e <- many_to_many_with_nonperfect_match_entries[one_to_one_i,,drop = FALSE]
      
      if(sum(nrow(many_to_many_e), nrow(one_to_many_e), nrow(one_to_one_e)) > 0){
        
        
        many_to_many_indices <- unique(c(which(close_match_rows_updated[,y_df_col] %fin% many_to_many_e[,y_df_col]), which(close_match_rows_updated[,x_df_col] %fin% many_to_many_e[,x_df_col])))
        one_to_many_indices <- unique(c(which(close_match_rows_updated[,y_df_col] %fin% one_to_many_e[,y_df_col]), which(close_match_rows_updated[,x_df_col] %fin% one_to_many_e[,x_df_col])))
        one_to_one_indices <- unique(c(which(close_match_rows_updated[,y_df_col] %fin% one_to_one_e[,y_df_col]), which(close_match_rows_updated[,x_df_col] %fin% one_to_one_e[,x_df_col])))
        
        perfect_duplicate_indices <- rbind(perfect_duplicate_indices, one_to_one_e)
        error_indices <- rbind(error_indices, one_to_many_e, many_to_many_e)
        
        # remove rows that have already been handled to prevent double handling.
        close_match_rows_updated <- close_match_rows_updated[-unique(c(one_to_one_indices, one_to_many_indices, many_to_many_indices)),]
      }
    } else{
      next
    }
  }
  ### ---------- 
  #one-one and one-many nonperfect matches 
  
  # Handling one to many relationship or many to many with discrepancies may  
  # have left a number of one to one or one to many rows. This will be iterative 
  # until none remain or no further matches can be found. Anything left will be assigned to  
  # new entry to ensure that it is processed correctly.
  is_unmatched <- nrow(close_match_rows_updated) > 0 
  while(is_unmatched){
    count <- nrow(close_match_rows_updated)
    
    close_match_rows_updated <- find_one_to_one_matches(close_match_rows)
    
    # condition finds rows in `close_match_rows_updated` where only one column is a duplicate
    for(i in 1:distance){
      if(nrow(close_match_rows_updated) == 0){
        break
      }
      x_dup_indices <- (duplicated(close_match_rows_updated[,x_df_col])|duplicated(close_match_rows_updated[,x_df_col], fromLast=TRUE))
      y_dup_indices <- (duplicated(close_match_rows_updated[,y_df_col])|duplicated(close_match_rows_updated[,y_df_col], fromLast=TRUE))
      one_to_many <- !(y_dup_indices & x_dup_indices) & (y_dup_indices | x_dup_indices) & close_match_rows_updated[,3] == i
      
      # Check that the same row is not being matched to multiple. If they are 
      # them and they will be handled at the end by being treated as new rows
      matches <- close_match_rows_updated[one_to_many,]
      if(nrow(matches) > 0){
        match_x_dup_indices <- (duplicated(matches[,x_df_col])|duplicated(matches[,x_df_col], fromLast=TRUE))
        match_y_dup_indices <- (duplicated(matches[,y_df_col])|duplicated(matches[,y_df_col], fromLast=TRUE))
        discrepancies_indices <- rbind(discrepancies_indices, matches[!(match_y_dup_indices | match_x_dup_indices),])
        error_indices <- rbind(error_indices, matches[(match_y_dup_indices | match_x_dup_indices),])
        
        # remove checked rows and any that have already been matched. 
        close_match_rows_updated <- close_match_rows_updated[-unique(c(which(close_match_rows_updated[,x_df_col] %fin% matches[,x_df_col]), which(close_match_rows_updated[,y_df_col] %fin% matches[,y_df_col]))),]
        
      }
      is_unmatched <- !((count - nrow(close_match_rows_updated) == 0) | nrow(close_match_rows_updated) == 0)
      
    }
  }
  # check for mistakes 
  is_mistake_present <- check_for_mistake(control_data_type)
  
  output <- list(discrepancies_indices, perfect_duplicate_indices, error_indices)
  names(output) <- c("discrepancies", "perfect", "error") 
  return(output)
}

check_for_mistake <- function(control_data_type){
  x_df_col <- 1
  y_df_col <- 2
  
  # Check that there are no duplicates between data frames
  elements_count_x <- table(c(unique(discrepancies_indices[,x_df_col]),unique(perfect_duplicate_indices[,x_df_col]),unique(error_indices[,x_df_col])))
  common_x_values <- names(elements_count_x[elements_count_x >= 2])
  
  elements_count_y <- table(c(unique(discrepancies_indices[,y_df_col]),unique(perfect_duplicate_indices[,y_df_col]),unique(error_indices[,y_df_col])))
  common_y_values <- names(elements_count_y[elements_count_y >= 2])
  
  if (any(common_y_values == 0)| any(common_x_values == 0)) {
    grandparent <- as.character(sys.call(sys.parent()))[1]
    parent <- as.character(match.call())[1]
    warning <- paste("Warning in", parent , "within", grandparent, "- The rows were not correctly separated and the following row indexes were labeled as a `Perfect Duplicate`, `Discrepancy` and/or an `Error`:", 
                     "New Data Indices (Y):", toString(common_y_values), "Legacy Data Indices (X):", toString(common_x_values))
    base::message(warning)
    
    if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
      # Append the warning to an existing matrix 
      warnings <- data.frame(
        index_new = common_y_values,
        index_legacy = common_x_values,
        message = "Row with listed indices were incorrectly separated"
      )
      contribute_to_metadata_report("Separation", warnings, parent_key = "Warning")
    }
    
  }
}


# Run all verification functions on data sets. All verification functions are 
# within try catch to ensure that a fatal error will not break the workflow. 
verify_entries <- function(data_df, configuration){
  control_data_type <- configuration$metadata$control_data_type
  base::message("HEREREREEEEEEEEEEEEEEEEEE")
  data_df <- verify_integers_positive(data_df)
  base::message("HEREREREEEEEEEEEEEEEEEEEE")
  data_df <- verify_reef(data_df)
  base::message("HEREREREEEEEEEEEEEEEEEEEE")
  data_df <- verify_percentages(data_df)
  base::message("HEREREREEEEEEEEEEEEEEEEEE")
  #verify long and lat separately
  data_df <- verify_lat_lng(data_df, max_val=160, min_val=138, columns=c("Longitude", "Start Lng", "End Lng"))
  data_df <- verify_lat_lng(data_df, max_val=-5, min_val=-32, columns=c("Latitude", "Start Lat", "End Lat"))
  base::message("HEREREREEEEEEEEEEEEEEEEEE")
  if (control_data_type == "manta_tow") {
    data_df <- verify_tow_date(data_df)
    data_df <- verify_coral_cover(data_df)
    data_df <- verify_scar(data_df)
  } else if (control_data_type == "cull") {
    data_df <- verify_voyage_dates(data_df)
  } else if (control_data_type == "RHISS") {
    data_df <- verify_RHISS(data_df)
  } 
  data_df <- verify_na_null(data_df, configuration)
  data_df <- verify_available_columns(data_df, configuration)
  data_df$error_flag <- as.integer(data_df$error_flag)
  return(data_df)
}


verify_available_columns <- function(data_df, configuration){
  
  tryCatch({
    transformations <- configuration$mappings$transformations
    nonexempt_cols <- transformations[transformations$verify_na_exempt == FALSE, "target_field"]
    is_nonexempt_cols_available <- !all(nonexempt_cols %in% colnames(data_df))
    if (is_nonexempt_cols_available){
      data_df[["error_flag"]] <- as.integer(is_nonexempt_cols_available)
    }
    
    if (is_nonexempt_cols_available) {
      errors <- data.frame(
        verification_function = "is_nonexempt_cols_available",
        message = as.character(e)
      )
      contribute_to_metadata_report("is_nonexempt_cols_available", errors, parent_key = "Columns missing")
    }
  }, error = function(e){
    errors <- data.frame(
      verification_function = "verify_available_columns",
      message = as.character(e)
    )
    contribute_to_metadata_report("verify_available_columns", errors, parent_key = "Error In Verification")
    data_df$error_flag <<- 1
  })

  return(data_df)
}


verify_lat_lng <- function(data_df, max_val, min_val, columns){
  tryCatch({
    for (col in columns) {
      if (col %in% colnames(data_df)) {
        values <- data_df[[col]]
        out_of_range <- values < min_val | values > max_val
        
        # Set any NA values to TRUE as the check was unable to be completed 
        # correctly
        out_of_range <- ifelse(is.na(out_of_range), FALSE, out_of_range)
        data_df[["error_flag"]] <- as.integer(data_df[["error_flag"]] | out_of_range)
        
        if (any(out_of_range)) {
          grandparent <- as.character(sys.call(sys.parent()))[1]
          parent <- as.character(match.call())[1]
          warning <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have inappropriate LatLong coordinates:", 
                           toString(data_df[out_of_range, 1]), "and the following indexes", toString((1:nrow(data_df))[out_of_range]))
          base::message(warning)
          
          if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
            # Append the warning to an existing matrix 
            warnings <- data.frame(
              ID = data_df[out_of_range, 1],
              index = which(out_of_range),
              message = "Inappropriate Lat Long values"
            )
            contribute_to_metadata_report("Coordinates", warnings, parent_key = "Warning")
          }
        }
      }
    }
  }, error = function(e){
    errors <- data.frame(
      verification_function = "verify_lat_lng",
      message = as.character(e)
    )
    contribute_to_metadata_report("verify_lat_lng", errors, parent_key = "Error In Verification")
    data_df$error_flag <<- 1
  })
  return(data_df)
}


verify_scar <- function(data_df) {
  tryCatch({
    # check that columns in RHISS data contain expected values according to metadata
    valid_scar <- c("a", "p", "c")
    
    col_names <- colnames(data_df)
    col_names <- tolower(col_names)
    search_word <- "scar"
    matching_columns <- col_names[grepl(search_word, col_names)]
    
    for (col in matching_columns) {
      check_valid_scar <- data_df[[col]] %in% valid_scar
      is_not_valid <- ifelse(is.na(check_valid_scar), TRUE, check_valid_scar)
      is_not_valid <- !is_not_valid
      data_df[["error_flag"]] <- as.integer(data_df[["error_flag"]] | is_not_valid)
      
      if (any(is_not_valid )) {
        grandparent <- as.character(sys.call(sys.parent()))[1]
        parent <- as.character(match.call())[1]
        warning <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have invalid COTS scar:",
                         toString(data_df[is_not_valid , 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[is_not_valid]))
        base::message(warning)
        
        
        if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
          # Append the warning to an existing matrix 
          warnings <- data.frame(
            ID = data_df[is_not_valid, 1],
            index = which(is_not_valid),
            message = "Invalid COT Scar"
          )
          contribute_to_metadata_report(col, warnings, parent_key = "Warning")
        }
        
      }
    }
    
  }, error = function(e){
    errors <- data.frame(
      verification_function = "verify_scar",
      message = as.character(e)
    )
    contribute_to_metadata_report("verify_scar", errors, parent_key = "Error In Verification")
    data_df$error_flag <<- 1
  })
  return(data_df)
}

verify_tow_date <- function(data_df){
  tryCatch({
    # Approximate a tow date based on vessel and voyage if it does not exist. 
    
    tow_date <- data_df[["Tow date"]]
    incomplete_dates <- unique(which(is.na(tow_date)))
    if (length(incomplete_dates) > 0){
      incomplete_date_rows <- data_df[incomplete_dates,]
      vessel_voyage <- unique(incomplete_date_rows[,which(names(incomplete_date_rows) %in% c("Vessel", "Voyage"))])
      for(i in 1:nrow(vessel_voyage)){
        data_df_filtered <- data_df[(data_df$Vessel == vessel_voyage[i,1]) & (data_df$Voyage == vessel_voyage[i,2]),]
        is_any_voyage_date_correct <- any(!is.na(data_df_filtered$`Tow date`))
        if(is_any_voyage_date_correct){
          correct_date <- data_df_filtered[!(is.na(data_df_filtered$`Tow date`) | is.null(data_df_filtered$`Tow date`)),][1,"Tow date"]
          data_df[(data_df$Vessel == vessel_voyage[i,1]) & (data_df$Voyage == vessel_voyage[i,2]) & (is.na(data_df$`Tow date`)), "Tow date"] <- correct_date
        } else if(!is_any_voyage_date_correct & !is_any_survey_date_correct) {
          data_df[(data_df$Vessel == vessel_voyage[i,1]) & (data_df$Voyage == vessel_voyage[i,2]) & (is.na(data_df$`Tow date`)), c("error_flag")] <- 1
          
        }
      }
      
    }
    
    
    
    post_estimation_tow_dates <- data_df[["Tow date"]]
    na_present <- (is.na(post_estimation_tow_dates) | is.null(post_estimation_tow_dates)) & (is.na(tow_date) | is.null(tow_date))
    dated_estimated <- !(is.na(post_estimation_tow_dates) | is.null(post_estimation_tow_dates)) & (is.na(tow_date) | is.null(tow_date))
    
    if (any(dated_estimated | na_present)) {
      grandparent <- as.character(sys.call(sys.parent()))[1]
      parent <- as.character(match.call())[1]
      warning <- c()
      if (any(dated_estimated)) {
        warning1 <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have their tow date estimated from their vessel",
                          toString(data_df[dated_estimated , 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[dated_estimated]))
        base::message(warning1)
      }
      if (any(na_present)) {
        warning2 <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have no tow date",
                          toString(data_df[na_present , 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[na_present]))
        base::message(warning2)
      }  
      
      
      if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
        # Append the warning to an existing matrix 
        warnings <- data.frame(
          ID = data_df[dated_estimated, 1],
          index = which(dated_estimated),
          message = "Invalid Tow Date. Date was successfully estimated."
        )
        contribute_to_metadata_report("Estimated Tow Date", warnings, parent_key = "Warning")
        warnings <- data.frame(
          ID = data_df[na_present, 1],
          index = which(na_present),
          message = "Invalid Tow Date. "
        )
        contribute_to_metadata_report("Invalid Tow Date", warnings, parent_key = "Warning")
      }
      
    }
  }, error = function(e){
    errors <- data.frame(
      verification_function = "verify_tow_date",
      message = as.character(e)
    )
    contribute_to_metadata_report("verify_tow_date", errors, parent_key = "Error In Verification")
    data_df$error_flag <<- 1
  })
  return(data_df)
}


verify_RHISS <- function(data_df) {
  tryCatch({
    # check that columns in RHISS data contain expected values according to metadata
    valid_tide <- c("L", "M", "H")
    check_tide <- data_df$`Tide` %in% valid_tide
    check_tide <- ifelse(is.na(check_tide), TRUE, check_tide)
    
    cols <- c("Slime Height (cm)", "Entangled/Mat-Like Height (cm)", "Filamentous Height (cm)", "Leafy/Fleshy Height (cm)", "Tree/Bush-Like Height (cm)")
    valid_macroalgae <- c(">25cm", "1 to 3cm", "None", "4 to 25cm")
    available_cols <- cols %in% colnames(data_df)
    if (sum(available_cols) >= 2){
      cols <- cols[available_cols]
      is_valid_macroalgae <- apply(data_df[, cols], 2, function(x) !(x %in% valid_macroalgae))
      is_valid_macroalgae <- ifelse(is.na(is_valid_macroalgae), FALSE, is_valid_macroalgae)
      check_macroalgae <- rowSums(is_valid_macroalgae) > 0
    } else if (sum(available_cols) == 1) {
      cols <- cols[available_cols]
      is_valid_macroalgae <- !(data_df[,cols] %in% valid_macroalgae)
      is_valid_macroalgae <- ifelse(is.na(is_valid_macroalgae), FALSE, is_valid_macroalgae)
      check_macroalgae <- rowSums(is_valid_macroalgae) > 0
    } else {
      check_macroalgae <- 0
    }
  
    
    valid_descriptive_bleach_severity <- c("Totally bleached white", "pale/fluoro (very light or yellowish)", "None", "Bleached only on upper surface", "Pale (very light)/Focal bleaching", "Totally bleached white/fluoro", "Recently dead coral lightly covered in algae")
    bcols <- c("Mushroom Bleach Severity", "Massive Bleach Severity", "Encrusting Bleach Severity", "Vase/Foliose Bleach Severity", "Plate/Table Bleach Severity", "Bushy Bleach Severity", "Branching Bleach Severity")
    available_cols <- bcols %in% colnames(data_df)
    if (sum(available_cols) >= 2){
      bcols <- bcols[available_cols]
      is_valid_descriptive_bleach_severity <- apply(data_df[, bcols], 2, function(x) !(x %in% valid_descriptive_bleach_severity))
      is_valid_descriptive_bleach_severity <- ifelse(is.na(is_valid_descriptive_bleach_severity), FALSE, is_valid_descriptive_bleach_severity)
      check_descriptive_bleach_severity <- rowSums(is_valid_macroalgae) > 0
    } else if (sum(available_cols) == 1) {
      bcols <- bcols[available_cols]
      is_valid_descriptive_bleach_severity <- !(data_df[,bcols] %in% valid_descriptive_bleach_severity)
      is_valid_descriptive_bleach_severity <- ifelse(is.na(is_valid_descriptive_bleach_severity), FALSE, is_valid_descriptive_bleach_severity)
      check_descriptive_bleach_severity <- rowSums(is_valid_descriptive_bleach_severity) > 0
    } else {
      check_descriptive_bleach_severity <- 0
    }
    
    if (all(c("Bleached Average Severity Index (calculated via matrix)") %in% colnames(data_df))){
      bleached_severity <- data_df$`Bleached Average Severity Index (calculated via matrix)`
      check_bleach_severity <- bleached_severity >= 1 & bleached_severity <= 8
      check_bleach_severity <- ifelse(is.na(check_bleach_severity), TRUE, check_bleach_severity)
    } else {
      check_bleach_severity <- 1
    }
  
    
    check <- !check_tide | check_macroalgae | !check_bleach_severity | check_descriptive_bleach_severity
    if (any(check)) {
      grandparent <- as.character(sys.call(sys.parent()))[1]
      parent <- as.character(match.call())[1]
      if (any(!check_tide)) {
        warning1 <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have invalid tide values:",
                          toString(data_df[!check_tide , 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[!check_tide]))
        
      }
      if (any(check_macroalgae)) {
        warning2 <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have invalid macroalgae values:",
                          toString(data_df[check_macroalgae, 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[check_macroalgae]))
      }
      if (any(!check_bleach_severity)) {
        warning3 <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have invalid bleach severity values:",
                          toString(data_df[!check_bleach_severity, 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[!check_bleach_severity]))
      }
      if (any(check_descriptive_bleach_severity)) {
        warning4 <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have invalid descriptive beach severity:",
                          toString(data_df[check_descriptive_bleach_severity , 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[check_descriptive_bleach_severity ]))
      }
      
      if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
        # Append the warning to an existing matrix 
        warnings <- data.frame(
          ID = data_df[!check_tide, 1],
          index = which(!check_tide),
          message = "Invalid tide value"
        )
        contribute_to_metadata_report("Tide", warnings, parent_key = "Warning")
        
        warnings <- data.frame(
          ID = data_df[check_macroalgae, 1],
          index = which(check_macroalgae),
          message = "Invalid macroalgae"
        )
        contribute_to_metadata_report("Macroalgae", warnings, parent_key = "Warning")
        
        warnings <- data.frame(
          ID = data_df[!check_bleach_severity, 1],
          index = which(!check_bleach_severity),
          message = "Invalid Bleach Severity"
        )
        contribute_to_metadata_report("Bleach Severity", warnings, parent_key = "Warning")
        
        warnings <- data.frame(
          ID = data_df[check_descriptive_bleach_severity, 1],
          index = which(check_descriptive_bleach_severity),
          message = "Invalid Bleach Severity Description"
        )
        contribute_to_metadata_report("Bleach Severity Description", warnings, parent_key = "Warning")
      }
      
    }
    data_df[["error_flag"]] <- as.integer(data_df[["error_flag"]] | check)
    
  }, error = function(e){
    errors <- data.frame(
      verification_function = "verify_RHISS",
      message = as.character(e)
    )
    contribute_to_metadata_report("verify_RHISS", errors, parent_key = "Error In Verification")
    data_df$error_flag <<- 1
  })
  return(data_df)
}


update_path <- function(path) {
  # Replace backslashes with forward slashes
  fixed_path <- gsub("\\\\", "/", path)
  # Reduce multiple forward slashes to a single one
  normalized_path <- gsub("/{2,}", "/", fixed_path)
  return(normalized_path)
}


verify_percentages <- function(data_df) {
  tryCatch({
    # Check that all percentages in a row are between 0 and 100
    perc_cols <- grep("%", colnames(data_df))
    if(length(perc_cols) > 0){
      perc_cols_vals <- data_df[, perc_cols]
      col_check <- apply(perc_cols_vals, 2, function(x) x < 0 | x > 100)
      col_check <- ifelse(is.na(col_check), FALSE, col_check)
      check <- rowSums(col_check) > 0
      data_df[["error_flag"]] <- as.integer(data_df[["error_flag"]] | check)
      if (any(check)) {
        grandparent <- as.character(sys.call(sys.parent()))[1]
        parent <- as.character(match.call())[1]
        warning <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have percentages in an invalid format:",
                         toString(data_df[check , 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[check]))
        base::message(warning)
        
        if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
          # Append the warning to an existing matrix 
          warnings <- data.frame(
            ID = data_df[check, 1],
            index = which(check),
            message = "Invalid Percentages"
          )
          contribute_to_metadata_report("Percentages", warnings, parent_key = "Warning")
          
        }
        
      }
    }
  }, error = function(e){
    errors <- data.frame(
      verification_function = "verify_percentages",
      message = as.character(e)
    )
    contribute_to_metadata_report("verify_percentages", errors, parent_key = "Error In Verification")
    data_df$error_flag <<- 1
  })
  return(data_df)
}


verify_na_null <- function(data_df, configuration) {
  tryCatch({
    # check if there are any values that are NA or NULL and flag those rows as an 
    # error. This does not include new additional columns as they are assigned a
    # default value at the end of the verification process or ID column.

    transformations <- configuration$mappings$transformations
    new_fields <- configuration$mappings$new_fields
    
    nonexempt_existing_cols <- transformations[transformations$verify_na_exempt == FALSE, "target_field"]
    nonexempt_new_cols <- new_fields[new_fields$verify_na_exempt == FALSE, "field"]
    nonexempt_cols <- c(nonexempt_new_cols, nonexempt_existing_cols)
    nonexempt_cols <- nonexempt_cols[nonexempt_cols %in% colnames(data_df)]
    nonexempt_df <- data_df[,nonexempt_cols]
    na_present <- apply(nonexempt_df, 2, function(x) is.na(x) | is.null(x) | x == "")
    na_present <- ifelse(is.na(na_present), TRUE, na_present)
    check <- rowSums(na_present) > 0
    data_df[,"error_flag"] <- as.integer(data_df[,"error_flag"] | check)
    if (any(check)) {
      grandparent <- as.character(sys.call(sys.parent()))[1]
      parent <- as.character(match.call())[1]
      warning <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have missing data:",
                       toString(data_df[check , 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[check]))
      base::message(warning)
      
      if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
        # Append the warning to an existing matrix 
        warnings <- data.frame(
          ID = data_df[check, 1],
          index = which(check),
          message = "NA or Null values present"
        )
        contribute_to_metadata_report("NA|NULL", warnings, parent_key = "Warning")
        
      }
      
    }
    return(data_df)
  }, error = function(e){
    errors <- data.frame(
      verification_function = "verify_na_null",
      message = as.character(e)
    )
    contribute_to_metadata_report("verify_na_null", errors, parent_key = "Error In Verification")
    data_df$error_flag <<- 1
  })
  return(data_df)
}

verify_integers_positive <- function(data_df) {
  tryCatch({
    # R function that verifys all integers are positive values as they represent 
    # real quantities. Note: Whole numbers are not integers, they must be declared
    # as such. All relevant columns were set as integers in the set_data_type 
    # function
    
    is_integer <- sapply(data_df[1,],is.integer)
    if(any(is_integer)){
      check <- apply(data_df[,is_integer, drop = FALSE], 1, function(row) any(row < 0))
      check <- ifelse(is.na(check), FALSE, check)
      data_df[, "error_flag"] <- as.integer(data_df[, "error_flag"] | check)
      
      if (any(check)) {
        grandparent <- as.character(sys.call(sys.parent()))[1]
        parent <- as.character(match.call())[1]
        warning <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have non-positive integer values:",
                         toString(data_df[check , 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[check]))
        base::message(warning)
        
        
        if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
          # Append the warning to an existing matrix 
          warnings <- data.frame(
            ID = data_df[check, 1],
            index = which(check),
            message = "Non-positive integers present"
          )
          contribute_to_metadata_report("Integers", warnings, parent_key = "Warning")
          
        }
      }
    }
  }, error = function(e){
    errors <- data.frame(
      verification_function = "verify_integers_positive",
      message = as.character(e)
    )
    contribute_to_metadata_report("verify_integers_positive", errors, parent_key = "Error In Verification")
    data_df$error_flag <<- 1
  })
  return(data_df)
}

remove_leading_spaces <- function(data_df) {
  # R function that removes leading and trailing spaces from all entries in a 
  # data frame column
  cols <- colnames(data_df)
  for (col_name in cols) {
    is_character <- is.character(data_df[[col_name]])
    if(any(is_character)){
      data_df[is_character, col_name] <- gsub("^\\s+|\\s+$", "", data_df[is_character, col_name])
    } 
  }
  return(data_df)
}


verify_coral_cover <- function(data_df) {
  tryCatch({
    accepted_values <- c("1-", "2-", "3-", "4-", "5-", "1+", "2+", "3+", "4+", "5+", "0")
    
    # Identify possible corrections based on common mistakes
    possible_corrections <- data.frame(
      Old_Value = c("-1", "-2", "-3", "-4", "-5","+1", "+2", "+3", "+4", "+5"),
      New_Value = c("1-", "2-", "3-", "4-", "5-", "1+", "2+", "3+", "4+", "5+")
    )
    
    col_names <- colnames(data_df)
    
    # Convert column names to lowercase
    col_names <- tolower(col_names)
    
    # Iterate through the column names containing "coral"
    for (col in col_names) {
      if (grepl("coral", col)) {
        # Substitute similar characters for correct ones
        data_df[[col]] <- gsub("—|–|−", "-", data_df[[col]])
       
        # Loop through possible corrections and replace values
        for (i in 1:nrow(possible_corrections)) {
          old_value <- possible_corrections$Old_Value[i]
          new_value <- possible_corrections$New_Value[i]
          data_df[[col]][data_df[[col]] == old_value] <- new_value
        }
        
        check <- data_df[[col]] %in% accepted_values
        check <- ifelse(is.na(check), TRUE, check)
        check <- !check
        
        data_df[["error_flag"]] <- as.integer(data_df[["error_flag"]] | check)
        
        if (any(check)) {
          grandparent <- as.character(sys.call(sys.parent()))[1]
          parent <- as.character(match.call())[1]
          warning <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have invalid coral cover values:",
                           toString(data_df[check , 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[check]))
          base::message(warning)
          
          
          if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
            # Append the warning to an existing matrix 
            warnings <- data.frame(
              ID = data_df[check, 1],
              index = which(check),
              message = "Invalid coral cover values"
            )
            contribute_to_metadata_report(col, warnings, parent_key = "Warning")
          }
        }
      }
    }
  }, error = function(e){
    errors <- data.frame(
      verification_function = "verify_coral_cover",
      message = as.character(e)
    )
    contribute_to_metadata_report("verify_coral_cover", errors, parent_key = "Error In Verification")
    data_df$error_flag <<- 1
  })
  return(data_df)
}

verify_reef <- function(data_df){
  tryCatch({
    # Check that the reef ID is in one of the correct standard formats with regex.
    # Look for most similar reef ID if it is not. Am not checking for a match 
    # because that would mean no new reefs would be accepted and I believe that 
    # the reef input is restricted to existing reefs so it is unlikley to be a typo
    reef_id_col_to_find <- c("reef id", "reef label", "reef_id", "reef_label")
    col_names <- colnames(data_df)
    col_names_lower <- tolower(col_names)
    is_col_present <- sapply(reef_id_col_to_find, function(string) grepl(string, col_names_lower))
    reef_id_col_index <- which(rowSums(is_col_present) > 0)
    reef_id_cols <- col_names[reef_id_col_index]
    for (reef_id_col in reef_id_cols){
      reef_id <- data_df[[reef_id_col]]
      correct_reef_id_format <- grepl("^(1[0-9]|2[0-9]|10)-\\d{3}[a-z]?$", reef_id)
      data_df[,"error_flag"] <- as.integer(data_df[,"error_flag"] | !correct_reef_id_format)
      if (any(!correct_reef_id_format)) {
        grandparent <- as.character(sys.call(sys.parent()))[1]
        parent <- as.character(match.call())[1]
        warning <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have invalid Reef IDs:",
                         toString(data_df[!correct_reef_id_format , 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[!correct_reef_id_format]))
        base::message(warning)
        
        if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
          # Append the warning to an existing matrix 
          warnings <- data.frame(
            ID = data_df[!correct_reef_id_format, 1],
            index = which(!correct_reef_id_format),
            message = "Invalid reef ID"
          )
          contribute_to_metadata_report(reef_id_col, warnings, parent_key = "Warning")
        }
      }
    }
    
  }, error = function(e){
    errors <- data.frame(
      verification_function = "verify_reef",
      message = as.character(e)
    )
    contribute_to_metadata_report("verify_reef", errors, parent_key = "Error In Verification")
    data_df$error_flag <<- 1
  })
  return(data_df)
}

verify_voyage_dates <- function(data_df){
  tryCatch({
    # Check that voyage dates of observation are within in voyage dates and that 
    # none of the dates are NA. If Voyage dates are NA set start and end to min 
    # and max observation date. Check that voyage dates associated with a vessels 
    # voyage are unique (There should only be on departure and return date)
    voyage_start <- data_df[["Voyage Start"]]
    voyage_end <- data_df[["Voyage End"]]
    
    incomplete_dates <- unique(c(which(is.na(voyage_end)), which(is.na(voyage_start))))
    if (length(incomplete_dates) > 0){
      incomplete_date_rows <- data_df[incomplete_dates,]
      vessel_voyage <- unique(incomplete_date_rows[,which(names(incomplete_date_rows) %in% c("Vessel", "Voyage"))])
      for(i in 1:nrow(vessel_voyage)){
        data_df_filtered <- data_df[(data_df$Vessel == vessel_voyage[i,1]) & (data_df$Voyage == vessel_voyage[i,2]),]
        is_any_voyage_date_correct <- any(!is.na(data_df_filtered$`Voyage Start`) & !is.na(data_df_filtered$`Voyage End`))
        is_any_survey_date_correct <- any(!is.na(data_df_filtered$`Survey Date`))
        if(is_any_voyage_date_correct){
          correct_dates <- data_df_filtered[!is.na(data_df_filtered),][1,c("Voyage Start", "Voyage End")]
          data_df[(data_df$Vessel == vessel_voyage[i,1]) & (data_df$Voyage == vessel_voyage[i,2]) & (is.na(data_df$`Voyage Start`)), "Voyage Start"] <- correct_dates[1]
          data_df[(data_df$Vessel == vessel_voyage[i,1]) & (data_df$Voyage == vessel_voyage[i,2]) & (is.na(data_df$`Voyage End`)), "Voyage End"] <- correct_dates[2]
        } else if(!is_any_voyage_date_correct & is_any_survey_date_correct){
          incomplete_date_rows_filtered <- incomplete_date_rows[(incomplete_date_rows$Vessel == vessel_voyage[i,1]) & (incomplete_date_rows$Voyage == vessel_voyage[i,2]),]
          estimate_start <- min(incomplete_date_rows_filtered$`Survey Date`)
          estimate_end <- min(incomplete_date_rows_filtered$`Survey Date`)
          data_df[(data_df$Vessel == vessel_voyage[i,1]) & (data_df$Voyage == vessel_voyage[i,2]) & (is.na(data_df$`Voyage Start`)), c("Voyage Start")] <-  estimate_start
          data_df[(data_df$Vessel == vessel_voyage[i,1]) & (data_df$Voyage == vessel_voyage[i,2]) & (is.na(data_df$`Voyage End`)), c("Voyage End")] <-  estimate_end
        } else if(!is_any_voyage_date_correct & !is_any_survey_date_correct) {
          data_df[(data_df$Vessel == vessel_voyage[i,1]) & (data_df$Voyage == vessel_voyage[i,2]) & (is.na(data_df$`Voyage Start`) | is.na(data_df$`Voyage End`)), "error_flag"] <- 1
        }
      }
      
    }
    
    
    post_voyage_start <- data_df[["Voyage Start"]]
    post_voyage_end <- data_df[["Voyage End"]]
    na_present <- ((is.na(post_voyage_start) | is.null(post_voyage_start)) | (is.na(post_voyage_end) | is.null(post_voyage_end))) & ((is.na(voyage_start) | is.null(voyage_start)) | (is.na(voyage_end) | is.null(voyage_end)))
    dated_estimated <- !((is.na(post_voyage_start) | is.null(post_voyage_start)) | (is.na(post_voyage_end) | is.null(post_voyage_end))) & ((is.na(voyage_start) | is.null(voyage_start)) | (is.na(voyage_end) | is.null(voyage_end)))
    
    if (any(dated_estimated | na_present)) {
      grandparent <- as.character(sys.call(sys.parent()))[1]
      parent <- as.character(match.call())[1]
      if (any(dated_estimated) & !any(na_present)) {
        warning1 <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have their voyage dates estimated from their vessel",
                          toString(data_df[dated_estimated , 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[dated_estimated]))
        base::message(warning1)
      }
      if (any(na_present)) {
        warning2 <- paste("Warning in", parent , "within", grandparent, "- The rows with the following IDs have voyage dates",
                          toString(data_df[na_present , 1]), "Their respective row indexes are:", toString((1:nrow(data_df))[na_present]))
        base::message(warning2)
      }  
      
      if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
        # Append the warning to an existing matrix 
        warnings <- data.frame(
          ID = data_df[dated_estimated, 1],
          index = which(dated_estimated),
          message = "Invalid Voyage Date. Date was successfully estimated."
        )
        contribute_to_metadata_report("Estimated Voyage Date", warnings, parent_key = "Warning")
        warnings <- data.frame(
          ID = data_df[na_present, 1],
          index = which(na_present),
          message = "Invalid Voyage Date. "
        )
        contribute_to_metadata_report("Invalid Voyage Date", warnings, parent_key = "Warning")
      }
      
    }
    
    survey_date_error <- (data_df$`Survey Date` < data_df$`Voyage Start` | data_df$`Survey Date` > data_df$`Voyage End`)
    data_df$error_flag <- data_df$error_flag | survey_date_error
    
    if(any(survey_date_error)){
      if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
        # Append the warning to an existing matrix 
        warnings <- data.frame(
          ID = data_df[survey_date_error, 1],
          index = which(survey_date_error),
          message = "Survey date outside voyage date range"
        )
        contribute_to_metadata_report("Survey Date", warnings, parent_key = "Warning")
      }
    }
    
    vessel_voyage <- unique(data_df[,which(names(data_df) %in% c("Vessel", "Voyage"))])
    for (i in 1:nrow(vessel_voyage)){
      filtered_data_df <- data_df[(data_df$Vessel == vessel_voyage[i,1]) & (data_df$Voyage == vessel_voyage[i,2]),]
      start_dates <- unique(filtered_data_df$`Voyage Start`) 
      end_dates <- unique(filtered_data_df$`Voyage End`)
      if(length(start_dates) > 1){
        mf_start <- names(sort(table(start_dates), decreasing = TRUE)[1])
        data_df[(data_df$Vessel == vessel_voyage[i,1]) & (data_df$Voyage == vessel_voyage[i,2]), "Voyage Start"] <- mf_start
      }
      if(length(end_dates) > 1){
        mf_end <- names(sort(table(end_dates), decreasing = TRUE)[1])
        data_df[(data_df$Vessel == vessel_voyage[i,1]) & (data_df$Voyage == vessel_voyage[i,2]), "Voyage End"] <- mf_end
      }
    }
  }, error = function(e){
    errors <- data.frame(
      verification_function = "verify_voyage_dates",
      message = as.character(e)
    )
    contribute_to_metadata_report("verify_voyage_dates", errors, parent_key = "Error In Verification")
    data_df$error_flag <<- 1
  })
  return(data_df)
}

find_one_to_one_matches <- function(close_match_rows){
  
  x_df_col <- 1
  y_df_col <- 2
  
  # Determine duplicates of the row indices.
  x_dup_indices <- (duplicated(close_match_rows[,x_df_col])|duplicated(close_match_rows[,x_df_col], fromLast=TRUE))
  y_dup_indices <- (duplicated(close_match_rows[,y_df_col])|duplicated(close_match_rows[,y_df_col], fromLast=TRUE))
  dup_indices <- y_dup_indices|x_dup_indices
  non_dup_indices <- !dup_indices
  
  # Add rows matches with only a single to relevant matrix.
  perfect_duplicate_indices <<- rbind(perfect_duplicate_indices, close_match_rows[non_dup_indices & close_match_rows[,3] == 0,])
  discrepancies_indices <<- rbind(discrepancies_indices, close_match_rows[non_dup_indices & close_match_rows[,3] > 0,])
  
  # remove rows that have already been handled to prevent double handling.
  close_match_rows_updated <- close_match_rows[dup_indices,]
  return(close_match_rows_updated)
}


rec_group <- function(m2m_split, groups, group){
  # That is recursive with the purpose of grouping sets of matching rows so that 
  # it can be determined whether or not they are mistakes or coincidental 
  # perfect matches.  
  group <- group + 1
  stack <- m2m_split[[group]][,1]
  names <- names(m2m_split)
  for(i in 1:length(names)){
    if(identical(stack,m2m_split[[names[i]]][,1])){
      groups[[group]] <- c(groups[[group]], m2m_split[[names[i]]][1,2])
    }
    
  }
  if(length(m2m_split) != group){
    return(rec_group(m2m_split, groups, group))
  } else {
    return(groups)
  }
}

# Re-write base operator %in% faster
`%fin%` <- function(x, table) {
  stopifnot(require(fastmatch))
  fmatch(x, table, nomatch = 0L) > 0L
}

matrix_close_matches_vectorised <- function(x, y, distance){
  # Find list of all close matches between rows in x and y within a specified 
  # distance. This distance is the number of non perfect column matches within
  # a row. returns a the indices of the rows matched and the distance from
  # perfect. (X_index, Y_index, Distance). Pre-allocating memory for the matrix
  # is not definitive and needs to assume worst case scenario or maximum 
  # allocation possible. However, this requires to much memory, and therefore 
  # will dynamically allocate memory as needed even though this is slower.
  
  #Pre-allocate variables and memory 
  x_rows <- nrow(x)
  x_cols <- ncol(x)
  y_rows <- nrow(y)
  
  match_indices <- matrix(nrow = 0, ncol = 3)
  
  # Iterate through each row in matrix/dataframe x and evaluate for each value 
  # in the row whether it matches the column of y. This vector of logical values
  # are then appeneded to the `matches` matrix. After Iterating over every 
  # column there will be a matrix of size (y_rows, x_cols). A perfect matching 
  # row in y will have a corresponding row in `matches` exclusively containing 
  # TRUE. Given TRUE = 1, a row sum can be utilised to determine the distance 
  # from a perfect match. Can then use a customised vectorised function to 
  # append matches to match_indices. 
  matches <- matrix(data=NA, nrow=y_rows, ncol=x_cols)
  for(z in 1:x_rows){ 
    for(i in 1:x_cols){
      matches[,i] <- x[z,i] == y[,i]
    }
    matches <- !(matches)
    num_matches <- rowSums(matches, dims = 1)
    num_matches <- ifelse(num_matches <= distance, num_matches, NA)
    nonNA <- which(!is.na(num_matches))
    nonNAvalues <- num_matches[nonNA]
    if(length(nonNAvalues) >= 1){
      match <- store_index_vec(nonNAvalues, nonNA, z)
      match <- t(match)
      match_indices <- rbind(match_indices, match)
    }
  }
  match_indices <- na.omit(match_indices)
  return(match_indices)
}


store_index <- function(nonNAvalues, nonNA, z){
  match_indices <- c(z, nonNA, nonNAvalues)
  return(match_indices)
}
store_index_vec <- base::Vectorize(store_index)

set_data_type <- function(data_df, mapping){
  # sets the data_type of each column of any data frame input based on
  # configuration file
  
  output_df <- data_df
  for (i in seq_len(nrow(mapping))) {
    column_name <- mapping$field[i]
    data_type <- mapping$data_type[i]
    if (!(column_name %in% colnames(data_df))){
      next
    }
    # Convert the column to the specified data type
    if(tolower(data_type) == "date"){
      # NA will cause all dates to note parse. Remove NA from parsing but 
      # then include them in the dataframe afterwards. 
      datetimes <- data_df[[column_name]]
      datetimes_available <- datetimes[!is.na(datetimes)]
      dates_available <- parse_date_time(datetimes_available, orders = get_datetime_parse_order())
      dates_available <- as.character(date(dates_available))
      datetimes[!is.na(datetimes)] <- dates_available
      output_df[[column_name]] <- datetimes
    } else if (tolower(data_type) == "time") {
      time <- as.POSIXct(data_df[[column_name]], format = "%H:%M:%S")
      output_df[[column_name]] <- format(time, '%H:%M:%S')
    } else if (tolower(data_type) == "datetime") {
      output_df[[column_name]] <- as.character(parse_date_time(data_df[[column_name]], orders = get_datetime_parse_order()))
    } else {
      output_df[[column_name]] <- as(data_df[[column_name]], tolower(data_type))
    }
  }
  
  # Identify which rows have been coerced to NA and listed in metadata report. 
  original_has_na <- apply(data_df, 2, function(x) is.na(x))
  conversion_has_na <- apply(output_df, 2, function(x) is.na(x))
  coerced_na <- rowSums(original_has_na) < rowSums(conversion_has_na)
  
  if(any(coerced_na)){
    
    grandparent <- as.character(sys.call(sys.parent()))[1]
    parent <- as.character(match.call())[1]
    indexes <- (1:length(coerced_na))
    warning <- paste("Warning in", parent , "within", grandparent, "- Incorrect data type present. The rows with the following IDs have had value(s) coerced to NA:",
                     paste(data_df[coerced_na, 1], collapse = ", "), "and the following indexes", paste(indexes[coerced_na], collapse = ", "))
    base::message(warning)
    
    if (exists("contribute_to_metadata_report") && is.function(contribute_to_metadata_report)) {
      # Append the warning to an existing matrix 
      warnings <- data.frame(
        ID = data_df[coerced_na, 1],
        index = which(coerced_na),
        message = "Incorrect data type present. Values coerced to NA"
      )
      contribute_to_metadata_report("Data Type", warnings, parent_key = "Warning")
    }
  }
  
  # remove trailing and leading spaces from strings for comparison. 
  output_df <- remove_leading_spaces(output_df)
  return(output_df)
}

update_config_file <- function(data_df, configuration_path, new_mappings_to_add=c()) {
  
  configuration <- fromJSON(configuration_path)
  data_colnames <- colnames(data_df)
  expected_source_names <- configuration$mappings$transformations$source_field
  new_json_data <- configuration
  
  if (!all(data_colnames %in% expected_source_names)) {
    warning <- "Column names in 'data_df' do not match the expected source names. New json configuration file will be created with most appropriate mapping. Please check after process is complete."
    warning(warning)
    closest_matches <- get_closest_matches(data_colnames, expected_source_names)
    # Replace the original source values in new_json_data with closest matches
    
    old_values <- c()
    new_values <- c()
    for (i in seq_len(ncol(closest_matches))) {
      new_input <- closest_matches[1, i]
      closest_match <- closest_matches[2, i]
      index <- which(new_json_data$mappings$transformations$source_field %in% closest_match)
      if (!is.na(index) ) {
        new_json_data$mappings$transformations$source_field[index] <- new_input
        old_values <- c(old_values, closest_matches)
        new_values <- c(new_values, new_input)
      }
    }
    warnings <- data.frame(
      old_values = old_values,
      new_values = new_values,
      message = "Mapping has been updated"
    )
    contribute_to_metadata_report("configuration Mapping Update", warnings, parent_key = "Warning")
  }
  
  if(length(new_mappings_to_add) > 0){
    max_position <- base::max(configuration$mappings$transformations$position, configuration$mappings$new_fields$position)
    closest_matches <- get_closest_matches(new_mappings_to_add, data_colnames)
    contribute_to_metadata_report("New configuration Mappings", paste(closest_match, "updated to", new_input), parent_key = "Warning")
    for(i in 1:length(new_mappings_to_add)){
      source_field <- closest_matches[2,i]
      target_field <- closest_matches[2,i]
      position <- max_position + i
    }
    warnings <- data.frame(
      field_name = closest_matches[2,i],
      message = "Mappings have been created for the following field names"
    )
    contribute_to_metadata_report("configuration Mapping Update", warnings, parent_key = "Warning")
  }
  
  if (!all(data_colnames %in% expected_source_names) || (length(new_mappings_to_add) > 0)) {
    json_data <- toJSON(new_json_data, pretty = TRUE)
    directory <- dirname(configuration_path)
    tryCatch({
      writeLines(json_data, file.path(directory, paste(configuration$metadata$control_data_type, "_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".json", sep = "")))
    }, error = function(e){
      writeLines(json_data, file.path(paste(configuration$metadata$control_data_type, "_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".json", sep = "")))
    })
    
  }
  
  if(length(new_mappings_to_add) > 0){
    tryCatch({
      configuration <- fromJSON(file.path(directory, paste(configuration$metadata$control_data_type, "_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".json", sep = "")))
    }, error = function(e){
      tryCatch({
        configuration <<- fromJSON(file.path(paste(configuration$metadata$control_data_type, "_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".json", sep = "")))
      })
    })
  }
  
  return(configuration)
  
}


map_new_fields <- function(data_df, transformed_df, new_fields){
  for (i in seq_len(nrow(new_fields))) {
    new_field <- new_fields$field[i]
    default_value <- new_fields$default[i]
    
    # Call function written as string in configuration file
    if(new_fields$function_call[i]){
      default_value <- NA
    }
    
    position <- new_fields$position[i]
    transformed_df[, position] <- default_value
    colnames(transformed_df)[position] <- new_field
  }
  return(transformed_df)
}  

# Get the default value of new columns required in the dataframe from the
# configuration file.The default value can be a function in the form of string 
# that will be executed if the default value is dependent on other columns. 
get_new_field_default_values <- function(new_data_df, transformed_data_df, new_fields){
  new_fields <- new_fields[new_fields$function_call == TRUE,]
  for (i in seq_len(nrow(new_fields))) {
        default_value <- new_fields$default[i]
        expression <- gsub("\\{TRANSFORMED_DATAFRAME\\}", "transformed_data_df", default_value)
        expression <- gsub("\\{RAW_DATAFRAME\\}", "new_data_df", expression)
        default_value <- eval(parse(text = expression))
        position <- new_fields$position[i]
        transformed_data_df[, position] <- default_value
  }
  return(transformed_data_df)
}


map_all_fields <- function(data_df, transformed_df, mappings){
  closest_matches <- get_closest_matches(colnames(data_df), mappings$source_field)
  for (i in seq_len(ncol(closest_matches))) {
    index <- match(closest_matches[2,i], mappings$source_field)
    position <- mappings$position[index]
    mapped_name <- mappings$target_field[index]
    colnames(transformed_df)[position] <- mapped_name 
    transformed_df[, position] <- data_df[[closest_matches[1,i]]]
  }
  return(transformed_df)
  
}


map_data_structure <- function(data_df, mappings, new_fields){
  
  transformed_df <- data.frame(matrix(ncol = nrow(mappings) + nrow(new_fields), nrow = nrow(data_df)))
  transformed_df <- map_new_fields(data_df, transformed_df, new_fields)
  transformed_df <- map_all_fields(data_df, transformed_df, mappings)
  transformed_df <- get_new_field_default_values(data_df, transformed_df, new_fields)
  transformed_df <- transformed_df[, colSums(is.na(transformed_df)) < nrow(transformed_df)]
  return(transformed_df)
}


get_closest_matches <- function(sources, targets){
  
  # perform swap to ensure that sources is longer than targets
  is_targets_longer <- length(targets) > length(sources)
  if(is_targets_longer){
    temp <- targets
    targets <- sources
    sources <- temp
  }  
  
  transformed_sources <- matrix(0, nrow = 2, ncol = min(length(sources), length(targets)))
  levenshtein_distances <- adist(sources , targets)
  smallest_source_distances <- apply(levenshtein_distances, 1, min)
  smallest_source_indices <- apply(levenshtein_distances, 2, which.min)
  transformed_sources[1,] <- sources[smallest_source_indices]
  transformed_sources[2,] <- targets
  
  if(is_targets_longer){
    transformed_sources[c(1, 2), ] <- transformed_sources[c(2, 1), ]
  }  
  return(transformed_sources)
}

extract_dates <- function(input){
  input <- sapply(input, basename)
  dates <- str_extract(input, "\\d{8}_\\d{6}")
  dates <- ifelse(is.na(dates), str_extract(input, "\\d{4}_\\d{1,2}_\\d{1,2}_\\d{1,2}_\\d{1,2}(?:_\\d{1,2}(?:_\\d{1,2})?)?_[APMapm]{2}"), dates)
  dates <- ifelse(is.na(dates), str_extract(input, "\\d{8}"), dates)
  dates <- ifelse(is.na(dates), str_extract(input, "\\d{6}"), dates)
  date_objects <- parse_date_time(dates, orders = c("Ymd_HMS", "Y_m_d_H_M_%p", "Y_m_d_H_M", "Y_m_d_H_M_S_%p", "Ymd", "ymd"))
  return(date_objects)
}

find_recent_file <- function(directory_path, keyword, file_extension) {
  # Get a list of files in the directory
  tryCatch({
    files <- list.files(file.path(getwd(),directory_path), pattern = paste0(keyword, ".*\\.", file_extension), full.names = TRUE)
    if (length(files) == 0) {
      cat("No matching files found.\n")
      return(NULL)
    }
    file_infos <- lapply(files, file.info)
    creation_times <- sapply(file_infos, function(info) info$mtime)
    most_recent_index <- which.max(creation_times)
    return(files[most_recent_index])
  }, error = function(e) {
    print(paste("Failed to find recent file: ",  conditionMessage(e)))
  })
}

save_spatial_as_raster <- function(output_path, serialized_spatial_path){
  tryCatch({
    site_regions <- readRDS(serialized_spatial_path)
    for(i in 1:length(site_regions)){
      file_name <- names(site_regions[i])
      modified_file_name <- gsub("/", "_", file_name)
      if (file.info(output_path)$isdir) {
        file.remove(output_path)
        output_path <- dirname(output_path)
      }
      writeRaster(site_regions[[i]], filename = file.path(output_path, paste(modified_file_name,".tif", sep=""), format = "GTiff", overwrite = TRUE))
    }
  }, error = function(e) {
    cat("An error occurred while saving", modified_file_name, "\n")
  })
}

get_spatial_differences <- function(kml_data, previous_kml_data){
  spatial_differences <- list()

    # Extract Reef IDs from each list
  reef_ids <- get_reef_label(names(kml_data))
  reef_ids_previous <- get_reef_label(names(previous_kml_data))
  
  # Add any reefs that did not exist in the previous kml but exist in the 
  # current kml
  indices_not_in_previous <- which(!reef_ids %in% reef_ids_previous)
  if(length(indices_not_in_previous) > 0){
    for (i in 1:length(indices_not_in_previous)) {
      name <- names(kml_data)[[i]]
      spatial_differences[[name]] <- kml_data[[i]]
    }
  }

  # Iterate through all IDS that exist in the new KML file. Any that have 
  # changed or do not exist in the previous data set will be added to the new 
  # one. It is intentional that cull sites that have been removed are not 
  # included, as a result of the general philosophy outlined in the documentation
  for (reef_id in reef_ids) {
    index_reefs <- which(reef_ids == reef_id)
    index_reefs_previous <- which(reef_ids_previous == reef_id)
    
    if (length(index_reefs) > 0 && length(index_reefs_previous) > 0) {
      reef <- kml_data[[index_reefs]]
      reef_name <- names(kml_data)[[index_reefs]]
      reef_previous <- previous_kml_data[[index_reefs_previous]]
      reef_name_previous <- names(previous_kml_data)[[index_reefs_previous]]
      
      # If comparison fails or does not indicate that reefs are identical then 
      # add reef to list of changed reefs.
      is_unchanged <- FALSE
      tryCatch({
        if (all(dim(reef) == dim(reef_previous))) {
          is_unchanged <- (all(reef_previous == reef)) && (reef_name_previous == reef_name)
        }

      }, error = function(e) {
        print(paste("Error: ",  conditionMessage(e)))
      },
      warning = function(w) {
        print(paste("Warning: ",  conditionMessage(w)))
      })
      
      if(!is_unchanged){
        name <- names(kml_data)[[index_reefs]]
        spatial_differences[[name]] <- kml_data[[index_reefs]]
      }
    }
  }
  return(spatial_differences)
}


compute_checksum <- function(data) {
  digest(data, algo = "md5", serialize = TRUE)
}


assign_nearest_site_method_c <- function(data_df, kml_path, keyword, kml_path_previous=NULL, serialised_raster_path=NULL, spatial_output_path=NULL, raster_size=0.0005, x_closest=1, is_standardised=0, save_spatial_as_raster=0){
  # Assign nearest sites to manta tows with method developed by Cameron Fletcher
  
  # Acquire the directory to store raster outputs and the most recent spatial 
  # file that was saved as an R binary
  # if loading data fails calculate site regions
  load_site_rasters_failed <- TRUE
  if(!is.null(serialised_raster_path)){
    if(file.info(serialised_raster_path)$isdir){
      print("Invalid path to serialized spatial data. Must be a file not a directory")
      spatial_file <- NULL
      spatial_directory <- serialised_raster_path
    } else {
      spatial_file <- serialised_raster_path
    }

    tryCatch({
      base::message("Loading previously saved raster data ...")
      site_regions <- readRDS(spatial_file)
      crs <- projection(site_regions[[1]])
      load_site_rasters_failed <- FALSE
      base::message("Loaded data successfully")
    }, error = function(e) {
      print(paste("Error site regions could not be loaded. Site regions will be calculated instead.", conditionMessage(e)))
    })
  }
  
  base::message("Loading kml file...")
  kml_layers <- st_layers(kml_path)
  layer_names_vec <- unlist(kml_layers["name"])
  kml_data <- NULL
  tryCatch({
    plan(multisession)
    future_layers <- future_map(layer_names_vec, ~ st_read(kml_path, layer = .x))
    kml_data <- setNames(future_layers, layer_names_vec)
  }, error = function(e) {
    print(paste("Error KML not loaded with parrallel processing", conditionMessage(e)))
    kml_data <<- setNames(lapply(layer_names_vec, function(i)  st_read(kml_path, layer = i)), layer_names_vec)
    return(kml_data)
  })
  kml_data <<- kml_data
  base::message("Loaded KML file successfully")
  base::message("Compute checksum for kml")
  crs <- projection(kml_data[[1]])
  sf_use_s2(FALSE)
  checksum <- compute_checksum(kml_data)
  
  
  # compare two kml files and return the geometry collections that have been 
  # updated 
  calculate_site_rasters <- TRUE
  update_kml <- FALSE
  if(!is.null(kml_path_previous) && !load_site_rasters_failed){
    previous_kml_layers <- st_layers(kml_path_previous)
    previous_layer_names_vec <- unlist(previous_kml_layers["name"])
    previous_kml_data <- setNames(lapply(previous_layer_names_vec, function(i)  st_read(kml_path_previous, layer = i)), previous_layer_names_vec)
    previous_kml_data <<- previous_kml_data
    previous_crs <- projection(previous_kml_data[[1]])
    previous_checksum <- compute_checksum(previous_kml_data)
    if(checksum != previous_checksum){
      if(previous_crs == crs){
        kml_data_to_update <- get_spatial_differences(kml_data, previous_kml_data)
        tryCatch({
          if(length(kml_data_to_update) == 0){
            calculate_site_rasters <- FALSE
          } else {
            update_kml <- TRUE
          }
        }, error = function(e) {
          update_kml <<- FALSE
        })
      } 
    } else {
      base::message("Checksum determined current and previous KML data are identical")
      calculate_site_rasters <- FALSE
    }
  }
  
  if(calculate_site_rasters){
    if(!update_kml | load_site_rasters_failed){
      
      
      tryCatch({
        base::message("Simplifying kml polygons...")
        kml_data_simplified <- simplify_geometry_list_rdp(kml_data)
        base::message("Simplified kml polygons successfully")
        
      }, error = function(e) {
        print(paste("Error Simplifying kml polygons", conditionMessage(e)))
        kml_data_simplified <<- kml_data
      })
      site_regions <- assign_raster_pixel_to_sites_original(kml_data_simplified, layer_names_vec, crs, raster_size, x_closest, is_standardised)
    
    } else {
      base::message("Updating raster pixels for reefs that have changed since last process date...")
      
      tryCatch({
        base::message("Simplifying kml polygons...")
        kml_data_simplified <- simplify_geometry_list_rdp(kml_data_to_update)
        base::message("Simplified kml polygons successfully")
      }, error = function(e) {
        print(paste("Error Simplifying kml polygons", conditionMessage(e)))
        kml_data_simplified <<- kml_data_to_update
      })
      
      tryCatch({
        base::message("Number of Rasters to Update: ", length(kml_data_simplified))
        updated_layer_names_vec <- names(kml_data_simplified)
        updated_site_regions <- assign_raster_pixel_to_sites_original(kml_data_simplified, updated_layer_names_vec, crs, raster_size, x_closest, is_standardised)
        
        site_regions_reef_ids <- get_reef_label(names(kml_data))
        updated_site_regions_reef_ids <- get_reef_label(names(kml_data_to_update))
        
        index_reefs <- match(updated_site_regions_reef_ids, site_regions_reef_ids)
        site_regions <- site_regions[-index_reefs]
        site_regions <- c(site_regions, updated_site_regions)
        site_regions <- site_regions[order(names(site_regions))]
      }, error = function(e) {
        print(paste("Error updating updating raster pixels for reefs. Will create new raster", conditionMessage(e)))
        
        tryCatch({
          base::message("Simplifying kml polygons...")
          kml_data_simplified <- simplify_geometry_list_rdp(kml_data)
          base::message("Simplified kml polygons successfully")
        }, error = function(e) {
          print(paste("Error Simplifying kml polygons", conditionMessage(e)))
          kml_data_simplified <<- kml_data
        })
        site_regions <<- assign_raster_pixel_to_sites_original(kml_data_simplified, layer_names_vec, crs, raster_size, x_closest, is_standardised)
      })

    }
  }
  
  base::message("Saving raster data as serialised binary file...")
  tryCatch({
    if (!dir.exists(spatial_output_path)) {
      dir.create(spatial_output_path, recursive = TRUE)
    }
    saveRDS(site_regions, file.path(spatial_output_path, paste("site_regions_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".rds", sep = "")))
    contribute_to_metadata_report("output", file.path(getwd(), paste(keyword,"_site_regions_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".rds", sep = "")))
  }, error = function(e) {
    print(paste("Error saving site regions as rds - Data saved in source directory", conditionMessage(e)))
    saveRDS(site_regions, paste(keyword,"_site_regions_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".rds", sep = ""))
    contribute_to_metadata_report("output", file.path(getwd(), paste(keyword,"_site_regions_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".rds", sep = "")))
  })
  
  tryCatch({
    if(save_spatial_as_raster == 1 & !is.null(spatial_output_path)){
      base::message("Saving raster data as gtiff files...")
      spatial_file <- file.path(spatial_output_path, paste("site_regions_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".rds", sep = ""))
      raster_output <- file.path(spatial_output_path, "rasters")
      if (!dir.exists(raster_output)) {
        dir.create(raster_output, recursive = TRUE)
      }
      save_spatial_as_raster(raster_output, spatial_file)
    }
  }, error = function(e) {
    print(paste("Error saving as rasters", conditionMessage(e)))
  })
  
  
  base::message("Assigning sites to data...")
  data_df <- get_centroids(data_df, crs)
  updated_pts <- data_df
  for(i in 1:length(site_regions)){
    is_contained <- sapply(data_df$`Reef ID`, function(str) grepl(str, names(site_regions[i])))
    if(any(is_contained) == FALSE){
      next
    }
    reef_pts <- data_df[is_contained,]
    nearest_site_manta_data <- raster::extract(site_regions[[i]], reef_pts)
    updated_pts[is_contained, c("Nearest Site")] <- nearest_site_manta_data
  }
  updated_pts <- st_drop_geometry(updated_pts)
  is_nearest_site_has_error <- is.na(updated_pts$`Nearest Site`) | ifelse(is.na(updated_pts$`Nearest Site` == -1),FALSE,updated_pts$`Nearest Site` == -1)
  updated_pts[,"error_flag"] <- as.integer(updated_pts[,"error_flag"] | is_nearest_site_has_error)
  
  return(updated_pts)
}

get_centroids <- function(data_df, crs, precision=0){
  # Determine the centroid coordinates create geospatial points or create 
  # geospatial points if only a single set of coordinates exist for an 
  # observation
  
  if (all(c("Start Lat", "End Lat", "End Lng", "Start Lng") %in% colnames(data_df))){
    data_df <- data_df %>%
      mutate(
        mean_lat = (`Start Lat` + `End Lat`) / 2,
        mean_long = (`Start Lng` + `End Lng`) / 2
      )
    
    if(precision != 0){
      data_df <- data_df %>%
        mutate_at(vars(`Start Lat`, `Start Lng`, `End Lat`, `End Lng`, `mean_lat`, `mean_long`), ~ round(., precision))
    }
  } else {
    return(data_df)
  }
  
  # Can't create pts that are NA, use 0 as place holder.
  # the st_as_sf function by default removes these columns.
  data_df$mean_lat[is.na(data_df$mean_lat)] <- 0
  data_df$mean_long[is.na(data_df$mean_long)] <- 0
  pts <- st_as_sf(data_df, coords=c("mean_long", "mean_lat"), crs=crs)
  return(pts)
}


assign_raster_pixel_to_sites_multithread <- function(kml_data, layer_names_vec, crs, raster_size, x_closest=1, is_standardised=0){
  # Produces same result as assign_raster_pixel_to_sites in a parallel manner
  
  if(is_standardised){
    expanded_extent <- standardise_extents(kml_data)
  } else {
    expanded_extent <- list()
    for(i in 1:length(kml_data)) {
      expanded_extent[[i]] <- extent(kml_data[[i]])
    }
  }
  
  # Define the increase amount in both x and y directions. 
  increase_amount <- 0.003
  expanded_bboxs <- setNames(lapply(expanded_extent, function(i) {
    
    original_extent <- i
    # Increase the extent by the specified amount
    expanded_bboxs <- extent(original_extent[1] - increase_amount,
                             original_extent[2] + increase_amount,
                             original_extent[3] - increase_amount,
                             original_extent[4] + increase_amount)
    
  }), layer_names_vec)
  
  # Create an empty raster based on the bounding box, cell size, and projection
  rasters <- create_raster_templates(expanded_bboxs, layer_names_vec, crs, raster_size)
  
  # The original script in mathmatica utilised a function "RegionDistance". This 
  # finds the Euclidean distance between the polygon and a point and does not
  # consider the curviture of the earth. 
  
  site_regions <- foreach(i = 1:length(kml_data), .combine = "c",  .export = c("assign_raster_pixel_to_sites_single", "rasterToPoints", "st_polygon", "raster", "values", "as.matrix", "st_multipoint", "st_sfc", "st_cast", "st_crs<-", "st_distance", "drop_units", "site_names_to_numbers", "xth_smallest")) %dopar% {
    assign_raster_pixel_to_sites_single_reef(rasters[[i]], kml_data[[i]], crs, x_closest)
  }
  site_regions <- setNames(site_regions, layer_names_vec)
  return(site_regions)
}

assign_raster_pixel_to_sites_single_reef <- function(raster, site_poly, crs, x_closest){
  
  raster::values(raster) <- 1
  pixel_coords <- rasterToPoints(raster)
  pixel_coords <- pixel_coords[,1:2]
  raster_points <- pixel_coords |> as.matrix() |> st_multipoint() |> st_sfc() |> st_cast('POINT')
  st_crs(raster_points) <- crs
  
  site_names <- site_poly$Name
  site_numbers <- site_names_to_numbers(site_names)
  
  distances <- st_distance(site_poly, raster_points)
  
  if (dim(distances)[1] != 1){
    distances <- apply(distances, 2, as.numeric)
    min_distances_list <- apply(distances, 2, xth_smallest, x_values=x_closest)
    min_distances_df <- do.call(rbind, min_distances_list)
    min_distance_site_numbers <- site_numbers[as.vector(min_distances_df$`Nearest Site`)]
    min_distances <- min_distances_df$`Distance to Site`
  } else {
    min_distances <- drop_units(distances)
    min_distance_site_numbers <- rep(site_numbers, length(min_distances))
  }
  
  is_within_required_distance <- min_distances > 300
  min_distance_site_numbers[is_within_required_distance] <- -1
  raster::values(raster) <- min_distance_site_numbers
  names(raster) <- c("Nearest Site")
  
  return(raster)
}

assign_raster_pixel_to_sites <- function(kml_data, layer_names_vec, crs, raster_size, x_closest=1, is_standardised=0){
  site_regions <- NULL
  tryCatch({
    library(foreach)
    library(doParallel)
    
    cl <- makeCluster(detectCores())
    registerDoParallel(cl)
    
    site_regions <- assign_raster_pixel_to_sites_multithread(kml_data, layer_names_vec, crs, raster_size, x_closest=1, is_standardised=0)
    
  }, error = function(e) {
    base::message("Failed to perform assigning of sites with multithreading. Performing without...")
    site_regions <<- assign_raster_pixel_to_sites_original(kml_data, layer_names_vec, crs, raster_size, x_closest=1, is_standardised=0)
  })
  return(site_regions)
}


assign_raster_pixel_to_sites_original <- function(kml_data, layer_names_vec, crs, raster_size, x_closest=1, is_standardised=0){
  # This is a method of assigning sites to manta tows that was initially 
  # implemented in Mathmatica by Dr Cameron Fletcher. A set of rasters are 
  # created slightly larger than the bounding box of each layer in the KML file.
  # The rasters are convereted into a matrix of geospatial "POINTS" to compare 
  # the distance between each point and the sites on the corresponding layer. 
  # Each raster pixel will then be assigned a value corresponding with the 
  # nearest site (geodesic distance) 
  
  if(is_standardised){
    expanded_extent <- standardise_extents(kml_data)
  } else {
    expanded_extent <- list()
    for(i in 1:length(kml_data)) {
      expanded_extent[[i]] <- extent(kml_data[[i]])
    }
  }
  
  # Define the increase amount in both x and y directions. 
  increase_amount <- 0.003
  expanded_bboxs <- setNames(lapply(expanded_extent, function(i) {
    
    original_extent <- i
    # Increase the extent by the specified amount
    expanded_bboxs <- extent(original_extent[1] - increase_amount,
                             original_extent[2] + increase_amount,
                             original_extent[3] - increase_amount,
                             original_extent[4] + increase_amount)
    
  }), layer_names_vec)
  
  # Create an empty raster based on the bounding box, cell size, and projection
  rasters <- create_raster_templates(expanded_bboxs, layer_names_vec, crs, raster_size)
  
  # The original script in mathmatica utilised a function "RegionDistance". This 
  # finds the Euclidean distance between the polygon and a point and does not
  # consider the curviture of the earth. 
  site_regions <- setNames(vector("list", length=length(rasters)),layer_names_vec)
  
  for (i in seq_along(rasters)) {
    raster <- rasters[[i]]
    site_poly <- kml_data[[i]]
    
    # set all values of the raster layer so every pixel can be exported to a 
    # dataframe and then converted to points 
    values(raster) <- 1
    pixel_coords <- rasterToPoints(raster)
    pixel_coords <- pixel_coords[,1:2]
    raster_points <- pixel_coords |> as.matrix() |> st_multipoint() |> st_sfc() |> st_cast('POINT')
    st_crs(raster_points) <- crs
    
    # Get site numbers
    site_names <- site_poly$Name
    site_numbers <- site_names_to_numbers(site_names)
    
    # Can determine the Euclidean and geodesic distance (st_distance)
    distances <- st_distance(site_poly, raster_points)
    if (dim(distances)[1] != 1){
      
      distances <- apply(distances, 2, as.numeric)
      min_distances_list <- apply(distances, 2, xth_smallest, x_values=x_closest)
      min_distances_df <- do.call(rbind, min_distances_list)
      min_distance_site_numbers <- site_numbers[as.vector(min_distances_df$`Nearest Site`)]
      min_distances <- min_distances_df$`Distance to Site`
      
    } else {
      min_distances <- drop_units(distances)
      min_distance_site_numbers <- rep(site_numbers, length(min_distances))
    }
    
    # Set site numbers to NA if they are more than 300m away.
    is_within_required_distance <- min_distances > 300
    min_distance_site_numbers[is_within_required_distance] <- -1
    values(raster) <- min_distance_site_numbers
    names(raster) <- c("Nearest Site")
    site_regions[[i]] <- raster
  }
  
  return(site_regions)
}

site_names_to_numbers <- function(site_names){
  return(as.numeric(sub(".+_(\\d+).*", "\\1", site_names)))
}


simplify_shp_polyogns_rdp <- function(shapefile){
  # simplify polygons stored in a shapefile with the Ramer-Douglas-Peucker 
  # algorithm
  
  reef_geometries <- shapefile_filtered[,"geometry"]
  simplified_shapefile_filtered <- shapefile_filtered
  reef_geometries_updated <- reef_geometries
  for(i in 1:nrow(reef_geometries)){
    # A vast majority of reef_geometries at this level are polygons but 
    # occasionally they are geometrycollections and require iteration. 
    
    site_polygon <- reef_geometries[i,1][[1]][[1]]
    polygon_points <- site_polygon[[1]]
    approx_polygon_points <- polygon_rdp(polygon_points) 
    site_polygon[[1]] <- approx_polygon_points
    reef_geometries[i,1][[1]][[1]] <- site_polygon
    
  }
  simplified_shapefile_filtered <- st_drop_geometry(simplified_shapefile_filtered)
  simplified_shapefile_filtered <- st_as_sf(simplified_shapefile_filtered, geometry = reef_geometries$geometry)
  
  return(simplified_shapefile_filtered)
}


simplify_geometry_list_rdp <- function(kml_data){
  # simplify all polygons in a list that was retrieved from the kml file 
  # the Ramer-Douglas-Peucker algorithm
  
  simplified_kml_data <- kml_data
  for (j in 1:length(kml_data)){
    reef_geometries <- kml_data[[j]][["geometry"]]
    simplified_kml_data[[j]][["geometry"]] <- simplify_geometry_rec(reef_geometries)
    
  }
  return(simplified_kml_data)
}

# This is a recursive function that takes a geometry and recursively finds and 
# simplifies polygons with the Ramer-Douglas-Peucker algorithm.
simplify_geometry_rec <- function(geometry){
  for (i in 1:length(geometry)){
    polygon <- geometry[[i]]
    geometry_type <- NULL
    tryCatch({
      geometry_type <- st_geometry_type(polygon)
    }, error = function(e) {
      geometry_type <<- "LIST"
    })
    
    if(geometry_type %in% c("GEOMETRYCOLLECTION", "MULTIPOLYGON")){
      polygon <- simplify_geometry_rec(polygon)
    } else if (!(geometry_type %in% c("POINT"))){
      polygon_points <- polygon[[1]]
      approx_polygon_points <- polygon_rdp(polygon_points)
      polygon[[1]] <- approx_polygon_points
    }
    geometry[[i]] <- polygon
  }
  return(geometry)
}


polygon_rdp <- function(polygon_points, epsilon=0.00001) {
  # adaptation of the Ramer-Douglas-Peucker algorithm. The original algorithm 
  # was developed for a use with a line not a ploygon. Remove the last point 
  # temporarily, perform the algorithm and then return the value.
  
  line_points <- polygon_points[1:nrow(polygon_points)-1,]
  approx_line_points <- rdp(line_points, epsilon)
  polygon <- rbind(approx_line_points, approx_line_points[1,])
  return(polygon)
}

rdp <- function(points, epsilon=0.00001) {
  # Ramer-Douglas-Peucker algorithm s
  if (nrow(points) <= 2) {
    return(points)
  }
  
  dmax <- 0
  index <- 0
  end <- nrow(points)
  
  # Find the point with the maximum distance
  for (i in 2:(end - 1)) {
    d <- perpendicularDistance(points[i,], points[1,], points[end,])
    if (d > dmax) {
      index <- i
      dmax <- d
    }
  }
  
  
  result <- matrix(nrow = 0, ncol = ncol(points))
  # If max distance is greater than epsilon, recursively simplify
  if (dmax > epsilon) {
    recursive1 <- rdp(points[1:index,], epsilon)
    recursive2 <- rdp(points[(index):end,], epsilon)
    result <- rbind(result, rbind(recursive1[1:nrow(recursive1) - 1,], recursive2))
  } else {
    result <- rbind(points[1,], points[end,])
  }
  
  return(result)
}

# Calculate perpendicular distance of a point p from a line segment AB
perpendicularDistance <- function(p, A, B) {
  numerator <- abs((B[2] - A[2]) * p[1] - (B[1] - A[1]) * p[2] + B[1] * A[2] - B[2] * A[1])
  denominator <- sqrt((B[2] - A[2])^2 + (B[1] - A[1])^2)
  result <- (numerator / denominator)
  return(result)
}


find_largest_extent <- function(kml_data){
  num_geometries <- length(kml_data)
  
  # Preallocate the data frame
  result <- data.frame(
    x_difference = numeric(num_geometries),
    y_difference = numeric(num_geometries)
  )
  
  for (i in 1:num_geometries) {
    bbox <- extent(kml_data[[i]])
    diff_x <- bbox@xmax - bbox@xmin
    diff_y <- bbox@ymax - bbox@ymin
    result[i, "x_difference"] <- diff_x
    result[i, "y_difference"] <- diff_y
  }
  
  colnames(result) <- c("x_difference", "y_difference")
  largest_extent <- c(max(result$x_difference),max(result$y_difference))
  names(largest_extent) <- c("x_difference", "y_difference")
  
  return(largest_extent)
}

standardise_extents <- function(kml_data){
  largest_extent <- find_largest_extent(kml_data)
  adjusted_extents <- list()
  for(i in 1:length(kml_data)) {
    extent <- extent(kml_data[[i]])
    diff_x <- extent@xmax - extent@xmin
    diff_y <- extent@ymax - extent@ymin
    largest_extent_centered <- extent(extent@xmin + diff_x/2 - largest_extent[1]/2,
                                      extent@xmax - diff_x/2 + largest_extent[1]/2,
                                      extent@ymin + diff_y/2 - largest_extent[2]/2,
                                      extent@ymax - diff_y/2 + largest_extent[2]/2
    )
    adjusted_extents[[i]] <- largest_extent_centered
  }
  return(adjusted_extents)
}


create_raster_templates <- function(extents, layer_names_vec, crs, raster_size=150){
  
  # rasterise the bounding boxes 
  # res <- 0.0005
  # NN input requires standard image size. 570x550 is approximately a resolution of 0.00045
  if (raster_size > 1){
    y_pixel <- raster_size
    x_pixel <- raster_size
    rasters <- setNames(lapply(extents, function(i) {
      raster(ext = i, ncols=x_pixel, nrows=y_pixel, crs = crs)
    }), layer_names_vec)
  } else {
    rasters <- setNames(lapply(extents, function(i) {
      raster(ext = i, resolution=raster_size, crs = crs)
    }), layer_names_vec)
  }
  return(rasters)
}


rasterise_sites <- function(kml_data, is_standardised=1, raster_size=150){
  if (is_standardised == 1){
    extent_data <- standardise_extents(kml_data)
    location <- "standardised_extent"
  } else {
    extent_data <- kml_data
    location <- "varying_extent"
  }
  
  # res <- 0.0005
  # NN input requires standard image size. 570x550 is approximately a resolution of 0.00045
  y_pixel <- raster_size
  x_pixel <- raster_size
  for(i in 1:length(kml_data)){
    site_names <- kml_data[[i]]$Name
    site_numbers <- site_names_to_numbers(site_names)
    kml_data[[i]]$site_number <- site_numbers
    site_raster <- st_rasterize(kml_data[[i]], st_as_stars(st_bbox(extent_data[[i]]), field = "site_number", nx = x_pixel, ny = y_pixel))
    file_name <- names(kml_data[i])
    modified_file_name <- gsub("/", "_", file_name)
    site_raster <- as(site_raster, "Raster")
    writeRaster(site_raster, filename = paste("CNN\\", raster_size, "\\", location,  "\\sites_as_rasters\\", modified_file_name, sep=""), format = "GTiff", overwrite = TRUE)
    site_raster <- NA
  }
}


rasterise_sites_reef_encoded <- function(kml_data, layer_names_vec, is_standardised=1, raster_size=150){
  
  if (is_standardised == 1){
    extent_data <- standardise_extents(kml_data)
    location <- "standardised_extent"
  } else {
    extent_data <- kml_data
    location <- "varying_extent"
  }
  # res <- 0.0005
  # NN input requires standard image size. 570x550 is approximately a resolution of 0.00045
  y_pixel <- raster_size
  x_pixel <- raster_size
  for(i in 1:length(kml_data)){
    site_names <- kml_data[[i]]$Name
    site_numbers <- site_names_to_numbers(site_names)
    reef_numbers <- as.numeric(gsub("[^0-9.]", "", layer_names_vec[[i]]))
    encoded_reef_site_numbers <- as.numeric(sapply(site_numbers, function (x) paste(reef_numbers, x, sep = "")))
    kml_data[[i]]$site_number <- encoded_reef_site_numbers
    site_raster <- st_rasterize(kml_data[[i]], st_as_stars(st_bbox(extent_data[[i]]), field = "site_number", nx = x_pixel, ny = y_pixel))
    file_name <- names(kml_data[i])
    modified_file_name <- gsub("/", "_", file_name)
    site_raster <- as(site_raster, "Raster")
    writeRaster(site_raster, filename = paste("CNN\\", raster_size, "\\", location, "\\sites_as_rasters_encoded\\", modified_file_name, sep=""), format = "GTiff", overwrite = TRUE)
    site_raster <- NA
  }
}

xth_smallest <- function(x, x_values) {
  # Function to find the xth smallest value in a vector without sorting. This 
  # allows for the second closest sites etc to be determined. Likely unnecessary 
  #in production was used for testing purposes
  
  sorted_uniques <- sort(x)
  xth_smallest_values <- sorted_uniques[x_values]
  xth_smallest_indices <- which(x %in% xth_smallest_values)
  if(length(xth_smallest_indices > 1)){
    xth_smallest_indices <- xth_smallest_indices[1]
  }
  output <- data.frame(matrix(ncol = (2*length(x_values))))
  colnames(output) <- c("Nearest Site", "Distance to Site")
  output[1,] <- c(xth_smallest_indices, xth_smallest_values)
  return(output)
  
}


send_error_email <- function(oauth_path, to_email, content, subject = "Fatal Error in CCIP Control Data Workflow") {
  tryCatch({
    if (file.info(oauth_path)$isdir) {
      files <- list.files(oauth_path, full.names = TRUE)
      if (length(files) > 0) {
        file_info <- file.info(files)
        oauth_path <- files[which.max(file_info$mtime)]
      }
    }
    auth <- fromJSON(oauth_path)
    gmail_auth(scope = "compose", id = auth$installled$client_id, secret = auth$installled$client_secret)
    # Format email with the error message
    error_message <- paste("Error in R Script:\n", conditionMessage(content))
    mime <- create_mime(
      To(to_email),
      Subject(subject),
      body = error_message
    )
    
    # Send the email
    send_message(mime)
    cat("Error email sent.\n")
  }, error = function(e) {
    print("Error email failed.\n")
  })
}




# Define Main

In [None]:
# Process GBRMPA control data into a standardised format that researchers have utilised
# historically. Verification functions were written in consultation with domain experts 
# determine whether a row contains an error and is suitable for use in research. 
# The ecological observations are assigned to the nearest site.
main <- function(new_path, configuration_path = NULL, kml_path = NULL, leg_path = NULL) {
## `new_path` = Path to the file containing the new control data. This new file should 
# contain the entire historical dataset exported from GBRMPA EOTR database
## `configuration_path` = Path to JSON configuration file utilised to specify desired 
# setting. This has a specific format and should remain consistent with historically 
# devloped config files. 
## `kml_path` = Path to kml spatial file containing the GBRMPA cull sites. Polygons 
# within this file are the basis for determining the nearest site. 
## `leg_path` = Path to legacy control data. This is a CSV contain data previously 
# processed by this workflow. It is used as a point of comparison and is optional. 

  
  tryCatch({

    ### Initialize Required Variables -------------------------------------------------------------
    # Get the keyword from the new input file used to identify what type of control 
    # data it is. 
    keyword <- get_file_keyword(new_path) 

    # When a configuration file is not specified specifically with a path then the 
    # most recent config file with the keyword is utilsied 
    if (is.null(configuration_path)) {
      configuration_path <- find_recent_file("configuration_files/", paste("research_",keyword, sep=""), "json")
      configuration <- fromJSON(configuration_path)
    }

    # If parameters are not provided the workflow checks the expected location for 
    # the most recent file that meets the requirements. 
    most_recent_kml_path <- find_recent_file(configuration$metadata$input_directory$spatial_data, "Sites", "kml")
    most_recent_leg_path <- find_recent_file(configuration$metadata$output_directory$control_data_unaggregated, configuration$metadata$control_data_type, "csv")

    # The workflow accesses the report and serialised raster data generated by the 
    # previous instance of this workflow. This information can be utilised later to
    # reduce the number of spatial operations.
    most_recent_report_path <- find_recent_file(configuration$metadata$output_directory$reports, configuration$metadata$control_data_type, "json")
    serialised_spatial_path <- find_recent_file(configuration$metadata$output_directory$spatial_data, "site_regions", "rds")
    separate_data <- configuration$metadata$separate_data

    
    previous_kml_path <- NULL
    if(!is.null(most_recent_report_path)){
      previous_report <- fromJSON(most_recent_report_path)
      previous_kml_path <- previous_report$inputs$kml_path
    } 

    # Attempt to use legacy data where possible. 
    # The workflow "is_new" if there is no evidence that the workflow has be run 
    # previously. 
    is_new <- 0
    is_legacy_data_available <- 1
    if (is.null(leg_path)) {
      leg_path <- most_recent_leg_path
      if(is.null(most_recent_leg_path)){
        is_new <- 1
        is_legacy_data_available <- 0
      } else {
        leg_path <- most_recent_leg_path
      }
    } 
    
    if (is.null(kml_path)) {
      kml_path <- most_recent_kml_path
    }

    ### Import Data -------------------------------------------------------------
    # import new and legacy data. 
    # A legacy dataset with no error_flag column indicates it is new becuase R implementation 
    # of this workflow exports an error flag column, whereas the mathmatica implementation 
    # did not
    new_data_df <- rio::import(new_path)
    if(is_legacy_data_available){
      legacy_df <- rio::import(leg_path)
      if("error_flag" %in% colnames(legacy_df)){
        is_new <- 0
      } else {
        is_new <- 1
        legacy_df["error_flag"] <- 0
      }
    }
    
    # Check if the new data has an authoritative ID. All rows of a database export 
    # will have one and no rows from a powerBI export will. There should be no 
    # scenario where only a portion of rows have IDs. Even if an ID is present it 
    # should be ensured that it is authoritative before altering the configuration 
    # files to preference the use of the ID for separation rather than checking 
    # for differences manually. 
    
    has_authorative_ID <-  !any(is.na(new_data_df[[configuration$metadata$ID_col]])) & configuration$metadata$is_ID_preferred
    assign("has_authorative_ID", has_authorative_ID, envir = .GlobalEnv) 


    ### Setup Report -------------------------------------------------------------
    
    # create list to export as json file with metadata about workflow
    metadata_json_output <- list()    
    
    # timestamp workflow 
    metadata_json_output[["timestamp"]] = Sys.time()
    
    # record inputs to workflow
    metadata_json_output[["inputs"]] = list(
      kml_path = kml_path,
      new_path = new_path,
      leg_path = leg_path, 
      configuration_path = configuration_path,
      serialised_spatial_path = serialised_spatial_path
    )
    
    # record inputs to workflow
    metadata_json_output[["decisions"]] = list(
      has_authorative_ID = has_authorative_ID,
      is_legacy_data_available = is_legacy_data_available,
      is_new = is_new
    )
    
    # save metadata json file 
    json_data <- toJSON(metadata_json_output, pretty = TRUE)
    writeLines(json_data, file.path(getwd(), configuration$metadata$output_directory$reports, paste(configuration$metadata$control_data_type, "_Report_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".json", sep = "")))
    

    ### Transform Data -------------------------------------------------------------
    # Transform the new incoming data into the desired structure. Create new 
    # columns that are required but not present in the incoming dataset and 
    # populate them with defaul values that can be constant or derived from 
    # other columns in the dataset.
    transformed_data_df <- map_data_structure(new_data_df, configuration$mappings$transformations, configuration$mappings$new_fields)

    # Set the data type of both datasets to ensure that any operations or 
    # comparisons are executed in the expected manner. Rows that fail to parse 
    # indicate a fundamental flaw in the dataset and should be addressed by the
    # providers of the dataset.
    if(is_legacy_data_available){
      legacy_df <- set_data_type(legacy_df, configuration$mappings$data_type_mappings)
    }
    formatted_data_df <- set_data_type(transformed_data_df, configuration$mappings$data_type_mappings) 

    # This function can help automate generating new config file versions
    # adjusting to changes in the expected input. Currently has minor bugs 
    # and is optional but helpful
    # configuration <- update_config_file(new_data_df, configuration_path)


    ### Verify Data -------------------------------------------------------------
    # verification functions flag whether a row is deemed to have an error. 
    # Although there may be situations where subsets of these rows can still 
    # be utilised it provides a quick way to filter out data not considered 
    # ideal for research purposes. This puts the onus on the researcher to 
    # justify their inclusion of error flagged data.
    verified_data_df <- verify_entries(formatted_data_df, configuration)
    if(is_new && is_legacy_data_available){
      legacy_df <- verify_entries(legacy_df, configuration) 
    }
    
    # flag non-genuine duplicates that are mistakes. There are possible 
    # instances where two perfect matches are not duplicates but rather two 
    # identical valid data points (predominantly seen in cull data). 
    # Typically errorous data will be 3 or more duplications and as a result, 
    # this is the only situation where duplicates are flagged as errors. 
    verified_data_df <- flag_duplicates(verified_data_df)

    ### Assign Sites to Data -------------------------------------------------------------
    # This process determines the site number where manta tow and culls were performed.
    # Although cull sites have the site name included in the dataset, including the 
    # site number makes it easier for researchers to compare the datasets.
    tryCatch({
      if(configuration$metadata$assign_sites){
        if(configuration$metadata$control_data_type == "manta_tow"){

          # Assigns sites to manta tows with method similar to intial mathmatica 
          # implementation.
          verified_data_df <- assign_nearest_site_method_c(verified_data_df, kml_path, configuration$metadata$control_data_type, previous_kml_path, serialised_spatial_path, configuration$metadata$output_directory$spatial_data, raster_size=0.0005, x_closest=1, is_standardised=0, save_spatial_as_raster=0)
        } else {
          verified_data_df$`Nearest Site` <- site_names_to_numbers(verified_data_df$`Site Name`)
        }
        verified_data_df$`Nearest Site` <- ifelse(is.na(verified_data_df$`Nearest Site`), -1, verified_data_df$`Nearest Site`)
      }
    }, error = function(e) {
      print(paste("Error assigning sites:", conditionMessage(e)))
    })


    ### Separate Data -------------------------------------------------------------
    # Datasets are provided as a single large tabular file which contain all control 
    # data from the inception of the COTS control program to present date. Consequently, 
    # a large portion of the rows will have been processed previously and will be seen 
    # in the legacy dataset. This provides the oppirtunity to seperate the new dataset 
    # into sections to perform further error checking. 

    # The three sections that the data from "new_path" can be split into are, new, perfect
    # match and discrepancy. New is data that this workflow has never seen before. perfect 
    # match is data that is present in the legacy data set and has therefore been processed 
    # previously by this workflow. Discrepancy is a row deemed to have been processed
    # previously but has has minor changes since which could be either quality assurance or 
    # a mistake. 

    # Separating the data can provide the oppirtunity to identify and prevent discrepancies  
    # that transition from having no flagged error in the legacy dataset to a flagged error 
    # as this is likely a mistake. This process can be time consuming and is up to the 
    # user to determine if the trade off is beneficial for their applications. 
    tryCatch({
      if(is_legacy_data_available & separate_data){
        verified_data_df <- separate_control_dataframe(verified_data_df, legacy_df, has_authorative_ID)
      }
    }, error = function(e) {
      print(paste("Error seperating control data. All data has been treated as new entries.", conditionMessage(e)))
    })


    ### Save & Aggregate Data -------------------------------------------------------------
    
    # Save unaggrgated workflow with specific naming convension output
    tryCatch({
      if (!dir.exists(configuration$metadata$output_directory$control_data_unaggregated)) {
        dir.create(configuration$metadata$output_directory$control_data_unaggregated, recursive = TRUE)
      }
      
      output_directory <- configuration$metadata$output_directory$control_data_unaggregated
      data_type <- configuration$metadata$control_data_type
      timestamp <- format(Sys.time(), "%Y%m%d_%H%M%S")
      file_name <- paste(data_type, "_", timestamp, ".csv", sep = "")
      output_path <- file.path(output_directory, file_name)
      write.csv(verified_data_df, output_path, row.names = FALSE)
      
    }, error = function(e) {
      print(paste("Error saving data - Data saved in source directory", conditionMessage(e)))
      write.csv(verified_data_df, paste(configuration$metadata$control_data_type,"_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".csv", sep = ""), row.names = FALSE)
  
    })

    # Aggregate the cull an manta tow data 
    if(configuration$metadata$control_data_type == "manta_tow"){
      verified_aggregated_df <- aggregate_manta_tows_site_resolution_research(verified_data_df)  
    } else if (configuration$metadata$control_data_type == "cull") {
      verified_aggregated_df <- aggregate_culls_site_resolution_research(verified_data_df) 
    }

    # Save aggregated data
    tryCatch({
      if (!dir.exists(configuration$metadata$output_directory$control_data_aggregated)) {
        dir.create(configuration$metadata$output_directory$control_data_aggregated, recursive = TRUE)
      }
      
      output_directory <- configuration$metadata$output_directory$control_data_aggregated
      data_type <- configuration$metadata$control_data_type
      timestamp <- format(Sys.time(), "%Y%m%d_%H%M%S")
      file_name <- paste(data_type, "_", timestamp, ".csv", sep = "")
      output_path <- file.path(output_directory, file_name)
      write.csv(verified_aggregated_df, output_path, row.names = FALSE)
      
    }, error = function(e) {
      print(paste("Error saving data - Data saved in source directory", conditionMessage(e)))
      write.csv(verified_aggregated_df, paste(configuration$metadata$control_data_type,"_", format(Sys.time(), "%Y%m%d_%H%M%S"), ".csv", sep = ""), row.names = FALSE)
      
    })
    
    # Reset
    new_path <- NULL
    configuration_path <- NULL
    kml_path <- NULL
    leg_path <- NULL
    
  }, error = function(e) {
    print(paste("Critical Error in workflow could not be resolved:", conditionMessage(e)))
  })
}


### Execute Workflow

In [None]:

configuration_path <- NULL
kml_path <- NULL
leg_path <- NULL

main(new_path, configuration_path = NULL, kml_path = NULL, leg_path = NULL)