In [None]:
import os
import sys
module_path = os.path.abspath(os.path.join(".."))
if module_path not in sys.path:
    sys.path.append(module_path)

In [None]:
import numpy as np
from utils.logger import PrettyLogger
from utils.date import str2date, int2date_delta, date2str
from utils.io_func import save_to_npy, load_from_tiff
from config import (
    START_V_I, START_H_I, SIDE_LEN, INTRPL_START_DATE_STR, INTRPL_END_DATE_STR
)

In [None]:
logger = PrettyLogger()

In [None]:
SITE = "Site_A"
YEAR = "2015"
DATA_DIR = "../data/{}/ARD/{}/".format(SITE, YEAR)
OUTPUT_DIR = "./out/{}/ARD/cropped_interpolated/{}/".format(SITE, YEAR)
AVAI_PATH = os.path.join(OUTPUT_DIR, "availability.npy")
FILTER_BAND_PATH = os.path.join(OUTPUT_DIR, "filter_band.npy")
INTERPOLATED_PATH = os.path.join(OUTPUT_DIR, "interpolated.npy")
FINAL_OUTOUT_FILEPATH = "./out/{}/x-{}.npy".format(SITE, YEAR)

# ARD observation input, cropping and filtering

In [None]:
# link the filenames to date
date_filename_dict = {}
for filename in sorted(os.listdir(DATA_DIR)):
    date = str2date(filename[15:23])
    if (
        date >= str2date("{}{}".format(YEAR, INTRPL_START_DATE_STR))
        and date <= str2date("{}{}".format(YEAR, INTRPL_END_DATE_STR))
    ):
        if date not in date_filename_dict.keys():
            date_filename_dict[date] = []
        date_filename_dict[date].append(filename)

In [None]:
# read ARD images, crop ARD images and detect invalid values
raw_dates = sorted(date_filename_dict.keys())
availability = np.zeros((SIDE_LEN, SIDE_LEN, len(raw_dates)))
valid = np.zeros((SIDE_LEN, SIDE_LEN, len(raw_dates), 6))

to_fill = np.vectorize(lambda x: int("{:011b}".format(x)[-1], 2))
to_clear = np.vectorize(lambda x: int("{:011b}".format(x)[-2], 2))
to_cloud_shadow = np.vectorize(lambda x: int("{:011b}".format(x)[-4], 2))
to_cloud = np.vectorize(lambda x: int("{:011b}".format(x)[-6], 2))
for i, date in enumerate(raw_dates):
    logger.info("Loading: {}/{}".format(i+1, len(raw_dates)), date2str(date))
    sr_bands = []
    for filename in date_filename_dict[date]:
        band = load_from_tiff(os.path.join(DATA_DIR, filename))[
            START_V_I:START_V_I+SIDE_LEN, START_H_I:START_H_I+SIDE_LEN
        ]
        if filename[-11:-4] != "PIXELQA":
            sr_bands.append(band)
        else:
            qa_band = band
    sr_bands = np.array(sr_bands).transpose((1, 2, 0))

    flag_sr_range = ((sr_bands >= 0) & (sr_bands <= 10000)).all(axis=2)
    fill_band = to_fill(qa_band)
    flag_fill = (fill_band == 0)
    clear_band = to_clear(qa_band)
    flag_clear = (clear_band == 1)
    cloud_shadow_band = to_cloud_shadow(qa_band)
    flag_cloud_shadow = (cloud_shadow_band == 0)
    cloud_band = to_cloud(qa_band)
    flag_cloud = (cloud_band == 0)
    flag = flag_sr_range*flag_fill*flag_clear*flag_cloud_shadow*flag_cloud

    availability[:, :, i] = flag

    # make invalid observations zero, only for the convenience of debugging
    valid[:, :, i, :] = sr_bands
    valid[:, :, i, :] = valid[:, :, i, :]*(flag.reshape(*flag.shape, 1))

save_to_npy(availability, AVAI_PATH)

In [None]:
"""
========== PIXEL FILTER METHOD BY AVAILABILITY ==========
If the number of valid observations after May 15 >= 7,
the pixel will be included in the dataset, otherwise it will be excluded.
"""

index4filter = raw_dates.index(list(filter(
    lambda x: x > str2date("{}0515".format(YEAR)), raw_dates
))[0])
filter_band = availability[:, :, index4filter:].sum(axis=2) >= 7
logger.info("Validity percentage ({} {}): {:.4f}".format(
    SITE, YEAR,
    filter_band.sum()/(filter_band.shape[0]*filter_band.shape[1])
))
save_to_npy(filter_band, FILTER_BAND_PATH)

# Temporal interpolation

In [None]:
# prepare target dates for interpolation
intrpl_start_date = str2date("{}{}".format(YEAR, INTRPL_START_DATE_STR))
intrpl_end_date = str2date("{}{}".format(YEAR, INTRPL_END_DATE_STR))
intrpl_delta_days = list(range(
    0, (intrpl_end_date - intrpl_start_date).days + 1, 7
))
intrpl_dates = [
    int2date_delta(intrpl_delta_day) + intrpl_start_date
    for intrpl_delta_day in intrpl_delta_days
]

In [None]:
'''
========== INTERPOLATION METHOD ==========
situation I (normal): d_1, d_2*, target, d_3*, d_4, ...
situation II (close to the start date): target, d_1*, d_2*, d_3, ...
situation III (close to the end date): d_1, d_2, ..., d_(-2), d_(-1), target
'''

interpolated = np.zeros((SIDE_LEN, SIDE_LEN, len(intrpl_dates), 6))
for intrpl_date_index, intrpl_date in enumerate(intrpl_dates):
    logger.info("Interpolating: {}/{} {} ".format(
        intrpl_date_index + 1, len(intrpl_dates), date2str(intrpl_date))
    )
    # descending/ascending order for searching the nearest day before/after
    before_dates = list(filter(lambda x: x <= intrpl_date, raw_dates))[::-1]
    after_dates = list(filter(lambda x: x >= intrpl_date, raw_dates))

    for i in range(SIDE_LEN):
        for j in range(SIDE_LEN):

            # filter invalid pixel
            if not filter_band[i, j]:
                continue

            # situation I
            d_1 = None
            for nearest_before_index, before_date in enumerate(before_dates):
                before_date_raw_index = raw_dates.index(before_date)
                if availability[i, j][before_date_raw_index]:
                    d_1 = before_date
                    date_raw_index_1 = before_date_raw_index
                    break
            d_2 = None
            for nearest_after_index, after_date in enumerate(after_dates):
                after_date_raw_index = raw_dates.index(after_date)
                if availability[i, j][after_date_raw_index]:
                    d_2 = after_date
                    date_raw_index_2 = after_date_raw_index
                    break

            # situation II: search the second nearest after date
            if not d_1:
                for after_date in after_dates[nearest_after_index+1:]:
                    after_date_raw_index = raw_dates.index(after_date)
                    if availability[i, j][after_date_raw_index]:
                        d_1 = after_date
                        date_raw_index_1 = after_date_raw_index
                        break

            # situation III: search the second nearest before date
            if not d_2:
                for before_date in before_dates[nearest_before_index+1:]:
                    before_date_raw_index = raw_dates.index(before_date)
                    if availability[i, j][before_date_raw_index]:
                        d_2 = before_date
                        date_raw_index_2 = before_date_raw_index
                        break

            interpolated[i][j][intrpl_date_index] = [np.interp(
                (intrpl_date-d_1).days,
                [0, (d_2-d_1).days],
                [valid[i, j, date_raw_index_1, band_index],
                    valid[i, j, date_raw_index_2, band_index]]
            ) for band_index in range(6)]

save_to_npy(interpolated, INTERPOLATED_PATH)
x = interpolated[filter_band]
save_to_npy(x, FINAL_OUTOUT_FILEPATH)