In [1]:
import sys
sys.path.append("../")

from src.cmesrc.config import CMESRC_DB
import sqlite3

import zarr
import dask.array as da
import numcodecs
# Import delayed
from dask import delayed

from tqdm import tqdm
import os

import s3fs
from IPython.display import display

In [2]:
import os
from typing import Union

import s3fs
import zarr

AWS_ZARR_ROOT = (
    "s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr"
)


def s3_connection(path_to_zarr: os.path) -> s3fs.S3Map:
    """
    Instantiate connection to aws for a given path `path_to_zarr`
    """
    return s3fs.S3Map(
        root=path_to_zarr,
        s3=s3fs.S3FileSystem(anon=True),
        # anonymous access requires no credentials
        check=False,
    )


def load_single_aws_zarr(
    path_to_zarr: os.path,
    cache_max_single_size: int = None,
) -> Union[zarr.Array, zarr.Group]:
    """
    load zarr from s3 using LRU cache
    """
    return zarr.open(
        zarr.LRUStoreCache(
            store=s3_connection(path_to_zarr),
            max_size=cache_max_single_size,
        ),
        mode="r",
    )

In [13]:
root = load_single_aws_zarr(
    path_to_zarr=AWS_ZARR_ROOT,
    cache_max_single_size=5e8
)

In [24]:
indices = [int(i) for i in range(0,100)]

def get_zarrs(indices):
    zarrs = root[2010]["Bz"][[0,1], ...]
    return zarrs

zarrs = get_zarrs(indices)

IndexError: unsupported selection item for basic indexing; expected integer or slice, got <class 'list'>

In [23]:
zarrs.shape

(512, 512)

In [2]:
# SQLite connection

def get_z_groups():
    s3 = s3fs.S3FileSystem(anon=True)
    BASE_PATH = "s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr"
    zarrs = dict()
    for year in range(2010,2019):
        store = s3fs.S3Map(root=f"{BASE_PATH}/{year}", s3=s3, check=False)
        group = zarr.open_group(store, mode='r')
        zarrs[year] = dict()
        zarrs[year]['Bx'] = da.from_zarr(group['Bx'])
        zarrs[year]['By'] = da.from_zarr(group['By'])
        zarrs[year]['Bz'] = da.from_zarr(group['Bz'])
    return zarrs

groups = get_z_groups()

In [17]:
def get_rows_grouped_by_year(harpnum):
    con = sqlite3.connect(CMESRC_DB)
    con.execute("PRAGMA foreign_keys = ON")
    cur = con.cursor()

    cur.execute(
        f"""
        SELECT DISTINCT strftime('%Y', timestamp) FROM cutouts_for_download WHERE harpnum= ?
    """, (harpnum,))

    years = [row[0] for row in cur.fetchall()]

    rows = dict()

    for year in years:
#        cur.execute(
#            f"""
#            SELECT hpb.timestamp, hpb.x_cen, hpb.y_cen, hpbs.width, hpbs.height, i.idx
#            FROM cutouts_for_download cfd 
#            INNER JOIN harps_pixel_bbox hpb ON cfd.harpnum = hpb.harpnum AND cfd.timestamp = hpb.timestamp
#            INNER JOIN images i ON cfd.timestamp = i.timestamp
#            INNER JOIN harps_pixel_bbox_sizes hpbs ON hpb.harpnum = hpbs.harpnum
#            WHERE cfd.harpnum= ? AND strftime('%Y', cfd.timestamp) = ?
#        """, (harpnum, year))

        cur.execute("DROP TABLE IF EXISTS cutouts_for_download_temp")

        cur.execute(
            f"""
            CREATE TEMPORARY TABLE cutouts_for_download_temp AS
            SELECT hpb.timestamp, hpb.x_cen, hpb.y_cen, hpbs.width, hpbs.height, i.idx
            FROM cutouts_for_download cfd 
            INNER JOIN harps_pixel_bbox hpb ON cfd.harpnum = hpb.harpnum AND cfd.timestamp = hpb.timestamp
            INNER JOIN images i ON cfd.timestamp = i.timestamp
            INNER JOIN harps_pixel_bbox_sizes hpbs ON hpb.harpnum = hpbs.harpnum
            WHERE cfd.harpnum= ? AND strftime('%Y', cfd.timestamp) = ?
        """, (harpnum, year))

        cur.execute(
            """
            WITH cutouts_hours AS (
            SELECT *,
                strftime('%Y-%m-%d %H:00:00', timestamp) AS hour,
                ABS(julianday(timestamp) - julianday(strftime('%Y-%m-%d %H:00:00', timestamp))) * 24 * 60 * 60 AS diff
            FROM cutouts_for_download_temp
            )

            SELECT timestamp, x_cen, y_cen, width, height, idx FROM (
            SELECT *,
                RANK() OVER (PARTITION BY hour ORDER BY diff ASC) AS rank
                FROM cutouts_hours
            )
            WHERE rank = 1
            """
        )

        rows[year] = cur.fetchall()
    
    con.close()

    total_rows = sum([len(rows[year]) for year in years])

    if total_rows == 0:
        raise ValueError(f"No rows found for HARP {harpnum}")

    width, height = rows[years[0]][0][3], rows[years[0]][0][4]

    return rows, total_rows, (width, height)

# @delayed
def process_harps(harpnum, groups, extra_size=10):
    all_rows, total_rows, dims = get_rows_grouped_by_year(harpnum)

    dims = (dims[0] + extra_size, dims[1] + extra_size)

    # We also want even dimensions for simplicity

    dims = (dims[0] + dims[0] % 2, dims[1] + dims[1] % 2)

    compressor = numcodecs.Blosc(cname='zstd', clevel=3, shuffle=numcodecs.Blosc.BITSHUFFLE)

    # Create zarr array
    path = f"/home/julio/cmesrc/data/processed/images/{harpnum}"
    os.makedirs(path, exist_ok=True)

    store = zarr.DirectoryStore(path)

    zarr_array = zarr.zeros(shape=(total_rows, 3, dims[1], dims[0]), chunks=(24, 3, dims[1], dims[0]), dtype='float32', store=store, overwrite=True, compressor=compressor)

    all_timestamps = []

    for year in tqdm(sorted(all_rows.keys()), desc=f"Processing HARP {harpnum}"):
        rows = all_rows[year]
        indices = [row[-1] for row in rows]
        print("Fetching data from Zarr")
        images_bx = groups[int(year)]['Bx'][indices].compute()
        images_by = groups[int(year)]['By'][indices].compute()
        images_bz = groups[int(year)]['Bz'][indices].compute()
        print("Done fetching data from Zarr")

        for i, row in tqdm(enumerate(rows), total=len(rows), desc=f"Processing HARP {harpnum} for year {year}"):
            timestamp, x_cen, y_cen, width, height, idx = row
            width = dims[0]
            height = dims[1]

            x_min = x_cen - width // 2
            x_max = x_cen + width // 2
            y_min = y_cen - height // 2
            y_max = y_cen + height // 2

            zarr_array[i, 0, :, :] = images_bx[i, y_min:y_max, x_min:x_max]
            zarr_array[i, 1, :, :] = images_by[i, y_min:y_max, x_min:x_max]
            zarr_array[i, 2, :, :] = images_bz[i, y_min:y_max, x_min:x_max]

            all_timestamps.append(timestamp)
        
    # We also add metadata to the zarr array to know what timestamp each index corresponds to
    zarr_array.attrs['timestamps'] = all_timestamps

    return

In [18]:
process_harps(245, groups)

Processing HARP 245:   0%|          | 0/1 [00:00<?, ?it/s]

Fetching data from Zarr
Done fetching data from Zarr


Processing HARP 245 for year 2010: 100%|██████████| 181/181 [00:02<00:00, 65.45it/s]
Processing HARP 245: 100%|██████████| 1/1 [02:00<00:00, 120.15s/it]


In [14]:
# Read images from Zarr

harpnum = 5745
path = f"/home/julio/cmesrc/data/processed/cutouts/cutouts/{harpnum}"
store = zarr.DirectoryStore(path)
zarr_array = zarr.open(store, mode='r')
print(zarr_array[40][2][0])
# Print metadata

print(zarr_array.attrs.asdict())

[  2.6314554    5.4385448    1.5455878   23.556997    21.762978
  -2.031483     0.40470907   1.6578659   -3.5366678   -9.856332
  -3.5038862    5.8283033    5.7878027    5.0698333    9.224531
   3.6037123   -5.4770565    3.8657215    3.8597214   -1.2231728
   3.3949475    3.6088834   -0.19549352   1.9910722   -7.582053
  -6.879026    -7.3419857  -46.588367   -19.280237     2.677797
   4.2443314    2.450763    -3.1532462   -3.5349922  -11.317123
 -58.43108    -60.368034    -6.825081    -3.136513    -3.1382732
  -2.169882   -13.401646   -12.554778    -4.9479227   -1.0948049
  -4.858417     1.5503764   -1.4184549    2.6698248  -17.797318
  -5.602891    -4.3278117    2.7591083    1.7513428   -1.6454732
   1.1438432   -1.0551776   -0.31875342   0.19021179   0.07320985
  -2.9028883   -1.1880283   -1.4687849    4.447831    14.397117
   7.2225003    5.6291623   10.874637     2.1341665    4.2525434
   4.0970592   -0.24369588  -4.857376    -5.3090386  -10.378448
  -1.5407305    4.2016926    2.15

In [15]:
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize
import numpy as np

# Some plotting for tests

savepath = f"/home/julio/cmesrc/data/processed/images/previews/{harpnum}/"
os.makedirs(savepath, exist_ok=True)

# For normalizing we use all the sequence of the images
# So we normalize the full array

global_min_bx = np.percentile(zarr_array[:, 0, :, :], 0.5)
global_max_bx = np.percentile(zarr_array[:, 0, :, :], 99.5)

global_min_by = np.percentile(zarr_array[:, 1, :, :], 0.5)
global_max_by = np.percentile(zarr_array[:, 1, :, :], 99.5)

global_min_bz = np.percentile(zarr_array[:, 2, :, :], 0.5)
global_max_bz = np.percentile(zarr_array[:, 2, :, :], 99.5)


norm_bx = Normalize(vmin=global_min_bx, vmax=global_max_bx)
norm_by = Normalize(vmin=global_min_by, vmax=global_max_by)
norm_bz = Normalize(vmin=global_min_bz, vmax=global_max_bz)

# Let's do something better still. We make a mask by taking pixels in Bz that are larger than
# The background level of magnetic field (which we have to determine) and then we plot only
# Bx and By where the mask is True

def get_bz_mask(image):
    return np.abs(image) > np.percentile(np.abs(image), 90.0)

for i in tqdm(range(zarr_array.shape[0])):
    fig, ax = plt.subplots(1, 3, figsize=(15, 5))

    bx_image = zarr_array[i, 0, :, :]
    by_image = zarr_array[i, 1, :, :]
    bz_image = zarr_array[i, 2, :, :]

    ax[0].imshow(bx_image, origin='lower', norm=norm_bx, cmap='gray')
    ax[0].set_title("Bx")
    ax[1].imshow(by_image, origin='lower', norm=norm_by, cmap='gray')
    ax[1].set_title("By")
    ax[2].imshow(bz_image, origin='lower', norm=norm_bz, cmap='gray')
    ax[2].set_title("Bz")

    plt.savefig(f"{savepath}{i}.png")
    plt.close(fig)

100%|██████████| 173/173 [00:23<00:00,  7.38it/s]


### IGNORE

In [15]:
import sys
sys.path.append("../")

from src.cmesrc.config import CMESRC_DB

from tqdm import tqdm
import numpy as np
import os
import zarr
import s3fs
from multiprocessing import Pool, cpu_count
from functools import partial
import sqlite3
from collections import defaultdict
import dask.array as da
from dask.distributed import Client, progress
import dask

# Import delayed decorator from dask
from dask import delayed

client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 14.87 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:38981,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 14.87 GiB

0,1
Comm: tcp://127.0.0.1:36439,Total threads: 4
Dashboard: http://127.0.0.1:35701/status,Memory: 3.72 GiB
Nanny: tcp://127.0.0.1:46239,
Local directory: /tmp/dask-scratch-space/worker-uz8l23eu,Local directory: /tmp/dask-scratch-space/worker-uz8l23eu

0,1
Comm: tcp://127.0.0.1:38823,Total threads: 4
Dashboard: http://127.0.0.1:42203/status,Memory: 3.72 GiB
Nanny: tcp://127.0.0.1:36261,
Local directory: /tmp/dask-scratch-space/worker-uv5yqbh_,Local directory: /tmp/dask-scratch-space/worker-uv5yqbh_

0,1
Comm: tcp://127.0.0.1:33785,Total threads: 4
Dashboard: http://127.0.0.1:42109/status,Memory: 3.72 GiB
Nanny: tcp://127.0.0.1:35755,
Local directory: /tmp/dask-scratch-space/worker-c4njmjs5,Local directory: /tmp/dask-scratch-space/worker-c4njmjs5

0,1
Comm: tcp://127.0.0.1:35635,Total threads: 4
Dashboard: http://127.0.0.1:35729/status,Memory: 3.72 GiB
Nanny: tcp://127.0.0.1:35339,
Local directory: /tmp/dask-scratch-space/worker-5sfpxnym,Local directory: /tmp/dask-scratch-space/worker-5sfpxnym


In [16]:

import sys
sys.path.append("../")

from tqdm import tqdm
import numpy as np
import os
import zarr
import s3fs
from multiprocessing import Pool, cpu_count
from functools import partial
import sqlite3
from collections import defaultdict

import argparse

parser = argparse.ArgumentParser()

parser.add_argument('-y', '--year')

args = parser.parse_args()

year = args.year

CMESRC_DB = "/home/jhc/cmesrc/cmesrc.db"
CUTOUTS_FOLDER = "/disk/solar15/jhc/cmesrc/images/"

# SQLite connection
con = sqlite3.connect(CMESRC_DB)
con.execute("PRAGMA foreign_keys = ON")
cursor = con.cursor()

# S3 File System
s3 = s3fs.S3FileSystem(anon=True)
store = s3fs.S3Map(root="s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2010", s3=s3, check=False)

# Open the Zarr store
z_group = zarr.open_group(store, mode='r')

# Create a generator to yield rows one by one
def get_year_rows(year):
    con = sqlite3.connect(CMESRC_DB)
    con.execute("PRAGMA foreign_keys = ON")
    cursor = con.cursor()

    cursor.execute(
        """
        SELECT hpb.harpnum, hpb.timestamp, hpb.x_min, hpb.x_max, hpb.y_min, hpb.y_max, i.idx
        FROM harps_pixel_bbox hpb
        INNER JOIN images i ON hpb.timestamp = i.timestamp
        WHERE hpb.timestamp LIKE ?
        """,
        (f"{year}%",)
    )

    rows = cursor.fetchall()

    bboxes = defaultdict(list)

    for row in rows:
        bboxes[row[-1]].append(row)

    return bboxes

def process_img(idx, bbox_list, year, z_group):
    start = time()
    image_bx = z_group["Bx"][idx]
    image_by = z_group["By"][idx]
    image_bz = z_group["Bz"][idx]
    end = time()

    global fetching_images
    fetching_images += (end - start)

    for bbox in bbox_list:
        harpnum, timestamp, x_min, x_max, y_min, y_max, idx = bbox
        x_min -= 20
        x_max += 20
        y_min -= 20
        y_max += 20
        img_np_bx = image_bx[y_min:y_max, x_min:x_max]
        img_np_by = image_by[y_min:y_max, x_min:x_max]
        img_np_bz = image_bz[y_min:y_max, x_min:x_max]

        # Save the cutout to the local system in the appropriate directory structure
        directory_bx = f"{CUTOUTS_FOLDER}{harpnum}/Bx"
        directory_by = f"{CUTOUTS_FOLDER}{harpnum}/By"
        directory_bz = f"{CUTOUTS_FOLDER}{harpnum}/Bz"

        os.makedirs(directory_bx, exist_ok=True)
        os.makedirs(directory_by, exist_ok=True)
        os.makedirs(directory_bz, exist_ok=True)

        np.save(f"{directory_bx}/{timestamp}.npy", img_np_bx)
        np.save(f"{directory_by}/{timestamp}.npy", img_np_by)
        np.save(f"{directory_bz}/{timestamp}.npy", img_np_bz)

        del img_np_bx, img_np_by, img_np_bz
        del directory_bx, directory_by, directory_bz

    del image_bx, image_by, image_bz

def process_chunk(start_idx, chunk_size, idxs_bbox_list, year, z_group):
    end_idx = start_idx + chunk_size
    image_bx = z_group["Bx"][start_idx:end_idx]
    image_by = z_group["By"][start_idx:end_idx]
    image_bz = z_group["Bz"][start_idx:end_idx]

    for idx, bbox_list in idxs_bbox_list:
        for bbox in bbox_list:
            harpnum, timestamp, x_min, x_max, y_min, y_max, idx = bbox

            x_min -= 20
            x_max += 20
            y_min -= 20
            y_max += 20

            img_np_bx = image_bx[y_min:y_max, x_min:x_max]
            img_np_by = image_by[y_min:y_max, x_min:x_max]
            img_np_bz = image_bz[y_min:y_max, x_min:x_max]

            # Save the cutout to the local system in the appropriate directory structure
            directory_bx = f"{CUTOUTS_FOLDER}{harpnum}/Bx"
            directory_by = f"{CUTOUTS_FOLDER}{harpnum}/By"
            directory_bz = f"{CUTOUTS_FOLDER}{harpnum}/Bz"

            os.makedirs(directory_bx, exist_ok=True)
            os.makedirs(directory_by, exist_ok=True)
            os.makedirs(directory_bz, exist_ok=True)

            np.save(f"{directory_bx}/{timestamp}.npy", img_np_bx)
            np.save(f"{directory_by}/{timestamp}.npy", img_np_by)
            np.save(f"{directory_bz}/{timestamp}.npy", img_np_bz)

            del img_np_bx, img_np_by, img_np_bz
            del directory_bx, directory_by, directory_bz

    del image_bx, image_by, image_bz

# Worker function to process a single bounding box detail
def process_year(year):
    # S3 File System
    s3 = s3fs.S3FileSystem(anon=True)
    store = s3fs.S3Map(root=f"s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/{year}", s3=s3, check=False)

    # Open the Zarr store
    print("Opening Zarr store")
    z_group = zarr.open(store, mode='r')

    print("Fetching rows")
    bboxes = get_year_rows(year)

    chunk_size = z_group["Bx"].chunks[0] * 4

    chunk_boundaries = range(0, z_group["Bx"].shape[0], chunk_size)

    print("Processing chunks")
    for start_idx in tqdm(chunk_boundaries, desc=f"Processing {year}"):
        end_idx = start_idx + chunk_size
        chunk_idxs_bbox_list = [(idx, bbox_list) for idx, bbox_list in bboxes.items() if idx >= start_idx and idx < end_idx]

        if chunk_idxs_bbox_list:
            try:
                process_chunk(start_idx, chunk_size, chunk_idxs_bbox_list, year, z_group)
            except Exception as e:
                print(f"Failed to process chunk {year}/{start_idx}-{end_idx} due to error: {e}")
                raise e

if __name__=="__main__":
	process_year(year)


usage: ipykernel_launcher.py [-h] [-y YEAR]
ipykernel_launcher.py: error: unrecognized arguments: --ip=127.0.0.1 --stdin=9018 --control=9016 --hb=9015 --Session.signature_scheme="hmac-sha256" --Session.key=b"1d06927e-5eed-48bd-a14a-10973c03e2a7" --shell=9017 --transport="tcp" --iopub=9019 --f=/home/julio/.local/share/jupyter/runtime/kernel-v2-14628lQoOEy4WRuv6.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [17]:
# SQLite connection
con = sqlite3.connect(CMESRC_DB)
con.execute("PRAGMA foreign_keys = ON")
cursor = con.cursor()

# S3 File System
s3 = s3fs.S3FileSystem(anon=True)
store = s3fs.S3Map(root="s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2015", s3=s3, check=False)

z_groups = dict()
for year in range(2010, 2019):
    print(f"s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/{year}")
    store = s3fs.S3Map(root=f"s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/{year}", s3=s3, check=False)
    z_groups[year] = zarr.open_group(store, mode='r')

# Open the Zarr store
z_group = zarr.open_group(store, mode='r')

s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2010
s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2011
s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2012
s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2013
s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2014
s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2015
s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2016
s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2017
s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2018


In [13]:
# Read zarr to dask
total = 0

for year in range(2010, 2019):
    dask_array = da.from_zarr(z_groups[year]["Bx"])
    # In GB
    total += 3 * dask_array[::5].nbytes / 1e9

In [14]:
total

218.524286976

In [4]:
from time import time

fetching_images = 0
processing_images = 0
fetching_rows = 0

# Create a generator to yield rows one by one
def get_year_rows(year):
    con = sqlite3.connect(CMESRC_DB)
    con.execute("PRAGMA foreign_keys = ON")
    cursor = con.cursor()

    start = time()
    cursor.execute(
        """
        SELECT hpb.harpnum, hpb.timestamp, hpb.x_min, hpb.x_max, hpb.y_min, hpb.y_max, i.idx
        FROM harps_pixel_bbox hpb
        INNER JOIN images i ON hpb.timestamp = i.timestamp
        WHERE hpb.timestamp LIKE ?
        """,
        (f"{year}%",)
    )
    end = time()

    global fetching_rows
    fetching_rows += (end - start)

    rows = cursor.fetchall()

    bboxes = defaultdict(list)

    for row in rows:
        bboxes[row[-1]].append(row)

    return bboxes

def process_img(idx, bbox_list, year, z_group):
    start = time()
    image_bx = z_group["Bx"][idx]
    image_by = z_group["By"][idx]
    image_bz = z_group["Bz"][idx]
    end = time()

    global fetching_images
    fetching_images += (end - start)

    start = time()
    for bbox in bbox_list:
        harpnum, timestamp, x_min, x_max, y_min, y_max, idx = bbox
        x_min -= 20
        x_max += 20
        y_min -= 20
        y_max += 20
        img_np_bx = image_bx[y_min:y_max, x_min:x_max]
        img_np_by = image_by[y_min:y_max, x_min:x_max]
        img_np_bz = image_bz[y_min:y_max, x_min:x_max]

        # Save the cutout to the local system in the appropriate directory structure
        directory_bx = f"/home/julio/cmesrc/data/raw/images/{harpnum}/Bx"
        directory_by = f"/home/julio/cmesrc/data/raw/images/{harpnum}/By"
        directory_bz = f"/home/julio/cmesrc/data/raw/images/{harpnum}/Bz"

        os.makedirs(directory_bx, exist_ok=True)
        os.makedirs(directory_by, exist_ok=True)
        os.makedirs(directory_bz, exist_ok=True)

        np.save(f"{directory_bx}/{timestamp}.npy", img_np_bx)
        np.save(f"{directory_by}/{timestamp}.npy", img_np_by)
        np.save(f"{directory_bz}/{timestamp}.npy", img_np_bz)

        del img_np_bx, img_np_by, img_np_bz
        del directory_bx, directory_by, directory_bz
    end = time()

    global processing_images
    processing_images += (end - start)

    del image_bx, image_by, image_bz

def process_chunk(start_idx, chunk_size, idxs_bbox_list, year, z_group):
    start = time()
    end_idx = start_idx + chunk_size
    image_bx = z_group["Bx"][start_idx:end_idx]
    image_by = z_group["By"][start_idx:end_idx]
    image_bz = z_group["Bz"][start_idx:end_idx]
    end = time()

    global fetching_images
    fetching_images += (end - start)

    start = time()
    for idx, bbox_list in idxs_bbox_list:
        for bbox in bbox_list:
            harpnum, timestamp, x_min, x_max, y_min, y_max, idx = bbox
            img_np_bx = image_bx[y_min:y_max, x_min:x_max]
            img_np_by = image_by[y_min:y_max, x_min:x_max]
            img_np_bz = image_bz[y_min:y_max, x_min:x_max]

            # Save the cutout to the local system in the appropriate directory structure
            directory_bx = f"/home/julio/cmesrc/data/raw/images/{harpnum}/Bx"
            directory_by = f"/home/julio/cmesrc/data/raw/images/{harpnum}/By"
            directory_bz = f"/home/julio/cmesrc/data/raw/images/{harpnum}/Bz"

            os.makedirs(directory_bx, exist_ok=True)
            os.makedirs(directory_by, exist_ok=True)
            os.makedirs(directory_bz, exist_ok=True)

            np.save(f"{directory_bx}/{timestamp}.npy", img_np_bx)
            np.save(f"{directory_by}/{timestamp}.npy", img_np_by)
            np.save(f"{directory_bz}/{timestamp}.npy", img_np_bz)

            del img_np_bx, img_np_by, img_np_bz
            del directory_bx, directory_by, directory_bz
    end = time()

    global processing_images
    processing_images += (end - start)

    del image_bx, image_by, image_bz

# Worker function to process a single bounding box detail
def process_year(year):
    # S3 File System
    s3 = s3fs.S3FileSystem(anon=True)
    store = s3fs.S3Map(root=f"s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/{year}", s3=s3, check=False)

    # Open the Zarr store
    print("Opening Zarr store")
    z_group = zarr.open(store, mode='r')

    print("Fetching rows")
    bboxes = get_year_rows(year)

    chunk_size = z_group["Bx"].chunks[0] * 4

    chunk_boundaries = range(0, z_group["Bx"].shape[0], chunk_size)

    print("Processing chunks")
    for start_idx in tqdm(chunk_boundaries, desc=f"Processing {year}"):
        end_idx = start_idx + chunk_size
        chunk_idxs_bbox_list = [(idx, bbox_list) for idx, bbox_list in bboxes.items() if idx >= start_idx and idx < end_idx]

        if chunk_idxs_bbox_list:
            try:
                process_chunk(start_idx, chunk_size, chunk_idxs_bbox_list, year, z_group)
            except Exception as e:
                print(f"Failed to process chunk {year}/{start_idx}-{end_idx} due to error: {e}")

In [6]:
process_year(2010)

Opening Zarr store
Fetching rows
Processing chunks


Processing 2010:   0%|          | 0/426 [00:00<?, ?it/s]


KeyboardInterrupt: 

In [7]:
print(f"Fetching images took {fetching_images} seconds")
print(f"Processing images took {processing_images} seconds")
print(f"Fetching rows took {fetching_rows} seconds")

Fetching images took 55.11068892478943 seconds
Processing images took 0.1564617156982422 seconds
Fetching rows took 0.0480039119720459 seconds


In [None]:
if __name__ == 

In [5]:
from time import time

fetching_images = 0
processing_images = 0
fetching_rows = 0

# Create a generator to yield rows one by one
def get_year_rows(year):
    con = sqlite3.connect(CMESRC_DB)
    con.execute("PRAGMA foreign_keys = ON")
    cursor = con.cursor()

    start = time()
    cursor.execute(
        """
        SELECT hpb.harpnum, hpb.timestamp, hpb.x_min, hpb.x_max, hpb.y_min, hpb.y_max, i.idx
        FROM harps_pixel_bbox hpb
        INNER JOIN images i ON hpb.timestamp = i.timestamp
        WHERE hpb.timestamp LIKE ?
        """,
        (f"{year}%",)
    )
    end = time()

    global fetching_rows
    fetching_rows += (end - start)

    rows = cursor.fetchall()

    bboxes = defaultdict(list)

    for row in rows:
        bboxes[row[-1]].append(row)

    return bboxes

@delayed
def process_chunk(chunk, block_info=None):
    chunk_size = 15
    chunk_start_idx = block_info[0]['chunk-location'][0] * chunk_size
    chunk_end_idx = chunk_start_idx + chunk_size
    print(chunk_start_idx, chunk_end_idx)

    for idx in range(chunk_start_idx, chunk_end_idx):
        if idx in bboxes:
            bbox_list = bboxes[idx]
            for bbox in bbox_list:
                harpnum, timestamp, x_min, x_max, y_min, y_max, idx = bbox
                img_np_bx = chunk[idx - chunk_start_idx, y_min:y_max, x_min:x_max]
                img_np_by = chunk[idx - chunk_start_idx, y_min:y_max, x_min:x_max]
                img_np_bz = chunk[idx - chunk_start_idx, y_min:y_max, x_min:x_max]

                # Save the cutout to the local system in the appropriate directory structure
                directory_bx = f"/home/julio/cmesrc/data/raw/images/{harpnum}/Bx"
                directory_by = f"/home/julio/cmesrc/data/raw/images/{harpnum}/By"
                directory_bz = f"/home/julio/cmesrc/data/raw/images/{harpnum}/Bz"

                os.makedirs(directory_bx, exist_ok=True)
                os.makedirs(directory_by, exist_ok=True)
                os.makedirs(directory_bz, exist_ok=True)

                np.save(f"{directory_bx}/{timestamp}.npy", img_np_bx)
                np.save(f"{directory_by}/{timestamp}.npy", img_np_by)
                np.save(f"{directory_bz}/{timestamp}.npy", img_np_bz)

                del img_np_bx, img_np_by, img_np_bz
                del directory_bx, directory_by, directory_bz

    del chunk

# Worker function to process a single bounding box detail
def process_year(year):
    # S3 File System
    s3 = s3fs.S3FileSystem(anon=True)
    store = s3fs.S3Map(root=f"s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/{year}", s3=s3, check=False)

    # Open the Zarr store
    print("Opening Zarr store")
    z_group = zarr.open(store, mode='r')

    print("Fetching rows")
    global bboxes
    bboxes = get_year_rows(year)

    print("Processing chunks")
    # Then we process chunk by chunk each as a future



In [6]:
process_year(2015)

Opening Zarr store
Fetching rows
Processing chunks


AttributeError: 'Array' object has no attribute 'blocks'

### IGNORE

In [1]:
import sys
sys.path.append("../")

import gcsfs
import zarr

import dask.array as da
import dask.bag as db
import numpy as np
import dask

from src.cmesrc.config import CMESRC_DB
import sqlite3
from matplotlib.colors import Normalize

from tqdm import tqdm
import gc
import matplotlib.pyplot as plt
from dask.distributed import Client
from dask.diagnostics import ProgressBar

In [2]:
client = Client(n_workers=8, threads_per_worker=2, memory_limit='2GB')
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 16,Total memory: 14.90 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:41881,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 14.90 GiB

0,1
Comm: tcp://127.0.0.1:39713,Total threads: 2
Dashboard: http://127.0.0.1:38279/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:36073,
Local directory: /tmp/dask-scratch-space/worker-prcsk8o8,Local directory: /tmp/dask-scratch-space/worker-prcsk8o8

0,1
Comm: tcp://127.0.0.1:41775,Total threads: 2
Dashboard: http://127.0.0.1:33155/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:45729,
Local directory: /tmp/dask-scratch-space/worker-rx3xy4f4,Local directory: /tmp/dask-scratch-space/worker-rx3xy4f4

0,1
Comm: tcp://127.0.0.1:38723,Total threads: 2
Dashboard: http://127.0.0.1:36159/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:39949,
Local directory: /tmp/dask-scratch-space/worker-x1aj4mv7,Local directory: /tmp/dask-scratch-space/worker-x1aj4mv7

0,1
Comm: tcp://127.0.0.1:35923,Total threads: 2
Dashboard: http://127.0.0.1:41469/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:35517,
Local directory: /tmp/dask-scratch-space/worker-24av26g_,Local directory: /tmp/dask-scratch-space/worker-24av26g_

0,1
Comm: tcp://127.0.0.1:41927,Total threads: 2
Dashboard: http://127.0.0.1:40735/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:32981,
Local directory: /tmp/dask-scratch-space/worker-0dl02o94,Local directory: /tmp/dask-scratch-space/worker-0dl02o94

0,1
Comm: tcp://127.0.0.1:43825,Total threads: 2
Dashboard: http://127.0.0.1:42887/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:38859,
Local directory: /tmp/dask-scratch-space/worker-tuampj9p,Local directory: /tmp/dask-scratch-space/worker-tuampj9p

0,1
Comm: tcp://127.0.0.1:35345,Total threads: 2
Dashboard: http://127.0.0.1:38699/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:37963,
Local directory: /tmp/dask-scratch-space/worker-u99rd1k3,Local directory: /tmp/dask-scratch-space/worker-u99rd1k3

0,1
Comm: tcp://127.0.0.1:45667,Total threads: 2
Dashboard: http://127.0.0.1:45003/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:45081,
Local directory: /tmp/dask-scratch-space/worker-dyzur2gi,Local directory: /tmp/dask-scratch-space/worker-dyzur2gi


In [3]:

import os
from typing import Union

import s3fs
import zarr

AWS_ZARR_ROOT = (
    "s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/"
)


def s3_connection(path_to_zarr: os.path) -> s3fs.S3Map:
    """
    Instantiate connection to aws for a given path `path_to_zarr`
    """
    return s3fs.S3Map(
        root=path_to_zarr,
        s3=s3fs.S3FileSystem(anon=True),
        # anonymous access requires no credentials
        check=False,
    )


def load_single_aws_zarr(
    path_to_zarr: os.path,
    cache_max_single_size: int = None,
) -> Union[zarr.Array, zarr.Group]:
    """
    load zarr from s3 using LRU cache
    """
    return zarr.open(
        zarr.LRUStoreCache(
            store=s3_connection(path_to_zarr),
            max_size=cache_max_single_size,
        ),
        mode="r",
    )

def load_single_aws_zarr_to_dask(
    path_to_zarr: os.path,
) -> da.Array:
    """
    load zarr from s3 into a Dask array
    """
    return da.from_zarr(s3_connection(path_to_zarr))

AWS_ZARR_ROOT_2010_BX = (
    "s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2010/Bx"
)

AWS_ZARR_ROOT_2010_BY = (
    "s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2010/By"
)

AWS_ZARR_ROOT_2010_BZ = (
    "s3://gov-nasa-hdrl-data1/contrib/fdl-sdoml/fdl-sdoml-v2/sdomlv2_hmi.zarr/2010/Bz"
)

chunk_size = (10, 512, 512)

root_2010_bx = load_single_aws_zarr_to_dask(
    path_to_zarr=AWS_ZARR_ROOT_2010_BX,
).rechunk(chunk_size)

root_2010_by = load_single_aws_zarr_to_dask(
    path_to_zarr=AWS_ZARR_ROOT_2010_BY,
).rechunk(chunk_size)

root_2010_bz = load_single_aws_zarr_to_dask(
    path_to_zarr=AWS_ZARR_ROOT_2010_BZ,
).rechunk(chunk_size)

root_2010_bx = client.scatter(root_2010_bx)
root_2010_by = client.scatter(root_2010_by)
root_2010_bz = client.scatter(root_2010_bz)

In [4]:
con = sqlite3.connect(CMESRC_DB)
con.execute("PRAGMA foreign_keys = ON")
cur = con.cursor()

In [5]:
# Read all the bounding boxes for the year 2010. Since we only have to
# timestamps, we need to extract the year from them

cur.execute(
    "DROP TABLE IF EXISTS harps_2010_bboxes"
)

cur.execute(
    """
    CREATE TEMPORARY TABLE IF NOT EXISTS harps_2010_bboxes AS
    SELECT hpb.timestamp, hpb.harpnum, hpb.x_min, hpb.x_max, hpb.y_min, hpb.y_max
    FROM harps_pixel_bbox hpb
    INNER JOIN harps_bbox hb ON hpb.timestamp = hb.timestamp AND hpb.harpnum = hb.harpnum
    INNER JOIN harps h ON hpb.harpnum = h.harpnum
    WHERE substr(hpb.timestamp, 1, 4) = '2010'
    AND (hb.londtmin + hb.londtmax) / 2 BETWEEN -60 AND 60
    """
)

cur.execute("SELECT COUNT(*) FROM harps_2010_bboxes")

print(f"Number of bounding boxes: {cur.fetchone()[0]}")

Number of bounding boxes: 92778


In [6]:
cur.execute("""
    CREATE TEMPORARY TABLE IF NOT EXISTS harps_2010_pixel_bboxes AS
    WITH harpnum_ranges AS (
        SELECT harpnum, 
            (MAX(strftime('%s', hb.timestamp)) - MIN(strftime('%s', hb.timestamp))) / 60 / 60 / 24 as day_range
        FROM harps_2010_bboxes hb
        GROUP BY harpnum
    )
    SELECT hb.*
    FROM harps_2010_bboxes hb
    INNER JOIN harpnum_ranges hr ON hb.harpnum = hr.harpnum
    WHERE hr.day_range > 4;
""")

cur.execute("SELECT COUNT(*) FROM harps_2010_pixel_bboxes")

print(f"Number of pixel bounding boxes: {cur.fetchone()[0]}")

Number of pixel bounding boxes: 69930


In [7]:
cur.execute("""
SELECT bb.harpnum, bb.timestamp, i.idx, bb.x_min, bb.y_min, bb.x_max, bb.y_max
FROM harps_2010_pixel_bboxes bb
INNER JOIN images i ON bb.timestamp = i.timestamp
""")

bboxes = cur.fetchall()

bboxes_bag = db.from_sequence(bboxes)

In [8]:
def process_image(bbox, bx_dask_array, by_dask_array, bz_dask_array):
    harpnum, timestamp, idx, x_min, y_min, x_max, y_max = bbox

    cutout_bx = bx_dask_array[idx, y_min:y_max, x_min:x_max]
    cutout_by = by_dask_array[idx, y_min:y_max, x_min:x_max]
    cutout_bz = bz_dask_array[idx, y_min:y_max, x_min:x_max]

    image = da.stack([cutout_bx, cutout_by, cutout_bz])

    output_dir = f"/home/julio/cmesrc/data/raw/images/{harpnum}"

    os.makedirs(output_dir, exist_ok=True)

    filename = os.path.join(output_dir, f"{timestamp}")

    da.to_npy_stack(filename, image)

    return filename

In [9]:
lazy_results = bboxes_bag.map(
    process_image,
    bx_dask_array=root_2010_bx,
    by_dask_array=root_2010_by,
    bz_dask_array=root_2010_bz,
)

root_2010_bx

In [10]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 16,Total memory: 14.90 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:41881,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 14.90 GiB

0,1
Comm: tcp://127.0.0.1:39713,Total threads: 2
Dashboard: http://127.0.0.1:38279/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:36073,
Local directory: /tmp/dask-scratch-space/worker-prcsk8o8,Local directory: /tmp/dask-scratch-space/worker-prcsk8o8

0,1
Comm: tcp://127.0.0.1:41775,Total threads: 2
Dashboard: http://127.0.0.1:33155/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:45729,
Local directory: /tmp/dask-scratch-space/worker-rx3xy4f4,Local directory: /tmp/dask-scratch-space/worker-rx3xy4f4

0,1
Comm: tcp://127.0.0.1:38723,Total threads: 2
Dashboard: http://127.0.0.1:36159/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:39949,
Local directory: /tmp/dask-scratch-space/worker-x1aj4mv7,Local directory: /tmp/dask-scratch-space/worker-x1aj4mv7

0,1
Comm: tcp://127.0.0.1:35923,Total threads: 2
Dashboard: http://127.0.0.1:41469/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:35517,
Local directory: /tmp/dask-scratch-space/worker-24av26g_,Local directory: /tmp/dask-scratch-space/worker-24av26g_

0,1
Comm: tcp://127.0.0.1:41927,Total threads: 2
Dashboard: http://127.0.0.1:40735/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:32981,
Local directory: /tmp/dask-scratch-space/worker-0dl02o94,Local directory: /tmp/dask-scratch-space/worker-0dl02o94

0,1
Comm: tcp://127.0.0.1:43825,Total threads: 2
Dashboard: http://127.0.0.1:42887/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:38859,
Local directory: /tmp/dask-scratch-space/worker-tuampj9p,Local directory: /tmp/dask-scratch-space/worker-tuampj9p

0,1
Comm: tcp://127.0.0.1:35345,Total threads: 2
Dashboard: http://127.0.0.1:38699/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:37963,
Local directory: /tmp/dask-scratch-space/worker-u99rd1k3,Local directory: /tmp/dask-scratch-space/worker-u99rd1k3

0,1
Comm: tcp://127.0.0.1:45667,Total threads: 2
Dashboard: http://127.0.0.1:45003/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:45081,
Local directory: /tmp/dask-scratch-space/worker-dyzur2gi,Local directory: /tmp/dask-scratch-space/worker-dyzur2gi


In [12]:
with ProgressBar():
    results = lazy_results.compute(client=client)

KeyboardInterrupt: 

In [16]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 16,Total memory: 14.90 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:37607,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 14.90 GiB

0,1
Comm: tcp://127.0.0.1:44907,Total threads: 2
Dashboard: http://127.0.0.1:38939/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:38971,
Local directory: /tmp/dask-scratch-space/worker-9gu34k_x,Local directory: /tmp/dask-scratch-space/worker-9gu34k_x

0,1
Comm: tcp://127.0.0.1:43847,Total threads: 2
Dashboard: http://127.0.0.1:44287/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:35197,
Local directory: /tmp/dask-scratch-space/worker-ppw_osyr,Local directory: /tmp/dask-scratch-space/worker-ppw_osyr

0,1
Comm: tcp://127.0.0.1:34151,Total threads: 2
Dashboard: http://127.0.0.1:35937/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:39629,
Local directory: /tmp/dask-scratch-space/worker-4mz38agr,Local directory: /tmp/dask-scratch-space/worker-4mz38agr

0,1
Comm: tcp://127.0.0.1:38061,Total threads: 2
Dashboard: http://127.0.0.1:38785/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:39681,
Local directory: /tmp/dask-scratch-space/worker-r_mp2nki,Local directory: /tmp/dask-scratch-space/worker-r_mp2nki

0,1
Comm: tcp://127.0.0.1:40695,Total threads: 2
Dashboard: http://127.0.0.1:39309/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:39809,
Local directory: /tmp/dask-scratch-space/worker-agolv_jh,Local directory: /tmp/dask-scratch-space/worker-agolv_jh

0,1
Comm: tcp://127.0.0.1:41875,Total threads: 2
Dashboard: http://127.0.0.1:40899/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:35627,
Local directory: /tmp/dask-scratch-space/worker-r2idg54m,Local directory: /tmp/dask-scratch-space/worker-r2idg54m

0,1
Comm: tcp://127.0.0.1:33721,Total threads: 2
Dashboard: http://127.0.0.1:44857/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:43153,
Local directory: /tmp/dask-scratch-space/worker-80hb0xhf,Local directory: /tmp/dask-scratch-space/worker-80hb0xhf

0,1
Comm: tcp://127.0.0.1:42387,Total threads: 2
Dashboard: http://127.0.0.1:42391/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:34567,
Local directory: /tmp/dask-scratch-space/worker-ye0rbn8x,Local directory: /tmp/dask-scratch-space/worker-ye0rbn8x
