In [1]:
# Import the needed modules
import numpy as np
import pandas as pd
import os,random,pickle,time,glob,sys, multiprocessing
from tqdm.auto import tqdm
from collections import OrderedDict
from engineering_functions import mean_per_band_per_field

In [2]:
# Set the directory and the chunks in which the larger fields are splitted 
DATA_DIR = '../data'
MAX_CHUNKS = int(1)
DIR_BANDS = f'{DATA_DIR}/train/bands-raw/' 
# Load the data frame and add the path information of the npz objects for each field to the data frame
df = pd.read_pickle(f'{DATA_DIR}/train/field_meta_train.pkl')
df['path'] = DIR_BANDS+df.field_id.astype(str)+'.npz'

In [3]:
df

Unnamed: 0,field_id,tile_id,label,dates,path
0,1,2171,4,"[2017-04-01T00:00:00.000000000, 2017-04-11T00:...",../data/train/bands-raw/1.npz
1,2,1703,7,"[2017-04-01T00:00:00.000000000, 2017-04-11T00:...",../data/train/bands-raw/2.npz
2,3,2214,6,"[2017-04-01T00:00:00.000000000, 2017-04-11T00:...",../data/train/bands-raw/3.npz
3,4,2526,8,"[2017-04-01T00:00:00.000000000, 2017-04-11T00:...",../data/train/bands-raw/4.npz
4,6,544,4,"[2017-04-01T00:00:00.000000000, 2017-04-11T00:...",../data/train/bands-raw/6.npz
...,...,...,...,...,...
87087,122731,2298,4,"[2017-04-01T00:00:00.000000000, 2017-04-11T00:...",../data/train/bands-raw/122731.npz
87088,122732,2225,5,"[2017-04-04T00:00:00.000000000, 2017-04-14T00:...",../data/train/bands-raw/122732.npz
87089,122733,1986,2,"[2017-04-01T00:00:00.000000000, 2017-04-11T00:...",../data/train/bands-raw/122733.npz
87090,122735,997,3,"[2017-04-01T00:00:00.000000000, 2017-04-04T00:...",../data/train/bands-raw/122735.npz


In [4]:
# Create a sorted dataframe by the field ids
field_ids = sorted(df.field_id.unique())
print(f'extracting data from {len(df.field_id.unique())}')

# Check the number of CPU cores
num_processes = multiprocessing.cpu_count()
print(f'processesing on : {num_processes} cpus')

# Create a pool of processes equal to the number of cores
pool = multiprocessing.Pool(num_processes)
# Calculate the number of fields each core must process
field_per_process = len(df.field_id.unique()) / num_processes
# Create the a number of field id batches equal to the number of cores
batches = []
for num_process in range(1, num_processes + 1):
    start_index = (num_process - 1) * field_per_process + 1
    end_index = num_process * field_per_process
    start_index = int(start_index)
    end_index = int(end_index)
    sublist = df.field_id.unique()[start_index - 1:end_index]
    batches.append((sublist,))
    print(f"Task # {num_process} process tiles {len(sublist)}")

extracting data from 87092
processesing on : 8 cpus
Task # 1 process tiles 10886
Task # 2 process tiles 10887
Task # 3 process tiles 10886
Task # 4 process tiles 10887
Task # 5 process tiles 10886
Task # 6 process tiles 10887
Task # 7 process tiles 10886
Task # 8 process tiles 10887


In [None]:
batch[0].tolist()

In [24]:
len(df)

87092

In [28]:
field_ids = []
labels = []
sub_ids = []
dates = []
features = []

for batch in batches:
    for _,row in tqdm(df.iloc[batch[0].tolist()].iterrows(),total=len(df.iloc[batch[0].tolist()])):
        bands = np.load(row.path)['arr_0']

        n = bands.shape[0]
        n_dates = bands.shape[2]
        num_chunks = MAX_CHUNKS
        if n<MAX_CHUNKS*2 or MAX_CHUNKS==1:
            num_chunks = 1
            mean = np.mean(bands,axis=0)
        else:
            bands = np.array_split(bands,num_chunks)
            mean = [np.mean(x,axis=0) for x in bands]
            mean = np.concatenate(mean,axis=1)

        feature = np.concatenate([mean]).transpose(1,0)
        features.append(feature)

        field_id = np.repeat(row.field_id,len(features))
        field_ids.append(field_id)
        label = np.repeat(row.label,len(features))
        labels.append(label)
        
        ids = [i for i in range(num_chunks)]
        dts = [str(d)[:10] for d in row.dates]
        dts = np.tile(dts,num_chunks)
        dates.append(dts)
        ids = np.repeat([ids],len(row.dates))
        sub_ids.append(ids)

100%|██████████| 10886/10886 [00:36<00:00, 297.82it/s]
100%|██████████| 10887/10887 [00:35<00:00, 310.48it/s]
 12%|█▏        | 1265/10886 [00:04<00:31, 304.39it/s]


error: Error -3 while decompressing data: invalid block type

In [38]:
np.load(row.path)['arr_0']

error: Error -3 while decompressing data: invalid block type

In [None]:
all_features = np.concatenate(features)
all_field_ids = np.concatenate(field_ids)
all_labels = np.concatenate(labels)

all_sub_ids = np.concatenate(sub_ids)
all_dates = np.concatenate(dates)

cols = ['B01', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08','B8A', 'B09', 'B11', 'B12', 'CLM']
df_data = pd.DataFrame(all_features,columns=cols)
df_data.insert(0,'field_id',all_field_ids)
df_data.insert(1,'sub_id',all_sub_ids)
df_data.insert(2,'date',all_dates)
df_data.insert(3,'label',all_labels)

df_data['field_id'] = df_data['field_id'].astype(np.int32)
df_data['sub_id'] = df_data['sub_id'].astype(np.int8)
fn = f'{DATA_DIR}/s2_train_dxc{MAX_CHUNKS}_bands.h5'
df_data.to_hdf(fn,key='df')
print(f'saved data to {fn}')