# B-Pipe Smart Partitioning
Since B-Pipe data cannot be load balanced against multiple servers, one may need to explicitly split queries into individual requests where each machine will handle different set of securities. However, some securities are overrepresented resulting in suboptimal partitioning. While some machines will struggle to cope up with volume of trades, others will remain mostly idle. But what if we could know in advance the expected volume of trades and split requests accordingly? Can we minimize variance across multiple partitions to ensure a more scalable and cost-effective solution? 

For the purpose of this exercise, let's access synthetic trade volumes for 900 random securities we want to pull out of B-Pipe. 

In [0]:
import pandas as pd
import json
securities_stats = pd.read_csv('data/security_volume.csv')
bpipe_securities = json.dumps(securities_stats.security.to_list())
display(securities_stats)

As expected, some securities may be over-represented (exhibit a power of law distribution, better visualized
in log scale below). Such a distribution will make partitioning tricky when distributing requests against multiple
executors. During our observation window, the vast majority of securities resulted in ~ 10,000 trades whilst some
were observed more than 10 million times

In [0]:
import numpy as np
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import colors
from matplotlib.ticker import PercentFormatter

x = securities_stats['volume']
_, bins = np.histogram(np.log10(x + 1), bins='auto')
fig, axs = plt.subplots(1,1,figsize=(10, 7),tight_layout=True,dpi=100)

# Remove axes splines
for s in ['top', 'bottom', 'left', 'right']:
    axs.spines[s].set_visible(False)

# Remove x, y ticks
axs.xaxis.set_ticks_position('none')
axs.yaxis.set_ticks_position('none')
   
# Add padding between axes and labels
axs.xaxis.set_tick_params(pad = 5)
axs.yaxis.set_tick_params(pad = 10)

# Compute histogram
N, bins, patches = axs.hist(x, bins=10**bins, rwidth=0.9)

# Setting color
fracs = ((N**(1 / 5)) / N.max())
norm = colors.Normalize(fracs.min(), fracs.max())
for thisfrac, thispatch in zip(fracs, patches):
    color = plt.cm.viridis(norm(thisfrac))
    thispatch.set_facecolor(color)

# Adding extra features   
plt.xlabel("trades")
plt.ylabel("securities")
plt.title('Volume of trades for securities (log scale)')

# Plot
plt.gca().set_xscale("log")

Let's start by randomly assigning a given partition for each given security. This will give us an indication of the actual over / under utilization of each machine.

In [0]:
num_partitions = 10
securities_stats['partition'] = [np.random.randint(0, num_partitions) for k in securities_stats.index]

Graph below exhibits a clear imbalance of input data. Whilst some machines will be over utilized, other won't,
resulting in a non-scalable and cost inefficient approach.

In [0]:
import numpy as np
import matplotlib.pyplot as plt

def trades_by_partition(df, file=None, dpi=100, y_lim=None):

  x = df.index
  y = df['sum']
  mu = np.average(y)
  sigma = np.std(y)
  
  fig, axs = plt.subplots(1,1,figsize=(15, 7),tight_layout=True,dpi=dpi)

  # Remove axes splines
  for s in ['top', 'bottom', 'left', 'right']:
      axs.spines[s].set_visible(False)

  # Remove x, y ticks
  axs.xaxis.set_ticks_position('none')
  axs.yaxis.set_ticks_position('none')

  # Add padding between axes and labels
  axs.xaxis.set_tick_params(pad = 5)
  axs.yaxis.set_tick_params(pad = 10)
  
  # Plot
  axs.bar(x, y, color = 'green', alpha = 0.5)
  axs.hlines(mu, xmin=0, xmax=num_partitions, color='coral', ls='--', lw=0.8)
  if y_lim:
    axs.set_ylim(0, y_lim)

  plt.xlabel("Partition Id")
  plt.ylabel("Volume of trades")
  plt.title(r"Volume of trades per partition: $\mu={:.2f}$ $\sigma={:.2f}$".format(mu, sigma))
  
  if file:
    plt.savefig(file)
  plt.show()

In [0]:
grouped_partitions = securities_stats.groupby('partition')['volume'].agg(['sum','count'])
trades_by_partition(grouped_partitions)

## Optimize partitions
Optimal partitioning can be learned for our requested securities following  ML best practices. Leveraging mlflow + [hyperopt](https://docs.databricks.com/machine-learning/automl-hyperparam-tuning/index.html), we can group different set of securities together to minimize variance across partitions so that each and every machine we spin up will be worth the while.

In [0]:
securities = securities_stats.set_index('security')['volume'].to_dict()

We define our objective function (our loss that we want to minimize) as the total variance of the
distribution of our trades.

In [0]:
import pandas as pd
import numpy as np
from hyperopt import hp, fmin, tpe, SparkTrials, STATUS_OK, space_eval
import mlflow

def objective_function(partitions):
  
  # retrieve volume of trades by security
  df = pd.DataFrame([[partitions[key], securities[key]] for key in securities.keys()], columns=['partition', 'volume'])
  
  # compute variance for this suggested partitioning
  grouped_df = df.groupby('partition')['volume'].agg(['sum','count'])
  var = np.var(grouped_df['sum'])

  # our objective is to minimize variance of volume of trade by partition
  return {'loss': var, 'status': STATUS_OK}

... And define our optimizer where each security can randomly select one of our 10 partitions.

In [0]:
search_space = {}
for security in securities.keys():
    search_space[security] = hp.randint(security, num_partitions)

We track progress using mlflow and ensure parallel execution of our optimizer using SparkTrials

In [0]:
with mlflow.start_run(run_name='bpipe_partitioning', nested=True) as run:
  
  argmin = fmin(
    fn=objective_function,
    space=search_space,
    algo=tpe.suggest, 
    max_evals=10, # we can limit the number of combinations to evaluate
    trials=SparkTrials(parallelism=10),  # we have 10 workers available for this task
    verbose=True
  )
  
  run_id = run.info.run_id

Finally, after X number of runs, we can access the best set of parameters (i.e. the ideal partition number) for each security we want to ingest through B-Pipe. Whilst some large volume securities may deserve their own executors, other may be bundled together to ensure effective parallelism

In [0]:
best_params = space_eval(search_space, argmin)

In [0]:
distributed_securities = pd.DataFrame(
  [[key, best_params[key], securities[key]] for key in securities.keys()],
  columns=['security', 'partition', 'volume']
)
distributed_securities.index = securities.keys()
display(distributed_securities.head(10))

Not obvious to the untrained eyes given the log scale of our plot, our optimization **reduced our initial
variance by 60%** (i.e. the imbalance) in expected throughput across all our 10 partitions.

In [0]:
optimized_partitions = distributed_securities.groupby('partition')['volume'].agg(['sum','count'])
trades_by_partition(optimized_partitions)

## Partition reference data
Our spark wrapper can accept our partitioning logic through the partitions option (taking a list of partitions as JSON array). In this case, we are explicitly indicating what security will be read from what partition in the form of the following spark options:

<br>

```
.option("securities", "['sec1', 'sec2']")
.option("partitions", "[1,2]")
```

In [0]:
import json
bpipe_securities = json.dumps(distributed_securities.security.to_list())
bpipe_partitions = json.dumps(distributed_securities.partition.to_list())

In [0]:
ref_data = (
  spark
    .read
    .format("//blp/refdata")
    .option("serviceName", "ReferenceDataRequest")
    .option("serviceHost", "127.0.0.1")
    .option("servicePort", 8954)
    .option("correlationId", 999)
    .option("fields", "['BID','ASK']")
    # our list of securities
    .option("securities", bpipe_securities)
    # our smart partitioning
    .option("partitions", bpipe_partitions) 
    .load()
)

display(ref_data)

Finally, we showed how to efficiently read from B-Pipe data and leverage the distributed nature of spark to create a resilient and cost efficient ingestion framework that feeds into downstream spark / delta based applications.