In [1]:
#Las librerias de os y glob se utiliza para el manejo de las rutas de los archivos guardados
import os
import glob

#netCDF se utiliza para procesar los archivos *.nc
import netCDF4 as nc

#librerias de manipulación de datos generales
import numpy as np
import pandas as pd
import xray

#se importa esto debido a los warnings que generan utilizar
#versiones antiguas de los modulos (pero que son necesarias)
import warnings
warnings.filterwarnings('ignore')

#librerias de visualización de gráficos e imagenes
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="ticks")

#Se utiliza datetime para obtener el timestamp de la generación
#de imagenes de la temperatura oceanica
from datetime import datetime

#Librerias de Apache Spark
import findspark
import pyspark

  from pandas.tslib import OutOfBoundsDatetime


In [2]:
#Se define el contexto de spark que se utiliza para
#las paralelizaciones de tareas
findspark.init()

sc = pyspark.SparkContext()

In [3]:
#Se toma todas las rutas de los archivos pertenecientes al dataset de oceanos almacenados
dataset = list(glob.glob("/archive/data/*_1hrly_prog.nc"))
dataset.sort()

In [4]:
print(dataset)

['/archive/data/rtofs_glo_2ds_f000_1hrly_prog.nc', '/archive/data/rtofs_glo_2ds_f001_1hrly_prog.nc', '/archive/data/rtofs_glo_2ds_f002_1hrly_prog.nc', '/archive/data/rtofs_glo_2ds_f003_1hrly_prog.nc', '/archive/data/rtofs_glo_2ds_f004_1hrly_prog.nc', '/archive/data/rtofs_glo_2ds_f005_1hrly_prog.nc', '/archive/data/rtofs_glo_2ds_f006_1hrly_prog.nc', '/archive/data/rtofs_glo_2ds_f007_1hrly_prog.nc']


In [5]:
#Función que se encarga de procesar los datos oceanográficos (archivos .nc)
#que genera las visualizaciones de la temperatura del oceano
def save_plot_img(fn):
    data = nc.Dataset(os.path.join(fn))
    sst = data.variables['sst']
    ds = xray.open_dataset(fn, decode_times=True)
    sst = ds.sst.values.ravel()
    sst_masked = sst[~np.isnan(sst)]
    
    plt.figure(figsize=(7,7))
    
    ## Masking a numpy array with multiple logical criteria:
    # sst_between_-10_5 = sst[(sst > -10) & (sst < 5)]
    now = datetime.now()
    timestamp = datetime.timestamp(now)
    plt.imshow(ds.sst[0,::-100,::100])
    plt.savefig('T_ocean_low_res'+str(timestamp)+'.jpg')
    plt.imshow(ds.sst[0,::-1,...])
    plt.savefig('T_ocean_high_res'+str(timestamp)+'.jpg')

In [6]:
#Se prepara la paralelización de las tareas
parallel_plots = sc.parallelize(dataset).map(save_plot_img)

In [7]:
#Se ejecuta esta paralelización
parallel_plots.collect()

[None, None, None, None, None, None, None, None]