# OSSF demo

In this notebook, we will demonstrate how **Legend** and **Morphir** models can be interpreted on **databricks** with minimum development overhead. We will be sourcing information with **Legend** and feed the resulting dataframe into a **Morphir** calculation.

Although we could directly query the underlying tables through the spark SQL API and its multiple joins and transformations (see below), we can benefit from the model created through the **Legend** framework and access its logical representation (mapping to a legend entity).

In [0]:
%sql
SELECT
  a.`id`,
  a.`currency`,
  a.`reporting_entity`,
  a.`product`,
  a.`sub_product`,
  a.`business_line`,
  m.`maturity_date`,
  m.`purchase_date`
FROM inflows.assets a
JOIN inflows.assets_maturity m
LIMIT 10

id,currency,reporting_entity,product,sub_product,business_line,maturity_date,purchase_date
750000,GBP,entity_1,EncumberedAssets,Currency and Coin,business_1,2022-12-15,2022-12-01
750000,GBP,entity_1,EncumberedAssets,Currency and Coin,business_1,2022-12-26,2022-12-04
750000,GBP,entity_1,EncumberedAssets,Currency and Coin,business_1,2022-12-23,2022-12-04
750000,GBP,entity_1,EncumberedAssets,Currency and Coin,business_1,2022-12-22,2022-12-04
750000,GBP,entity_1,EncumberedAssets,Currency and Coin,business_1,2022-12-01,2022-12-01
750000,GBP,entity_1,EncumberedAssets,Currency and Coin,business_1,2022-12-23,2022-12-01
750000,GBP,entity_1,EncumberedAssets,Currency and Coin,business_1,2022-12-04,2022-12-04
750000,GBP,entity_1,EncumberedAssets,Currency and Coin,business_1,2022-12-05,2022-12-01
750000,GBP,entity_1,EncumberedAssets,Currency and Coin,business_1,2022-12-30,2022-12-01
750000,GBP,entity_1,EncumberedAssets,Currency and Coin,business_1,2022-12-26,2022-12-04


## LEGEND

<img src='https://raw.githubusercontent.com/aamend/ossf_db_ms_gs/main/images/wf_1.png' width=800/>

## LEGEND ON DELTA
With our **Legend** model packaged as JAR and included as a cluster dependency, we can easily access each of its underlying entities, create tables programmatically or execute queries according to legend specifications. We show below how to load our model and access a given entity (a logical model mapped to a physical table)

<img src='https://raw.githubusercontent.com/aamend/ossf_db_ms_gs/main/images/wf_2.png' width=800/>

In [0]:
%scala
import org.finos.legend.spark.LegendClasspathLoader
val legend = LegendClasspathLoader.loadResources()

In [0]:
%scala
display(legend.getEntityNames.toList.toDF("legendEntity"))

legendEntity
lcr::entities::product
lcr::services::getInflows
lcr::lakehouse::finos
lcr::entities::collateralClass
lcr::services::getInflowsWithBuckets
lcr::lakehouse::assetMapping
lcr::entities::currency
lcr::entities::asset
lcr::lakehouse::store
lcr::lakehouse::databricks


In [0]:
%scala
display(legend.getSchema("lcr::entities::asset").fields.toList.map(_.toJson).toDF("legendField"))

legendField
"{""name"":""id"",""type"":""integer"",""nullable"":false,""metadata"":{}}"
"{""name"":""currency"",""type"":""string"",""nullable"":false,""metadata"":{}}"
"{""name"":""converted"",""type"":""boolean"",""nullable"":false,""metadata"":{}}"
"{""name"":""reportingEntity"",""type"":""string"",""nullable"":false,""metadata"":{}}"
"{""name"":""product"",""type"":""string"",""nullable"":false,""metadata"":{}}"
"{""name"":""subProduct"",""type"":""string"",""nullable"":true,""metadata"":{}}"
"{""name"":""marketValue"",""type"":""long"",""nullable"":false,""metadata"":{}}"
"{""name"":""lendableValue"",""type"":""long"",""nullable"":false,""metadata"":{}}"
"{""name"":""purchaseDate"",""type"":""date"",""nullable"":false,""metadata"":{}}"
"{""name"":""maturityDate"",""type"":""date"",""nullable"":false,""metadata"":{}}"


Accessing the underlying generated SQL code from pure to databricks SQL...

In [0]:
%scala
println(legend.generateSql("lcr::services::getInflows"))

... Or executing query directly resulting in a dataframe with both its technical and business constraints enforced (such as multiplicity constraints or enumerations)

In [0]:
%scala
val assetMapping = legend.query("lcr::services::getInflows")
display(assetMapping.limit(10))

product,subProduct,collateralClass,marketValue,maturityDate,encumbranceType,forwardStartAmount,forwardStartBucket,treasuryControl
UnencumberedAssets,Currency and Coin,a_0_Q,1444320.0,2022-12-01,encumbrance_1,,,True
Capacity,Level 1,a_1_Q,3194640.0,2022-12-06,encumbrance_2,2970648.0,2.0,True
UnrestrictedReserveBalances,Level 2a,a_2_Q,2460240.0,2023-01-03,encumbrance_3,,,False
RestrictedReserveBalances,Level 2b,a_3_Q,3733200.0,2023-01-03,encumbrance_4,,,True
UnsettledAssetPurchases,Non-HQLA,a_4_Q,3769920.0,2022-12-28,,3350088.0,5.0,True
ForwardAssetPurchases,No Collateral Pledged,a_5_Q,1358640.0,2022-12-26,encumbrance_2,946152.0,6.0,False
EncumberedAssets,Rehypothecateable Collateral Unencumbered,s_1_Q,477360.0,2023-01-04,encumbrance_3,140760.0,7.0,True
UnencumberedAssets,Unsettled (Regular Way),s_2_Q,3206880.0,2022-12-01,encumbrance_4,2892312.0,8.0,True
Capacity,Unsettled (Forward),s_3_Q,3414960.0,2022-12-04,encumbrance_1,2959632.0,9.0,True
UnrestrictedReserveBalances,firm long,s_4_Q,1578960.0,2022-12-19,encumbrance_2,826200.0,10.0,True


## MORPHIR
Finally, we were able to source or data through multiple JOINs operations and necessary transformations without having to write any complex SQL code. The same can be safely passed onto **Morphir** for rule based decisioning and aggregations as set by regulators. Each of those rules have been validated and unit tested through the **Morphir** framework.

<img src='https://raw.githubusercontent.com/aamend/ossf_db_ms_gs/main/images/wf_3.png' width=800/>

We may realize that our output dataframe may not fully comply with **Morphir** specifications (maturity date should be a bucket rather than a date). We can easily get back to legend and create a new service where all the necessary transformations are created (and unit tested) to transform raw data into **Morphir** ready data assets. By doing so, we comply with the "data contract" implied by the **Morphir** framework with no development overhead. An example is the `maturityBucket` that is derived from `transactionDate` and `maturityDate`.

In [0]:
%scala
val inflows = legend.query("lcr::services::getInflowsWithBuckets")
display(inflows.limit(10))

product,subProduct,collateralClass,marketValue,maturityBucket,encumbranceType,forwardStartAmount,forwardStartBucket,treasuryControl
EncumberedAssets,Currency and Coin,l_8,1884960.0,9,encumbrance_1,1241136.0,21.0,True
EncumberedAssets,Currency and Coin,l_8,1884960.0,26,encumbrance_1,1241136.0,21.0,True
UnencumberedAssets,Level 1,l_9,3268080.0,20,encumbrance_2,,,False
UnencumberedAssets,Level 1,l_9,3268080.0,2,encumbrance_2,,,False
Capacity,Level 2a,l_10,1285200.0,28,encumbrance_3,910656.0,23.0,True
Capacity,Level 2a,l_10,1285200.0,33,encumbrance_3,910656.0,23.0,True
UnrestrictedReserveBalances,Level 2b,l_11,538560.0,4,encumbrance_4,269280.0,24.0,True
UnrestrictedReserveBalances,Level 2b,l_11,538560.0,22,encumbrance_4,269280.0,24.0,True
RestrictedReserveBalances,Non-HQLA,l_12,61200.0,25,encumbrance_1,,,False
RestrictedReserveBalances,Non-HQLA,l_12,61200.0,23,encumbrance_1,,,False


## MORPHIR ON SPARK

<img src='https://raw.githubusercontent.com/aamend/ossf_db_ms_gs/main/images/wf_4.png' width=800/>

In [0]:
%scala
import regulation.us.lcr.inflows.assets.{SparkJobs => Morphir}

Let's execute our first sets of LCR rules for inflows data resulting in a new dataframe

In [0]:
%scala
val lcr = inflows.transform(Morphir.sumToRule)
display(lcr)

Label,value
20(c)(1),7168698720.0
20(b)(1),3510236160.0
20(a)(1),14354900640.0


And safely append or overwrite its results onto a table. That table can be shared across different systems or organizations through delta sharing (see later) or mapped back onto legend to be accessed by end users.

In [0]:
%scala
lcr.write.format("delta").mode("overwrite").saveAsTable("lcr.report")

## TIME TRAVEL
Persisting our reports to Delta format, we benefit from its audit capability allowing users to travel back in time through all its previous versions.

<img src='https://raw.githubusercontent.com/aamend/ossf_db_ms_gs/main/images/wf_5.png' width=800/>

In [0]:
%sql
DESCRIBE HISTORY lcr.report

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2022-11-30T09:36:08.000+0000,3658755248564160,antoine.amend@databricks.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(1562452343328116),1129-000847-nuk6je0t,3.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 3, numOutputBytes -> 769)",,Databricks-Runtime/10.4.x-photon-scala2.12
3,2022-11-29T20:00:04.000+0000,3658755248564160,antoine.amend@databricks.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(1562452343328116),1129-000847-nuk6je0t,2.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 3, numOutputBytes -> 769)",,Databricks-Runtime/10.4.x-photon-scala2.12
2,2022-11-29T19:57:29.000+0000,3658755248564160,antoine.amend@databricks.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(1562452343328116),1129-000847-nuk6je0t,1.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 3, numOutputBytes -> 769)",,Databricks-Runtime/10.4.x-photon-scala2.12
1,2022-11-29T19:57:21.000+0000,3658755248564160,antoine.amend@databricks.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(1562452343328116),1129-000847-nuk6je0t,0.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 3, numOutputBytes -> 769)",,Databricks-Runtime/10.4.x-photon-scala2.12
0,2022-11-29T19:56:53.000+0000,3658755248564160,antoine.amend@databricks.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(1562452343328116),1129-000847-nuk6je0t,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 3, numOutputBytes -> 769)",,Databricks-Runtime/10.4.x-photon-scala2.12


We access our reports as it was generated at a given point in time (regardless of susequent updates) or at a given version. This ensures audit and compliance requirements whilst guaranteeing strict reproducibility of our output since both our **Legend** and **Morphir** models are version / controlled

In [0]:
%sql
SELECT * FROM lcr.report
VERSION AS OF 2

Label,value
20(c)(1),1837909440.0
20(b)(1),861194160.0
20(a)(1),3689184960.0


In [0]:
%sql
SELECT * FROM lcr.report
TIMESTAMP AS OF '2022-11-29T19:57:29.000+0000'

Label,value
20(c)(1),1837909440.0
20(b)(1),861194160.0
20(a)(1),3689184960.0
