In [1]:
import requests
import itertools
import os
import yaml
import xarray as xr
import numpy as np
from variable_mapping import variable_mapping
from datetime import datetime
import csv
import sys

In [2]:
variables = ['sconco3']
resolution = 'hourly'
target_start_date = datetime(2018, 1, 1, 0)
target_end_date = datetime(2018, 12, 31, 23)

In [3]:
CURRENT_PATH = os.getcwd()

In [4]:
# Define metadata keys, inspired in first dataset (TODO: Check all available metadata fields)
metadata_dict = {'ebas_station_code': 'station_reference',
                 'ebas_station_name': 'station_name',
                 'ebas_station_land_use': 'land_use',
                 'ebas_station_wmo_region': 'WMO_region',
                 'ebas_station_latitude': 'latitude', 
                 'ebas_station_longitude': 'longitude',
                 'ebas_station_altitude': 'altitude'}

coverages_dict = {'P0000-00-00T01:00:00': 'hourly', 
                  'P0000-00-01T00:00:00': 'daily', 
                  'P0000-01-00T00:00:00': 'monthly'}

units_dict = {'nmol/mol': 'nmol_per_mol',
              'ug/m3': 'ug_per_m3'} 

# Get ACTRIS variable mapping

In [5]:
def create_variable_mapping_file():
    result = {
        value['preferred_term'].replace('"', ''): {'var': key[2], 'units': key[0]}
        for key, value in variable_mapping.items()
    }
    
    with open('variable_mapping.yaml', 'w') as file:
        yaml.dump(result, file, default_flow_style=False)

In [6]:
#create_variable_mapping_file()

In [7]:
variable_mapping = yaml.safe_load(open(os.path.join(CURRENT_PATH, 'variable_mapping.yaml')))
variable_mapping = {k: v for k, v in variable_mapping.items() if k.strip() and v}

# Get BSC-ACTRIS parameters dictionary

In [8]:
def create_actris_variables_file():
    with open('actris_variables.csv', mode="w", newline="", encoding="utf-8") as file:
        writer = csv.writer(file)
        for key in variable_mapping.keys():
            writer.writerow([key, variable_mapping[key]['var']])

In [9]:
#create_actris_variables_file()

In [10]:
def create_ghost_variables_file():
    sys.path = [path for path in sys.path if '../dependencies/GHOST_standards/' not in path]            
    sys.path.insert(1, os.path.join(CURRENT_PATH, '../dependencies/GHOST_standards/1.5'))
    from GHOST_standards import standard_parameters
    
    with open('ghost_variables.csv', mode="w", newline="", encoding="utf-8") as file:
        writer = csv.writer(file)
        for key in standard_parameters.keys():
            writer.writerow([standard_parameters[key]['long_parameter_name'], standard_parameters[key]['bsc_parameter_name'], ', '.join( standard_parameters[key]['ebas_parameter_name'])])

In [11]:
#create_ghost_variables_file()

In [12]:
parameters_dict = {}
with open('ghost_actris_variables.csv', mode='r', encoding='utf-8') as file:
    reader = csv.reader(file)
    for row in reader:
        # if ACTRIS was found (manual intervention)
        if len(row[6]) != 0 and row[6] != 'Preferred':
            # 'Preferred' = 'BSC'
            parameters_dict[row[1]] = row[6]
parameters_dict

{'sconco3': 'ozone mass concentration',
 'sconcno2': 'nitrogen dioxide mass concentration',
 'sconcso2': 'sulfur dioxide mass concentration',
 'sconcco': 'carbon monoxide amount fraction',
 'sconcch4': 'methane amount fraction',
 'sconcglyox': 'glyoxal mass concentration',
 'sconcc2h4': 'ethene amount fraction',
 'sconcald2': 'acetaldehyde mass concentration',
 'sconcc2h6': 'ethane amount fraction',
 'sconcetoh': 'ethanol mass concentration',
 'sconcc3h6': 'propene amount fraction',
 'sconcc3h8': 'propane amount fraction',
 'sconcc4h6': '1,3-butadiene amount fraction',
 'sconcc4h8': '1-butene amount fraction',
 'sconcisop': 'isoprene amount fraction',
 'sconcc5h12': 'n-pentane amount fraction',
 'sconcc6h6': 'benzene amount fraction',
 'sconcc6h14': 'n-hexane amount fraction',
 'sconcc7h8': 'toluene',
 'sconcmpxyl': 'm/p-xylenes amount fraction',
 'sconcmxyl': 'm-xylene amount fraction',
 'sconcoxyl': 'o-xylene amount fraction',
 'sconcc9h12': '1,2,4-trimethylbenzene amount fraction',


# Get files per variable

In [13]:
def get_files_per_variable(chunk_i, variables):
    files_per_var = {}
    base_url = "https://prod-actris-md.nilu.no/metadata/content"
    n_variables = len(variables)
    print('Variables:', variables)
    for var_i, var in enumerate(variables):
        print(f'[{chunk_i}] {var} ({var_i}/{n_variables})')
        if var not in files_per_var:
            files_per_var[var] = {}
        variable_files = []
        page = 0
        while True:
            # Set up URL with pagination
            url = f"{base_url}/{parameters_dict[var]}/page/{page}"
            response = requests.get(url)
            
            # Check if the response is valid and contains data
            if response.status_code != 200:
                print(f"Error fetching page {page}. Status code: {response.status_code}")
                break
            
            data = response.json()
            
            # Check if there's content in the data
            if not data:
                break
            
            # Loop through each entry in the data and print DOI and OPeNDAP URL
            for item in data:
                doi = item.get("md_identification", {}).get("identifier", {}).get("pid")
                opendap_urls = [protocol_dict['dataset_url'] for protocol_dict in item.get('md_distribution_information', []) if protocol_dict.get('protocol') == 'OPeNDAP']
                
                # Print DOI and OPeNDAP URL if both are present
                if doi and opendap_urls:
                    variable_files.append(opendap_urls)
                    
            # Go to the next page
            page += 1
        
        files_per_var[var]['files'] = list(itertools.chain.from_iterable(variable_files))
    
    return files_per_var

In [14]:
def get_files_per_var_list(variables):
    combined_data = {}
    chunk_size = 100
    chunks = [list(variables)[i:i + chunk_size] for i in range(0, len(variables), chunk_size)]
    for chunk_i, chunk in enumerate(chunks):
        files_per_var = get_files_per_variable(chunk_i, chunk)
        combined_data.update(files_per_var)
    return combined_data

In [15]:
combined_data = get_files_per_var_list(variables)

Variables: ['sconco3']
[0] sconco3 (0/1)


# Get information on files

In [16]:
def create_files_info_file(variables):
    files_info = {}
    for var in variables:
        print('Variable:', var)
        files = combined_data[var]['files']
        files_info[var] = {}
        print('Total number of files:', len(files))
        for i, file in enumerate(files):
            print(f'{i} - {file}')
            try:
                ds = xr.open_dataset(file)
            except:
                print('Error opening dataset')
            coverage = ds.time_coverage_resolution
            try:             
                resolution = coverages_dict[coverage]
            except:
                print('Error in resolution with coverage:', coverage)
                continue
            start_date = ds.time_coverage_start
            end_date = ds.time_coverage_end
            variables = list(ds.data_vars.keys())
            files_info[var][file] = {}
            files_info[var][file]['resolution'] = resolution
            files_info[var][file]['start_date'] = start_date
            files_info[var][file]['end_date'] = end_date
            files_info[var][file]['variables'] = variables

        unique_resolutions = coverages_dict.values()
        for resolution in unique_resolutions:
            path = f'files/{var}/{resolution}'
            datasets = {
                url: data
                for url, data in files_info[var].items()
                if data["resolution"] == resolution
            }
            if len(datasets) != 0:
                if not os.path.exists(path):
                    os.makedirs(path)
                with open(path + '/files.yaml', 'w') as file:
                    yaml.dump(datasets, file, default_flow_style=False)
        
        print('Done')

In [17]:
%%time
create_files_info_file(variables)

Variable: sconco3
Total number of files: 609
0 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/2P/9G/Q3/2P9G-Q3BM.nc
1 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/SY/B5/A3/SYB5-A38Q.nc
2 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/CU/EP/RX/CUEP-RXR8.nc
3 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/XY/3A/RE/XY3A-REBQ.nc
4 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/35/68/2W/3568-2W3Q.nc
5 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/NM/W2/TX/NMW2-TXB2.nc
Error opening dataset
6 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/3W/4A/23/3W4A-237S.nc
7 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/NK/5T/S8/NK5T-S833.nc
Error opening dataset
8 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/RJ/9P/SE/RJ9P-SEU6.nc
9 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/CG/NX/5W/CGNX-5WSG.nc
10 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/EH/GP/BB/EHGP-BBJX.nc
11 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/BK/7Q/E7/BK7Q-E7TT.nc
12 - https://thredds.nilu.no/thre

# Format data

In [18]:
def filter_files(var, resolution, target_start_date, target_end_date):
    files = []
    files_info = yaml.safe_load(open(os.path.join(CURRENT_PATH, f'files/{var}/{resolution}/files.yaml')))
    files_info = {k: v for k, v in files_info.items() if k.strip() and v}
    for file, attributes in files_info.items():
        if attributes["resolution"] == resolution:
            start_date = datetime.strptime(attributes["start_date"], "%Y-%m-%dT%H:%M:%S UTC")
            end_date = datetime.strptime(attributes["end_date"], "%Y-%m-%dT%H:%M:%S UTC")
            if start_date <= target_end_date and end_date >= target_start_date:
                files.append(file)
    return files

In [19]:
for var in variables:
    files = filter_files(var, resolution, target_start_date, target_end_date)
    if len(files) != 0:
    
        actris_parameter = parameters_dict[var]
        ebas_component = variable_mapping[actris_parameter]['var']
        
        print('Variable:', var, '- ACTRIS:', actris_parameter)
        
        # combine datasets that have the same variable and resolution
        combined_ds_list = []
        metadata = {}
        metadata[resolution] = {}
        
        print('Total number of files:', len(files))
        for i, file in enumerate(files):
            print(i, '-', file)
            # open file
            try:
                ds = xr.open_dataset(file)
            except:
                print('Error opening file')
                continue

            # get resolution
            coverage = ds.time_coverage_resolution
            resolution = coverages_dict[coverage]

            # assign station code as dimension
            ds = ds.expand_dims(dim={'station': [i]})
    
            # select data for that variable only
            unformatted_units = variable_mapping[actris_parameter]['units']
            if unformatted_units in units_dict.keys():
                units = units_dict[unformatted_units]
            else:
                print('Units could not be found')
                continue
            units_var = f'{ebas_component}_{units}'
            possible_vars = [ebas_component, 
                             f'{ebas_component}_amean', 
                             units_var, 
                             f'{units_var}_amean']
            for possible_var in possible_vars:
                if possible_var in ds:
                    ds_var = ds[possible_var]
                    break
            
            # save metadata
            for ebas_key in metadata_dict.keys():
                if ebas_key not in metadata[resolution].keys():
                    metadata[resolution][ebas_key] = []
                if ebas_key not in ds_var.attrs.keys():
                    metadata[resolution][ebas_key].append(np.nan)
                else:
                    metadata[resolution][ebas_key].append(ds_var.attrs[ebas_key])

            # remove all attributes except units
            ds_var.attrs = {key: value for key, value in ds_var.attrs.items() if key == 'units'}

            # rename variable to BSC standards
            ds_var = ds_var.to_dataset(name=var)

            # append modified dataset to list
            combined_ds_list.append(ds_var)

        # combine and create new dataset
        try:
            combined_ds = xr.concat(combined_ds_list, 
                                    dim='station', 
                                    combine_attrs='drop_conflicts')
        except Exception as error:
            print(f'Error: Datasets could not be combined - {error}')
            if 'time' in str(error):
                for item in combined_ds_list:
                    print(item.time.values[0], item.time.values[1])
            continue
        
        # add metadata
        for key, value in metadata[resolution].items():
            if metadata_dict[key] in ['latitude', 'longitude']:
                value = [float(val) for val in value]
            elif metadata_dict[key] == 'altitude':
                value = [float(val.replace('m', '').strip()) for val in value]
            combined_ds[metadata_dict[key]] = xr.Variable(data=value, dims=('station'))

        # add units for lat and lon
        combined_ds.latitude.attrs['units'] = 'degrees_north'
        combined_ds.longitude.attrs['units'] = 'degrees_east'

        # save data per year and month
        path = f'/home/avilanov/data/providentia/obs/nonghost/actris/actris/{resolution}/{var}'
        if not os.path.isdir(path):
            os.makedirs(path, exist_ok=True)
        for year, ds_year in combined_ds.groupby('time.year'):
            for month, ds_month in ds_year.groupby('time.month'):
                if target_start_date <= datetime(year, month, 1) <= target_end_date:
                    filename = f"{path}/{var}_{year}{month:02d}.nc"
                    combined_ds_yearmonth = combined_ds.sel(time=f"{year}-{month:02d}")
                    combined_ds_yearmonth.to_netcdf(filename)
                    print(f"Saved: {filename}")

Variable: sconco3 - ACTRIS: ozone mass concentration
Total number of files: 229
0 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/2F/57/5Y/2F57-5YWN.nc
1 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/2G/N5/BS/2GN5-BSG9.nc
2 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/2J/EC/FU/2JEC-FUND.nc
3 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/2M/DM/GF/2MDM-GFRA.nc
4 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/2Q/QE/CB/2QQE-CBAC.nc
5 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/2R/WD/A6/2RWD-A6UJ.nc
6 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/3B/4H/V7/3B4H-V7QF.nc
7 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/3D/JD/E2/3DJD-E279.nc
8 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/3E/YC/7M/3EYC-7MNN.nc
9 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/3G/C7/QN/3GC7-QNSG.nc
10 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/3H/34/NS/3H34-NS7R.nc
11 - https://thredds.nilu.no/thredds/dodsC/ebas_doi/3H/5X/DG/3H5X-DG99.nc
12 - https://thredds.nilu.no/thredds/dodsC

In [20]:
test_data = xr.open_dataset('/home/avilanov/data/providentia/obs/nonghost/actris/actris/hourly/sconcno2/sconcno2_201801.nc')
test_data

In [21]:
test_data = xr.open_dataset('/home/avilanov/data/providentia/obs/nonghost/actris/actris/hourly/sconcno2/sconcno2_201802.nc')
test_data

In [22]:
nonghost_data = xr.open_dataset('/home/avilanov/data/providentia/obs/nonghost/eea/eionet/hourly/sconcno2/sconcno2_202406.nc')
nonghost_data

In [23]:
ghost_data = xr.open_dataset('/home/avilanov/data/providentia/obs/ghost/EEA_AQ_eReporting/1.5/hourly/sconcno2/sconcno2_201805.nc')
ghost_data