# Flow Generator
Similar to the Image Generator notebook, we'll use this notebook to streamline the process of generating NetCDF files that contain the two variables **flow_u** and **flow_v**.

## Importing necessary libraries and notebooks

In [1]:
import xarray as xr
import io
import os
import cv2
import imageio
import matplotlib.pyplot as plt
import matplotlib.colors as colors
import netCDF4 as nc
import numpy as np
import cartopy.crs as ccrs
import cartopy.feature as cfeature
from datetime import datetime, timedelta
from matplotlib import ticker
from matplotlib.lines import Line2D
from matplotlib.patches import FancyArrowPatch
from matplotlib.collections import LineCollection
from IPython.display import Image, display, clear_output
from PIL import Image as PILImage
from concurrent.futures import ProcessPoolExecutor

# Import the other notebooks without running their cells
from ii_Data_Manipulation import visualize_4
from iii_GOES_average import time_list, visualize_aggregate, calculate_median
from iv_Image_Processing import collect_times, crop_image, save_aggregate, binarize_image, bilateral_image, process_dates, process_directory
from vii_Flow_Analysis import haversine
from vii_Flow_NetCDF import *
from v_i_OF_Functions import *

## DeepFlow 

### *process_file_pair*
Function to parallelize the process of calculating the flow over the different pairs of images.

**Crashes when run in parallel (no longer when limiting the max_workers).**

In [None]:
def process_file_pair(file_paths, destination_directory):
    first_path, second_path = file_paths
    # Calculate flow vectors for different variables
    flow_vectors = calculate_deepflow(first_path, second_path, variable_key="fai_anomaly")
    flow_vectors_f = calculate_deepflow(first_path, second_path, variable_key="filtered")

    # Define the output filename and path
    first_file = os.path.basename(first_path)
    output_filename = 'DeepFlow_' + first_file
    output_path = os.path.join(destination_directory, output_filename)

    # Save flow data
    save_flow(first_path, output_path, lat_range=None, lon_range=None, flow_vectors=flow_vectors, flow_vectors_m=None, flow_vectors_f=flow_vectors_f, mask_data=False)
    print(f"Processed and saved flow data for {first_file} to {output_filename}")
    return output_filename  # Return something to signify completion

In [None]:
if __name__ == '__main__':
    # Paths
    source_directory = '/media/yahia/ballena/ABI/NetCDF/Atlantic/Filtered'
    destination_directory = '/media/yahia/ballena/Flow/DeepFlow'

    # Ensure the destination directory exists
    os.makedirs(destination_directory, exist_ok=True)

    # Get all .nc files sorted to ensure chronological order
    files = sorted([f for f in os.listdir(source_directory) if f.endswith('.nc')])

    # Prepare pairs of files for processing
    file_pairs = [(os.path.join(source_directory, files[i]), os.path.join(source_directory, files[i+1])) for i in range(len(files) - 1)]

    # Adjust the number of max_workers based on your system capabilities
    max_workers = os.cpu_count() // 2  # For example, use half of your CPU cores

    # Use ProcessPoolExecutor to execute multiple processes
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Map the helper function to each pair of files
        futures = [executor.submit(process_file_pair, pair, destination_directory) for pair in file_pairs]

        # Optionally handle results as they complete
        for future in futures:
            try:
                print(future.result())  # Accessing result() will wait for the process to complete
            except Exception as e:
                print(f"Error processing file: {e}")

In [None]:
# Sequential
if __name__ == '__main__':
    # Paths
    source_directory = '/media/yahia/ballena/ABI/NetCDF/Atlantic/Filtered' 
    destination_directory = '/media/yahia/ballena/Flow/DeepFlow' 

    # Ensure the destination directory exists
    os.makedirs(destination_directory, exist_ok=True)

    # Get all .nc files sorted to ensure chronological order
    files = sorted([f for f in os.listdir(source_directory) if f.endswith('.nc')])

    # Process each pair of consecutive files
    for i in range(len(files) - 1):
        first_file = files[i]
        second_file = files[i + 1]

        # Define full paths for both files
        first_path = os.path.join(source_directory, first_file)
        second_path = os.path.join(source_directory, second_file)

        # Calculate flow vectors
        flow_vectors = calculate_deepflow(first_path, second_path, variable_key="fai_anomaly")
        # flow_vectors_m = calculate_deepflow(first_path, second_path, variable_key="masked_land")
        flow_vectors_f = calculate_deepflow(first_path, second_path, variable_key="filtered")

        # Define the output path for the processed NetCDF file
        output_filename = 'DeepFlow_' + first_file
        output_path = os.path.join(destination_directory, output_filename)

        # Save flow data
        save_flow(first_path, output_path, lat_range=None, lon_range=None, flow_vectors=flow_vectors, flow_vectors_m=None, flow_vectors_f=flow_vectors_f,  mask_data=False) 
        print(f"Processed and saved flow data for {first_file} to {output_filename}")

Sequential (1 month, flow+filtered_flow): 1 hour

## DeepFlow (Masked)

### *mask_flow*
This takes in the images for which flow is already calculated and sets flow to 0 in pixels where there is no algae detection.

In [None]:
def mask_flow(data, output_path):
    mask = data['fai_anomaly'] != 0
    for var in data.data_vars:
        # Check if there are filtered flow variables
        if ('flow_u_f' in var) or ('flow_v_f' in var):
            mask_filtered = data['filtered'] != 0
            data[var] = xr.where(mask_filtered, data[var], 0)
        if 'flow' in var:
            data[var] = xr.where(mask, data[var], 0)
    # Save the modified dataset to a new NetCDF file
    data.to_netcdf(output_path)

In [None]:
# Masking
if __name__ == '__main__':
    # Source directory with DeepFlow data
    source_directory = '/media/yahia/ballena/Flow/DeepFlow'
    destination_directory = '/media/yahia/ballena/Flow/DeepFlow_Masked'

    # Ensure the destination directory exists
    os.makedirs(destination_directory, exist_ok=True)

    # Process each .nc file in the source directory
    for filename in os.listdir(source_directory):
        if filename.endswith('.nc'):
            file_path = os.path.join(source_directory, filename)
            output_path = os.path.join(destination_directory, 'Masked_' + filename)

            # Load the dataset
            data = xr.open_dataset(file_path)

            # Mask the flow data and save the modified file
            mask_flow(data, output_path)

        print(f"Masked flow data for {filename} to {output_path}")

## Farneback
This is much faster than DeepFlow.

### *process_file_pair_farneback*

In [None]:
def process_file_pair_farneback(file_paths, destination_directory):
    first_path, second_path = file_paths
    # Calculate flow vectors for different variables
    flow_vectors = calculate_farneback(first_path, second_path, variable_key="fai_anomaly")
    flow_vectors_f = calculate_farneback(first_path, second_path, variable_key="filtered")

    # Define the output filename and path
    first_file = os.path.basename(first_path)
    output_filename = 'Farneback_' + first_file
    output_path = os.path.join(destination_directory, output_filename)

    # Save flow data
    save_flow(first_path, output_path, lat_range=None, lon_range=None, flow_vectors=flow_vectors, flow_vectors_m=None, flow_vectors_f=flow_vectors_f, mask_data=False)
    print(f"Processed and saved flow data for {first_file} to {output_filename}")
    return output_filename  # Return something to signify completion

In [None]:
if __name__ == '__main__':
    # Paths
    source_directory = '/media/yahia/ballena/ABI/NetCDF/Atlantic/Filtered'
    destination_directory = '/media/yahia/ballena/Flow/Farneback'

    # Ensure the destination directory exists
    os.makedirs(destination_directory, exist_ok=True)

    # Get all .nc files sorted to ensure chronological order
    files = sorted([f for f in os.listdir(source_directory) if f.endswith('.nc')])

    # Prepare pairs of files for processing
    file_pairs = [(os.path.join(source_directory, files[i]), os.path.join(source_directory, files[i+1])) for i in range(len(files) - 1)]

    # Adjust the number of max_workers based on your system capabilities
    max_workers = os.cpu_count() // 2  # For example, use half of your CPU cores

    # Use ProcessPoolExecutor to execute multiple processes
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Map the helper function to each pair of files
        futures = [executor.submit(process_file_pair_farneback, pair, destination_directory) for pair in file_pairs]

        # Optionally handle results as they complete
        for future in futures:
            try:
                print(future.result())  # Accessing result() will wait for the process to complete
            except Exception as e:
                print(f"Error processing file: {e}")

## Farneback (Masked)

In [None]:
# Masking
if __name__ == '__main__':
    # Source directory with DeepFlow data
    source_directory = '/media/yahia/ballena/Flow/Farneback'
    destination_directory = '/media/yahia/ballena/Flow/Farneback_Masked'

    # Ensure the destination directory exists
    os.makedirs(destination_directory, exist_ok=True)

    # Process each .nc file in the source directory
    for filename in os.listdir(source_directory):
        if filename.endswith('.nc'):
            file_path = os.path.join(source_directory, filename)
            output_path = os.path.join(destination_directory, 'Masked_' + filename)

            # Load the dataset
            data = xr.open_dataset(file_path)

            # Mask the flow data and save the modified file
            mask_flow(data, output_path)

        print(f"Masked flow data for {filename} to {output_path}")

## Time Series

### *combine_netcdfs*
For now, this always crashes the notebook.

In [4]:
def combine_netcdfs(source_directory, output_path, time_variable_name='time', batch_size=10):
    """
    Combines multiple NetCDF files into one, adding a time dimension, in batches to manage memory usage.
    """
    # List all NetCDF files in the source directory
    nc_files = sorted([os.path.join(source_directory, f) for f in os.listdir(source_directory) if f.endswith('.nc')])

    # Prepare an empty list to collect batch results
    combined_batches = []

    # Process in batches
    for i in range(0, len(nc_files), batch_size):
        batch_files = nc_files[i:i+batch_size]
        datasets = [xr.open_dataset(f, chunks={}) for f in batch_files]  # Ensure that files are read in chunks
        combined_batch = xr.concat(datasets, dim=time_variable_name)
        combined_batches.append(combined_batch)
        # Close datasets to free up memory
        for ds in datasets:
            ds.close()

    # Combine all batches
    final_combined = xr.concat(combined_batches, dim=time_variable_name)
    final_combined[time_variable_name] = range(len(nc_files))

    # Save the combined dataset
    final_combined.to_netcdf(output_path)
    print(f"Combined NetCDF saved to {output_path}")

In [None]:
if __name__ == '__main__':
    source_dir = '/media/yahia/ballena/Flow/DeepFlow_Masked'
    output_file_path = '/media/yahia/ballena/Flow/DeepFlow_Time.nc'
    combine_netcdfs(source_dir, output_file_path, batch_size=5)

    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]
  final_combined.to_netcdf(output_path)


### *calculate_and_concatenate_flows*
Maybe this won't crash the notebook.

In [4]:
def calculate_and_concatenate_flows(source_directory, output_path):
    xr.set_options(enable_cftimeindex=True)  # Ensure cf-time compatibility if needed

    # Get all .nc files sorted to ensure chronological order
    files = sorted([os.path.join(source_directory, f) for f in os.listdir(source_directory) if f.endswith('.nc')])

    # Prepare to store datasets
    datasets = []
    
    # Process each pair of files
    for i in range(len(files) - 1):
        first_path = files[i]
        second_path = files[i+1]
        
        # Calculate flow vectors
        flow_u, flow_v = calculate_deepflow(first_path, second_path, variable_key="fai_anomaly")
        
        # Load the first file as base for coordinates and other metadata, use Dask for lazy loading
        ds = xr.open_dataset(first_path, chunks={"latitude": "auto", "longitude": "auto"})
        ds['flow_u'] = (('latitude', 'longitude'), flow_u)
        ds['flow_v'] = (('latitude', 'longitude'), flow_v)
        
        # Extract date from filename
        try:
            date_str = os.path.basename(first_path).split('_')[3].split('.')[0]
            date = datetime.strptime(date_str, '%Y%m%d')
            ds = ds.assign_coords(time=date)
            ds = ds.expand_dims('time')
        except Exception as e:
            print(f"Failed to process {first_path}: {e}")
            continue

        datasets.append(ds)
    
    # Concatenate all datasets along time dimension
    if datasets:
        combined_dataset = xr.concat(datasets, dim='time')
        combined_dataset.to_netcdf(output_path, mode='w')
        print(f"Combined NetCDF with flow data saved to {output_path}")
    else:
        print("No datasets to combine.")

In [None]:
if __name__ == '__main__':
    source_directory = '/media/yahia/ballena/ABI/NetCDF/Atlantic/Filtered'
    output_path = '/media/yahia/ballena/Flow/Combined_Flow.nc'   
    calculate_and_concatenate_flows(source_directory, output_path)



### *calculate_flow_for_two_days*
Still crashing so let's try for just two days.

This works, let's try with more days.

In [4]:
def calculate_flow_for_two_days(first_path, second_path, output_path):
    # Load the datasets with chunking to manage memory
    ds1 = xr.open_dataset(first_path, chunks={"latitude": "auto", "longitude": "auto"})
    ds2 = xr.open_dataset(second_path, chunks={"latitude": "auto", "longitude": "auto"})

    # Calculate flow vectors assuming a function that does this exists
    flow_u, flow_v = calculate_deepflow(first_path, second_path, variable_key="fai_anomaly")

    # Prepare the results with proper dimensions and coordinates
    ds_combined = xr.Dataset({
        'flow_u': (['time', 'latitude', 'longitude'], [flow_u, flow_u]),  # Using the same data as a placeholder for demonstration
        'flow_v': (['time', 'latitude', 'longitude'], [flow_v, flow_v]),
    }, coords={
        'latitude': ds1.latitude,
        'longitude': ds1.longitude
    })

    # Parse dates and assign them as a time coordinate
    try:
        date_str1 = os.path.basename(first_path).split('_')[3].split('.')[0]
        date_str2 = os.path.basename(second_path).split('_')[3].split('.')[0]
        date1 = datetime.strptime(date_str1, '%Y%m%d')
        date2 = datetime.strptime(date_str2, '%Y%m%d')
        ds_combined = ds_combined.assign_coords(time=[date1, date2])
    except Exception as e:
        print(f"Failed to parse dates from filenames: {e}")

    # Save the dataset
    ds_combined.to_netcdf(output_path)
    print(f"Flow data for two days saved to {output_path}")

In [5]:
if __name__ == '__main__':
    first_path = '/media/yahia/ballena/ABI/NetCDF/Atlantic/Filtered/Filtered_algae_distribution_20220701.nc'
    second_path = '/media/yahia/ballena/ABI/NetCDF/Atlantic/Filtered/Filtered_algae_distribution_20220702.nc'
    output_path = '/media/yahia/ballena/Flow/Two_Days_Flow.nc'
    calculate_flow_for_two_days(first_path, second_path, output_path)

Flow data for two days saved to /media/yahia/ballena/Flow/Two_Days_Flow.nc


### *calculate_flow_for_n_days*

In [6]:
def calculate_flow_for_n_days(source_directory, output_path, days=10):
    """
    Calculates and concatenates flow data for a specified number of days.

    Parameters:
    - source_directory (str): Directory containing NetCDF files to process.
    - output_path (str): Path to save the combined NetCDF file.
    - days (int): Number of days to process. This translates to (days-1) pairs of files.
    """
    # Get all .nc files sorted to ensure chronological order
    files = sorted([os.path.join(source_directory, f) for f in os.listdir(source_directory) if f.endswith('.nc')])
    
    # Limit to the specified number of days, ensuring there are pairs to process
    files = files[:days] if len(files) >= days else files

    # Prepare to store datasets
    datasets = []
    
    # Process each pair of files
    for i in range(len(files) - 1):
        first_path = files[i]
        second_path = files[i+1]
        
        # Calculate flow vectors
        flow_u, flow_v = calculate_deepflow(first_path, second_path, variable_key="fai_anomaly")
        
        # Load the first file as base for coordinates and other metadata
        ds = xr.open_dataset(first_path, chunks={"latitude": "auto", "longitude": "auto"})
        ds['flow_u'] = (('latitude', 'longitude'), flow_u)
        ds['flow_v'] = (('latitude', 'longitude'), flow_v)
        
        # Extract date from filename and handle possible errors
        try:
            date_str = os.path.basename(first_path).split('_')[3].split('.')[0]
            date = datetime.strptime(date_str, '%Y%m%d')
            ds = ds.assign_coords(time=date)
            ds = ds.expand_dims('time')
        except Exception as e:
            print(f"Failed to parse dates from filenames {first_path}: {e}")
            continue

        datasets.append(ds)
    
    # Concatenate all datasets along time dimension
    if datasets:
        combined_dataset = xr.concat(datasets, dim='time')
        combined_dataset.to_netcdf(output_path)
        print(f"Flow data for {days-1} days saved to {output_path}")
    else:
        print("No datasets to combine or an error occurred.")



In [9]:
if __name__ == '__main__':
    source_directory = '/media/yahia/ballena/ABI/NetCDF/Atlantic/Filtered'
    output_path = '/media/yahia/ballena/Flow/Custom_Days_Flow.nc'
    days = 3
    calculate_flow_for_n_days(source_directory, output_path, days=days)

Flow data for 2 days saved to /media/yahia/ballena/Flow/Custom_Days_Flow.nc
