### Simple simplified use case of an IBOR solution
The scenario is as below:
* Each time an event (transaction) is received both cash and asset positions will be impacted
* Value will be added/subtracted to/from cash position for each event
* Value will be added/subtracted to/from asset position for each event
* Portfolio will be valuated for each event

### Create the connection setting to connect to Azure Event Hub

In [3]:
conf = {}
connectionString = "replace with your connection string key;EntityPath=replace with the name of your event hub"
conf["eventhubs.connectionString"] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)


### Create the Dataframe for consuming events from Azure Event Hub

In [5]:
transactionsStreamDf = spark.readStream.format("eventhubs").options(**conf).load()

### Schema creation for the events received

In [7]:
from pyspark.sql.types import *
import  pyspark.sql.functions as F

schema = StructType([
  StructField("ptf_id", IntegerType(), True),
  StructField("security_id", StringType(), True),
  StructField("quantity",IntegerType(), True),
  StructField("transaction_date",DateType(), True)]
)

### Transform the event received and enrichment witht the current time

In [9]:
transactionEventParsedDf = transactionsStreamDf.select(F.from_json(F.col("body").cast("string"),schema).alias("message")).withColumn("timestamp",F.current_timestamp())


Check schema structure

In [11]:
transactionEventParsedDf.schema

Run the following magic line if you want to re-run everything from scratch

In [13]:

%fs rm -r /ibor/transactions/checkpoints/json-events


### Start to stream

In [15]:
transactionEventParsedDf.writeStream.format("delta").outputMode("append").option("checkpointLocation","/ibor/transactions/checkpoints/json-events").queryName("Ibor").start("/ibor/transactions")

Events will be saved in a Delta table

In [1]:
#create a Delta table
spark.sql("CREATE TABLE transactions USING DELTA LOCATION '/ibor/transactions'")

The table can be queried with sql by using the magic line command

In [19]:
%sql select * from transactions

message,timestamp


### Portfolio creation
Portfolio creation  with the following values:
* 1 000 000 ==> cash
* 0 ==> asset

### Position creation
Positions will be created from the events received
<br> Position quantity will be updated for each transaction and will account for any previous transactions

### Price reference
In order to valuate the portfolio, mock price will be created

In [21]:
dfPortfolio = spark.createDataFrame([(1,"cash",1000000,"USD"),(1,"asset",0,"USD")],["ptf_id","type_position","value","currency"])

In [22]:
dfPortfolio.show()

Dataframe creation for the events saved in the table creation

In [24]:
dfTransactions = spark.sql("select message.*, timestamp from transactions")

In [25]:
dfTransactions.show(truncate = False)

In order to display for each event (so at a specific time) a snapshot of our portfolio, we need to account for previous events to valuate the portfolio correctly
<br> Window function will allow us to do it

In [27]:
from pyspark.sql import Window

window = Window.partitionBy("ptf_id").orderBy("timestamp").rangeBetween(Window.unboundedPreceding, Window.currentRow)

From the transactions recieved, creation of a dataframe positions to track the variation of the quantity for a specific security

In [29]:
dfPositions = dfTransactions.withColumn("quantity", F.sum("quantity").over(window))

In [30]:
dfPositions.show(truncate=False)

Join the portfolio and positions dataframe to be able to valuate the portfolio

In [32]:
iborDf = dfPortfolio.alias("ptf").join(dfPositions.alias("positions"),dfPortfolio.ptf_id == dfPositions.ptf_id)

Each event will impact a cash and an asset position

In [34]:
iborDf.show(truncate = False)

In order to valuate the porfolio for each event, we will create a reference price data

In [36]:
priceReferenceDF = spark.createDataFrame([("Security XYZ","200","USD")],["security_id", "price", "currency"])

In [37]:
priceReferenceDF.show()

Asset position will be impacted positively (addition) in case of "buy" event or negatively (subtraction) in case of a "sell" event

In [39]:
from pyspark.sql.functions import col,lit
iborWithAssetDf = iborDf.where("ptf.type_position == 'asset'").join(priceReferenceDF).withColumn("value",col("value") + (col("quantity") * col("price"))).select("ptf.ptf_id","type_position","value","timestamp")

For each event, the position value is updated

In [41]:
iborWithAssetDf.show(truncate = False)

The logic is similar for the cash position. The only difference is "buy" event impacts negatively the cash position and "sell" event impacts positively the postion.

In [43]:
iborWithCashDf = iborDf.where("ptf.type_position == 'cash'").join(priceReferenceDF).withColumn("type_positions",lit("cash")).withColumn("value",col("value") - (col("quantity") * col("price"))).withColumn("total_positions",col("quantity")).select("ptf.ptf_id","type_positions","value","timestamp")

In [44]:
iborWithCashDf.show(truncate = False)

Portflio can be now valuated after each event.
<br> In order to do so, we merge iborWithAssetDf and iborWithCashDf so cash and asset positions can be valuated for each timestamp.

In [46]:
iborRealTimeDf = iborWithCashDf.union(iborWithAssetDf)

### Portfolio Valuation

In [48]:
display(iborRealTimeDf)

ptf_id,type_positions,value,timestamp
1,cash,556000.0,2020-08-15T18:38:48.881+0000
1,cash,554000.0,2020-08-15T18:38:33.102+0000
1,cash,550000.0,2020-08-15T18:38:14.856+0000
1,cash,540000.0,2020-08-15T18:37:56.597+0000
1,cash,520000.0,2020-08-15T18:37:26.118+0000
1,cash,510000.0,2020-08-15T18:37:09.108+0000
1,cash,450000.0,2020-08-15T18:36:49.512+0000
1,cash,350000.0,2020-08-15T18:36:28.367+0000
1,cash,390000.0,2020-08-15T18:36:07.093+0000
1,cash,430000.0,2020-08-15T18:35:43.995+0000


Stop the stream

In [50]:
for s in spark.streams.active:
    s.stop()