## Import Basic Libraries

In [1]:
import numpy as np
import pandas as pd
import glob
import sys
import h5py
#from netCDF4 import Dataset
from datetime import datetime
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from scipy.spatial import cKDTree

import pyarrow as pa
import pyarrow.parquet as pq

from functools import reduce
import operator
import gc

In [2]:
# plot settings
plt.rc('font', family='serif') 
plt.rc('font', serif='Times New Roman') 
plt.rcParams.update({'font.size': 16})
plt.rcParams['mathtext.fontset'] = 'stix'

## Define SparkSession and sparkContext

In [5]:
# This is for the KISTI env.
"""
spark = SparkSession \
    .builder \
    .master("spark://spark-ms-0.spark-ms.default.svc.cluster.local:7077") \
    .config("spark.driver.memory", "20g") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.maxResultSize", "8g")\
    .config("spark.hadoop.fs.s3a.access.key", "spark")\
    .config("spark.hadoop.fs.s3a.secret.key", "kistispark")\
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio-service.default.svc.cluster.local:9000")\
    .config("spark.hadoop.fs.s3a.path.style.access", True)\
    .appName("test") \
    .getOrCreate()

sc = spark.sparkContext
"""

'\nspark = SparkSession     .builder     .master("spark://spark-ms-0.spark-ms.default.svc.cluster.local:7077")     .config("spark.driver.memory", "20g")     .config("spark.executor.memory", "16g")     .config("spark.driver.maxResultSize", "8g")    .config("spark.hadoop.fs.s3a.access.key", "spark")    .config("spark.hadoop.fs.s3a.secret.key", "kistispark")    .config("spark.hadoop.fs.s3a.endpoint", "http://minio-service.default.svc.cluster.local:9000")    .config("spark.hadoop.fs.s3a.path.style.access", True)    .appName("test")     .getOrCreate()\n\nsc = spark.sparkContext'

In [3]:
# PySpark packages
from pyspark import SparkContext   
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import Row
from pyspark.sql.window import Window as W


spark = SparkSession.builder \
    .master("yarn") \
    .appName("spark-shell") \
    .config("spark.driver.maxResultSize", "32g") \
    .config("spark.driver.memory", "32g") \
    .config("spark.executor.memory", "14g") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.instances", "60") \
    .getOrCreate()


sc = spark.sparkContext
sc.setCheckpointDir("hdfs://spark00:54310/tmp/checkpoints")

spark.conf.set("spark.sql.debug.maxToStringFields", 500)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [4]:
sc.getConf().getAll()[:10]

[('spark.driver.memory', '32g'),
 ('spark.driver.appUIAddress', 'http://spark00:4040'),
 ('spark.driver.maxResultSize', '32g'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://spark04:8088/proxy/application_1689144870884_0095'),
 ('spark.master', 'yarn'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.executor.cores', '2'),
 ('spark.app.id', 'application_1689144870884_0095'),
 ('spark.serializer.objectStreamReset', '100')]

## Read h5 files 

In [5]:
h5dir = '/mnt/raid5/shong/oco2/'

In [6]:
flist = !ls /mnt/raid5/shong/oco2/

In [7]:
flist[:5]

['diff.txt',
 'file.txt',
 'file_e.txt',
 'oco2_L1bScGL_37218a_210701_B10206r_210817230313.h5',
 'oco2_L1bScGL_37219a_210701_B10206r_210818001332.h5']

In [8]:
h5list = flist[3:]

In [9]:
numh5list = len(h5list)
print(numh5list)

654


In [10]:
h5dir+h5list[0]

'/mnt/raid5/shong/oco2/oco2_L1bScGL_37218a_210701_B10206r_210817230313.h5'

In [11]:
h5py.is_hdf5(h5dir+h5list[0])

True

In [12]:
try:
    h5f = h5py.File(h5dir+h5list[0], "r")
except IOError as e:
    print("Error opening HDF5 file:", str(e))
# Don't forget f.close() when done! 

## Explore the `h5`

In [13]:
h5f.keys()

<KeysViewHDF5 ['Dimensions', 'FootprintGeometry', 'FrameConfiguration', 'FrameGeometry', 'FrameHeader', 'FrameTemperatures', 'InstrumentHeader', 'Metadata', 'RadianceClockingCorrection', 'Shapes', 'SliceMeasurements', 'SoundingGeometry', 'SoundingMeasurements', 'SpikeEOF']>

In [14]:
for key in h5f.keys():
    item = h5f[key]
    
    if isinstance(item, h5py.Group):  # Check if it's a group
        print(f"Group: {key}")
    elif isinstance(item, h5py.Dataset):  # Check if it's a dataset
        print(f"Dataset: {key}, dtype: {item.dtype}")
    else:
        print(f"Unknown item: {key}")

Group: Dimensions
Group: FootprintGeometry
Group: FrameConfiguration
Group: FrameGeometry
Group: FrameHeader
Group: FrameTemperatures
Group: InstrumentHeader
Group: Metadata
Group: RadianceClockingCorrection
Group: Shapes
Group: SliceMeasurements
Group: SoundingGeometry
Group: SoundingMeasurements
Group: SpikeEOF


In [15]:
h5f['FootprintGeometry'].keys()

<KeysViewHDF5 ['footprint_altitude', 'footprint_altitude_uncert', 'footprint_aspect', 'footprint_azimuth', 'footprint_land_fraction', 'footprint_latitude', 'footprint_latitude_geoid', 'footprint_longitude', 'footprint_longitude_geoid', 'footprint_los_surface_bidirectional_angle', 'footprint_o2_qual_flag', 'footprint_plane_fit_quality', 'footprint_polarization_angle', 'footprint_slope', 'footprint_solar_azimuth', 'footprint_solar_surface_bidirectional_angle', 'footprint_solar_zenith', 'footprint_stokes_coefficients', 'footprint_strong_co2_qual_flag', 'footprint_surface_roughness', 'footprint_time_string', 'footprint_time_tai93', 'footprint_vertex_altitude', 'footprint_vertex_latitude', 'footprint_vertex_longitude', 'footprint_weak_co2_qual_flag', 'footprint_zenith']>

In [16]:
for key in h5f['FootprintGeometry'].keys():
    item = h5f['FootprintGeometry'][key]
    
    if isinstance(item, h5py.Group):  # Check if it's a group
        print(f"Group: {key}")
    elif isinstance(item, h5py.Dataset):  # Check if it's a dataset
        print(f"Dataset: {key}, dtype: {item.dtype}, shape: {item.shape}")
    else:
        print(f"Unknown item: {key}")

Dataset: footprint_altitude, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_altitude_uncert, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_aspect, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_azimuth, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_land_fraction, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_latitude, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_latitude_geoid, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_longitude, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_longitude_geoid, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_los_surface_bidirectional_angle, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_o2_qual_flag, dtype: uint16, shape: (8878, 8)
Dataset: footprint_plane_fit_quality, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_polarization_angle, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_slope, dtype: float32, shape: (8878, 8, 3)
Dataset: footprint_solar_azi

In [17]:
#leng = h5f['FootprintGeometry/footprint_altitude'].shape[0]
#print(leng)

In [18]:
h5f['SoundingMeasurements'].keys()

<KeysViewHDF5 ['rad_continuum_o2', 'rad_continuum_strong_co2', 'rad_continuum_weak_co2', 'radiance_o2', 'radiance_strong_co2', 'radiance_weak_co2', 'snr_o2_l1b', 'snr_strong_co2_l1b', 'snr_weak_co2_l1b']>

In [19]:
for key in h5f['SoundingMeasurements'].keys():
    item = h5f['SoundingMeasurements'][key]
    
    if isinstance(item, h5py.Group):  # Check if it's a group
        print(f"Group: {key}")
    elif isinstance(item, h5py.Dataset):  # Check if it's a dataset
        print(f"Dataset: {key}, dtype: {item.dtype}, shape: {item.shape}")
    else:
        print(f"Unknown item: {key}")

Dataset: rad_continuum_o2, dtype: float32, shape: (8878, 8)
Dataset: rad_continuum_strong_co2, dtype: float32, shape: (8878, 8)
Dataset: rad_continuum_weak_co2, dtype: float32, shape: (8878, 8)
Dataset: radiance_o2, dtype: float32, shape: (8878, 8, 1016)
Dataset: radiance_strong_co2, dtype: float32, shape: (8878, 8, 1016)
Dataset: radiance_weak_co2, dtype: float32, shape: (8878, 8, 1016)
Dataset: snr_o2_l1b, dtype: float32, shape: (8878, 8)
Dataset: snr_strong_co2_l1b, dtype: float32, shape: (8878, 8)
Dataset: snr_weak_co2_l1b, dtype: float32, shape: (8878, 8)


## Save selected features as a parquet

In [20]:
schema = T.StructType([\
                       T.StructField('filename',T.StringType(), True),\
                       T.StructField('channel_ind',T.IntegerType(), True),\
                       T.StructField('pix_ind',T.IntegerType(), True),\
                       T.StructField('row_ind',T.IntegerType(), True),\
                       T.StructField('altitude',T.FloatType(), True),\
                       T.StructField('longitude',T.FloatType(), True),\
                       T.StructField('latitude',T.FloatType(), True),\
                       T.StructField('aspect',T.FloatType(), True),\
                       T.StructField('slope',T.FloatType(), True),\
                       T.StructField('sol_az',T.FloatType(), True),\
                       T.StructField('sol_zn',T.FloatType(), True),\
                       T.StructField('fo_az',T.FloatType(), True),\
                       T.StructField('fo_zn',T.FloatType(), True),\
                       T.StructField('flag',T.IntegerType(), True),\
                       T.StructField('snr',T.FloatType(), True),\
                       T.StructField('continuum',T.FloatType(), True),\
                       T.StructField('time_str',T.StringType(), True),\
                       T.StructField('spectrum',T.ArrayType(T.DoubleType(),True), True)\
                      ])

#### Extracting useful info 

In [21]:
numdata, numpix, numchannel = h5f['FootprintGeometry/footprint_altitude'].shape
print(numdata)
print(numpix)
print(numchannel)

8878
8
3


> 8878 x 8 x 3 three dimesional data

In [22]:
%%time
filename_list = [h5list[0] for i in range(numdata*numpix*numchannel)] 

CPU times: user 8.61 ms, sys: 697 µs, total: 9.3 ms
Wall time: 9.3 ms


In [23]:
len(filename_list)

213072

In [24]:
channel_ind_list =[]
pixel_ind_list =[]
row_ind_list =[]
flag_list =[]
snr_list = []
cont_list= []
time_list=[]
times =[]

In [25]:
####################### generate indices
for k in range(numdata) :
        for j in range(numpix)   :
            for i in range(numchannel):
                channel_ind_list.append(i)
                pixel_ind_list.append(j)
                row_ind_list.append(k)

In [26]:
################## General info 
alti_list = h5f['FootprintGeometry/footprint_altitude'][:,:,:].flatten().tolist()
lon_list = h5f['FootprintGeometry/footprint_longitude'][:,:,:].flatten().tolist()
lat_list = h5f['FootprintGeometry/footprint_latitude'][:,:,:].flatten().tolist()

aspect_list = h5f['FootprintGeometry/footprint_aspect'][:,:,:].flatten().tolist()
slope_list  = h5f['FootprintGeometry/footprint_slope'][:,:,:].flatten().tolist()
surf_list   = h5f['FootprintGeometry/footprint_surface_roughness'][:,:,:].flatten().tolist()

sol_az_list = h5f['FootprintGeometry/footprint_solar_azimuth'][:,:,:].flatten().tolist()
sol_zn_list = h5f['FootprintGeometry/footprint_solar_zenith'][:,:,:].flatten().tolist()
fo_az_list  = h5f['FootprintGeometry/footprint_azimuth'][:,:,:].flatten().tolist()
fo_zn_list  = h5f['FootprintGeometry/footprint_zenith'][:,:,:].flatten().tolist()
times       = h5f['FootprintGeometry/footprint_time_string'][:,:,:].flatten().tolist()
time_list   = [time.decode() for time in times]

In [27]:
####################### merge info
o2_flg = h5f['FootprintGeometry/footprint_o2_qual_flag'][:,:].flatten().tolist()
snr_o2 = h5f['/SoundingMeasurements/snr_o2_l1b'][:,:].flatten().tolist()
o2_cont= h5f['/SoundingMeasurements/rad_continuum_o2'][:,:].flatten().tolist()
flag_list.append(o2_flg)
snr_list.append(snr_o2)
cont_list.append(o2_cont)

wco2_flg = h5f['FootprintGeometry/footprint_weak_co2_qual_flag'][:,:].flatten().tolist()
snr_wco2 = h5f['/SoundingMeasurements/snr_weak_co2_l1b'][:,:].flatten().tolist()
wco2_cont= h5f['/SoundingMeasurements/rad_continuum_weak_co2'][:,:].flatten().tolist()
flag_list.append(wco2_flg)
snr_list.append(snr_wco2)
cont_list.append(wco2_cont)

sco2_flg = h5f['FootprintGeometry/footprint_strong_co2_qual_flag'][:,:].flatten().tolist()
snr_sco2 = h5f['/SoundingMeasurements/snr_strong_co2_l1b'][:,:].flatten().tolist()
sco2_cont= h5f['/SoundingMeasurements/rad_continuum_strong_co2'][:,:].flatten().tolist()
flag_list.append(sco2_flg)
snr_list.append(snr_sco2)
cont_list.append(sco2_cont)

flag_list =reduce(operator.concat, flag_list)
snr_list =reduce(operator.concat, snr_list)
cont_list =reduce(operator.concat, cont_list)

In [28]:
################## Spectra info
o2   = h5f['/SoundingMeasurements/radiance_o2'][:,:,:].reshape(-1,1016).tolist()
wco2 = h5f['/SoundingMeasurements/radiance_weak_co2'][:,:,:].reshape(-1,1016).tolist()
sco2 = h5f['/SoundingMeasurements/radiance_strong_co2'][:,:,:].reshape(-1,1016).tolist()

In [29]:
temp_spec = list(map(list, zip(o2, wco2, sco2)))
all_spec_list = [x for sublist in temp_spec for x in sublist]

#### Wrap up the extracted info 

In [30]:
%%time
sparkdf = spark.createDataFrame(zip(filename_list,channel_ind_list,pixel_ind_list,row_ind_list,\
                                    alti_list,lon_list,lat_list,aspect_list,slope_list, \
                                    sol_az_list,sol_zn_list,fo_az_list,fo_zn_list, \
                                    flag_list,snr_list,cont_list,time_list,all_spec_list),schema)

CPU times: user 51.2 s, sys: 806 ms, total: 52 s
Wall time: 53.8 s


In [31]:
outdir = 'hdfs://spark00:54310/user/shong/data/parquet/oco2/'

In [32]:
outname = outdir+h5list[0].replace("h5","parquet.snappy")
print(outname)

hdfs://spark00:54310/user/shong/data/parquet/oco2/oco2_L1bScGL_37218a_210701_B10206r_210817230313.parquet.snappy


In [33]:
%%time
sparkdf.write.option("compression", "snappy") \
    .mode("overwrite") \
    .save(outname)

CPU times: user 8.2 ms, sys: 3.22 ms, total: 11.4 ms
Wall time: 28.4 s


### Check up the parquet

In [34]:
sparkdf.printSchema()

root
 |-- filename: string (nullable = true)
 |-- channel_ind: integer (nullable = true)
 |-- pix_ind: integer (nullable = true)
 |-- row_ind: integer (nullable = true)
 |-- altitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- aspect: float (nullable = true)
 |-- slope: float (nullable = true)
 |-- sol_az: float (nullable = true)
 |-- sol_zn: float (nullable = true)
 |-- fo_az: float (nullable = true)
 |-- fo_zn: float (nullable = true)
 |-- flag: integer (nullable = true)
 |-- snr: float (nullable = true)
 |-- continuum: float (nullable = true)
 |-- time_str: string (nullable = true)
 |-- spectrum: array (nullable = true)
 |    |-- element: double (containsNull = true)



In [35]:
%%time
print(sparkdf.count())

213072
CPU times: user 8.66 ms, sys: 140 µs, total: 8.8 ms
Wall time: 18.5 s


In [36]:
%%time
sparkdf.limit(2).toPandas().transpose()

CPU times: user 13.8 ms, sys: 4.38 ms, total: 18.2 ms
Wall time: 18.2 s


Unnamed: 0,0,1
filename,oco2_L1bScGL_37218a_210701_B10206r_21081723031...,oco2_L1bScGL_37218a_210701_B10206r_21081723031...
channel_ind,0,1
pix_ind,0,0
row_ind,3712,3712
altitude,0.0,0.0
longitude,-172.583115,-172.584061
latitude,13.294443,13.2956
aspect,0.0,0.0
slope,0.0,0.0
sol_az,300.463837,300.462219


In [39]:
sparkdf.select(['altitude','longitude','latitude','aspect','slope','sol_az','sol_zn', \
                'fo_az','fo_zn','flag','snr','continuum']).show(3)

+--------+----------+----------+------+-----+---------+--------+---------+---------+----+---------+------------+
|altitude| longitude|  latitude|aspect|slope|   sol_az|  sol_zn|    fo_az|    fo_zn|flag|      snr|   continuum|
+--------+----------+----------+------+-----+---------+--------+---------+---------+----+---------+------------+
|     0.0|-157.18796|-54.093674|   0.0|  0.0| 332.2954|81.40759|152.66422|59.156822|   0|345.16867| 8.220378E19|
|     0.0|-157.19017|-54.089634|   0.0|  0.0| 332.2971|81.40341| 152.6702|59.171177|   0|314.94962| 6.252769E19|
|     0.0|-157.18686|-54.090748|   0.0|  0.0|332.29425|81.40529|152.67673| 59.16697|   0|434.39273|1.1830203E20|
+--------+----------+----------+------+-----+---------+--------+---------+---------+----+---------+------------+
only showing top 3 rows



In [40]:
%%time
sparkdf.select(['altitude','longitude','latitude','aspect','slope','sol_az','sol_zn', \
                'fo_az','fo_zn','flag','snr','continuum']).describe().toPandas().set_index('summary').transpose()

CPU times: user 14.1 ms, sys: 4.66 ms, total: 18.8 ms
Wall time: 18.8 s


summary,count,mean,stddev,min,max
altitude,213072,-3916.6610661485047,62628.40698157672,-999999.0,1069.8132
longitude,213072,-43.4179519220024,150.52536453702382,-179.99902,179.99986
latitude,213072,23.50426827051629,42.67004694109275,-54.093674,79.85018
aspect,213072,-3922.70090362916,62627.96895201564,-999999.0,359.98965
slope,213072,-3937.4064771711537,62627.018680349945,-999999.0,88.91238
sol_az,213072,240.77608360943069,90.1262493103798,47.548046,333.2926
sol_zn,213072,44.826667950095526,20.06552954185553,16.159397,81.40759
fo_az,213072,152.3926359179914,98.19703122966196,0.00013626867,359.9971
fo_zn,213072,34.759277050122044,15.077191883426975,12.505527,60.078445
flag,213072,1.183862731846512,17.348864907539664,0.0,256.0
