# Creating a spark cluster on demand

We're going to look at how to create a spark cluster from the notebook using the oshinko REST server and then run the notebook app against it. The cluster configuration that we'll pass in has options set to turn on metrics reporting and the elastic worker daemon.


## Generating a random cluster name

We're going to create an ephemeral cluster, so we aren't too concerned with the cluster name. We just use a name with a random 4 character suffix.


In [None]:
import random
import string

clustername = "cluster-" + ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.digits) for _ in range(4))
print("The cluster will be named " + clustername)

## Finding the IP address of the oshinko-rest server

Since the oshinko-rest pod was already running in the project when we launched the notebook, the host and port for the oshinko-rest service can be found in the local environment

In [None]:
import os

oshinko_server = os.environ.get("OSHINKO_REST_SERVICE_HOST") + ":" + os.environ.get("OSHINKO_REST_SERVICE_PORT")

## Sending a cluster creation request to oshinko

The next step is to send a request to oshinko to create the cluster.

### The cluster configuration object

Rather than set all the options inline, we'll use a previoulsy created cluster configuration to tell oshinko how to configure the cluster. The configuration is stored in a Kubernetes object called a *configmap* and it's JSON representation looks like this:
```json
{
    "kind": "ConfigMap",
    "apiVersion": "v1",
    "metadata": {
        "name": "clusterconfig",
        "creationTimestamp": null
    },
    "data": {
        "metrics.enable": "true",
        "scorpionstare.enable": "true",
        "sparkimage": "docker.io/manyangled/var-spark-worker:latest",
        "sparkmasterconfig": "masterconfig",
        "sparkworkerconfig": "workerconfig"
    }
}
```

Notice that we've enabled metrics and the elastic worker daemon (scoripon stare) and specified our custom spark image which contains our pre-loaded data. We also see that the names of two other configmaps, *masterconfig* and *workerconfig*, are used to provide the spark configuration for the master and workers. These other configmaps contain spark configuration files that will be written to $SPARK_HOME/conf in the master and worker containers.

One additional detail -- since we did not specify *workercount* here, we will get the default number of workers (1).

### Making the request and parsing the response

To create the cluster, we simply post a request to oshinko giving the cluster name and the configuration.
The response from the server will tell us things like how many workers were created and what the URL is for the spark master. In this case, we also want the host and port for the metrics server which by convention will always be the name of the cluster with a suffix of *-metrics* and port 8000 respectively.

In [None]:
import requests

r = requests.post("http://%s/clusters" % oshinko_server,
                      json={"name": clustername, 
                            "config": {"name": "clusterconfig"}
                           }
                 )
json = r.json()
desired_workers = json['cluster']['config']['workerCount']
spark_master = json["cluster"]["masterUrl"]
metrics_server = clustername + "-metrics:8000"

## Let's get fancy -- using the metrics server to monitor worker startup

Since we know we've enabled metrics for use by the elastic worker daemon, we can go ahead and use the same metrics server to wait for the workers to register with the master. Here we poll the metrics server until *master.aliveWorkers* is equal to the number of workers that oshinko told us it created. When all of the expected workers have registered with the master we'll drop out of the loop.

In [None]:
import sys
import time

def latest_metric_value(metrics_ip, metric):
    recent_query = "http://%s/render" % metrics_ip
    
    # We may actually try to contact the metrics service before it's up, so catch connection exceptions here
    try:
        qr = requests.get(recent_query, params={"target": metric, "format": "json", "from": "-1min"})
    except:
        return None
    
    if qr.status_code != 200:
        sys.stderr.write("query code error: %d\n" % (qr.status_code))
        return None
    qj = qr.json()
    if len(qj) != 1: return None
    dp = [e[0] for e in qj[0]["datapoints"] if e[0] is not None]
    if len(dp) < 1: return None
    return dp[-1]

print("Waiting for workers", end="")
cnt = 0
while True:
    workers = latest_metric_value(metrics_server, "master.aliveWorkers")
    if workers != None and workers >= desired_workers:
        break
    if cnt % 3 == 0:
        print("...",end="")
    time.sleep(1)
    cnt += 1
print()

## Turning on metrics reporting for the spark driver (our notebook)

In order for the elastic worker daemon to function, the spark driver must also report metrics. We must tell spark what kind of metrics to send, how often, and where to send them. The metrics host by convention will be the cluster name plus a suffix of *-carbon* and it will listen on port 2003. Here we write out the required config to `$SPARK_HOME/conf`.

In [None]:
import textwrap

spark_conf = "%s/conf/" % os.environ.get("SPARK_HOME")

metrics_properties = textwrap.dedent('''
    *.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
    *.sink.graphite.host=%s
    *.sink.graphite.port=2003
    *.sink.graphite.period=5
    *.sink.graphite.unit=seconds

    # Enable JvmSource for instance driver
    driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource'''
) % (clustername + "-carbon")

with open(spark_conf + "metrics.properties", "w") as myfile:
    myfile.write(metrics_properties[1:])

## Turning on dynamic allocation in the driver

In order for the elastic worker to receive requests for extra executors, dynamic allocation must also be enabled. We set that here and write out the configuration to `$SPARK_HOME/conf`.

In [None]:
spark_defaults = textwrap.dedent('''
    spark.dynamicAllocation.enabled true
    spark.shuffle.service.enabled true
    spark.dynamicAllocation.executorIdleTimeout 300s
    spark.dynamicAllocation.schedulerBacklogTimeout 1s
    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
    spark.dynamicAllocation.maxExecutors 4'''
)

with open(spark_conf + "spark-defaults.conf", "w") as myfile:
    myfile.write(spark_defaults[1:])

## On to the application

At this point the spark cluster has been created and both the cluster and the driver are configured for elastic workers. All that remains is to use the *spark_master* value from oshinko to build the spark context and run the application. When the application is done, we'll tell oshinko to delete the cluster.

# Value-at-risk calculations

The basic idea behind the value-at-risk calculation is that we're going to look at the historical returns of a portfolio of securities and run many simulations to determine the range of returns we can expect from these.  We can then predict, over a given time horizon, what our expected loss is at a given probability, e.g., we might say that there is less than a 10% chance that the portfolio will lose more than $1,000,000.

Note that this is a didactic example and consequently makes some simplifying assumptions about the composition of the portfolio (i.e., only long positions in common stocks, so no options, dividends, or short selling) and the behavior of the market (i.e., day-to-day return percentages are normally-distributed and independent).  Do not use this code to guide actual investment decisions!

## Basic setup

Here we import the `pyspark` module and set up a `SparkSession`.

In [None]:
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext

spark = SparkSession.builder.master(spark_master).getOrCreate()

## Loading the data

We'll start by loading the data (from the WikiEOD dataset of freely-available stock closing prices).

In [None]:
df = spark.read.load("/data/wikieod.parquet")

## Calculating historical returns

We'll use Spark's windowing functions over data frames to determine the percentage change in each security from the previous close to each day's close.  Basically, we'll add a column to our data frame that represents the percentage change from the previous day's close (that is, `lag("close", 1)` when partitioned by ticker symbol and ordered by date) and the current day's close (that is, `col("close")`).

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import lag, col, avg, variance

ddf = df.select("ticker", "date", "close").withColumn("change", (col("close") / lag("close", 1).over(Window.partitionBy("ticker").orderBy(df["date"])) - 1.0) * 100)
ddf.show(10)

## Characterizing expected return distributions

With the range of changes now available, we can calculate our expected returns for each security.  Since this is a simple example, we'll assume that returns are normal (in the real world, you'd want to use a distribution with heavier tails or a more sophisticated technique altogether).  We can calculate the parameters for each security's distribution as follows:

In [None]:
from pyspark.sql.functions import sqrt
mv = ddf.groupBy("ticker").agg(avg("change").alias("mean"), sqrt(variance("change")).alias("stddev"))
mv.show(10)

Since there are only about 3,000 ticker symbols in our data set, we can easily collect these in driver memory for use in our simulation:

In [None]:
dist_map = mv.rdd.map(lambda r: (r[0], (r[1], r[2]))).collectAsMap()
dist_map["RHT"]

## Getting current security prices

We'll now identify the latest price for each security in our dataset:

In [None]:
from pyspark.sql.functions import first
priceDF = ddf.orderBy("date", ascending=False).groupBy("ticker").agg(first("close").alias("price"), first("date").alias("date"))
priceDF.show(10)

prices = priceDF.rdd.map(lambda r: (r[0], r[1])).collectAsMap()

## Setting up a simulation

We'll now define our simulation.  This involves three steps: 

1.  We'll start by generating a random portfolio of securities (a map from ticker symbols to values); then, 
2.  we'll decide how many simulations to run and generate a random seed for each; and, finally
3.  we'll actually run the simulation for a given number of training days, updating the value of our portfolio with random returns sampled from the distribution of historical returns.

Generating the random portfolio is pretty straightforward:

In [None]:
from random import randint, seed

def random_portfolio(symbols):
    result = {}
    for s in symbols:
        result[s] = prices[s] * (randint(1, 1000) * 11)
    return result

def portfolio_value(pf):
    return sum([v for v in pf.values()])

seed(0xdea110c8)

portfolio = random_portfolio(ddf.select("ticker").distinct().sample(True, 0.01, 0xdea110c8).rdd.map(lambda r: r[0]).collect())

As is generating a collection of random seeds:

In [None]:
def seeds(count):
    return [randint(0, 1 << 32 - 1) for i in range(count)]

We'll define a single step of our simulation (and, subsequently, a whole run of the simulation) next:

In [None]:
def simstep(pf, params, prng):
    def daily_return(sym):
        mean, stddev = params[sym]
        change = (prng.normalvariate(mean, stddev) + 100) / 100.0
        return change
    return dict([(s, daily_return(s) * v) for s, v in pf.items()])

def simulate(seed, pf, params, days):
    from random import Random
    prng = Random()
    prng.seed(seed)
    pf = pf.copy()
    
    for day in range(days):
        pf = simstep(pf, params, prng)
    return pf

Now we have everything we need to actually run the simulation.  For each seed, we'll spawn a Spark job to simulate 5 days of activity on our portfolio and then return the total dollar value of our gain or loss at the end of the period.

In [None]:
sc = spark.sparkContext
seed_rdd = sc.parallelize(seeds(10000))
bparams = sc.broadcast(dist_map)
bpf = sc.broadcast(portfolio)
initial_value = portfolio_value(portfolio)

results = seed_rdd.map(lambda s: portfolio_value(simulate(s, bpf.value, bparams.value, 5)) - initial_value)

In [None]:
simulated_results = list(zip(results.collect(), seed_rdd.collect()))
simulated_values = [v for (v, _) in simulated_results]
simulated_values.sort()

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'svg'

import matplotlib
import numpy as np
import matplotlib.pyplot as plt

_ = plt.hist(simulated_values, bins=25)


In [None]:
xvals = [float(i) / len(simulated_values) for i in range(len(simulated_values))]
_ = plt.plot(xvals, simulated_values)

Since the market historically trends up, we have a better than even chance of not losing money in our simulation.  We can see the 5% value at risk over our time horizon.

In [None]:
simulated_values[int(len(simulated_values) * 0.05)]

## Visualizing random walks

Finally, let's look at some example simulation runs to see how the portfolio value changes over time.  We'll take the runs with the best and worst returns and also the runs at each decile.  To visualize our simulation, we'll need a slightly different `simulate` function:

In [None]:
def simulate_with_history(seed, pf, params, days):
    from random import Random
    prng = Random()
    prng.seed(seed)
    pf = pf.copy()
    values = [portfolio_value(pf)]
    
    for day in range(days):
        pf = simstep(pf, params, prng)
        values.append(portfolio_value(pf))
    return values

We'll now repeat the simulation on eleven of our seeds from the earlier run, collecting history for each:

In [None]:
simulated_results.sort()
eleven_results = [simulated_results[int((len(simulated_results) - 1) * i / 10)] for i in range(11)]
eleven_seeds = sc.parallelize([seed for (_, seed) in eleven_results])
walks = eleven_seeds.map(lambda s: simulate_with_history(s, bpf.value, bparams.value, 5))

walk_results = walks.collect()

In [None]:
_ = plt.plot([list(c) for c in zip(*walk_results)])

## Deleting our ephemeral cluster

If we do not want to keep the cluster around for other applications, we can use oshinko to delete it.

In [None]:
import requests

r = requests.delete("http://%s/clusters/%s" % (oshinko_server, clustername))
if r.status_code == 204:
    print("Cluster %s has been deleted" % clustername)
