### bigplanet Serial vs. Parallel Example

David Fleming, Feb 2017

---

In this notebook, I'll run through extracting data with bigplanet in parallel or serial to show that from the user's standpoint, they are equivalent.

** Serial **

---

Extracting data in serial uses only one core and stuffs the simulation results into 1 hdf5 file.

** Parallel **

---

Extracting data in parallel uses all available cores on a machine (or node if using bigplanet on a cluster) and produces 1 hdf5 file per core.  Bigplanet handles all the messy data accessing/handling problems of multiple hdf5 files under the hood so the API is consistent between serial and parallel modes.

In [1]:
#Imports
%matplotlib inline

from __future__ import print_function, division, absolute_import

#Imports
import numpy as np
import os
from bigplanet import data_extraction as de

** Input definitions **

---

Define the parameters required to extract the simulation data.

In [2]:
### Load in the data ###

# Define root dirctory where all sim sub directories are located
src = "/Users/dflemin3/Desktop/GM_run/Data"

# Define where you want the hdf5 files to live
hdf5_loc = "/Users/dflemin3/Desktop/GM_run/"

# How you wish the data to be ordered (grid for grid simulation suites, none otherwise)
order = "none"

# Format of the data (default, no need to set this)
fmt = "hdf5"

# Ignore simulations that halted at some point?
remove_halts = False

# Any bodies whose output you wish to ignore?
skip_body = ["primary.in"]

# Any parameters not in a body's .forward files that you want?
var_from_log = {"secondary" : ["Mass"], "cbp" : ["Mass"]}

# Any paramemters from an input file?
var_from_infile = None # Used just like var_from_log

# An optional kwarg that has extract_data_hdf5 output which simulation it's on
# every cadence steps for int cadence
cadence = 100

# Compression algorithm to use
compression = None #"gzip"

# Path to the hdf5 dataset(s)
dataset_parallel = os.path.join(hdf5_loc,"Parallel/simulation")
dataset_serial = os.path.join(hdf5_loc,"Serial/simulation")

** Extract the data ! **

---

Now actually extract the simulation data in serial or in parallel by changing the ```parallel``` flag.

In [3]:
# In serial!

# Use all processors? Best if used on a cluster (one node with many cores)
parallel = False

data_serial = de.extract_data_hdf5(src=src, dataset=dataset_serial, order=order, remove_halts=remove_halts,
                           skip_body=skip_body, compression=compression, var_from_infile=var_from_infile,
                           cadence=cadence, parallel=parallel)
print(data_serial)

Creating hdf5 dataset /Users/dflemin3/Desktop/GM_run/Serial/simulation:
Finding simulation subdirectories in /Users/dflemin3/Desktop/GM_run/Data ordered by none.
Skipped primary.in.
Infiles: ['cbp.in', 'secondary.in']
Data Columns: {'cbp.in': [u'Time', u'LongP', u'SemimajorAxis', u'Eccentricity', u'dIncBinary'], 'secondary.in': [u'Time', u'Radius', u'SemimajorAxis', u'Eccentricity', u'RotRate', u'MeanMotion']}
Simulations processed so far: 100
Simulations processed so far: 200
Simulations processed so far: 300
Simulations processed so far: 400
Simulations processed so far: 500
Simulations processed so far: 600
Name: /Users/dflemin3/Desktop/GM_run/Serial/simulation.hdf5. Size: 625. Order: none


In [4]:
# In parallel!

# Use all processors? Best if used on a cluster
parallel = True

data_parallel = de.extract_data_hdf5(src=src, dataset=dataset_parallel, order=order, remove_halts=remove_halts,
                           skip_body=skip_body, compression=compression, var_from_infile=var_from_infile,
                           cadence=cadence, parallel=parallel)
print(data_parallel)

Creating hdf5 dataset /Users/dflemin3/Desktop/GM_run/Parallel/simulation:
Finding simulation subdirectories in /Users/dflemin3/Desktop/GM_run/Data ordered by none.
Skipped primary.in.
Infiles: ['cbp.in', 'secondary.in']
Data Columns: {'cbp.in': [u'Time', u'LongP', u'SemimajorAxis', u'Eccentricity', u'dIncBinary'], 'secondary.in': [u'Time', u'Radius', u'SemimajorAxis', u'Eccentricity', u'RotRate', u'MeanMotion']}
Simulations processed so far: 100
Simulations processed so far: 100
Simulations processed so far: 100
Simulations processed so far: 100
Name: [u'/Users/dflemin3/Desktop/GM_run/Parallel/simulation_0.hdf5', u'/Users/dflemin3/Desktop/GM_run/Parallel/simulation_1.hdf5', u'/Users/dflemin3/Desktop/GM_run/Parallel/simulation_2.hdf5', u'/Users/dflemin3/Desktop/GM_run/Parallel/simulation_3.hdf5']. Size: 625. Order: none


** Dataset object methods **

---

The dataset object has some useful methods and attributes.  Let's test them out for both the parallel and the serial datasets to ensure they return the same value.

In [5]:
print(data_serial.input_files)
print(data_parallel.input_files)

['cbp.in', 'secondary.in']
['cbp.in', 'secondary.in']


In [6]:
print(data_serial.data_cols)
print(data_parallel.data_cols)

[u'cbp_Time' u'cbp_LongP' u'cbp_SemimajorAxis' u'cbp_Eccentricity'
 u'cbp_dIncBinary' u'secondary_Time' u'secondary_Radius'
 u'secondary_SemimajorAxis' u'secondary_Eccentricity' u'secondary_RotRate'
 u'secondary_MeanMotion']
[u'cbp_Time' u'cbp_LongP' u'cbp_SemimajorAxis' u'cbp_Eccentricity'
 u'cbp_dIncBinary' u'secondary_Time' u'secondary_Radius'
 u'secondary_SemimajorAxis' u'secondary_Eccentricity' u'secondary_RotRate'
 u'secondary_MeanMotion']


In [7]:
# Given a simulation number, what is the simulation's name?
print(data_serial.sim_name(25))
print(data_parallel.sim_name(25))

GM_simbe0_ba1_cfe0_ca0
GM_simbe0_ba1_cfe0_ca0


In [8]:
# How is the data ordered?
print(data_serial.order)
print(data_parallel.order)

none
none


In [9]:
# What is the size of the dataset (number of simulations)?
print(data_serial.size)
print(data_parallel.size)

625
625


In [10]:
# What's the path to the actual hdf5 file?
print(data_serial.data)
print(data_parallel.data)

/Users/dflemin3/Desktop/GM_run/Serial/simulation.hdf5
[u'/Users/dflemin3/Desktop/GM_run/Parallel/simulation_0.hdf5', u'/Users/dflemin3/Desktop/GM_run/Parallel/simulation_1.hdf5', u'/Users/dflemin3/Desktop/GM_run/Parallel/simulation_2.hdf5', u'/Users/dflemin3/Desktop/GM_run/Parallel/simulation_3.hdf5']


**Accessing simulation data**

---

Ok we successfully processed and stored the data, now let's access it using the Dataset ```get``` method.  We'll use both the serial and parallel Dataset objects to ensure identical results.

In [11]:
# Pick some random simulation: do serial and parallel yield the same result?
time_s, ecc_s = data_serial.get(399,"secondary",["Time","Eccentricity"])
print("Time:",time_s[0:10],"...")
print("Ecc:",ecc_s[0:10],"...")

time_p, ecc_p = data_parallel.get(399,"secondary",["Time","Eccentricity"])
print("Time:",time_p[0:10],"...")
print("Ecc:",ecc_p[0:10],"...")

# Are they the same array?
print(np.allclose(ecc_s,ecc_p))

Time: [       0.  1000000.  2000000.  3000000.  4000000.  5000000.  6000000.
  7000000.  8000000.  9000000.] ...
Ecc: [ 0.22750001  0.21699999  0.198797    0.194611    0.192672    0.19154
  0.190805    0.190299    0.18993799  0.189674  ] ...
Time: [       0.  1000000.  2000000.  3000000.  4000000.  5000000.  6000000.
  7000000.  8000000.  9000000.] ...
Ecc: [ 0.22750001  0.21699999  0.198797    0.194611    0.192672    0.19154
  0.190805    0.190299    0.18993799  0.189674  ] ...
True


In [12]:
# Now just access one variable
semi_s = data_serial.get(222,"secondary","SemimajorAxis")
semi_p = data_parallel.get(222,"secondary","SemimajorAxis")
print(np.allclose(semi_s,semi_p))

True


In [13]:
# Overkill: Loop though all sims for several parameters and ensure that they're the same
# Pick some random simulation: do serial and parallel yield the same result?
for ii in range(data_serial.size):
    # Serial
    semi_s, ecc_s = data_serial.get(ii,"secondary",["SemimajorAxis","Eccentricity"])

    # Parallel
    semi_p, ecc_p = data_parallel.get(ii,"secondary",["SemimajorAxis","Eccentricity"])
    
    # Are they the same array?
    res = np.allclose(ecc_s,ecc_p) & np.allclose(semi_s,semi_p)
    print(ii,res)

0 True
1 True
2 True
3 True
4 True
5 True
6 True
7 True
8 True
9 True
10 True
11 True
12 True
13 True
14 True
15 True
16 True
17 True
18 True
19 True
20 True
21 True
22 True
23 True
24 True
25 True
26 True
27 True
28 True
29 True
30 True
31 True
32 True
33 True
34 True
35 True
36 True
37 True
38 True
39 True
40 True
41 True
42 True
43 True
44 True
45 True
46 True
47 True
48 True
49 True
50 True
51 True
52 True
53 True
54 True
55 True
56 True
57 True
58 True
59 True
60 True
61 True
62 True
63 True
64 True
65 True
66 True
67 True
68 True
69 True
70 True
71 True
72 True
73 True
74 True
75 True
76 True
77 True
78 True
79 True
80 True
81 True
82 True
83 True
84 True
85 True
86 True
87 True
88 True
89 True
90 True
91 True
92 True
93 True
94 True
95 True
96 True
97 True
98 True
99 True
100 True
101 True
102 True
103 True
104 True
105 True
106 True
107 True
108 True
109 True
110 True
111 True
112 True
113 True
114 True
115 True
116 True
117 True
118 True
119 True
120 True
121 True
122 True
123

** Aggregating the data **

---

Suppose we want to again process the simulation results to make an array of all the simulation initial conditions to do machine learning or some other task using the aggregate_data method.  Below I show that it proceeds identically for both the serial and parallel modes!

In [14]:
# Trivial function to return initial eccentricity
# This function assumes an hdf5 format
def trivial(data,sim,body,key=None,fmt="hdf5"):
    # Check out the get function in action!
    return data.get(sim,body,key)[0]

In [15]:
# Define the bodies and body variables to extract using a dictionary
bodies = {'cbp': ['Eccentricity', 'SemimajorAxis'],'secondary': ['SemimajorAxis','Eccentricity']}

# Define the new value (dataframe column) to produce for a given body.  The new column
# and how to calculate it are given as a dictionary for each body.

# New column for the cbp is "InitEcc" and is calculated using the function "trivial"
new_cols = {"cbp" : {"InitEcc" : trivial}}

# Define any keyword arguments trivial might need
kw = {"key" : "Eccentricity"}

In [16]:
# Serial
df_s = de.aggregate_data(data_serial, bodies=bodies, ind=0, funcs={"cbp" : {"SemimajorAxis" : np.mean}},
                    new_cols=new_cols,cache=os.path.join(hdf5_loc,"Serial","trivial_cache.pkl"),fmt=fmt,**kw)

Caching data at /Users/dflemin3/Desktop/GM_run/Serial/trivial_cache.pkl


In [17]:
# Serial
df_p = de.aggregate_data(data_parallel, bodies=bodies, ind=0, funcs={"cbp" : {"SemimajorAxis" : np.mean}},
                    new_cols=new_cols,cache=os.path.join(hdf5_loc,"Parallel","trivial_cache.pkl"),fmt=fmt,**kw)

Caching data at /Users/dflemin3/Desktop/GM_run/Parallel/trivial_cache.pkl


** Are the two dataframes equal? **

In [18]:
print(df_s.equals(df_p))

True
