# Open Stack Swift storage of the project

## Open Stack directories structure for OCO-2

We do not store the original OCO-2 files from NASA.

* /datasets/oco-2/emissions/ contains all the potential source of emissions : factories, power plants, cities...
* /datasets/oco-2/soudings/ contains CSV of the raw features extracted from NASA NC4 files.
* /datasets/oco-2/peaks-detected/ contains all the peaks found in the satellite orbit datas.
* /datasets/oco-2/peaks-detected-details/ contains one JSON file of the full data for all detected peak

In [None]:
# default_exp datasets

## Module Installation

In [None]:
#!pip install --user python-swiftclient python-keystoneclient --upgrade

In [None]:
#hide
# Put these at the top of every notebook, to get automatic reloading and inline plotting
%reload_ext autoreload
%autoreload 2
#%matplotlib inline
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

## Class Datasets
Using a config file for credentials

In [None]:
#export
import swiftclient
import json
import glob
import os
import pandas as pd
from fastprogress.fastprogress import master_bar, progress_bar

class Datasets:
    """
    Utility class to access the Open Stack Swift storage of the project.
    """
    config = None # Dict configuration
    conn = None # swiftclient.Connection object
    container_name = 'oco2'
    
    def __init__(self, config_file):
        """
        Constructor
        :param config_file: str, Path to config file
        :return:
        """
        # Load config
        with open(config_file) as json_data_file:
            self.config = json.load(json_data_file)
        self.conn = self.swift_con()

    def swift_con(self, config=None):
        """
        Connect to Open Stack Swift
        :param config: dict, Config dictionary.
        :return: swiftclient.Connection
        """
        if config is None:
            config = self.config
        user=config['swift_storage']['user']
        key=config['swift_storage']['key']
        auth_url=config['swift_storage']['auth_url']
        tenant_name=config['swift_storage']['tenant_name']
        auth_version=config['swift_storage']['auth_version']
        options = config['swift_storage']['options']
        self.conn = swiftclient.Connection(user=user,
                                      key=key,
                                      authurl=auth_url,
                                      os_options=options,
                                      tenant_name=tenant_name,
                                      auth_version=auth_version)
        return self.conn

    def upload(self, mask='c:\datasets\*.csv', prefix="/Trash/",content_type='text/csv', recursive=False):
        """
        Upload files to Open Stack Swift
        :param mask: str, Mask for seraching file to upload.
        :param prefix: str, Prefix in destination. Useful to mimic folders.
        :param content_type: str, Content type on the destination.
        :param recursive: boolean, To allow search in sub-folder.
        :return:
        """
        master_progress_bar = master_bar([0])
        for _ in master_progress_bar: None
        
        for file in progress_bar(glob.glob(mask, recursive=recursive), parent=master_progress_bar):
            with open(file, 'rb') as one_file:
                    upload_to = prefix+ os.path.basename(file)
                    #print('Copy from',file,'to',upload_to)
                    self.conn.put_object(self.container_name, upload_to,
                                                    contents= one_file.read(),
                                                    content_type=content_type) # 'text/csv'
    def get_files_urls(self, prefix, pattern=""):
        """
        Retreive the list of file filtered by the given parameters.
        :param prefix: str, Mandatory to avoid retreiving too many files.
        :param pattern: str, Filter the list of files by this pattern. Complemantary of prefix.
        :return: Array of url
        """
        result=[]
        objects = self.conn.get_container(self.container_name, prefix=prefix, full_listing=True)[1]
        for data in objects:
            if pattern in data['name']:
                url = self.config['swift_storage']['base_url']+data['name']
                result.append(url)
        return result

    def delete_files(self, prefix="/Trash/", pattern='', dry_run=True):
        if dry_run:
            print('Nothing will be deleted. Use dry_run=False to delete.')
        master_progress_bar = master_bar([0])
        for _ in master_progress_bar: None
        objects = self.conn.get_container(self.container_name, prefix=prefix, full_listing=True)[1]
        for data in progress_bar(objects, parent=master_progress_bar):
            file = data['name']
            if pattern in file:
                #master_progress_bar.write(f'Deleting {file}')
                if not dry_run:
                    self.conn.delete_object(self.container_name, file)
                   

    def get_containers(self):
        return self.conn.get_account()[1]
    def get_container(self, container_name='oco2', prefix='/datasets/oco-2/'):
        return self.conn.get_container(container_name, prefix=prefix, full_listing=True)[1]

    def get_url_from_sounding_id(self, sounding_id):
        base_url = self.config['swift_storage']['base_url']
        return base_url+'/datasets/oco-2/peaks-detected-details/peak_data-si_'+sounding_id+'.json'
        
    def get_dataframe(self, url):
        """
        Read the url of a file and load it with Pandas
        :param url: str, URL of the file to load.
        :return: DataFrame
        """
        # TODO : Switch to GeoPandas ?
        df = None
        extension = url.split('.')[-1].lower()
        if extension == 'csv' or extension == 'xz' or extension == 'bz2':
            df = pd.read_csv(url, sep=';')
            if len(df.columns) == 1: # Very bad because we load it twice !
                df = pd.read_csv(url, sep=',')
#             if 'sounding_id' in df.columns:
#                 df['sounding_id']= df['sounding_id'].astype(str)
        elif extension == 'json':
            df = pd.read_json(url)
        if 'tcwv' not in df.columns:
            df['tcwv'] = 25
        else:
            tcwv = 0
        if 'surface_pressure' not in df.columns:
            df['surface_pressure'] = 979
        else:
            tcwv = 0
        return df
    
    def get_gaussian_param(self, sounding_id, df_all_peak):
        df_param = df_all_peak.query("sounding_id==@sounding_id")
        if len(df_param)<1:
            print('ERROR : sounding_id not found in dataframe !')
            return {'slope' : 1,'intercept' : 1,'amplitude' : 1,'sigma': 1,'delta': 1,'R' : 1}
        param_index = df_param.index[0]

        gaussian_param = {
            'slope' : df_param.loc[param_index, 'slope'],
            'intercept' : df_param.loc[param_index, 'intercept'],
            'amplitude' : df_param.loc[param_index, 'amplitude'],
            'sigma': df_param.loc[param_index, 'sigma'],
            'delta': df_param.loc[param_index, 'delta'],
            'R' : df_param.loc[param_index, 'R'],
        }
        return gaussian_param

# Examples

## Connection

In [None]:
config = './configs/config.json'
datasets = Datasets(config)

## Get a dataset

### Level 2 sounding value from OCO-2 satellite

In [None]:
url=datasets.get_files_urls(pattern='141', prefix='/datasets/oco-2/soudings/')
print(url)
df=datasets.get_dataframe(url[0])
df.head(3)

['https://storage.gra.cloud.ovh.net/v1/AUTH_2aaacef8e88a4ca897bb93b984bd04dd/oco2//datasets/oco-2/soudings/oco2_1410.csv.xz', 'https://storage.gra.cloud.ovh.net/v1/AUTH_2aaacef8e88a4ca897bb93b984bd04dd/oco2//datasets/oco-2/soudings/oco2_1411.csv.xz', 'https://storage.gra.cloud.ovh.net/v1/AUTH_2aaacef8e88a4ca897bb93b984bd04dd/oco2//datasets/oco-2/soudings/oco2_1412.csv.xz']


Unnamed: 0,sounding_id,latitude,longitude,xco2,xco2_uncert,orbit,windspeed_u,windspeed_v,surface_pressure_apriori,surface_pressure,altitude,land_water_indicator,land_fraction,tcwv
0,2014100202182405,-40.574512,176.451126,398.752686,0.395292,1337,6.98932,-5.809829,978.123352,974.571838,179.613159,0.0,100.0,25
1,2014100202182432,-40.533489,176.476425,397.972046,0.428197,1337,6.823709,-5.639187,976.780273,972.596436,191.66597,0.0,100.0,25
2,2014100202182433,-40.540665,176.465958,397.367462,0.408428,1337,6.896306,-5.694703,964.528748,963.135864,298.211029,0.0,100.0,25


### List of detected peak

In [None]:
url=datasets.get_files_urls(prefix='/datasets/oco-2/peaks-detected/', pattern='result')[0]
print(url)
df=datasets.get_dataframe(url)
df.head(3)

https://storage.gra.cloud.ovh.net/v1/AUTH_2aaacef8e88a4ca897bb93b984bd04dd/oco2//datasets/oco-2/peaks-detected/result_for_oco2_1401.csv


Unnamed: 0,sounding_id,latitude,longitude,orbit,slope,intercept,amplitude,sigma,delta,R,windspeed_u,windspeed_v,surface_pressure,tcwv,gCO2_per_s,ktCO2_per_h
0,2016040409591476,-23.483047,55.751904,9351,0.003248,401.098627,5.750227,22.380889,0.102499,0.52431,-4.077072,2.13369,1020.505371,22.41131,321587.8,1.157716
1,2016040409592308,-23.023375,55.623322,9351,-0.000606,401.051925,23.130956,25.464959,0.362377,0.548597,-4.163719,2.552373,1020.905945,19.716457,1297459.0,4.670853
2,2016040409592636,-22.838486,55.613483,9351,-0.003021,401.035206,14.325352,21.964555,0.260191,0.53932,-4.23905,2.482829,1019.659485,19.703201,822116.7,2.95962


### Level 2 sounding value for a peak

In [None]:
df.info()
sounding_id = int(df.iloc[0].sounding_id)
sounding_id
str(sounding_id)
url=datasets.get_url_from_sounding_id(str(sounding_id))
print(url)
df=datasets.get_dataframe(url)
df.head(3)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 214 entries, 0 to 213
Data columns (total 16 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   sounding_id       214 non-null    int64  
 1   latitude          214 non-null    float64
 2   longitude         214 non-null    float64
 3   orbit             214 non-null    int64  
 4   slope             214 non-null    float64
 5   intercept         214 non-null    float64
 6   amplitude         214 non-null    float64
 7   sigma             214 non-null    float64
 8   delta             214 non-null    float64
 9   R                 214 non-null    float64
 10  windspeed_u       214 non-null    float64
 11  windspeed_v       214 non-null    float64
 12  surface_pressure  214 non-null    float64
 13  tcwv              214 non-null    float64
 14  gCO2_per_s        214 non-null    float64
 15  ktCO2_per_h       214 non-null    float64
dtypes: float64(14), int64(2)
memory usage: 26.9 

2016040409591476

'2016040409591476'

https://storage.gra.cloud.ovh.net/v1/AUTH_2aaacef8e88a4ca897bb93b984bd04dd/oco2//datasets/oco-2/peaks-detected-details/peak_data-si_2016040409591476.json


HTTPError: HTTP Error 404: Not Found

## Get containers names

In [None]:
for container in datasets.get_containers():
    print('Container name:', container['name'])

Container name: oco2


## List files

In [None]:
datasets.get_files_urls(prefix='/map/', pattern='html')

['https://storage.gra.cloud.ovh.net/v1/AUTH_2aaacef8e88a4ca897bb93b984bd04dd/oco2//map/peaks_and_sources.html']

### Get files objects

In [None]:
objects = datasets.get_container('oco2', prefix='/datasets/oco-2/peaks-detected/')
print(f'Number of file : {len(objects)}')
for data in objects:
    if 'oco2_1504' in data['name']:
        print('{0}\t{1}\t{2}'.format(data['name'], data['bytes'], data['last_modified']))


Number of file : 74
/datasets/oco-2/peaks-detected/result_for_oco2_1504.csv	390108	2020-05-28T05:26:06.974830


## Upload files

In [None]:
datasets.upload(mask='../*.md', prefix="/Trash/",content_type='text/text')
# datasets.upload("/media/data-nvme/dev/datasets/OCO2/csv/*.csv", "/datasets/oco-2/peaks-detected/", 'text/csv')
# datasets.upload("/media/data-nvme/dev/datasets/OCO2/csv/*.json", "/datasets/oco-2/peaks-detected-details/", 'application/json')

In [None]:
datasets.get_files_urls('/Trash/')

['https://storage.gra.cloud.ovh.net/v1/AUTH_2aaacef8e88a4ca897bb93b984bd04dd/oco2//Trash/CONTRIBUTING.md',
 'https://storage.gra.cloud.ovh.net/v1/AUTH_2aaacef8e88a4ca897bb93b984bd04dd/oco2//Trash/README-old.md',
 'https://storage.gra.cloud.ovh.net/v1/AUTH_2aaacef8e88a4ca897bb93b984bd04dd/oco2//Trash/README.md']

### Upload HTML
Setting content type to 'text/html' allow the file to be display by browsers, without downloading.

In [None]:
#datasets.upload("chemin/peaks_and_sources.html", "/Trash/", 'text/html')

## Delete files

In [None]:
datasets.delete_files("/Trash/", dry_run=False)

In [None]:
from nbdev.export import notebook2script
notebook2script()

Converted 03_25_OCO2_Data_Exploration.ipynb.
Converted 04_01_OCO2_Work_Base.ipynb.
Converted 04_04_OCO2_China_Peaks.ipynb.
Converted 04_15_OCO2_Laiwu_Peak_Detection.ipynb.
Converted CO2_emissions_Inventory_data.ipynb.
Converted Christian-datasets-Distances.ipynb.
Converted Find_Peaks_with_LSTM_autoencoders.ipynb.
Converted Laiwu_Plume-more_data.ipynb.
Converted Laiwu_Plume-more_data_CD_exploration_selection_peaks.ipynb.
Converted Laiwu_Plume.ipynb.
Converted Untitled.ipynb.
Converted WIP_OCO2_Capture.ipynb.
Converted WIP_OCO2_Peaks_Wind.ipynb.
Converted WIP_OCO2_Peaks_Wind_Visualization.ipynb.
Converted find_peak_bco_test.ipynb.
Converted index.ipynb.
Converted oco2peak-datasets.ipynb.
Converted oco2peak-find_peak.ipynb.
Converted oco2peak-map.ipynb.
Converted oco2peak-nc4_convert.ipynb.
Converted oco2peak-swift_utils.ipynb.
Converted oco2peak_find_source.ipynb.
Converted show_map.ipynb.
Converted view_peak.ipynb.
