# Legend Delta

[![DBR](https://img.shields.io/badge/DBR-10.4 LTS-red?logo=databricks)](.)
[![FINOS - Incubating](https://cdn.jsdelivr.net/gh/finos/contrib-toolbox@master/images/badge-incubating.svg)](https://finosfoundation.atlassian.net/wiki/display/FINOS/Incubating)

In addition to the JDBC connectivity enabled to Databricks from the [legend-engine](https://github.com/finos/legend-engine/tree/master/docs/databricks) itself, 
this project helps organizations define data models that can be converted into efficient data pipelines, ensuring data being queried
is of high quality and availability. Raw data can be ingested as stream or batch and processed in line with the business semantics 
defined from the Legend interface. Domain specific language defined in Legend Studio can be interpreted as a series of Spark SQL operations,
helping analysts create [Delta Lake](https://delta.io/) tables that not only guarantees schema definition but also complies
with expectations, derivations and constraints defined by business analysts.

___


<img src='https://github.com/finos-labs/legend-delta/raw/main/images/legend-delta-workflow.png' width=30%>

___

<antoine.amend@databricks.com>

## Legend model
Legend project can be loaded from classpath or directory as follows

In [0]:
%scala
import org.finos.legend.spark.LegendClasspathLoader
val legend = LegendClasspathLoader.loadResources()

All legend entities available will be retrieved and can be listed as follows

In [0]:
%scala
val entities = legend.getEntityNames
display(entities.toSeq.toDF("pure"))

pure
databricks::mapping::developer_delta
databricks::entity::person
databricks::entity::sme
databricks::mapping::employee_developer
databricks::entity::employee
databricks::entity::developer
databricks::table::employee
databricks::mapping::employee_delta
databricks::table::developer


## Legend schema
We can create the spark schema for any Legend entity of type `Class`. 
This process will recursively loop through each of its underlying fields, enums and possibly nested properties and supertypes.

In [0]:
%scala
val schema = legend.getSchema("databricks::entity::employee")
display(schema.fields.map(s => s.toDDL).toSeq.toDF("field"))

field
firstName STRING NOT NULL COMMENT 'Person first name'
lastName STRING NOT NULL COMMENT 'Person last name'
birthDate DATE NOT NULL COMMENT 'Person birth date'
gender STRING COMMENT 'Person gender'
id INT NOT NULL COMMENT 'Unique identifier of a databricks employee'
sme STRING COMMENT 'Programming skill that person truly masters'
joinedDate DATE NOT NULL COMMENT 'When did that person join Databricks'
highFives INT COMMENT 'How many high fives did that person get'


## Legend expectations
Given the `multiplicity` properties, we can 
detect if a field is optional or not or list has the right number of elements. Given an `enumeration`, 
we check for value consistency. These will be considered **technical expectations** and converted into SQL constraints.

In [0]:
%scala
val expectations = legend.getExpectations("databricks::entity::employee")
display(expectations.toSeq.toDF("name", "expectation"))

name,expectation
[birthDate] is mandatory,birthDate IS NOT NULL
[sme] not allowed value,"sme IS NULL OR sme IN ('Scala', 'Python', 'C', 'Java', 'R', 'SQL')"
[id] is mandatory,id IS NOT NULL
[joinedDate] is mandatory,joinedDate IS NOT NULL
[firstName] is mandatory,firstName IS NOT NULL
[lastName] is mandatory,lastName IS NOT NULL


In addition to the rules derived from the schema itself, we also support the conversion of **business expectations**
from the PURE language to SQL expressions. We generate a legend
execution plan against a Databricks runtime, hence operating against relational legend `mapping` rather
than pure entities of type `class`.

In [0]:
%scala
val expectations = legend.getExpectations("databricks::mapping::employee_delta")
display(expectations.toSeq.toDF("name", "expectation"))

name,expectation
[birthDate] is mandatory,birth_date IS NOT NULL
[sme] not allowed value,"(sme IS NULL OR sme IN ('Scala', 'Python', 'C', 'Java', 'R', 'SQL'))"
[id] is mandatory,id IS NOT NULL
[joinedDate] is mandatory,joined_date IS NOT NULL
[firstName] is mandatory,first_name IS NOT NULL
[high five] should be positive,(high_fives IS NOT NULL AND high_fives > 0)
[age] should be > 21,year(joined_date) - year(birth_date) > 21
[lastName] is mandatory,last_name IS NOT NULL


## Legend transformations
In addition to business expectations, we transform raw entities into their
desired states and target tables. Note that relational transformations on legend only support direct mapping 
(no PURE operations or derived properties) and therefore easily enforced through `.withColumnRenamed` syntax.

In [0]:
%scala
val transformations = legend.getTransformations("databricks::mapping::employee_delta")
display(transformations.toSeq.toDF("column", "columnRenamed"))

column,columnRenamed
highFives,high_fives
joinedDate,joined_date
lastName,last_name
firstName,first_name
birthDate,birth_date
id,id
sme,sme
gender,gender


## Legend tables
In order to query our validated entity from legend interface, we can easily create the target state table. This table contains a placeholder for our invalidated constraints (below field `legend`).

In [0]:
%scala
val tableName = legend.createTable("databricks::mapping::employee_delta")
display(sql(s"DESCRIBE EXTENDED $tableName"))

col_name,data_type,comment
first_name,string,Person first name
last_name,string,Person last name
birth_date,date,Person birth date
gender,string,Person gender
id,int,Unique identifier of a databricks employee
sme,string,Programming skill that person truly masters
joined_date,date,When did that person join Databricks
high_fives,int,How many high fives did that person get
legend,array,LEGEND VALIDATION FIELD
,,


# Example
In this scenario, we read raw JSON files that we schematize, transform, validate and persist to a delta table. The resulting table will contain records that are both syntactically and semantically correct.

In [0]:
%fs
head /FileStore/antoine.amend@databricks.com/legend/employee.json

In [0]:
%scala
import org.finos.legend.spark._

val df = spark
  .read
  .format("json")
  .legendSchema("databricks::entity::employee")
  .load("/FileStore/antoine.amend@databricks.com/legend")

display(df)

firstName,lastName,birthDate,gender,id,sme,joinedDate,highFives
Levey,Storck,1989-02-19,M,,C,2015-12-05,282
Maria,O'Gorman,1987-08-14,M,2.0,Python,2017-03-03,299
Evvy,Lepoidevin,1970-10-04,M,3.0,C,2020-11-02,182
Georges,Jotcham,1973-11-26,F,4.0,Scala,2020-09-14,229
Doroteya,Wadhams,1987-03-11,N,5.0,Scala,2019-02-11,78
Mia,Millgate,1988-08-01,F,6.0,Python,2017-04-13,146
Celene,Calverley,1979-07-15,N,7.0,Python,2021-06-03,69
Richie,Di Matteo,1980-05-18,F,8.0,Python,2014-08-23,167
Ignaz,Kurth,1987-01-10,F,,Python,2014-02-01,199
Anthia,Duck,1998-02-08,F,10.0,Python,2015-01-14,277


In [0]:
%scala
val transformations = legend.getTransformations("databricks::mapping::employee_delta")
val transformedDf = df.legendTransform(transformations)
display(transformedDf)

first_name,last_name,birth_date,gender,id,sme,joined_date,high_fives
Levey,Storck,1989-02-19,M,,C,2015-12-05,282
Maria,O'Gorman,1987-08-14,M,2.0,Python,2017-03-03,299
Evvy,Lepoidevin,1970-10-04,M,3.0,C,2020-11-02,182
Georges,Jotcham,1973-11-26,F,4.0,Scala,2020-09-14,229
Doroteya,Wadhams,1987-03-11,N,5.0,Scala,2019-02-11,78
Mia,Millgate,1988-08-01,F,6.0,Python,2017-04-13,146
Celene,Calverley,1979-07-15,N,7.0,Python,2021-06-03,69
Richie,Di Matteo,1980-05-18,F,8.0,Python,2014-08-23,167
Ignaz,Kurth,1987-01-10,F,,Python,2014-02-01,199
Anthia,Duck,1998-02-08,F,10.0,Python,2015-01-14,277


In [0]:
%scala
val expectations = legend.getExpectations("databricks::mapping::employee_delta")
val validatedDf = transformedDf.legendValidate(expectations)
display(validatedDf)

first_name,last_name,birth_date,gender,id,sme,joined_date,high_fives,legend
Levey,Storck,1989-02-19,M,,C,2015-12-05,282,List([id] is mandatory)
Maria,O'Gorman,1987-08-14,M,2.0,Python,2017-03-03,299,List()
Evvy,Lepoidevin,1970-10-04,M,3.0,C,2020-11-02,182,List()
Georges,Jotcham,1973-11-26,F,4.0,Scala,2020-09-14,229,List()
Doroteya,Wadhams,1987-03-11,N,5.0,Scala,2019-02-11,78,List()
Mia,Millgate,1988-08-01,F,6.0,Python,2017-04-13,146,List()
Celene,Calverley,1979-07-15,N,7.0,Python,2021-06-03,69,List()
Richie,Di Matteo,1980-05-18,F,8.0,Python,2014-08-23,167,List()
Ignaz,Kurth,1987-01-10,F,,Python,2014-02-01,199,List([id] is mandatory)
Anthia,Duck,1998-02-08,F,10.0,Python,2015-01-14,277,List([age] should be > 21)


In [0]:
%scala
val tableName = legend.getTable("databricks::mapping::employee_delta")
validatedDf.write.format("delta").mode("append").saveAsTable(tableName)

In [0]:
%scala
import org.apache.spark.sql.functions._
display(spark.read.table(tableName))

first_name,last_name,birth_date,gender,id,sme,joined_date,high_fives,legend
Levey,Storck,1989-02-19,M,,C,2015-12-05,282,List([id] is mandatory)
Maria,O'Gorman,1987-08-14,M,2.0,Python,2017-03-03,299,List()
Evvy,Lepoidevin,1970-10-04,M,3.0,C,2020-11-02,182,List()
Georges,Jotcham,1973-11-26,F,4.0,Scala,2020-09-14,229,List()
Doroteya,Wadhams,1987-03-11,N,5.0,Scala,2019-02-11,78,List()
Mia,Millgate,1988-08-01,F,6.0,Python,2017-04-13,146,List()
Celene,Calverley,1979-07-15,N,7.0,Python,2021-06-03,69,List()
Richie,Di Matteo,1980-05-18,F,8.0,Python,2014-08-23,167,List()
Ignaz,Kurth,1987-01-10,F,,Python,2014-02-01,199,List([id] is mandatory)
Anthia,Duck,1998-02-08,F,10.0,Python,2015-01-14,277,List([age] should be > 21)


## Continuous monitoring
With new rules available from the legend studio, we can easily validate an entire legend table resulting in a new audited delta version

In [0]:
%scala
val mappingName = "databricks::mapping::employee_delta"
val tableName = legend.getTable(mappingName)
val expectations = legend.getExpectations(mappingName)
legend.validateTable(mappingName)

In [0]:
%scala
display(sql(s"DESCRIBE HISTORY $tableName"))

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2022-04-11T04:38:44.893+0000,999271697022884,antoine.amend@databricks.com,UPDATE,Map(),,List(3591277798165548),0410-051949-9j5ul4vr,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 1759, scanTimeMs -> 251, numAddedFiles -> 1, numUpdatedRows -> 1000, rewriteTimeMs -> 1501)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-04-11T04:37:16.143+0000,999271697022884,antoine.amend@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3591277798165548),0410-051949-9j5ul4vr,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1000, numOutputBytes -> 33930)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-04-11T04:36:34.988+0000,999271697022884,antoine.amend@databricks.com,CREATE TABLE,"Map(isManaged -> true, description -> by Legend-Delta from PURE entity [databricks::entity::employee], partitionBy -> [], properties -> {})",,List(3591277798165548),0410-051949-9j5ul4vr,,WriteSerializable,True,Map(),,Databricks-Runtime/10.4.x-scala2.12


In [0]:
%scala
import org.apache.spark.sql.functions._

display(
  spark
    .read
    .table(tableName)
    .withColumn("constraint", explode(col("legend")))
    .groupBy("constraint")
    .count()
    .withColumnRenamed("count", "violations")
)

constraint,violations
[id] is mandatory,2
[age] should be > 21,105
[sme] not allowed value,134
