## Objective:
- Import required libraries data pre-processing; numpy, pyarrow
- Convert Entire OCO2/OCO3 netCDF files to parquet format
- Locate the path for the multiple files to convert into parquet
- Additional columns like `verticies` of carbon footprints are extracted for polygon shape visualization

### Why Parquet format?
Parquet is an open source file format built to handle flat columnar storage data formats. Parquet operates well with complex data in large volumes.It is known for its both performant data compression and its ability to handle a wide variety of encoding types. 

- Parquet deploys Google's record-shredding and assembly algorithm that can address complex data structures within data storage. Some Parquet benefits include:

    * Fast queries that can fetch specific column values without reading full row data
    * Highly efficient column-wise compression
    * High compatibility with with OLAP
    
### Benifits of using Parquet format over CSV
- HOW IS PARQUET DIFFERENT FROM CSV?
    - While CSV is simple and the most widely used data format (Excel, Google Sheets), there are several distinct advantages for Parquet, including:
    - Parquet is column oriented and CSV is row oriented. Row-oriented formats are optimized for OLTP workloads while column-oriented formats are better suited for analytical workloads.
    - Column-oriented databases such as AWS Redshift Spectrum bill by the amount data scanned per query
    - Therefore, converting CSV to Parquet with partitioning and compression lowers overall costs and improves performance

[source: Snowflake](https://www.snowflake.com/guides/what-parquet#:~:text=Parquet%20is%20an%20open%20source,wide%20variety%20of%20encoding%20types.)

In [1]:
# !pip install pyarrow

In [2]:
import os
import numpy as np
import pandas as pd
import pyarrow as pa
import netCDF4 as nc

from pyarrow import parquet as parq
from datetime import datetime

In [3]:
import gc

In [6]:
## Example: creating a parquet file using table using the array
arr= np.arange(1.0, 20.0)
pa_table= pa.table({"float_data": arr})
parq.write_table(pa_table, 'test_table_data.parquet')

### NETCDF files
- PATH: downloaded netCDF files from the source(Earthdata search)

### Direcotry with all files
- this example shows files read from a single dir. for year 20021

In [8]:
path_= "../../../Cluster_machine/OCO2/B_11_new_version/2021/"

In [9]:
file_path_= []

for root, dirs, files in os.walk(path_):
    
    for filename in files:
        #print(os.path.join(root, filename))
        
        # Append the files into list
        file_path_.append(os.path.join(root, filename))
        
file_path_[:3]

['../../../Cluster_machine/OCO2/B_11_new_version/2021/01\\01\\LtCO2\\oco2_LtCO2_210101_B11014Ar_220729005635s.nc4',
 '../../../Cluster_machine/OCO2/B_11_new_version/2021/01\\02\\LtCO2\\oco2_LtCO2_210102_B11014Ar_220729005715s.nc4',
 '../../../Cluster_machine/OCO2/B_11_new_version/2021/01\\03\\LtCO2\\oco2_LtCO2_210103_B11014Ar_220729005741s.nc4']

In [10]:
file_names= file_path_

## conv dateTime

In [11]:
def conv_date(d):
    return datetime.strptime(str(d), '%Y%m%d%H%M%S%f')

In [12]:
%%time
countFiles=0

lon_list= []
lat_list= []
xco2_list= []
qual_flag_list= []
lat=[]
lon=[]
dateTime= []

for j in file_names:
    if j.endswith(".nc4"):
        var_= nc.Dataset(j)
        lon_list.append(np.array(var_.variables['vertex_longitude'][:]).tolist())
        lat_list.append(np.array(var_.variables['vertex_latitude'][:]).tolist())
        xco2_list.append(np.array(var_.variables['xco2'][:]).tolist())
        qual_flag_list.append(np.array(var_.variables['xco2_quality_flag'][:]).tolist())
        
        # DateTIme formating
        dateTime.append(np.array(var_.variables['sounding_id'][:].tolist()))

        # lat and long without vert
        lat.append(np.array(var_.variables['latitude'][:].tolist()))
        lon.append(np.array(var_.variables['longitude'][:].tolist()))

#print('\nTotalFiles: ', countFiles)

Wall time: 3min 25s


In [13]:
%%time
lon_list_a= [element for sublist in lon_list for element in sublist]
lat_list_a= [element for sublist in lat_list for element in sublist]
xco2_list_a= [element for sublist in xco2_list for element in sublist]
lon_a= [element for sublist in lon for element in sublist]
lat_a= [element for sublist in lat for element in sublist]
xco2_qual_flag= [ element for sublist in qual_flag_list for element in sublist]
dateTime_list= [ element for sublist in dateTime for element in sublist]

Wall time: 40.3 s


### Transformation to parquet format
- Creating table format

In [14]:
%%time
df_oco3= pa.table({
    'Latitude_vertices': lat_list_a,
    'Longitude_vertices': lon_list_a,
    'Latitude': lat_a,
    'Longitude': lon_a,
    'Xco2': xco2_list_a,
    'quality_flag': xco2_qual_flag,
    'DateTime': dateTime_list
})

Wall time: 3min 13s


## Table from parquet

In [15]:
# 'oco3_20222_parq.parquet'
#file_name= input("File Name")
file_name= "oco2_2021_parq.parquet"

In [16]:
%%time
parq.write_table(df_oco3, file_name)

Wall time: 36.2 s


# Pre-processing the original file
- CONVERT the DATETIME -> Date and Month
- Filter by quality flag
    - Good Quality-> 0

In [4]:
%%time
df_parq= pd.read_parquet("oco2_2021_parq.parquet", engine="pyarrow")
df_parq.head(2)

Wall time: 2min 18s


Unnamed: 0,Latitude_vertices,Longitude_vertices,Latitude,Longitude,Xco2,quality_flag,DateTime
0,"[-84.71208953857422, -84.71642303466797, -84.7...","[-51.488616943359375, -51.684295654296875, -51...",-84.725594,-51.603241,409.039856,1,2021010100041302
1,"[-84.9544906616211, -84.9550552368164, -84.976...","[-64.50091552734375, -64.71295166015625, -64.8...",-84.965218,-64.651619,418.155731,1,2021010100043437


## Quality Flag filtering:

In [5]:
%%time
df_qual_= df_parq[df_parq["quality_flag"]==0]
del df_parq
gc.collect()

Wall time: 8.8 s


18

In [6]:
#len(df_parq), 
len(df_qual_)

33052890

# Convert the DATETIME format

In [7]:
%%time
df_qual_["DateTime"]= pd.to_datetime(df_qual_["DateTime"], format="%Y%m%d%H%M%S%f")

Wall time: 7min 20s


In [8]:
df_qual_= df_qual_.reset_index()
df_qual_=df_qual_.drop(columns=["index"])
df_qual_

Unnamed: 0,Latitude_vertices,Longitude_vertices,Latitude,Longitude,Xco2,quality_flag,DateTime
0,"[-63.28132247924805, -63.26362991333008, -63.2...","[-147.940185546875, -147.949951171875, -147.98...",-63.273125,-147.962006,409.814789,0,2021-01-01 00:12:44.060
1,"[-63.28230285644531, -63.26461410522461, -63.2...","[-147.97442626953125, -147.98416137695312, -14...",-63.274101,-147.996048,409.309540,0,2021-01-01 00:12:44.070
2,"[-63.28309631347656, -63.26540756225586, -63.2...","[-148.0076904296875, -148.01739501953125, -148...",-63.274967,-148.030212,409.366913,0,2021-01-01 00:12:44.080
3,"[-63.2584342956543, -63.2407341003418, -63.242...","[-147.8147735595703, -147.82464599609375, -147...",-63.250282,-147.836777,410.673309,0,2021-01-01 00:12:44.320
4,"[-63.25991439819336, -63.242218017578125, -63....","[-147.84823608398438, -147.8580780029297, -147...",-63.251755,-147.870346,409.633270,0,2021-01-01 00:12:44.330
...,...,...,...,...,...,...,...
33052885,"[-5.621041774749756, -5.602428913116455, -5.61...","[-156.93154907226562, -156.9356689453125, -156...",-5.617089,-156.932739,415.088226,0,2021-12-31 23:59:52.370
33052886,"[-5.631556034088135, -5.61294412612915, -5.624...","[-156.93019104003906, -156.934326171875, -156....",-5.627943,-156.931335,414.612823,0,2021-12-31 23:59:52.380
33052887,"[-5.612905979156494, -5.5942816734313965, -5.6...","[-156.934326171875, -156.93846130371094, -156....",-5.609292,-156.935471,414.930511,0,2021-12-31 23:59:52.780
33052888,"[-5.5837321281433105, -5.565078258514404, -5.5...","[-156.9398193359375, -156.94398498535156, -156...",-5.579757,-156.941040,413.658966,0,2021-12-31 23:59:53.070


## Get month column

In [9]:
df_qual_["Month"]= df_qual_["DateTime"].dt.month

In [10]:
df_qual_["Month"].unique()

array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12], dtype=int64)

In [11]:
%%time
df_qual_.to_parquet("oco2_2021_parq_dateTime.parquet", engine="pyarrow")

del df_qual_
gc.collect()

Wall time: 2min 14s


0