# Different methods to download data from CASA, find information about the data.

In [None]:
import os
import pysftp
import xarray as xr
import pandas as pd
import gzip
import shutil
from netCDF4 import Dataset

In [None]:
# Example usage
ssh_host = ""
ssh_username = ""
ssh_pk = ""
remote_directory = "/mnt/casa-ssd-pool/casa/qpe/"

In [None]:
day = "20180908"
split = "validation"  #'train', 'validation', 'test'
local_directory_gz = f"/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData/{split}_gz/"
local_directory = f"/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData/{split}/"

os.makedirs(local_directory, exist_ok=True)
os.makedirs(local_directory_gz, exist_ok=True)

In [None]:
# import library

import os
import cv2
import numpy as np
from netCDF4 import Dataset


def move_and_resize(local_directory, new_directory):
    """
    Iterate through the each split folder, read iteratively netCDF files,
    resize from 366,350 to 256,256, and save in the new directory under the particular day,
    with the same name and .nc extension.

    Args:
    local_directory: str
        The directory of a particular split.
    new_directory: str
        The split directory where the resized netCDF files are saved.
    """

    day_folder = os.listdir(local_directory)

    os.makedirs(new_directory, exist_ok=True)

    for day in day_folder:
        files = os.listdir(os.path.join(local_directory, day))
        day_save_dir = os.path.join(new_directory, day)
        os.makedirs(day_save_dir, exist_ok=True)

        for filename in files:
            file_path = os.path.join(local_directory, day, filename)
            resized_rr_data = resize(file_path)
            save_nc(filename, day_save_dir, resized_rr_data)


def resize(file_path):
    with Dataset(file_path, "r") as nc_data:
        # Extract the frame and remove the leading dimension
        rr_data = nc_data.variables["RRdata"][:]  # 366, 350, 1
        x_data = nc_data.variables["x0"][:]
        y_data = nc_data.variables["y0"][:]

    filename = os.path.basename(file_path)
    no_ext = os.path.splitext(filename)[0]

    # Trim 8 pixels from top and bottom, then resize to 256x256
    trimmed_rr_data = rr_data[:, 8:-8, :]  # (1, 350, 350)
    trimmed_y_data = y_data[8:-8]  # (350,)

    # Resize to 256, 256
    resized_rr_data = cv2.resize(trimmed_rr_data[0], (256, 256), interpolation=cv2.INTER_LINEAR)

    # Add channel dimension – 256,256 to 1, 256, 256
    resized_rr_data = resized_rr_data[np.newaxis, :, :]

    return resized_rr_data


def save_nc(filename, save_dir, resized_rr_data):
    save_path = os.path.join(save_dir, filename)

    with Dataset(save_path, "w", format="NETCDF4") as nc_data:
        # Create dimensions based on the resized data. Assuming resized_rr_data is (1, 256, 256)
        nc_data.createDimension("x0", 256)  # Adjusted to resized shape
        nc_data.createDimension("y0", 256)  # Adjusted to resized shape
        nc_data.createDimension("z0", 1)  # Keeping the original 'time' dimension as 'z0'

        # Create variables with dimensions. The dimensions are named to match the original file's structure.
        x = nc_data.createVariable("x0", "f4", ("x0",))
        y = nc_data.createVariable("y0", "f4", ("y0",))
        z = nc_data.createVariable("z0", "f4", ("z0",))
        rr = nc_data.createVariable("RRdata", "f4", ("z0", "y0", "x0"))

        x[:] = np.linspace(start=0, stop=1, num=256)  # Example placeholder values
        y[:] = np.linspace(start=0, stop=1, num=256)  # Example placeholder values
        z[:] = np.array([0])  # Keeping a placeholder if you don't have a specific value for 'z0'
        rr[:] = resized_rr_data


# train split
train_local_directory = f"/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData/train/"
train_new_directory = f"/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData_processed/train/"
move_and_resize(train_local_directory, train_new_directory)

# validation split
validation_local_directory = f"/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData/validation/"
validation_new_directory = f"/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData_processed/validation/"
move_and_resize(validation_local_directory, validation_new_directory)

# test split
test_local_directory = f"/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData/test/"
test_new_directory = f"/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData_processed/test/"
move_and_resize(test_local_directory, test_new_directory)

In [None]:
def find_info(ssh_host, ssh_username, remote_dir):
    """
    Extract days for splits and total number of frames and batches
    """
    cnopts = pysftp.CnOpts()
    cnopts.hostkeys = None

    with pysftp.Connection(ssh_host, username=ssh_username, private_key=ssh_pk, cnopts=cnopts) as sftp:
        sftp.chdir(remote_dir)
        day_folders = sftp.listdir()
        day_folders.sort()

        total_number_of_frames_downloaded = 0
        offset = 0

        splits = {"train": [False, ""], "validation": [False, ""], "test": [False, ""]}

        # I need to understand how many batches (if we take every 5th frame and there are 22 frames in each batch) we have in total
        # I need to know on what day we cross 24000 mark, and then when we cross 29000 mark, and 34000 mark. Remeber each of those marks because we will need to split the data into train, validation, and test sets

        for day in day_folders:
            day_path = os.path.join(remote_dir, day)

            files = [f for f in sftp.listdir(day_path) if f.endswith(".gz")]
            number_of_frames = len(files)
            frames_to_download_indices = [
                (x * 5 + offset) % number_of_frames for x in range((number_of_frames + offset) // 5)
            ]

            total_number_of_frames_downloaded += len(frames_to_download_indices)
            total_number_batches = total_number_of_frames_downloaded // 22

            if total_number_batches >= 24000 and not splits["train"][0]:
                splits["train"] = [True, day]
            if total_number_batches >= 29000 and not splits["validation"][0]:
                splits["validation"] = [True, day]
            if total_number_batches >= 34000 and not splits["test"][0]:
                splits["test"] = [True, day]

            # Update the offset for the next day
            offset = (offset + number_of_frames) % 5

        # Get the Info
        for split, ar in splits.items():
            print(f"Split: {split}, End Day: {ar[1]}")

        print(f"Total number of frames: {total_number_of_frames_downloaded}")
        print(f"Total number of batches: {total_number_batches}")


find_info(ssh_host, ssh_username, remote_directory)

In [None]:
def download_split(ssh_host, ssh_username, remote_dir, local_dir_gz, local_dir, start_day, end_day):
    """Download the split data based on a range from the remote directory to the local directory"""

    cnopts = pysftp.CnOpts()
    cnopts.hostkeys = None
    total_number_of_frames_downloaded = 0
    offset = 0  # Variable to manage the offset for downloading every 5th frame across days

    with pysftp.Connection(ssh_host, username=ssh_username, private_key=ssh_pk, cnopts=cnopts) as sftp:
        sftp.chdir(remote_dir)
        day_folders = sftp.listdir()
        day_folders.sort()

        if end_day:
            day_folders = [day for day in day_folders if start_day <= day < end_day]

        else:
            day_folders = [day for day in day_folders if start_day <= day]

        for day_folder in day_folders:
            files = [f for f in sftp.listdir(os.path.join(remote_dir, day_folder)) if f.endswith(".gz")]
            number_of_frames = len(files)
            # Download every 5th frame with an offset
            frames_to_download_indices = [
                (x * 5 + offset) % number_of_frames for x in range((number_of_frames + offset) // 5)
            ]

            print(f"Downloading: {day_folder}\nNumber of frames: {len(frames_to_download_indices)}")
            remote_path_day = os.path.join(remote_dir, day_folder)
            local_path_gz_day = os.path.join(local_dir_gz, day_folder)
            local_path_day = os.path.join(local_dir, day_folder)

            if sftp.isdir(remote_path_day):
                os.makedirs(local_path_gz_day, exist_ok=True)
                os.makedirs(local_path_day, exist_ok=True)

                for index in frames_to_download_indices:
                    file_name = files[index]
                    remote_path_file = os.path.join(remote_path_day, file_name)
                    local_path_gz_file = os.path.join(local_path_gz_day, file_name)
                    local_path_file = os.path.join(local_path_day, os.path.splitext(file_name)[0])
                    sftp.get(remote_path_file, local_path_gz_file)

                    with gzip.open(local_path_gz_file, "rb") as f_in, open(local_path_file, "wb") as f_out:
                        shutil.copyfileobj(f_in, f_out)
                    os.remove(local_path_gz_file)

            else:
                raise ValueError("The remote path is not a directory")

            total_number_of_frames_downloaded += len(frames_to_download_indices)
            # Update the offset for the next day
            offset = (offset + number_of_frames) % 5

    print(f"Total number of frames downloaded: {total_number_of_frames_downloaded}")

In [None]:
import os
import shutil


def download_and_extract_split(
    ssh_host, ssh_username, remote_directory, local_directory_base, split, start_day="", end_day=""
):
    local_directory_gz = os.path.join(local_directory_base, f"{split}_gz/")
    local_directory = os.path.join(local_directory_base, split)

    os.makedirs(local_directory, exist_ok=True)
    os.makedirs(local_directory_gz, exist_ok=True)

    download_split(ssh_host, ssh_username, remote_directory, local_directory_gz, local_directory, start_day, end_day)

    shutil.rmtree(local_directory_gz)


# Training split
download_and_extract_split(
    ssh_host,
    ssh_username,
    remote_directory,
    "/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData/",
    "train",
    "20160301",
    "20210913",
)

# Validation split
download_and_extract_split(
    ssh_host,
    ssh_username,
    remote_directory,
    "/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData/",
    "validation",
    "20210913",
    "20221221",
)

# Test split
download_and_extract_split(
    ssh_host,
    ssh_username,
    remote_directory,
    "/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData/",
    "test",
    "20221221",
    "",
)

In [None]:
def download_data(
    ssh_host, ssh_username, remote_dir, local_dir_gz, local_dir, start_day="20180908", all=False, number_of_days=2
):
    cnopts = pysftp.CnOpts()
    cnopts.hostkeys = None
    total_number_of_frames_downloaded = 0
    offset = 0  # Variable to manage the offset for downloading every 5th frame across days

    with pysftp.Connection(ssh_host, username=ssh_username, private_key=ssh_pk, cnopts=cnopts) as sftp:
        sftp.chdir(remote_dir)
        day_folders = sftp.listdir()
        day_folders.sort()
        day_folders = [day for day in day_folders if day >= start_day]

        # Until today
        if all:
            number_of_days = len(day_folders)

        for i, day_folder in enumerate(day_folders):
            if i < number_of_days:
                files = [f for f in sftp.listdir(os.path.join(remote_dir, day_folder)) if f.endswith(".gz")]
                number_of_frames = len(files)
                # Download every 5th frame with an offset
                frames_to_download_indices = [
                    (x * 5 + offset) % number_of_frames for x in range((number_of_frames + offset) // 5)
                ]

                print(f"Downloading: {day_folder}\nNumber of frames: {len(frames_to_download_indices)}")
                remote_path_day = os.path.join(remote_dir, day_folder)
                local_path_gz_day = os.path.join(local_dir_gz, day_folder)
                local_path_day = os.path.join(local_dir, day_folder)

                if sftp.isdir(remote_path_day):
                    os.makedirs(local_path_gz_day, exist_ok=True)
                    os.makedirs(local_path_day, exist_ok=True)

                    for index in frames_to_download_indices:
                        file_name = files[index]
                        remote_path_file = os.path.join(remote_path_day, file_name)
                        local_path_gz_file = os.path.join(local_path_gz_day, file_name)
                        local_path_file = os.path.join(local_path_day, os.path.splitext(file_name)[0])
                        sftp.get(remote_path_file, local_path_gz_file)

                        with gzip.open(local_path_gz_file, "rb") as f_in, open(local_path_file, "wb") as f_out:
                            shutil.copyfileobj(f_in, f_out)
                        os.remove(local_path_gz_file)

                else:
                    raise ValueError("The remote path is not a directory")

                total_number_of_frames_downloaded += len(frames_to_download_indices)
                # Update the offset for the next day
                offset = (offset + number_of_frames) % 5
            else:
                break

    print(f"Total number of frames downloaded: {total_number_of_frames_downloaded}")

In [None]:
download_data(
    ssh_host, ssh_username, remote_directory, local_directory_gz, local_directory, start_day="20230101", all=True
)
shutil.rmtree(local_directory_gz)

In [None]:
# download a particular file

import os
import numpy as np
from netCDF4 import Dataset
import gzip
import shutil
import pysftp


cnopts = pysftp.CnOpts()
cnopts.hostkeys = None

with pysftp.Connection(ssh_host, username=ssh_username, private_key=ssh_pk, cnopts=cnopts) as sftp:
    sftp.chdir(remote_directory)
    file_path = "./20160301/20160301_053157.nc.gz"

    local_dir = "/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData/verify/20160301"

    os.makedirs(local_dir, exist_ok=True)

    local_path_gz_file = os.path.join(local_dir, os.path.basename(file_path))
    local_path_file = os.path.splitext(local_path_gz_file)[0]

    sftp.get(file_path, local_path_gz_file)

    with gzip.open(local_path_gz_file, "rb") as f_in, open(local_path_file, "wb") as f_out:
        shutil.copyfileobj(f_in, f_out)
        os.remove(local_path_gz_file)


with Dataset(
    local_path_file,
    "r",
) as nc_data:
    # Set auto mask to False
    nc_data.set_auto_mask(False)
    # Extract the frame and remove the leading dimension
    rr_data = nc_data.variables["RRdata"][:]  # 366, 350, 1
    x_data = nc_data.variables["x0"][:]
    y_data = nc_data.variables["y0"][:]


if isinstance(rr_data, np.ma.MaskedArray):
    print("The data is masked")

In [None]:
# check what is the first and last day in the remote directory
def get_first_last_day(ssh_host, ssh_username, remote_dir):
    cnopts = pysftp.CnOpts()
    cnopts.hostkeys = None
    with pysftp.Connection(ssh_host, username=ssh_username, private_key=ssh_pk, cnopts=cnopts) as sftp:
        sftp.chdir(remote_dir)
        day_folders = sftp.listdir()
        day_folders.sort()
        return day_folders[0], day_folders[-1]


first_day, last_day = get_first_last_day(ssh_host, ssh_username, remote_directory)
print(first_day, last_day)

In [None]:
# To approximate the number of frames and sequences for the entire dataset for training

cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
stop_day = "20230101"

with pysftp.Connection(
    Config.CASA_SSH_HOST, username=Config.CASA_SSH_USERNAME, private_key=Config.CASA_PRIVATE_KEY, cnopts=cnopts
) as sftp:
    sftp.chdir(Config.REMOTE_DIR)

    all_days = sftp.listdir()
    all_days = [day for day in all_days if day <= stop_day]

    all_files = []

    for day in all_days:
        sftp.chdir(day)
        all_files.extend(sftp.listdir())
        sftp.chdir("..")

    number_of_frames = len(all_files) // 5
    number_of_sequences = number_of_frames // TOTAL_FRAMES

print(f"Number of frames: {number_of_frames}")
print(f"Number of sequences: {number_of_sequences}")

In [None]:
# TO DELETE THE TRAIN AND VALIDATION FOLDERS

import shutil
import traceback


def remove(path):
    try:
        shutil.rmtree(path)
    except:
        traceback.print_exc()
        remove(path)


remove("/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData/validation")

In [None]:
# Check if the processed data contains Nans after disabling the set_auto_mask to false for the netCDF4
from netCDF4 import Dataset
import numpy as np

with Dataset(
    "/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData_processed/train/20160301/20160301_053157.nc", "r"
) as nc_data:
    nc_data.set_auto_mask(False)
    # Extract the frame and remove the leading dimension
    rr_data = nc_data.variables["RRdata"][:]  # 366, 350, 1
    x_data = nc_data.variables["x0"][:]
    y_data = nc_data.variables["y0"][:]


# check if rr_data is MaskedArray
if isinstance(rr_data, np.ma.MaskedArray):
    print("Masked Array")

# check if there are any nans
if np.isnan(rr_data).any():
    print("Nans")


# Repeat for not processed data
with Dataset(
    "/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/netCDFData/train/20160301/20160301_053157.nc", "r"
) as nc_data:
    nc_data.set_auto_mask(False)
    # Extract the frame and remove the leading dimension
    rr_data = nc_data.variables["RRdata"][:]  # 366, 350, 1
    x_data = nc_data.variables["x0"][:]
    y_data = nc_data.variables["y0"][:]

# check if rr_data is MaskedArray
if isinstance(rr_data, np.ma.MaskedArray):
    print("Masked Array")

# check if there are any nans
if np.isnan(rr_data).any():
    print("Nans")

In [None]:
import pandas as pd

# Load CSV file
file_path = "/work/pi_mzink_umass_edu/SPRITE/skillful_nowcasting/output/sorted_max_values.csv"
df = pd.read_csv(file_path)

# Convert 'max_value1' to numeric, coercing errors to NaN
df["max_value1"] = pd.to_numeric(df["max_value1"], errors="coerce")

# Define ranges, including open-ended ranges above 300
ranges = [
    (128, float("inf")),
    (135, 140),
    (140, 150),
    (150, 160),
    (160, 170),
    (170, 180),
    (180, 190),
    (190, 200),
    (200, 210),
    (210, 220),
    (220, 230),
    (230, 240),
    (240, 250),
    (250, 260),
    (260, 270),
    (270, 280),
    (280, 290),
    (290, 300),
    (300, float("inf")),
]

range_counts = {}

for low, high in ranges:
    if high == float("inf"):
        filtered_df = df[df["max_value1"] > low]
        range_name = f"above {low}"
    else:
        filtered_df = df[(df["max_value1"] > low) & (df["max_value1"] < high)]
        range_name = f"between {low} and {high}"

    range_counts[range_name] = len(filtered_df)

# Get the total number of rows in the CSV file
number_of_rows = len(df)

# Print the results
for range_name, count in range_counts.items():
    print(f"Number of rows {range_name}: {count}")

# The maximum value in the 'max_value1' column
max_value = df["max_value1"].max()
print(f"Maximum value: {max_value}")

print(f"Total number of rows: {number_of_rows}")