In [6]:
import boto3
import matplotlib.pyplot as plt
import re
import os
import glob
import scipy.io
import numpy as np
import io
import tqdm
import numpy as np
import pandas as pd
from PIL import Image
from multiprocessing import Pool

# Utils

In [7]:
def image_name_formating(aws_key_object):
    """
    format the name of the object in aws in proper date
    """
    items = aws_key_object.split('/')
    return '-'.join(items[-4:-1]) +' '+items[-1][:2] +':'+ items[-1][2:4]+':00.jpg'

def day_of_the_year_computation(universal_time):
    """
    Computes the floating value for the day of the year
    :param numpy.datetime64 universal_time:
    :return: float day of year:
    """
    time_object = pd.to_datetime(universal_time)
    return time_object.dayofyear + time_object.hour/24.0 + time_object.minute/(24.0*60)


def zenith_computation(universal_time, longitude, latitude):
    """
    Computes the zenith matrix associated with longitude, latitude at a given universal time,
    reference NOAA Global Monitoring Division
    :param numpy.datetime64 universal_time: universal time
    :param numpy.ndarray longitude: array of the considered longitude
    :param numpy.ndarray latitude: array of the considered latitude
    :return:numpy.ndarray zenith: matrix of the zenith in degrees
    """
    if longitude.shape == latitude.shape:
        day_of_year = day_of_the_year_computation(universal_time)
        x = 2 * np.pi / 365 * (day_of_year - 1)  # day of year un radian, named x because heavily referenced

        eqtime = (0.000075 + 0.001868 * np.cos(x) - 0.032077 * np.sin(x) - 0.014615 * np.cos(2 * x)
                  - 0.040849 * np.sin(2 * x)) * ((24 * 60) / (2 * np.pi))

        solar_declination = 0.006918 - 0.399912 * np.cos(x) + 0.070257 * np.sin(x) \
                            - 0.006758 * np.cos(2 * x) + 0.000907 * np.sin(2 * x) \
                            - 0.002697 * np.cos(3 * x) + 0.001480 * np.sin(3 * x)

        longitude_corrected = 4*longitude
        offset = (longitude_corrected + eqtime).astype('timedelta64[m]')
        true_solar_time = offset + universal_time
        true_solar_time = [x.astype(object) for x in true_solar_time.flatten()]
        true_solar_time_minutes = [(lambda time: (time.hour*60 + time.minute + time.second/60))(time)
                                   for time in true_solar_time]

        hour_angle = np.array([(x/4 - 180)*np.pi/180 for x in true_solar_time_minutes]).reshape(latitude.shape)
        zenith = np.arccos(np.cos(solar_declination)*np.cos(latitude*np.pi/180)*np.cos(hour_angle)
                           + np.sin(solar_declination)*np.sin(latitude*np.pi/180))

        return zenith * 180 / np.pi
    else:
        return ValueError

# Constants

In [8]:
TEMP_DIR = '/tmp/images/'
os.makedirs(TEMP_DIR, exist_ok=True)
bucket_name  = 'edp.engie-digital.prod.instance1.prod.data'

coordinates = scipy.io.loadmat('geographicCoordinate.mat')
latitude = coordinates['geographicCoordinate']['latitude'][0][0]
longitude = coordinates['geographicCoordinate']['longitude'][0][0]

percentiles = (3,6)

num_partitions = 53
num_cores = 28 

# Connexion

In [9]:
s3 = boto3.resource('s3')

bucket = s3.Bucket(bucket_name)

s3_client = boto3.client('s3')

In [10]:
%%time
processed_images = [x.key for x  in bucket.objects.filter(Prefix='LCV_SATELLITE_IMAGE_24/lcv_visual_images/processed/')]

ClientError: An error occurred (InvalidAccessKeyId) when calling the ListObjects operation: The AWS Access Key Id you provided does not exist in our records.

In [6]:
archives = list(set([(x.split('/')[4],x.split('/')[6]) for x in processed_images]))
archives = [elt for elt in archives if (int(elt[1].split('.')[0][-2:])%15 == 0)]

In [14]:
archives = list(set([(x.split('/')[4],x.split('/')[6]) for x in processed_images]))

In [7]:
len(archives)

855

In [7]:
def parallelize_dataframe(list_archives, func):
    df_split = np.array_split(list_archives, num_partitions)
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

In [8]:
def compute_cloud_index_walon(archives):
    date_resultats = []
    cloud_index_a_walon = []
    for month,hour in archives:
        TEMP_DIR_LOOP = TEMP_DIR+month+hour[:-4]+'/'
        os.makedirs(TEMP_DIR_LOOP, exist_ok=True)
        #print(hour)
        bucket.download_file('LCV_SATELLITE_IMAGE_24/clearsky_ref/month_{}_hour_{}'.format(month, hour), '/tmp/clearsky_ref_month_{}_hour_{}'.format(month,hour))
        bucket.download_file('LCV_SATELLITE_IMAGE_24/overcast_ref/month_{}_hour_{}'.format(month, hour), '/tmp/overcast_ref_month_{}_hour_{}'.format(month,hour))
        clear_sky = plt.imread('/tmp/clearsky_ref_month_{}_hour_{}'.format(month,hour))
        overcast = plt.imread('/tmp/overcast_ref_month_{}_hour_{}'.format(month,hour))

        images_to_concatenate = [x for x in processed_images if x.endswith(hour)]
        [os.remove(TEMP_DIR_LOOP+x) for x in os.listdir(TEMP_DIR_LOOP)];
        image_names = [image_name_formating(name) for name in images_to_concatenate]
        [bucket.download_file(image, TEMP_DIR_LOOP+image_name) for image, image_name in zip(images_to_concatenate, image_names)];
        concatenated_images = [plt.imread(TEMP_DIR_LOOP+x) for x in os.listdir(TEMP_DIR_LOOP)]
        zenith_images = [zenith_computation(np.datetime64(image_name[:-4]), longitude, latitude)<85 
                     for image_name in image_names]


        masks = [(img > clear_sky) & ((255 - clear_sky)>=1)&zenith for img, zenith in zip(concatenated_images, zenith_images)]
        results = [np.zeros(img.shape) for img in concatenated_images]
        for mask, result,img in zip(masks, results, concatenated_images):
            result[mask] = np.divide((img[mask] - clear_sky[mask]),  overcast[mask]-clear_sky[mask])
        cloud_index = results.copy()
        for cloud, result in zip(cloud_index, results):
            cloud[result>1.2] = 0
        cloud_index_a_walon.append([cloud[527,411] for cloud in cloud_index])
        date_resultats.append(image_names)
    return pd.DataFrame({'date': date_resultats, 'cloud_index_walon':cloud_index_a_walon})

In [None]:
%%time
computed_cloud_index = parallelize_dataframe(archives, compute_cloud_index_walon)

Exception in thread Thread-21:
Traceback (most recent call last):
  File "/home/cob/anaconda3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/cob/anaconda3/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/cob/anaconda3/lib/python3.6/multiprocessing/pool.py", line 463, in _handle_results
    task = get()
  File "/home/cob/anaconda3/lib/python3.6/multiprocessing/connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
TypeError: __init__() missing 1 required positional argument: 'operation_name'



In [None]:
%%time
result = compute_cloud_index_walon(archives[0:2])

In [12]:
result

NameError: name 'result' is not defined

In [None]:
import 