In [1]:
# Supress Warnings 
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
from pandarallel import pandarallel
import multiprocessing as mp
import numpy as np
from datetime import datetime, timedelta
import os
import pystac
import pystac_client
import odc
from tqdm import tqdm
from pystac_client import Client
from pystac.extensions.eo import EOExtension as eo
from odc.stac import stac_load
import planetary_computer as pc
pc.settings.set_subscription_key('6d4762f1152d42a285532dd26ea62836')

In [2]:
train_path = '../data/train.csv'
test_path = '../data/test.csv'
train_df = pd.read_csv(train_path)
test_df = pd.read_csv(test_path)

In [3]:
def save_data(row, path, history=120, resolution=10, surrounding_box=0.1, num_images=20):
    longitude = row['Longitude']
    latitude = row['Latitude']
    min_longitude = longitude - surrounding_box / 2
    min_latitude = latitude - surrounding_box / 2
    max_longitude = longitude + surrounding_box / 2
    max_latitude = latitude + surrounding_box / 2
    bbox = [min_longitude, min_latitude, max_longitude, max_latitude]
    
    havest_date = row['Date of Harvest']
    havest_datetime = datetime.strptime(havest_date, '%d-%m-%Y')
    sowing_datetime = havest_datetime - timedelta(days=history)
    time_period = f'{sowing_datetime.strftime("%Y-%m-%d")}/{havest_datetime.strftime("%Y-%m-%d")}'
    catalog = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")
    search = catalog.search(collections=["sentinel-2-l2a"], bbox=bbox, datetime=time_period)
    items = list(search.get_all_items())
    
    scale = resolution / 111320.0
    bands = ['red', 'green', 'blue', 'nir', 'rededge', 'B05', 'B06', 'B07', 'SCL']
    
    data = stac_load(
        items,
        bands=bands,
        crs="EPSG:4326",
        resolution=scale,
        chunks={"x": 2048, "y": 2048},
        dtype="uint16",
        patch_url=pc.sign,
        bbox=bbox
    )
    
    for i in range(1, num_images+1):
        time = data.time[-i].values
        date = np.datetime_as_string(time, unit='D')
        
        for band in bands:
            file_name = f'{longitude}_{latitude}_{date}_{band}'.replace('.', '-')
            array = data[band][-i].to_numpy()
            np.save(f'{path}/{file_name}.npy', array) 
            print(file_name)

In [5]:
os.makedirs('../data/raw', exist_ok=True)
pandarallel.initialize(progress_bar=True)

# Save train data 
train_path = '../data/raw/train'
os.makedirs(train_path, exist_ok=True)
train_df.parallel_apply(lambda row: save_data(row, train_path), axis=1)

# Save test data 
test_path = '../data/raw/test'
os.makedirs(test_path, exist_ok=True)
test_df.parallel_apply(lambda row: save_data(row, test_path), axis=1)

INFO: Pandarallel will run on 4 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.


VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=140), Label(value='0 / 140'))), HB…

The process has forked and you cannot use this CoreFoundation functionality safely. You MUST exec().
Break on __THE_PROCESS_HAS_FORKED_AND_YOU_CANNOT_USE_THIS_COREFOUNDATION_FUNCTIONALITY___YOU_MUST_EXEC__() to debug.
The process has forked and you cannot use this CoreFoundation functionality safely. You MUST exec().
Break on __THE_PROCESS_HAS_FORKED_AND_YOU_CANNOT_USE_THIS_COREFOUNDATION_FUNCTIONALITY___YOU_MUST_EXEC__() to debug.
The process has forked and you cannot use this CoreFoundation functionality safely. You MUST exec().
Break on __THE_PROCESS_HAS_FORKED_AND_YOU_CANNOT_USE_THIS_COREFOUNDATION_FUNCTIONALITY___YOU_MUST_EXEC__() to debug.
The process has forked and you cannot use this CoreFoundation functionality safely. You MUST exec().
Break on __THE_PROCESS_HAS_FORKED_AND_YOU_CANNOT_USE_THIS_COREFOUNDATION_FUNCTIONALITY___YOU_MUST_EXEC__() to debug.
Process ForkPoolWorker-7:
Process ForkPoolWorker-6:
Process ForkPoolWorker-8:
Process ForkPoolWorker-9:
Traceback (most recent c

KeyboardInterrupt: 

  File "/Users/admin/opt/anaconda3/envs/crop-forecasting/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/admin/opt/anaconda3/envs/crop-forecasting/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/admin/opt/anaconda3/envs/crop-forecasting/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Users/admin/opt/anaconda3/envs/crop-forecasting/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Users/admin/opt/anaconda3/envs/crop-forecasting/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Users/admin/opt/anaconda3/envs/crop-forecasting/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Users/admin/opt/anaconda3/envs/crop-forecasting/lib/python3.8/multiprocessing/queues.py", line 355, in get
    with self._rlock:
