In [5]:
%cleanup -f

In [6]:
%%info

### Configure Spark Application using [Sparkmagic](https://github.com/jupyter-incubator/sparkmagic)

In [7]:
%%configure -f
{
    "driverMemory": "40G", 
    "conf": {
        "spark.driver.maxResultSize": "30G", 
        "spark.dynamicAllocation.enabled": "true", 
        "spark.sql.execution.arrow.enabled": "true", 
        "maximizeResourceAllocation": "true"
    }
}

In [None]:
%%help

In [8]:
from pprint import pprint
import pandas as pd
from pyspark import Row
from pyspark.sql import SparkSession, DataFrame

from cadCAD import configs
from cadCAD.utils.sys_exec import to_spark_df, to_pandas_df
from cadCAD.utils.jupyter import get_home_dir, set_write_path
from cadCAD.configuration.utils import configs_as_objs, configs_as_dataframe, configs_as_dicts
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from cadCAD.engine.execution import distributed_simulations
# from cadCAD.engine.execution import dist_simulation

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1589588649708_0021,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 0. Create / Encode System Model:

In [9]:
user = 'jovyan'
sc.addPyFile(get_home_dir(user)+"distroduce.zip")

# from distroduce.session import sc_alt as sc
# from distroduce.session import spark_alt as spark
from distroduce.engine import transform
from distroduce.reggression_tests.models import sweep_config
# System Model Intervention Point for RAD and Integration
# cadCAD Model as Transformer

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1. Choose Distributed Execution Mode:
**Simulation Execution Modes:**
    `cadCAD` executes a process per System Model Configuration and a thread per System Simulation.

**Class:** `cadCAD.engine.ExecutionMode`

**Closed Source Attributes:**

* **Distributed Mode:** concurrently executes simulations (runs) on AWS EMR cluster within multiple / n AWS worker 
instances per given System Model Configuration (Example: 
`cadCAD.engine.ExecutionMode().distributed`).
multiple processes per given System Model Configuration (Example: `cadCAD.engine.ExecutionMode().distributed`).

#### Value: 
This feature enables increased Memory, CPU speed, and storage capacity beyond users' local machines as well as the following 
capabilities. (**add Benchmark documentation here**)
* Low latent Stochastic (Monte-Carlo) simulation & Agent-Based Modeling via distributed execution on AWS computing 
cluster
* large simulated event-dataset generation and storage
* System Modeling integration into distributed data pipelines for "Big Data" transformation

##### [Open Source Attributes](https://github.com/BlockScience/cadCAD/blob/master/documentation/Simulation_Execution.md)

### 2. Create Execution Context using **Distributed** (Execution) Mode:

In [10]:
distributed_sims = distributed_simulations(transform)
exec_mode = ExecutionMode()
distributed_ctx = ExecutionContext(context=exec_mode.distributed, method=distributed_sims)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 3. Create **Distributed** Simulation Executor:*

In [11]:
run = Executor(exec_context=distributed_ctx, configs=configs, spark_context=sc)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4. *Execute **Distributed** Simulation: Produce System Event Dataset*
A Simulation execution produces a System Event Dataset and the Tensor Field applied to initial states used to create it.

In [12]:
rdd, tensor_fields, sessions = run.execute()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Configurations Length: 500
Execution Method: distroduce_proc
Execution Mode: remote_distributed

### RDD: Typless
* Perfomance: 
    - Interactive (the cell below) - 98% Improvement
    - Batch - 92% Improvement
    - Micro-Batch (near Real-Time) (TBD)

In [13]:
result: list = list(rdd.collect())
# If the user wants the result Data Structure for local use on the Master node

### Spark DataFrame:
* Typefull Conversions of Datasets have the shortest distributed execution time on Big Data Transformation / Generation
(Justification: Perfomance Optimization - https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/)
* Typeless Conversions of Datasets have a significantly shorter distributed execution time than local execution but silghtly longer than Typefull Conversions

##### Typefull Conversion: From RDD

In [11]:
# Fastest
sdf: DataFrame = to_spark_df(rdd, spark, sweep_config.genesis_states)
sdf.show(100)

+---+---+---+---+----+----------+-------+-------------------+--------+
|run| s1| s2| s3|  s4|simulation|substep|          timestamp|timestep|
+---+---+---+---+----+----------+-------+-------------------+--------+
|  1|0.0|0.0|1.0| 1.0|         0|      0|2018-10-01 15:16:24|       0|
|  1|0.0|2.0|3.0| 4.0|         0|      1|2018-10-01 15:16:25|       1|
|  1|2.0|2.0|3.0| 4.0|         0|      2|2018-10-01 15:16:25|       1|
|  1|0.0|0.0|3.0| 4.0|         0|      3|2018-10-01 15:16:25|       1|
|  1|0.0|2.0|3.0| 7.0|         0|      1|2018-10-01 15:16:26|       2|
|  1|2.0|2.0|3.0| 7.0|         0|      2|2018-10-01 15:16:26|       2|
|  1|0.0|0.0|3.0| 7.0|         0|      3|2018-10-01 15:16:26|       2|
|  1|0.0|2.0|3.0|10.0|         0|      1|2018-10-01 15:16:27|       3|
|  1|2.0|2.0|3.0|10.0|         0|      2|2018-10-01 15:16:27|       3|
|  1|0.0|0.0|3.0|10.0|         0|      3|2018-10-01 15:16:27|       3|
|  1|0.0|2.0|3.0|13.0|         0|      1|2018-10-01 15:16:28|       4|
|  1|2

In [13]:
sdf.show(2)

+---+---+---+---+---+----------+-------+-------------------+--------+
|run| s1| s2| s3| s4|simulation|substep|          timestamp|timestep|
+---+---+---+---+---+----------+-------+-------------------+--------+
|  1|0.0|0.0|1.0|1.0|         0|      0|2018-10-01 15:16:24|       0|
|  1|0.0|2.0|3.0|4.0|         0|      1|2018-10-01 15:16:25|       1|
+---+---+---+---+---+----------+-------+-------------------+--------+
only showing top 2 rows

In [12]:
sdf.count()

7500500

In [12]:
sdf.count()

45003000

In [68]:
# rdd.count()

45003000

##### Typeless Conversion

In [14]:
# long, and wont succede if the resulting dataset is larger than the allocated memory
# sdf: DataFrame = to_spark_df(rdd, spark)
# sdf.show(5)

### Pandas DataFrame:

##### Typefull Conversion: From Spark

In [13]:
# del pdf
pdf = sdf.toPandas()
pdf.head(5)

   run   s1   s2   s3   s4  simulation  substep            timestamp  timestep
0    1  0.0  0.0  1.0  1.0           0        0  2018-10-01 15:16:24         0
1    1  0.0  2.0  3.0  4.0           0        1  2018-10-01 15:16:25         1
2    1  2.0  2.0  3.0  4.0           0        2  2018-10-01 15:16:25         1
3    1  0.0  0.0  3.0  4.0           0        3  2018-10-01 15:16:25         1
4    1  0.0  2.0  3.0  7.0           0        1  2018-10-01 15:16:26         2

##### Typefull Conversion: From RDD

In [47]:
del pdf
pdf: pd.DataFrame = to_pandas_df(rdd, sweep_config.genesis_states)
pdf.head(5)

   run   s1   s2   s3   s4  simulation  substep            timestamp  timestep
0    1  0.0  0.0  1.0  1.0           0        0  2018-10-01 15:16:24         0
1    1  0.0  2.0  3.0  4.0           0        1  2018-10-01 15:16:25         1
2    1  2.0  2.0  3.0  4.0           0        2  2018-10-01 15:16:25         1
3    1  0.0  0.0  3.0  4.0           0        3  2018-10-01 15:16:25         1
4    1  0.0  2.0  3.0  7.0           0        1  2018-10-01 15:16:26         2

##### Typeless Conversion: From RDD

In [48]:
del pdf
pdf: pd.DataFrame = to_pandas_df(rdd)
pdf.head(5)

    s1   s2   s3   s4            timestamp  simulation  run  substep  timestep
0  0.0  0.0  1.0  1.0  2018-10-01 15:16:24           0    1        0         0
1  0.0  2.0  3.0  4.0  2018-10-01 15:16:25           0    1        1         1
2  2.0  2.0  3.0  4.0  2018-10-01 15:16:25           0    1        2         1
3  0.0  0.0  3.0  4.0  2018-10-01 15:16:25           0    1        3         1
4  0.0  2.0  3.0  7.0  2018-10-01 15:16:26           0    1        1         2

### 5. Save Simulation:

#### Spark DataFrame

In [71]:
file_path = set_write_path(sc, user, 'data/param_sweep_df')
sdf.write.format("parquet").mode("overwrite").save(file_path)

#### RDD

In [10]:
# No Overwrite
file_path = set_write_path(SC, user, 'data/param_sweep_rdd')
rdd.saveAsTextFile(file_path)

### 6. Read Simulation:

#### Spark DataFrame

In [34]:
df = spark.read.parquet(file_path)
df.show(5)

+---+---+---+----+-------------------+----------+---+-------+--------+
| s1| s2| s3|  s4|          timestamp|simulation|run|substep|timestep|
+---+---+---+----+-------------------+----------+---+-------+--------+
|2.0|2.0|3.0|14.0|2018-10-01 15:16:33|         0|  2|      2|       9|
|0.0|0.0|3.0|14.0|2018-10-01 15:16:33|         0|  2|      3|       9|
|0.0|2.0|3.0|17.0|2018-10-01 15:16:34|         0|  2|      1|      10|
|2.0|2.0|3.0|17.0|2018-10-01 15:16:34|         0|  2|      2|      10|
|0.0|0.0|3.0|17.0|2018-10-01 15:16:34|         0|  2|      3|      10|
+---+---+---+----+-------------------+----------+---+-------+--------+
only showing top 5 rows

#### RDD

In [11]:
# Specify file path
read_rdd = sc.textFile(file_path)
# Get result
rdd_result = read_rdd.collect()
pdf_from_rdd = pd.DataFrame(rdd_result)
del rdd_result
pdf_from_rdd

                                                          0
0         {'s1': 0.0, 's2': 0.0, 's3': 1.0, 's4': 1.0, '...
1         {'s1': 0, 's2': 2, 'timestamp': '2018-10-01 15...
2         {'s1': 2, 's2': 2, 'timestamp': '2018-10-01 15...
3         {'s1': 0, 's2': 0, 'timestamp': '2018-10-01 15...
4         {'s1': 0, 's2': 2, 'timestamp': '2018-10-01 15...
...                                                     ...
45002995  {'s1': 5, 's2': 5, 'timestamp': '2018-10-01 16...
45002996  {'s1': 0, 's2': 0, 'timestamp': '2018-10-01 16...
45002997  {'s1': 0, 's2': 5, 'timestamp': '2018-10-01 16...
45002998  {'s1': 5, 's2': 5, 'timestamp': '2018-10-01 16...
45002999  {'s1': 0, 's2': 0, 'timestamp': '2018-10-01 16...

[45003000 rows x 1 columns]

#### Pandas DataFrame

### System Configurations:

#### Configurations: Returned as List of Objects

In [21]:
fmt_configs = configs_as_objs(configs)
pprint(fmt_configs)
print()
pprint(fmt_configs[0].sim_config)

[<cadCAD.configuration.Configuration object at 0x116a21b90>,
 <cadCAD.configuration.Configuration object at 0x116a21c90>]

{'M': {'alpha': 1, 'beta': 2, 'gamma': 3, 'omega': 7},
 'N': 2,
 'T': range(0, 10),
 'run_id': 1,
 'simulation_id': 0}


#### Configurations: Returned as Pandas DataFrame

In [23]:
configs_df = configs_as_dataframe(configs)
configs_df

Unnamed: 0,session_id,user_id,simulation_id,run_id,sim_config,initial_state,seeds,env_processes,exogenous_states,partial_state_updates,policy_ops,kwargs
0,cadCAD_user=0_1,cadCAD_user,0,1,"{'N': 2, 'T': (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), ...","{'s1': 0.0, 's2': 0.0, 's3': 1.0, 's4': 1.0, '...","{'z': RandomState(MT19937), 'a': RandomState(M...","{'s3': [<function <lambda> at 0x116a23b00>, <f...",{},[{'policies': {'p1': <function p1m1 at 0x116a1...,[<function <lambda> at 0x116867d40>],{}
1,cadCAD_user=1_1,cadCAD_user,1,1,"{'N': 2, 'T': (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), ...","{'s1': 0.0, 's2': 0.0, 's3': 1.0, 's4': 1.0, '...","{'z': RandomState(MT19937), 'a': RandomState(M...","{'s3': [<function <lambda> at 0x116a23b00>, <f...",{},[{'policies': {'p1': <function p1m1 at 0x116a1...,[<function <lambda> at 0x116867d40>],{}


#### Configurations: Returned as List of Dictionaries

In [15]:
configs_dicts: list = configs_as_dicts(configs)
pprint(configs_dicts[0]['sim_config'])

{'env_processes': {'s3': [<function <lambda> at 0x7f8f9c99bd90>,
                          <function <lambda> at 0x7f8f9c9a11e0>],
                   's4': <function env_trigger.<locals>.trigger.<locals>.env_update at 0x7f8f9c9a12f0>},
 'exogenous_states': {},
 'initial_state': {'s1': 0.0,
                   's2': 0.0,
                   's3': 1.0,
                   's4': 1.0,
                   'timestamp': '2018-10-01 15:16:24'},
 'kwargs': {},
 'partial_state_updates': [{'policies': {'p1': <function p1m1 at 0x7f8f9c985ea0>,
                                         'p2': <function p2m1 at 0x7f8f9c985f28>},
                            'variables': {'s1': <function s1m1 at 0x7f8f9c99b268>,
                                          's2': <function s2m1 at 0x7f8f9c99b2f0>,
                                          's3': <function var_trigger.<locals>.<lambda> at 0x7f8f9c99bae8>,
                                          's4': <function var_trigger.<locals>.<lambda> at 0x7f8f9c99bea0>,
 