# Getting Started

## Contents
```
1. Reading Halos
2. Processing Data
3. Saving the results
4. Beyond the basics
4.1. Taking a reduced random sample
4.2. Available redshits
4.3. Inspecting the halos dataframe
4.4. Getting a summary of basic statistics about a DataFrame in Spark
4.5. Spark Tips
4.6. Calculating the median
4.7. Additional notes
```

Before you start, remember to import this:

In [6]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, expr

## 1. Reading Halos

We start loading all the halos:

In [7]:
%%time
all_halos = spark.read.parquet('/user/csiaafpr/RockstarExtendedParquet/')

CPU times: user 3.03 ms, sys: 2.84 ms, total: 5.87 ms
Wall time: 22.2 s


But before we proceed with our anlysis it is always important to **restrict them to the redshift we are interested in**:

In [8]:
halos = all_halos.where(col('redshift') == 0)

**The list of availabe redshifts can be found here:
[UCHUU Snapshot Redshift correspondences](http://www.skiesanduniverses.org/resources/Uchuu_snapshot_redshift_scalefactor.txt)**

We can also select several redshifts:

In [11]:
halos = all_halos.where((col('redshift') == 1.54) | (col('redshift') == 0.49))

Or even a range of redshits:

In [13]:
halos = all_halos.where((col('redshift') < 1.54) & (col('redshift') > 0.49))

If you are familiar with SQL you can also express the condition as a SQL predicate:

In [14]:
halos = all_halos.where('redshift > 0.10 and redshift < 0.50')

Select HOST halos for `Z=0` (redshift=0) in the Mvir range `cmass_min` - `cmass_max`

In [23]:
cmass_min = 2.00e15
cmass_max = 2.03e15

hosts = all_halos.where((col('redshift') == 0.49)
                    & (col('pid') == -1)
                    & (col('Mvir') > cmass_min)
                    & (col('Mvir') < cmass_max))

In the most generic case, we can even restrict the halos selected indicating additional conditions they must fullfil and selecting just part of the columns of the dataframe, including additional computed ones and taking just a random sample of the halos:

In [32]:
halos = (all_halos.where((col('redshift') == 1.54)
                    & (col('pid') == -1)
                    & (col('Mvir') > 1.0e14)
                    & (col('Mvir') < 1.3e14)
                    & (col('Xoff')/col('Rvir') < 0.05)
                    & (col('Spin') < 0.03))
              .select('id', 'x', 'y', 'z', 'vx', 'vy', 'vz', 'Mvir', 
                      'Rvir', expr('Rvir/Rs_Klypin'))
              .sample(0.08))

Count the number of of halos we have selected:

In [36]:
%%time
halos.count()

CPU times: user 5 ms, sys: 1.21 ms, total: 6.21 ms
Wall time: 26 s


31

Show two of the halos in our selection (keep in mind that it will have to compute the additional columns so this will take some time):

In [35]:
%%time
halos.show(2)

+--------------+------------------+-------+-------+------+------+-------+--------+-------+------------------+
|            id|                 x|      y|      z|    vx|    vy|     vz|    Mvir|   Rvir|(Rvir / Rs_Klypin)|
+--------------+------------------+-------+-------+------+------+-------+--------+-------+------------------+
| 4226292660622|2.8652900000000003|623.168|388.151|-41.45|-352.1|-256.26|1.013E14| 1140.7|  6.73086568361922|
|81020308687922|           563.068|1758.17|1832.21|-13.99| 91.62| -99.75|1.135E14|1184.58|  6.28584467132214|
+--------------+------------------+-------+-------+------+------+-------+--------+-------+------------------+
only showing top 2 rows

CPU times: user 6.52 ms, sys: 1.64 ms, total: 8.16 ms
Wall time: 37.4 s


## 2. Processing the data

To get inspiration about how to process the data and do some actual computations you can look at the computing_subhalo_clusters.ipynb notebook.

## 3. Saving the results

In case you want to save the resulting dataframe you use:

In [27]:
halos.write.parquet('halos')

**Parquet is the recommended format to save data.**

You can later load again the dataframe from the stored data simply executing:

In [None]:
halos = spark.read.parquet('halos')

The results will be stored in HDFS. 

You can download them to your HOME directory and from there transfer them by SCP to your PC (just keep in mind that the size of the resulting dataframe is reasonable for this):

```
hdfs dfs -get hosts
```

If you prefer, you can also save the results using less efficient formats like CSV or JSON.

In [28]:
halos.write.csv('halos-csv')

In [29]:
halos.write.json('halos-json')

# 4. Beyond the basics

## Taking a reduced random sample

Sometimes it is useful to take a reduced random sample of the data for experimenting before moving to all the data. For example in this case we will take 1% of the selected halos:

In [None]:
sample = halos.sample(0.01)

## Available redshifts

The correspondence between the snapshot number and the redshift can be found here:
[UCHUU Snapshot Redshift correspondences](http://www.skiesanduniverses.org/resources/Uchuu_snapshot_redshift_scalefactor.txt)

To look a the list of redshifts currently available in the Hadoop cluster you can run (keep in mind that the list will be growing until we upload the 50 epochs):

In [20]:
all_halos.select('redshift').distinct().show(50)

+--------+
|redshift|
+--------+
|     0.0|
|    3.93|
|    1.54|
|    4.27|
|     0.7|
|     2.3|
|   0.022|
|    9.47|
|    2.03|
|    3.61|
|    1.65|
|    7.76|
|    0.63|
|    2.78|
|    3.31|
|    0.19|
|    1.22|
|    4.63|
|   11.51|
|    2.46|
|    0.49|
|     1.9|
|    0.14|
|    5.73|
|    7.02|
|    0.86|
|    0.78|
|    0.56|
|    2.95|
|    6.34|
|    1.32|
|    0.36|
|   13.96|
|    0.94|
|    1.12|
|    1.03|
|    0.25|
|    1.77|
|    0.43|
|     0.3|
|    8.58|
|   12.69|
|   10.44|
|   0.093|
|    5.16|
|    3.13|
|   0.045|
+--------+



## Inspecting the halos dataframe

To see the fields included in the dataframe:

In [22]:
all_halos.printSchema()

root
 |-- A_x: double (nullable = true)
 |-- A_x_500c: double (nullable = true)
 |-- A_y: double (nullable = true)
 |-- A_y_500c: double (nullable = true)
 |-- A_z: double (nullable = true)
 |-- A_z_500c: double (nullable = true)
 |-- Acc_Log_Vmax_1_Tdyn: double (nullable = true)
 |-- Acc_Log_Vmax_Inst: double (nullable = true)
 |-- Acc_Rate_100Myr: double (nullable = true)
 |-- Acc_Rate_1_Tdyn: double (nullable = true)
 |-- Acc_Rate_2_Tdyn: double (nullable = true)
 |-- Acc_Rate_Inst: double (nullable = true)
 |-- Acc_Rate_Mpeak: double (nullable = true)
 |-- Acc_Scale: double (nullable = true)
 |-- Breadth_first_ID: long (nullable = true)
 |-- Depth_first_ID: long (nullable = true)
 |-- First_Acc_Mvir: double (nullable = true)
 |-- First_Acc_Scale: double (nullable = true)
 |-- First_Acc_Vmax: double (nullable = true)
 |-- Future_merger_MMP_ID: long (nullable = true)
 |-- Halfmass_Radius: double (nullable = true)
 |-- Halfmass_Scale: double (nullable = true)
 |-- Jx: double (nullable

## Getting a summary of basic statistics about a DataFrame in Spark

In [37]:
%%time
hosts.select('Mvir', 'Rvir', 'CvirKly').summary().show(truncate=False)

+-------+--------------------+------------------+------------------+
|summary|Mvir                |Rvir              |CvirKly           |
+-------+--------------------+------------------+------------------+
|count  |4                   |4                 |4                 |
|mean   |2.018E15            |2569.4049999999997|5.424931600046005 |
|stddev |6.164414002968972E12|2.5905533514418204|0.7664338349708684|
|min    |2.014E15            |2567.76           |4.371386643604742 |
|25%    |2.014E15            |2567.76           |4.371386643604742 |
|50%    |2.014E15            |2567.77           |5.387601362598065 |
|75%    |2.017E15            |2568.88           |5.803175291640688 |
|max    |2.027E15            |2573.21           |6.137563102340523 |
+-------+--------------------+------------------+------------------+

CPU times: user 5.23 ms, sys: 716 µs, total: 5.95 ms
Wall time: 8.77 s


## Getting a summary of statistics about a DataFrame in Pandas

In [39]:
%%time
hosts_pdf.describe()

CPU times: user 13.2 ms, sys: 7 µs, total: 13.2 ms
Wall time: 11.8 ms


Unnamed: 0,Mvir,Rvir,CvirKly
count,4.0,4.0,4.0
mean,2018000000000000.0,2569.405,5.424932
std,6164414000000.0,2.590553,0.766434
min,2014000000000000.0,2567.76,4.371387
25%,2014000000000000.0,2567.7675,5.133548
50%,2015500000000000.0,2568.325,5.595388
75%,2019500000000000.0,2569.9625,5.886772
max,2027000000000000.0,2573.21,6.137563


## Spark Tips

### Caching data

We can optimize performance asking spark to cache the dataframe in memory if we are going to perform several operations on the same dataframe, but use it carefully because you can ran out of memory.

We have serveral options:
- halos_0.cache(): to keep it in memory
- halos_0.persist(): to keep it in memory or disk

And when done we have to run:
- halos_0.unpersist()

### Using pyspark.sql.functions

It is also convenient, when we do not know the name of the function to apply, to use the `F.xxx` alternative notation, in this way we can automplete available functions (using `Tab`) and show help (using `Shift+Tab`):

In [16]:
%%time
halos_0.select(F.avg('Mvir')).show(truncate=False)

+------------------+
|         avg(Mvir)|
+------------------+
|8.4807751816282E10|
+------------------+

CPU times: user 1.6 ms, sys: 2.75 ms, total: 4.36 ms
Wall time: 5.31 s


### Creating a Python list of hosts ids

In [11]:
idhost = [row.id for row in distinct.select('id').collect()]

In [12]:
idhost

[3814028194986, 101876720636316, 139500640393732, 214679739377433]

## Calculating the median

In [74]:
hosts = hosts.withColumn('CvirKly', expr('Rvir/Rs_Klypin'))

### Alternative 1: using spark approxQuantile relativeError 1

NOTE: The third argument of `approxQuantile` indicates the `relativeError` between 0 and 1. 0 means we want to compute the exact quantile, this is much more expensive computationally.

Calculate the median of `Mvir`:

In [32]:
%%time
quantiles = hosts.approxQuantile('Mvir', [0.5], 1)
Mvir = quantiles[0]

CPU times: user 4.83 ms, sys: 3.4 ms, total: 8.23 ms
Wall time: 2.14 s


In [33]:
%%time
quantiles = hosts.approxQuantile('Rvir', [0.5], 1)
Rvir = quantiles[0]

CPU times: user 6.21 ms, sys: 462 µs, total: 6.67 ms
Wall time: 1.78 s


In [34]:
%%time
quantiles = hosts.approxQuantile('CvirKly', [0.5], 1)
CvirKly = quantiles[0]

CPU times: user 7.39 ms, sys: 1.85 ms, total: 9.24 ms
Wall time: 1.86 s


In [35]:
print('Properties Mean host:','Mvir median =', Mvir, 'Rvir =', Rvir, 'CvirKly =', CvirKly)

Properties Mean host: Mvir median = 2014000000000000.0 Rvir = 2567.76 CvirKly = 4.371386643604742


### Alternative 2: using spark approxQuantile full precision

In [14]:
%%time
quantiles = hosts.approxQuantile('Mvir', [0.5], 0)
Mvir = quantiles[0]

CPU times: user 6.54 ms, sys: 3.94 ms, total: 10.5 ms
Wall time: 2.56 s


NOTE: The third argument of `approxQuantile` indicates the `relativeError` between 0 and 1. 0 means we want to compute the exact quantile, this is much more expensive computationally.

In [15]:
%%time
quantiles = hosts.approxQuantile('Rvir', [0.5], 0)
Rvir = quantiles[0]

CPU times: user 5.16 ms, sys: 4.53 ms, total: 9.69 ms
Wall time: 2.09 s


In [19]:
%%time
quantiles = hosts.approxQuantile('CvirKly', [0.5], 0)
CvirKly = quantiles[0]

CPU times: user 6.96 ms, sys: 1.2 ms, total: 8.15 ms
Wall time: 1.99 s


In [20]:
print('Properties Mean host:','Mvir median =', Mvir, 'Rvir =', Rvir, 'CvirKly =', CvirKly)

Properties Mean host: Mvir median = 2014000000000000.0 Rvir = 2567.77 CvirKly = 5.387601362598065


### Alternative 3: using python3 statitistics package

In [21]:
Mvirs = [row.Mvir for row in hosts.select('Mvir').collect()]
Rvirs = [row.Rvir for row in hosts.select('Rvir').collect()]
CvirKlys = [row.CvirKly for row in hosts.select('CvirKly').collect()]

In [22]:
from statistics import median

In [23]:
%%time
Mvir_median = median(Mvirs)
Rvir_median = median(Rvirs)
CvirKly_median = median(CvirKlys)

CPU times: user 13 µs, sys: 3 µs, total: 16 µs
Wall time: 19.8 µs


In [28]:
print('Properties Mean host:','Mvir median =', Mvir_median, 'Rvir =', Rvir_median, 'CvirKly =', CvirKly_median)

Properties Mean host: Mvir median = 2015500000000000.0 Rvir = 2568.325 CvirKly = 5.595388327119377


### Alternative 4: using pandas

In [41]:
%%time
hosts_pdf = hosts.select('Mvir', 'Rvir', 'CvirKly').toPandas()

CPU times: user 7.36 ms, sys: 1.56 ms, total: 8.92 ms
Wall time: 1.19 s


In [36]:
%%time
Mvir_median_pandas = hosts_pdf['Mvir'].median()
Rvir_median_pandas = hosts_pdf['Rvir'].median()
CvirKly_median_pandas = hosts_pdf['CvirKly'].median()

CPU times: user 531 µs, sys: 0 ns, total: 531 µs
Wall time: 468 µs


In [31]:
print('Properties Mean host:','Mvir median =', Mvir_median_pandas, 'Rvir =', Rvir_median_pandas, 'CvirKly =', CvirKly_median_pandas)

Properties Mean host: Mvir median = 2015500000000000.0 Rvir = 2568.325 CvirKly = 5.595388327119377


# Additional Notes

## Pipeline
In general the programs are divided in:
- Phase 1: extracting & transforming data: selecting data and adding additional columns with some computations. We can dump the selected data to HDFS in parquet format if we want to reuse it.
- Phase 2: calculate some statistics in the extracted data
- Phase 3: saving the results

If we have custom python code we want to use we can parallelize the data processing using sc.parallelize().

## Ready to use programs with parameters
There are ready to use programs for specific calculations, in that case you only have specify the required parameters for the program and then launch it using:

    spark-submit compute-subhalo-clusters.py --z 0 --min_mass 3e10 --max_mass 10e10

## Conversion optimizations applied when transforming from HDF5 to Parquet

The original hdf5 files have been splitted into smaller files of 512MB so each one has one parquet row group.

Optimizations performed:
- The parquet row group size is 512MB
- Each file has been uploaded to hdfs with a blocksize of 512MB.
- The halos are `partitioned` by redshift

The correspondence between the snapshot number and the redshift can be found here:
[UCHUU Snapshot Redshift correspondences](http://www.skiesanduniverses.org/resources/Uchuu_snapshot_redshift_scalefactor.txt)