In [1]:
import numpy as np
import math, time
import pandas as pd
from pvlib import solarposition as solar
import datetime
from matplotlib import pyplot as plt
import pytz
from skimage import draw
from random import randint
from tqdm import tqdm
from proj_utils import timers as tm
from proj_utils import load_array_from_csv as lafc
from proj_utils import save_array_to_csv as satc

In [2]:
# Loading data for initial results
# Image size: 64x64x3
datetime_trainval = np.load('dataset/datetime_trainval.npy', allow_pickle=True)
images_trainval = np.load('dataset/images_trainval.npy', allow_pickle=True)
datetime_test = np.load('dataset/datetime_test.npy', allow_pickle=True)
images_test = np.load('dataset/images_test.npy', allow_pickle=True)
print(images_trainval.shape)

(92975, 64, 64, 3)


In [3]:
# Swapping BGR => RGB
images_trainval = images_trainval[:,:,:,::-1]
images_test = images_test[:,:,:,::-1]

In [4]:
# Returns a boolean matrix of img: a pixel is True if it is within r radius of (x,y)
def get_circle_coordinates(img, x, y, r):
    xx, yy = np.mgrid[:img.shape[0], :img.shape[1]]
    circle = (xx - x) ** 2 + (yy - y) ** 2
    circumsolar = np.logical_and(circle <= round(r**2), True)
    return circumsolar

In [5]:
# Coordinates for the Jen-Hsun Huang Engineering Center (location of the camera)
LATITUDE = 37.427940
LONGITUDE = -122.174220

# Radius of camera
R = 29

# Camera deviation angle from the due north
delta = 14

# x,y coordinates of the origin
O_x = 30
O_y = 30

# Calculate total number of pixels inside circle image
N_PIXELS_CIRCLE = np.sum(get_circle_coordinates(images_trainval[0], O_x, O_y, R))

In [6]:
# Calculate the NRBR (normalised red-blue ratio) of an image, pixel-by-pixel
# The image could be flattened (pixels, channels)
def NRBR(img):
    if len(img.shape) == 2:
        B = img[:,2]
        R = img[:,0]
    else:
        B = img[:,:,2]
        R = img[:,:,0]
    
    return (B-R)/(B+R+1e-4)

In [7]:
# Calculates the number of days from the start of the year
def day_of_year(dt):
    return (dt - datetime.datetime(dt.year, 1, 1)).days + 1

# Sun position identification algorithm
def sun_position(img, dt):
    d = day_of_year(dt)
    dt_index = pd.DatetimeIndex([dt], tz=pytz.timezone('US/Pacific'))
    # B = (360/365) * (d - 81)
    # eot = 9.87 * math.sin(2 * math.radians(B)) - 7.53 * math.cos(math.radians(B)) - 1.5 * math.sin(math.radians(B))
    # eot = solar.equation_of_time_spencer71(d)
    # hour_angle = solar.hour_angle(times=dt_index, longitude=LONGITUDE, equation_of_time=eot)
    # declination = solar.declination_spencer71(d)

    # Use get_solarposition from the pvlib library
    pos = solar.get_solarposition(dt_index, LATITUDE, LONGITUDE)
    zenith = pos.zenith[0]
    azimuth = pos.azimuth[0]
    # print(zenith, azimuth)
    
    # zenith & azimuth in radians
    # zenith = solar.solar_zenith_analytical(LATITUDE, hour_angle, declination)
    # azimuth = solar.solar_azimuth_analytical(LATITUDE, hour_angle, declination, zenith)
    # print(math.degrees(zenith), math.degrees(azimuth))

    rho = zenith / 90 * R
    theta = azimuth - delta + 90
    
    sun_x = round(O_x - rho * math.sin(math.radians(theta)))
    sun_y = round(O_y + rho * math.cos(math.radians(theta)))

    return sun_x, sun_y

In [8]:
# Load the "Clear Sky Library" (and identifies sun positions for all the images)
def load_CSL():
    # Find image indices for dates corresponding to CSL
    date_queries = set([(5,20),(8,15),(9,23),(10,22)])
    trainval_indices = []
    test_indices = []
    for i, dt in enumerate(datetime_trainval):
        date = dt.month, dt.day
        if date in date_queries:
            trainval_indices.append(i)
    for i, dt in enumerate(datetime_test):
        date = dt.month, dt.day
        if date in date_queries:
            test_indices.append(i)
    
    # Extracting CSL images
    tv_CSL = images_trainval[trainval_indices]
    test_CSL = images_test[test_indices]
    CSL = np.concatenate((tv_CSL, test_CSL), axis=0)
    tv_CSL_dt = datetime_trainval[trainval_indices]
    test_CSL_dt = datetime_test[test_indices]
    CSL_datetimes = np.concatenate((tv_CSL_dt, test_CSL_dt))

    # Determining solar position for each CSL image
    sun_positions = []
    for i, img in enumerate(CSL):
        dt = CSL_datetimes[i]
        sun_pos = sun_position(img, dt)
        sun_positions.append(sun_pos)

    return CSL, sun_positions

CSL, CSL_sun_pos = load_CSL()

# Find a clear sky image in the CSL that has the sun at position x,y
def find_CSL(x, y):
    # Get exact match
    if (x,y) in CSL_sun_pos:
        return CSL[CSL_sun_pos.index((x,y))]
    else:
        r = 1
        # Search the pixels outwards
        while r <= 10:
            for i in range(-r, r+1):
                for j in range(-r, r+1):
                    if j == -r or j == r or i == -r or i == r:
                        try:
                            return CSL[CSL_sun_pos.index((x+i, y+j))]
                        except:
                            pass
            r += 1
    raise ValueError(f'CSL image could not be found within radius {r-1} of ({x},{y}).')

In [9]:
# Calculate the cloudiness (i.e. the fraction of cloud pixels in the sky)
# Input is a boolean array: True for cloud pixels, False for sky pixels
def calculate_cloudiness(cloud_pixels):
    return np.sum(cloud_pixels) / N_PIXELS_CIRCLE

In [10]:
# Fixed Threshold Method
def fixed_threshold(NRBR):
    return calculate_cloudiness(NRBR <= 0.05)

In [11]:
# modified Threshold with Background Subtraction Method
def modified_threshold_with_BS(img, dt):
    NRBR_original = NRBR(img)
    sun_x, sun_y = sun_position(img, dt)
    clear_sky = find_CSL(sun_x, sun_y)
    delta_NRBR = np.abs(NRBR_original - NRBR(clear_sky))
    inside_circle = get_circle_coordinates(img, O_x, O_y, R)
    # True means cloud, False means sky (or background!)
    cloud_pixels = np.logical_and(delta_NRBR >= 0.175, inside_circle)
    cloudiness = calculate_cloudiness(cloud_pixels)

    if cloudiness < 0.045:
        return cloudiness
    
    elif cloudiness < 0.35:
        # Pixels outside the circumsolar area
        circumsolar = get_circle_coordinates(img, sun_x, sun_y, 7)
        outside = ~circumsolar & inside_circle
        NRBR_outside = NRBR(img[outside])
        return fixed_threshold(NRBR_outside)
    
    else:
        return fixed_threshold(NRBR_original)

In [12]:
"""
Classify the cloudiness of an image into 3 classes (sunny, cloudy, overcast)
or 2 classes (just sunny and cloudy). 3 classes is the default option as it was
found to be optimal. Returns an integer between 0 and 2 and the cloudiness value.
"""
def cloudiness_classification(img, dt, n_classes=3):
    sun_x, sun_y = sun_position(img, dt)
    cloudiness = modified_threshold_with_BS(img, dt)
    
    if n_classes == 3:
        if cloudiness <= 0.16:
            return 0, cloudiness
        elif cloudiness <= 0.595:
            return 1, cloudiness
        else:
            #print(f"cloudiness : {cloudiness}")
            # Sun Area Mean Pixel Intensity
            sun_area = get_circle_coordinates(img, sun_x, sun_y, 2)
            #print(f"sun_area.shape : {sun_area.shape}")
            sun_area_pixels = img[sun_area]
            #print(f"sun_area_pixels.shape : {sun_area_pixels.shape}")
            
            ########
            #R,G,B = sun_area_pixels[:,:,0], sun_area_pixels[:,:,1], sun_area_pixels[:,:,2]
            R,G,B = sun_area_pixels[::,0], sun_area_pixels[::,1], sun_area_pixels[::,2]
            ########
            
            I = 0.229 * R + 0.587 * G + 0.114 * B
            #print(f"I: {I}")
            #SAMPI = np.mean(I)[0]
            SAMPI = np.mean(I)
            #print(f"SAMPI: {SAMPI}")
    
            #return 1, cloudiness if SAMPI >= 195 else 2, cloudiness
            if SAMPI >= 195:
                return 1, cloudiness
            else:
                return 2, cloudiness
        
    elif n_classes == 2:
        #return 0, cloudiness if cloudiness <= 0.05 else 1, cloudiness
        if cloudiness <= 0.05:
            return 0, cloudiness
        else:
            return 1, cloudiness
        
        

# images_test classification with treshold 0.16 and 0.595
classes = ['sunny', 'cloudy', 'overcast']
test_preds = []

t0 = time.time()
for i, img in enumerate(tqdm(images_test)):
    dt = datetime_test[i]
    prediction, cloudiness = cloudiness_classification(img, dt)
    test_preds.append(classes[prediction])
    
test_pred_Sunny   = [pred for pred in test_preds if pred == 'sunny']
test_pred_Cloudy  = [pred for pred in test_preds if pred == 'cloudy']
test_pred_Overcast= [pred for pred in test_preds if pred == 'overcast']

print(f"nb sunny    : {len(test_pred_Sunny)}")
print(f"nb cloudy   : {len(test_pred_Cloudy)}")
print(f"nb overcast : {len(test_pred_Overcast)}\n")

print(f"Tot duration: {tm.to_mins(t0)} mins")

100%|██████████| 9910/9910 [02:18<00:00, 71.53it/s]

nb sunny    : 5926
nb cloudy   : 2941
nb overcast : 1043

Tot duration: 2.31 mins





<div class="alert alert-block alert-success">
    <b>Note:</b>
   paper images_test clasification: 5924(S)  2948(C)  1038(O)
</div>

In [13]:
# images_trainval classification
trainval_preds = []
t0 = time.time()
for i, img in enumerate(tqdm(images_trainval)):
    dt = datetime_trainval[i]
    prediction, cloudiness = cloudiness_classification(img, dt)
    trainval_preds.append(classes[prediction])
    
pred_Sunny   = [pred for pred in trainval_preds if pred == 'sunny']
pred_Cloudy  = [pred for pred in trainval_preds if pred == 'cloudy']
pred_Overcast= [pred for pred in trainval_preds if pred == 'overcast']

print(f"nb Sunny    : {len(pred_Sunny)}")
print(f"nb Cloudy   : {len(pred_Cloudy)}")
print(f"nb Overcast : {len(pred_Overcast)}\n")

print(f"Tot duration: {tm.to_mins(t0)} mins")

# save the classification arrays to csv file
satc.save_array_to_csv(trainval_preds, 'trainval_phys_3calsses', header=False, index=False)
satc.save_array_to_csv(test_preds, 'test_phys_3calsses', header=False, index=False)

100%|██████████| 92975/92975 [21:33<00:00, 71.88it/s]


nb Sunny    : 58642
nb Cloudy   : 27762
nb Overcast : 6571

Tot duration: 21.56 mins


In [14]:
# UNIT TESTS

assert len(lafc.load_array_from_csv('trainval_phys_3calsses', skip_header = False)) == 92975
assert len(lafc.load_array_from_csv('test_phys_3calsses', skip_header = False)) == 9910

print("Success!")

Success!
