# RF PYTHON MPI 66 k

In [1]:
%%writefile rfpm6.py
import argparse, logging, os, sys, datetime, pandas as pd, numpy as np
from joblib import Parallel, parallel_backend, register_parallel_backend
from joblib import delayed, cpu_count
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from scipy.io import arff
import ipyparallel as ipp
from ipyparallel.joblib import IPythonParallelBackend
from time import time
t = time()

# Prepare the engines
c = ipp.Client(profile = sys.argv[3])
ncli = len(c.ids)
bview = c.load_balanced_view()
register_parallel_backend(
    'ipyparallel',
    lambda : IPythonParallelBackend(view = bview))

# Get & prepare data
data = arff.loadarff(sys.argv[1])
df = pd.DataFrame(data[0])
df = df.replace(b'N', 0)
df = df.replace(b'Y', 1)
df['class'] = df['class'].str.decode('utf-8').fillna(df['class'])
y_train = df['class']
X_train = df.drop(columns=['class'])
imp = SimpleImputer(missing_values = np.nan, strategy = 'mean')
df2 = pd.DataFrame(imp.fit_transform(X_train))
df2.columns = X_train.columns
df2.index = X_train.index
X_train = df2

datat = arff.loadarff(sys.argv[2])
df = pd.DataFrame(datat[0])
df = df.replace(b'N', 0)
df = df.replace(b'Y', 1)
df['class'] = df['class'].str.decode('utf-8').fillna(df['class'])
y_test = df['class']
X_test = df.drop(columns = ['class'])
imp = SimpleImputer(missing_values = np.nan, strategy = 'mean')
df2 = pd.DataFrame(imp.fit_transform(X_test))
df2.columns = X_test.columns
df2.index = X_test.index
X_test = df2

clf = RandomForestClassifier(n_estimators = 100)
with parallel_backend('ipyparallel') :
    clf.fit(X_train, y_train)

y_pred_test  = clf.predict(X_test)
y_pred_train = clf.predict(X_train)
accu = metrics.accuracy_score(y_train, y_pred_train, normalize = False)
trsi = y_train.size
perr = ((trsi - accu) / (trsi)) * 100
kapp = metrics.cohen_kappa_score(y_train, y_pred_train)
print(f'Trainset classification error is {perr:.2f}% ',
      f'of {trsi} (kappa: {kapp:.4f})')
accu = metrics.accuracy_score(y_test, y_pred_test, normalize = False)
trsi = y_test.size
perr = ((trsi - accu) / (trsi)) * 100
kapp = metrics.cohen_kappa_score(y_test, y_pred_test)
print(f' Testset classification error is {perr:.2f}% ',
      f'of {trsi} (kappa: {kapp:.4f})')

t = time() - t
print(f"T: {t:.4f}  |  N: {ncli:0g}")

c.shutdown(hub=True, block=False)

Overwriting rfpm6.py


## Copy to /scratch

In [2]:
! cp rfpm6.py /scratch${PWD#/prj}

## Slurm script

In [3]:
%%writefile rfpm6.srm
#!/bin/bash
#SBATCH --job-name rfpm6       # Job name
#SBATCH --partition cpu_small  # Select partition
#SBATCH --ntasks=1             # Total tasks(CPUs)
#SBATCH --time=00:10:00        # Limit execution time
#SBATCH --exclusive            # Exclusive acccess to nodes

echo '========================================'
echo '- Job ID:' $SLURM_JOB_ID
echo '- # of nodes in the job:' $SLURM_JOB_NUM_NODES
echo '- # of tasks:' $SLURM_NTASKS
echo '- Dir from which sbatch was invoked:' ${SLURM_SUBMIT_DIR##*/}
cd $SLURM_SUBMIT_DIR
echo -n '- Nodes allocated to the job: '
nodeset -e $SLURM_JOB_NODELIST

# get path
RF=/scratch${PWD#/prj}
SCR=${RF%/rf}
cd $RF
# path to a directory which IPython will use for user data
export IPYTHONDIR=$SCR/.ipython
              
# load Python environment and MPI module
source $SCR/env2/etc/profile.d/conda.sh
conda activate $SCR/env2
module load openmpi/gnu/4.0.1

echo -n '<1. starting ipython>        ' && date
# create a new ipython profile appended with the job id number
PROFILE=job_${SLURM_JOB_ID}
ipython profile create ${PROFILE} --parallel --quiet

echo -n '<2. starting ipcontroller>   ' && date
# run ipcontroler on one core
ipcontroller --ip="*" --profile=${PROFILE} --quiet &
sleep 10

echo -n '<3. starting srun ipengine>  ' && date
# run ipengine on each available core
srun --mpi=pmi2 -n $SLURM_NTASKS \
    ipengine --location=$(hostname) --profile=${PROFILE} --quiet &
sleep 25

# Executable
DT1="datasets/asteroid-train-66k.arff"
DT2="datasets/asteroid-test-34k.arff"
EXEC="rfpm6.py ${DT1} ${DT2} ${PROFILE}"

# run
echo -n '<4. starting python script > ' && date
echo '-- output -----------------------------'
python ${EXEC}
echo '-- end --------------------------------'
echo -n '<5. quit>                    ' && date
sleep 25

Overwriting rfpm6.srm


## Check

In [4]:
! sbatch --partition cpu_dev --ntasks=96 rfpm6.srm

Submitted batch job 1377056


In [5]:
! squeue --name=rfpm6 --partition=cpu_dev --format="%.19V %.19S %.8i %.9P %.5D %.4C"

        SUBMIT_TIME          START_TIME    JOBID PARTITION NODES CPUS
2021-10-05T19:56:31 2021-10-05T19:56:32  1377056   cpu_dev     4   96


In [8]:
! squeue --name=rfpm6 --partition=cpu_dev --format="%.19V %.19S %.8i %.9P %.5D %.4C"

        SUBMIT_TIME          START_TIME    JOBID PARTITION NODES CPUS


In [9]:
! cat /scratch${PWD#/prj}/slurm-1377056.out

- Job ID: 1377056
- # of nodes in the job: 4
- # of tasks: 96
- Dir from which sbatch was invoked: rf
- Nodes allocated to the job: sdumont1244 sdumont1245 sdumont1246 sdumont1247
<1. starting ipython>        Ter Out  5 19:56:48 -03 2021
<2. starting ipcontroller>   Ter Out  5 19:57:43 -03 2021
<3. starting srun ipengine>  Ter Out  5 19:57:53 -03 2021
<4. starting python script > Ter Out  5 19:58:18 -03 2021
-- output -----------------------------
Trainset classification error is 0.00%  of 66000 (kappa: 1.0000)
 Testset classification error is 0.00%  of 34000 (kappa: 0.9997)
T: 38.8848  |  N: 96
-- end --------------------------------
<5. quit>                    Ter Out  5 20:00:32 -03 2021


In [29]:
! sacct --jobs=1377056 --format=jobname,ncpus%5,nnodes%6,maxrss,maxrssnode%13,submit,start,elapsed,cputime

   JobName NCPUS NNodes     MaxRSS    MaxRSSNode              Submit               Start    Elapsed    CPUTime 
---------- ----- ------ ---------- ------------- ------------------- ------------------- ---------- ---------- 
     rfpm6    96      4                          2021-10-05T19:56:31 2021-10-05T19:56:32   00:04:25   07:04:00 
     batch    24      1   2219516K   sdumont1244 2021-10-05T19:56:32 2021-10-05T19:56:32   00:04:25   01:46:00 
  ipengine    96      4    128284K   sdumont1244 2021-10-05T19:57:53 2021-10-05T19:57:53   00:02:40   04:16:00 


In [10]:
! sbatch --partition cpu_dev --ntasks=1 rfpm6.srm

Submitted batch job 1377058


In [11]:
! squeue --name=rfpm6 --partition=cpu_dev --format="%.19V %.19S %.8i %.9P %.5D %.4C"

        SUBMIT_TIME          START_TIME    JOBID PARTITION NODES CPUS
2021-10-05T20:01:33 2021-10-05T20:01:34  1377058   cpu_dev     1   24


In [16]:
! squeue --name=rfpm6 --partition=cpu_dev --format="%.19V %.19S %.8i %.9P %.5D %.4C"

        SUBMIT_TIME          START_TIME    JOBID PARTITION NODES CPUS


In [17]:
! cat /scratch${PWD#/prj}/slurm-1377058.out

- Job ID: 1377058
- # of nodes in the job: 1
- # of tasks: 1
- Dir from which sbatch was invoked: rf
- Nodes allocated to the job: sdumont1244
<1. starting ipython>        Ter Out  5 20:01:42 -03 2021
<2. starting ipcontroller>   Ter Out  5 20:02:16 -03 2021
<3. starting srun ipengine>  Ter Out  5 20:02:26 -03 2021
<4. starting python script > Ter Out  5 20:02:51 -03 2021
-- output -----------------------------
Trainset classification error is 0.00%  of 66000 (kappa: 1.0000)
 Testset classification error is 0.00%  of 34000 (kappa: 0.9997)
T: 28.1289  |  N: 1
-- end --------------------------------
<5. quit>                    Ter Out  5 20:04:09 -03 2021


In [18]:
! sacct --jobs=1377058 --format=jobname,ncpus,nnodes,maxrss,maxrssnode%13,start,elapsed,cputime

   JobName      NCPUS   NNodes     MaxRSS    MaxRSSNode               Start    Elapsed    CPUTime 
---------- ---------- -------- ---------- ------------- ------------------- ---------- ---------- 
     rfpm6         24        1                          2021-10-05T20:01:34   00:03:00   01:12:00 
     batch         24        1   1682044K   sdumont1244 2021-10-05T20:01:34   00:03:00   01:12:00 
  ipengine          1        1    334804K   sdumont1244 2021-10-05T20:02:27   00:01:42   00:01:42 


## Run

### 1 of (1, 4, 16, 24, 48, 72, 96)

In [19]:
%%bash
sbatch --ntasks=1 rfpm6.srm
sbatch --ntasks=1 rfpm6.srm
sbatch --ntasks=1 rfpm6.srm
sbatch --ntasks=1 rfpm6.srm

Submitted batch job 1377061
Submitted batch job 1377062
Submitted batch job 1377063
Submitted batch job 1377064


In [None]:
%%bash
cat /scratch${PWD#/prj}/slurm-xxx.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out

### 4 of (1, 4, 16, 24, 48, 72, 96)

In [20]:
%%bash
sbatch --ntasks=4 rfpm6.srm
sbatch --ntasks=4 rfpm6.srm
sbatch --ntasks=4 rfpm6.srm
sbatch --ntasks=4 rfpm6.srm

Submitted batch job 1377065
Submitted batch job 1377066
Submitted batch job 1377067
Submitted batch job 1377068


In [None]:
%%bash
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out

### 16 of (1, 4, 16, 24, 48, 72, 96)

In [21]:
%%bash
sbatch --ntasks=16 rfpm6.srm
sbatch --ntasks=16 rfpm6.srm
sbatch --ntasks=16 rfpm6.srm
sbatch --ntasks=16 rfpm6.srm

Submitted batch job 1377069
Submitted batch job 1377070
Submitted batch job 1377071
Submitted batch job 1377072


In [None]:
%%bash
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out

### 24 of (1, 4, 16, 24, 48, 72, 96)

In [22]:
%%bash
sbatch --ntasks=24 rfpm6.srm
sbatch --ntasks=24 rfpm6.srm
sbatch --ntasks=24 rfpm6.srm
sbatch --ntasks=24 rfpm6.srm

Submitted batch job 1377073
Submitted batch job 1377074
Submitted batch job 1377075
Submitted batch job 1377076


In [None]:
%%bash
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out

### 48 of (1, 4, 16, 24, 48, 72, 96)

In [23]:
%%bash
sbatch --ntasks=48 rfpm6.srm
sbatch --ntasks=48 rfpm6.srm
sbatch --ntasks=48 rfpm6.srm
sbatch --ntasks=48 rfpm6.srm

Submitted batch job 1377077
Submitted batch job 1377078
Submitted batch job 1377079
Submitted batch job 1377080


In [None]:
%%bash
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out

### 72 of (1, 4, 16, 24, 48, 72, 96)

In [24]:
%%bash
sbatch --ntasks=72 rfpm6.srm
sbatch --ntasks=72 rfpm6.srm
sbatch --ntasks=72 rfpm6.srm
sbatch --ntasks=72 rfpm6.srm

Submitted batch job 1377081
Submitted batch job 1377082
Submitted batch job 1377083
Submitted batch job 1377084


In [None]:
%%bash
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out

### 96 of (1, 4, 16, 24, 48, 72, 96)

In [25]:
%%bash
sbatch --ntasks=96 rfpm6.srm
sbatch --ntasks=96 rfpm6.srm
sbatch --ntasks=96 rfpm6.srm
sbatch --ntasks=96 rfpm6.srm

Submitted batch job 1377085
Submitted batch job 1377086
Submitted batch job 1377087
Submitted batch job 1377088


In [None]:
%%bash
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out
cat /scratch${PWD#/prj}/slurm-XXXX.out

In [26]:
! squeue -u $(whoami) -h -r | wc -l

28


In [27]:
! squeue --partition=cpu_small -h -r | wc -l

394


In [28]:
! squeue --name=rfpm6 --format="%.19V %.19S %.8i %.9P %.5D %.4C"

        SUBMIT_TIME          START_TIME    JOBID PARTITION NODES CPUS
2021-10-05T20:05:38 2021-10-07T04:44:38  1377081 cpu_small     3   72
2021-10-05T20:05:38 2021-10-07T04:44:38  1377082 cpu_small     3   72
2021-10-05T20:05:38 2021-10-07T04:44:38  1377083 cpu_small     3   72
2021-10-05T20:05:39 2021-10-07T04:44:38  1377084 cpu_small     3   72
2021-10-05T20:05:42 2021-10-07T04:44:38  1377085 cpu_small     4   96
2021-10-05T20:05:42 2021-10-07T04:44:38  1377086 cpu_small     4   96
2021-10-05T20:05:42 2021-10-07T04:44:38  1377087 cpu_small     4   96
2021-10-05T20:05:42 2021-10-07T04:44:38  1377088 cpu_small     4   96
2021-10-05T20:05:35 2021-10-06T23:55:02  1377077 cpu_small     2   48
2021-10-05T20:05:35 2021-10-06T23:55:02  1377078 cpu_small     2   48
2021-10-05T20:05:35 2021-10-06T23:55:02  1377079 cpu_small     2   48
2021-10-05T20:05:35 2021-10-06T23:55:02  1377080 cpu_small     2   48
2021-10-05T20:05:22 2021-10-06T23:35:24  1377061 cpu_small     1    1
2021-10-05T20:05:22 

In [None]:
! squeue --name=rfpm6 --format="%.19V %.19S %.8i %.9P %.5D %.4C"

In [None]:
! sacct --jobs=xxxx --format=jobname,ncpus,nnodes,maxrss,maxrssnode%13,start,elapsed,cputime