# Working with intake-spark catalogs

Intake-spark catalogs, as any other intake catalog, includes metadata and the list of all available files in a dataset. It's devloped to work with very big datasets like CMIP6 and saves a complete metadata including licences, information about variables, etc. For this reason, the list of files is not a regular csv/pandas table, it is saved using parket files. 

This very short tutorial is inspired by NCI's dataset workshop and materials. 

To work with spark, we need to create a session. This will allow us to read the parquet files in the catalog.

In [1]:
from intake_spark.base import SparkHolder

spark_config = {
    "spark.executor.memory": "2g",  
    "spark.driver.memory": "4g",  
    "spark.executor.instances": "2", 
}

h = SparkHolder(True, [('catalog', )],spark_config) #  A placeholder indicating which intake catalogs to register or initialize (you could extend this to use actual catalogs).
h.setup()  # This actually sets up and starts the Spark session using the configurations you provided.
session = h.session[0]  # Accesses the first Spark session created by SparkHolder and stores it in the variable session.

session.conf.set("spark.sql.caseSensitive", "true")  #Sets a Spark SQL configuration so that column name matching is case-sensitive.

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/09 15:10:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Now, we can load a catalog. It is necesary to have access to the specific proyect. In this case, we'll work with ERA-Land located in `zz33`.

In [2]:
import intake
catalog = intake.open_catalog("/g/data/zz93/catalog/yml/driver.yml")
df = catalog.mydata.to_spark()
print(type(df))

[Stage 0:>                                                          (0 + 1) / 1]

<class 'pyspark.sql.classic.dataframe.DataFrame'>


                                                                                

A pyspark data frame, is similar to a pandas dataframe but optimized for big data processing. Let see how to navigate this dataframe to search and filter the file we want to read. 

In [3]:
df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+
|          attributes|          dimensions|             file_md|file_type|            file_uri|              format|           variables|
+--------------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+
|{CF-1.6, 2020-10-...|{{1801, false}, {...|{2022-01-26 20:21...|        f|/g/data/zz93/era5...|NC_FORMAT_NETCDF4...|{NULL, NULL, NULL...|
|{CF-1.6, 2022-03-...|{{1801, false}, {...|{2022-06-28 04:30...|        f|/g/data/zz93/era5...|NC_FORMAT_NETCDF4...|{NULL, NULL, NULL...|
|{CF-1.6, 2022-03-...|{{1801, false}, {...|{2022-06-28 04:30...|        f|/g/data/zz93/era5...|NC_FORMAT_NETCDF4...|{NULL, NULL, NULL...|
|{CF-1.6, 2020-10-...|{{1801, false}, {...|{2022-01-26 20:21...|        f|/g/data/zz93/era5...|NC_FORMAT_NETCDF4...|{NULL, NULL, NULL...|
|{CF-1.6, 2020-10-...|{{1801, fals

                                                                                

It has a few columns, but each one will containg a lot of information. 

In [7]:
df.columns

['attributes',
 'dimensions',
 'file_md',
 'file_type',
 'file_uri',
 'format',
 'variables']

In [12]:
df.select("file_uri").distinct().show()

+--------------------+
|            file_uri|
+--------------------+
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
|/g/data/zz93/era5...|
+--------------------+
only showing top 20 rows


Another way to inspect the columns is with the `.printSchema()` function. The "Schema" is the structure of the dataframe and the information is usually organised in levels, like a tree. 

In [4]:
df.select("dimensions").printSchema()

root
 |-- dimensions: struct (nullable = true)
 |    |-- latitude: struct (nullable = true)
 |    |    |-- length: long (nullable = true)
 |    |    |-- unlimited: boolean (nullable = true)
 |    |-- longitude: struct (nullable = true)
 |    |    |-- length: long (nullable = true)
 |    |    |-- unlimited: boolean (nullable = true)
 |    |-- time: struct (nullable = true)
 |    |    |-- length: long (nullable = true)
 |    |    |-- unlimited: boolean (nullable = true)



In [19]:
df.select("dimensions.time").distinct().show(truncate=False)

+------------+
|time        |
+------------+
|{24, false} |
|{743, false}|
|{25, false} |
|{672, false}|
|{720, false}|
|{744, false}|
|{1, false}  |
|{696, false}|
+------------+



In [22]:
df.select("variables.d2m").distinct().show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|d2m                                                                                                                                                                         |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{{-32767, 2.54442, 2 metre dewpoint temperature, -32767, 1.41827, K}, {[5, 361, 720], NC_CHUNKED}, {5, true, true}, [time, latitude, longitude], NC_ENDIAN_LITTLE, NC_SHORT}|
|{{-32767, 2.52901, 2 metre dewpoint temperature, -32767, 1.41069, K}, {[5, 361, 720], NC_CHUNKED}, {5, true, true}, [time, latitude, longitude], NC_ENDIAN_LITTLE, NC_SHORT}|
|{{-32767, 2.51008, 2 metre dewpoint temperature, -32767, 1.46302, K}, {[5, 361, 720], NC_CHUNKED}, {5, true, true}, [time, l

In [36]:
df.select("variables.d2m.attributes.long_name",
          "file_uri").distinct().show(truncate=False)


+----------------------------+--------------------------------------------------------------------------------------------------+
|long_name                   |file_uri                                                                                          |
+----------------------------+--------------------------------------------------------------------------------------------------+
|2 metre dewpoint temperature|/g/data/zz93/era5-land/monthly-averaged-by-hour/2d/1959/2d_era5-land_mnth_sfc_19591201-19591231.nc|
|2 metre dewpoint temperature|/g/data/zz93/era5-land/monthly-averaged-by-hour/2d/1983/2d_era5-land_mnth_sfc_19830601-19830630.nc|
|2 metre dewpoint temperature|/g/data/zz93/era5-land/monthly-averaged-by-hour/2d/1953/2d_era5-land_mnth_sfc_19530601-19530630.nc|
|2 metre dewpoint temperature|/g/data/zz93/era5-land/monthly-averaged-by-hour/2d/1969/2d_era5-land_mnth_sfc_19690101-19690131.nc|
|2 metre dewpoint temperature|/g/data/zz93/era5-land/monthly-averaged-by-hour/2d/1964/2d_e

With that table we can create a pandas dataframe and, for example, parse the time range to filter the file we want to read. 

In [55]:
import pandas as pd

d2m_data = df.select("variables.d2m.attributes.long_name",
          "file_uri").distinct().toPandas()

d2m_data['time_range'] = d2m_data['file_uri'].str.extract(r'(\d{8}-\d{8})\.nc$')
d2m_data[['ini_date', 'end_date']] = d2m_data['time_range'].str.split('-', expand=True)


d2m_data['ini_date'] = pd.to_datetime(d2m_data['ini_date'], format='%Y%m%d')
d2m_data['end_date'] = pd.to_datetime(d2m_data['end_date'], format='%Y%m%d')

d2m_data

Unnamed: 0,long_name,file_uri,time_range,ini_date,end_date
0,2 metre dewpoint temperature,/g/data/zz93/era5-land/monthly-averaged-by-hou...,19591201-19591231,1959-12-01,1959-12-31
1,2 metre dewpoint temperature,/g/data/zz93/era5-land/monthly-averaged-by-hou...,19830601-19830630,1983-06-01,1983-06-30
2,2 metre dewpoint temperature,/g/data/zz93/era5-land/monthly-averaged-by-hou...,19530601-19530630,1953-06-01,1953-06-30
3,2 metre dewpoint temperature,/g/data/zz93/era5-land/monthly-averaged-by-hou...,19690101-19690131,1969-01-01,1969-01-31
4,2 metre dewpoint temperature,/g/data/zz93/era5-land/monthly-averaged-by-hou...,19640901-19640930,1964-09-01,1964-09-30
...,...,...,...,...,...
135513,,/g/data/zz93/era5-land/monthly-averaged/rsn/20...,20100501-20100531,2010-05-01,2010-05-31
135514,,/g/data/zz93/era5-land/monthly-averaged/rsn/19...,19780101-19780131,1978-01-01,1978-01-31
135515,,/g/data/zz93/era5-land/monthly-averaged/rsn/19...,19870801-19870831,1987-08-01,1987-08-31
135516,,/g/data/zz93/era5-land/monthly-averaged/stl1/1...,19661201-19661231,1966-12-01,1966-12-31


In [5]:
session.stop()