# T-prime calculation with job-queue

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
import dask
import numpy as np
import xarray as xr
import pandas as pd
import seaborn as sns
import cartopy.crs as ccrs
import matplotlib.pyplot as plt
from pathlib import Path

from methods import (t_prime_calculation, 
                         dask_data_to_xarray,
                         area_calculation_real_area,
                         dists_of_lat_eff,
                         temp_ref)

# To use dask across the cluster
from dask_jobqueue import SLURMCluster
from dask.distributed import Client


import matplotlib
matplotlib.rcParams.update({'font.size': 15})

### Set up cluster
Use Dask's `job-queue` to process all jobs in the cluster. Check `~/.config/dask/jobqueue.yaml` file to set cluster configuration. Check `cluster.job_script()` text to explore the file configfuration. 

In [3]:
cluster = SLURMCluster(nanny=False)
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p broadwl
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=19G
#SBATCH -t 00:30:00
#SBATCH --account=geos39650
#SBATCH --output=dask_worker.out
#SBATCH --error=dask_worker.err

/scratch/midway2/ivanhigueram/reanalysis_env/bin/python -m distributed.cli.dask_worker tcp://172.25.220.71:46816 --nthreads 4 --memory-limit 20.00GB --name name --no-nanny --death-timeout 60 --local-directory $SCRATCH --interface ib0



In [4]:
client = Client(cluster)

In [5]:
cluster.scale(10)

In [6]:
client

0,1
Client  Scheduler: tcp://172.25.220.71:45905  Dashboard: http://172.25.220.71:8787/status,Cluster  Workers: 10  Cores: 40  Memory: 200.00 GB


### Or test code in nodes if you are in `sinteractive`

If you don't want to submit jobs, or simply want to checkout some fast computation, you can call the client within the log-in node. This way will give you all the log-in node resources to parallel operations

**WARNING**: System administrators do not like users running heavy computations in cluster. 

In [4]:
#client.upload_file('dask-worker-space/methods.py')

from methods import (t_prime_calculation, 
                         dask_data_to_xarray,
                         area_calculation_real_area,
                         dists_of_lat_eff,
                         temp_ref)

# Weekly Reanalysis Data - Main (reshape `xarray`)



Using the `xarray` library, let's analyze the reanalysis product. Instead of reshaping (or re-indexing) our data, we will extract the pure values and use `pandas` to pivot everything.

### Test with raw files 

In [7]:
path_to_year_era = Path('/project2/geos39650/jet_stream/cdsapi_requested_files').rglob('*.nc')

In [8]:
test_paths = list(path_to_year_era)
test_paths_sorted = sorted(test_paths, key= lambda x: int(x.name.split('_')[-2]))

In [10]:
from src.analyzer import preprocesser
xarray_list_daily = xr.open_mfdataset(test_paths_sorted, 
                                      combine='by_coords',
                                      parallel=True,
                                      preprocess=preprocesser)

In [12]:
xarray_list_daily = xarray_list_daily.where(xarray_list_daily.latitude > 20, drop=True)

In [13]:
xarray_list_daily

Unnamed: 0,Array,Chunk
Bytes,6.22 GB,293.53 MB
Shape,"(3857, 280, 1440)","(182, 280, 1440)"
Count,354 Tasks,23 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 6.22 GB 293.53 MB Shape (3857, 280, 1440) (182, 280, 1440) Count 354 Tasks 23 Chunks Type float32 numpy.ndarray",1440  280  3857,

Unnamed: 0,Array,Chunk
Bytes,6.22 GB,293.53 MB
Shape,"(3857, 280, 1440)","(182, 280, 1440)"
Count,354 Tasks,23 Chunks
Type,float32,numpy.ndarray


In [3]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:42471")
client

client.upload_file('methods.py')

In [5]:
import time
from datetime import datetime
from dateutil.relativedelta import relativedelta


# Define time ranges 
start_year = datetime(1980, 12, 1).strftime('%Y-%m-%d')
end_year = datetime(1983, 3, 1).strftime('%Y-%m-%d')

print(f'Processing: {start_year} to {end_year}')
# Set up infrastructure
#cluster = SLURMCluster(nanny=False)
#print(cluster.job_script())

#client = Client(cluster)
#cluster.scale(30)

# Wait for clients
#time.sleep(60)
#print(client)

# Open file and slice to the desired time range
print(f'Open File and start processing')
xarray_list_daily = xr.open_dataset('/project2/geos39650/jet_stream/data/df_lat_20_1D.nc',
                               chunks={'time': 1})
xarray_list_daily = xarray_list_daily.sel(time=slice(start_year, end_year))

# Calculate t-prime for the desired time range
print(f'Start t-prime calulation')
t_df = xarray_list_daily.to_dask_dataframe(dim_order=['time', 'latitude', 'longitude'])
test_ultimate = t_prime_calculation(t_df,
                                resample_time=False,
                                grouping_time_interval='1w',
                                build_buckets=True,
                                cut_interval=2
                               )

# Save results to NetCDF
print(f'Saving file to netcdf')
t_ref_test = dask_data_to_xarray(df=test_ultimate,
                         dims=['time', 'latitude', 'longitude'],
                         target_variable='t_prime')

t_ref_test.to_netcdf(f'/project2/geos39650/jet_stream/data/t_ref_test_blobs_resample_{start_year}_{end_year}.nc4')

# Close connections and free resources
#client.close()
#cluster.close()

Processing: 1980-12-01 to 1983-03-01
Open File and start processing
Start t-prime calulation


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  pdf_lat_effs='eff_lat_deg')


Saving file to netcdf


In [4]:
import time
from datetime import datetime
from dateutil.relativedelta import relativedelta

for year in range(1980, 2020, 5):
    # Define time ranges 
    start_year = datetime(year, 12, 1).strftime('%Y-%m-%d')
    end_year = datetime(year + 5, 3, 1).strftime('%Y-%m-%d')
    
    print(f'Processing: {start_year} to {end_year}')
    # Set up infrastructure
    cluster = SLURMCluster(nanny=False)
    print(cluster.job_script())
    
    client = Client(cluster)
    cluster.scale(30)
    
    # Wait for clients
    time.sleep(60)
    print(client)
    
    # Open file and slice to the desired time range
    print(f'Open File and start processing')
    xarray_list_daily = xr.open_dataset('/project2/geos39650/jet_stream/data/df_lat_20_1D.nc',
                                   chunks={'time': 1})
    xarray_list_daily = xarray_list_daily.sel(time=slice(start_year, end_year))
    
    # Calculate t-prime for the desired time range
    print(f'Start t-prime calulation')
    t_df = xarray_list_daily.to_dask_dataframe(dim_order=['time', 'latitude', 'longitude'])
    test_ultimate = t_prime_calculation(t_df,
                                    resample_time=False,
                                    grouping_time_interval='1w',
                                    build_buckets=True,
                                    cut_interval=2
                                   )
    
    # Save results to NetCDF
    print(f'Saving file to netcdf')
    t_ref_test = dask_data_to_xarray(df=test_ultimate,
                             dims=['time', 'latitude', 'longitude'],
                             target_variable='t_prime')
    
    t_ref_test.to_netcdf(f'/project2/geos39650/jet_stream/data/t_prime_test_blobs_no_resample_{start_year}_{end_year}.nc4')
    
    # Close connections and free resources
    client.close()
    cluster.close()

Processing: 1980-12-01 to 1985-03-01
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p broadwl
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=19G
#SBATCH -t 00:30:00
#SBATCH --qos=covid-19
#SBATCH --account=covid-19
#SBATCH --output=dask_worker.out
#SBATCH --error=dask_worker.err

/scratch/midway2/ivanhigueram/reanalysis_env/bin/python -m distributed.cli.dask_worker tcp://172.25.220.71:37923 --nthreads 4 --memory-limit 20.00GB --name name --no-nanny --death-timeout 60 --local-directory $SCRATCH --interface ib0

<Client: 'tcp://172.25.220.71:37923' processes=30 threads=120, memory=600.00 GB>
Open File and start processing
Start t-prime calulation


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  pdf_lat_effs='eff_lat_deg')


Saving file to netcdf
Processing: 1985-12-01 to 1990-03-01
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p broadwl
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=19G
#SBATCH -t 00:30:00
#SBATCH --qos=covid-19
#SBATCH --account=covid-19
#SBATCH --output=dask_worker.out
#SBATCH --error=dask_worker.err

/scratch/midway2/ivanhigueram/reanalysis_env/bin/python -m distributed.cli.dask_worker tcp://172.25.220.71:40833 --nthreads 4 --memory-limit 20.00GB --name name --no-nanny --death-timeout 60 --local-directory $SCRATCH --interface ib0

<Client: 'tcp://172.25.220.71:40833' processes=20 threads=80, memory=400.00 GB>
Open File and start processing
Start t-prime calulation


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  pdf_lat_effs='eff_lat_deg')


Saving file to netcdf
Processing: 1990-12-01 to 1995-03-01
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p broadwl
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=19G
#SBATCH -t 00:30:00
#SBATCH --qos=covid-19
#SBATCH --account=covid-19
#SBATCH --output=dask_worker.out
#SBATCH --error=dask_worker.err

/scratch/midway2/ivanhigueram/reanalysis_env/bin/python -m distributed.cli.dask_worker tcp://172.25.220.71:33027 --nthreads 4 --memory-limit 20.00GB --name name --no-nanny --death-timeout 60 --local-directory $SCRATCH --interface ib0

<Client: 'tcp://172.25.220.71:33027' processes=30 threads=120, memory=600.00 GB>
Open File and start processing
Start t-prime calulation


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  pdf_lat_effs='eff_lat_deg')


Saving file to netcdf
Processing: 1995-12-01 to 2000-03-01
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p broadwl
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=19G
#SBATCH -t 00:30:00
#SBATCH --qos=covid-19
#SBATCH --account=covid-19
#SBATCH --output=dask_worker.out
#SBATCH --error=dask_worker.err

/scratch/midway2/ivanhigueram/reanalysis_env/bin/python -m distributed.cli.dask_worker tcp://172.25.220.71:43071 --nthreads 4 --memory-limit 20.00GB --name name --no-nanny --death-timeout 60 --local-directory $SCRATCH --interface ib0

<Client: 'tcp://172.25.220.71:43071' processes=30 threads=120, memory=600.00 GB>
Open File and start processing
Start t-prime calulation


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  pdf_lat_effs='eff_lat_deg')


Saving file to netcdf
Processing: 2000-12-01 to 2005-03-01
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p broadwl
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=19G
#SBATCH -t 00:30:00
#SBATCH --qos=covid-19
#SBATCH --account=covid-19
#SBATCH --output=dask_worker.out
#SBATCH --error=dask_worker.err

/scratch/midway2/ivanhigueram/reanalysis_env/bin/python -m distributed.cli.dask_worker tcp://172.25.220.71:37133 --nthreads 4 --memory-limit 20.00GB --name name --no-nanny --death-timeout 60 --local-directory $SCRATCH --interface ib0

<Client: 'tcp://172.25.220.71:37133' processes=30 threads=120, memory=600.00 GB>
Open File and start processing
Start t-prime calulation


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  pdf_lat_effs='eff_lat_deg')


Saving file to netcdf
Processing: 2005-12-01 to 2010-03-01
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p broadwl
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=19G
#SBATCH -t 00:30:00
#SBATCH --qos=covid-19
#SBATCH --account=covid-19
#SBATCH --output=dask_worker.out
#SBATCH --error=dask_worker.err

/scratch/midway2/ivanhigueram/reanalysis_env/bin/python -m distributed.cli.dask_worker tcp://172.25.220.71:39490 --nthreads 4 --memory-limit 20.00GB --name name --no-nanny --death-timeout 60 --local-directory $SCRATCH --interface ib0

<Client: 'tcp://172.25.220.71:39490' processes=30 threads=120, memory=600.00 GB>
Open File and start processing
Start t-prime calulation


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  pdf_lat_effs='eff_lat_deg')


Saving file to netcdf
Processing: 2010-12-01 to 2015-03-01
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p broadwl
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=19G
#SBATCH -t 00:30:00
#SBATCH --qos=covid-19
#SBATCH --account=covid-19
#SBATCH --output=dask_worker.out
#SBATCH --error=dask_worker.err

/scratch/midway2/ivanhigueram/reanalysis_env/bin/python -m distributed.cli.dask_worker tcp://172.25.220.71:37726 --nthreads 4 --memory-limit 20.00GB --name name --no-nanny --death-timeout 60 --local-directory $SCRATCH --interface ib0

<Client: 'tcp://172.25.220.71:37726' processes=29 threads=116, memory=580.00 GB>
Open File and start processing
Start t-prime calulation


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  pdf_lat_effs='eff_lat_deg')


Saving file to netcdf
Processing: 2015-12-01 to 2020-03-01
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p broadwl
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=19G
#SBATCH -t 00:30:00
#SBATCH --qos=covid-19
#SBATCH --account=covid-19
#SBATCH --output=dask_worker.out
#SBATCH --error=dask_worker.err

/scratch/midway2/ivanhigueram/reanalysis_env/bin/python -m distributed.cli.dask_worker tcp://172.25.220.71:44976 --nthreads 4 --memory-limit 20.00GB --name name --no-nanny --death-timeout 60 --local-directory $SCRATCH --interface ib0

<Client: 'tcp://172.25.220.71:44976' processes=30 threads=120, memory=600.00 GB>
Open File and start processing
Start t-prime calulation


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  pdf_lat_effs='eff_lat_deg')


Saving file to netcdf
