# Data engineering with Databricks - Realtime data ingestion for Financial transactions

Building realtime system consuming messages from live system is required to build reactive data application. 

Near real-time is key to detect new fraud pattern and build a proactive system, offering better protection for your customers.

Ingesting, transforming and cleaning data to create clean SQL tables for our downstream user (Data Analysts and Data Scientists) is complex.

<link href="https://fonts.googleapis.com/css?family=DM Sans" rel="stylesheet"/>
<div style="width:300px; text-align: center; float: right; margin: 30px 60px 10px 10px;  font-family: 'DM Sans'">
  <div style="height: 300px; width: 300px;  display: table-cell; vertical-align: middle; border-radius: 50%; border: 25px solid #fcba33ff;">
    <div style="font-size: 70px;  color: #70c4ab; font-weight: bold">
      73%
    </div>
    <div style="color: #1b5162;padding: 0px 30px 0px 30px;">of enterprise data goes unused for analytics and decision making</div>
  </div>
  <div style="color: #bfbfbf; padding-top: 5px">Source: Forrester</div>
</div>

<br>

## <img src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/de.png" style="float:left; margin: -35px 0px 0px 0px" width="80px"> John, as Data engineer, spends immense time….


* Hand-coding data ingestion & transformations and dealing with technical challenges:<br>
  *Supporting streaming and batch, handling concurrent operations, small files issues, GDPR requirements, complex DAG dependencies...*<br><br>
* Building custom frameworks to enforce quality and tests<br><br>
* Building and maintaining scalable infrastructure, with observability and monitoring<br><br>
* Managing incompatible governance models from different systems
<br style="clear: both">



<!-- Collect usage data (view). Remove it to disable collection. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=lakehouse&org_id=984752964297111&notebook=%2F01-Data-ingestion%2F01.1-DLT-fraud-detection-SQL&demo_name=lakehouse-fsi-fraud&event=VIEW&path=%2F_dbdemos%2Flakehouse%2Flakehouse-fsi-fraud%2F01-Data-ingestion%2F01.1-DLT-fraud-detection-SQL&version=1&user_hash=086247655aad7f847fc5af0bced92d31b6454844129a39a1b73eef221886867a">

## Demo: build a banking database and detect fraud on transaction in real-time (ms)

In this demo, we'll step in the shoes of a retail banking company processing transaction.

The business has determined that we should improve our transaction fraud system and offer a better protection to our customers (retail and institutions using our payment systems). We're asked to:

* Analyse and explain current transactions: quantify fraud, understand pattern and usage
* Build a proactive system to detect fraud and serve prediction in real-time (ms latencies)


### What we'll build

To do so, we'll build an end-to-end solution with the Lakehouse. To be able to properly analyse and detect fraud, we'll mainly focus on transactional data, received by our banking system.

At a very high level, this is the flow we'll implement:

<img width="1000px" src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/fsi/fraud-detection/lakehouse-fsi-fraud-overview-1.png" />

1. Ingest and create our banking database, with tables easy to query in SQL
2. Secure data and grant read access to the Data Analyst and Data Science teams.
3. Run BI queries to analyse existing fraud
4. Build an ML model to detect fraud and deploy this model for real-time inference

As a result, we'll have all the information required to trigger alerts and ask our customer for stronger authentication if we believe there is a high fraud risk.

**A note on Fraud detection in real application** <br/>
*This demo is a simple example to showcase the Lakehouse benefits. We'll keep the data model and ML simple for the sake of the demo. Real-world application would need more data sources and also deal with imbalanced class and more advanced models. If you are interested in a more advanced discussion, reach out to your Databricks team!*

Let's see how this data can be used within the Lakehouse to analyse and reduce fraud!  


## Building a Delta Live Table pipeline to analyze and reduce fraud detection in real-time

In this example, we'll implement a end 2 end DLT pipeline consuming our banking transactions information. We'll use the medaillon architecture but we could build star schema, data vault or any other modelisation.

We'll incrementally load new data with the autoloader and enrich this information.

This information will then be used  to:

* Build our DBSQL dashboard to track transactions and fraud impact.
* Train & deploy a model to detect potential fraud in real-time.

Let's implement the following flow: 
 
<img width="1200px" src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/fsi/fraud-detection/fsi-fraud-dlt-full.png"/>


*Note that we're including the ML model our [Data Scientist built]($../04-Data-Science-ML/04.1-automl-fraud-detection) using Databricks AutoML to predict fraud. We'll cover that in the next section.*

Your DLT Pipeline has been installed and started for you! Open the <a dbdemos-pipeline-id="dlt-fsi-fraud" href="#joblist/pipelines/9c690e9c-75bf-4653-a170-a21bfcfca6bb" target="_blank">Fraud detection Delta Live Table pipeline</a> to see it in action.<br/>
*(Note: The pipeline will automatically start once the initialization job is completed, this might take a few minutes... Check installation logs for more details)*

### 1/ Loading our data using Databricks Autoloader (cloud_files)

<img  style="float:right; margin-left: 10px" width="600px" src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/fsi/fraud-detection/fsi-fraud-dlt-1.png"/>
  
Autoloader allow us to efficiently ingest millions of files from a cloud storage, and support efficient schema inference and evolution at scale.

For more details on autoloader, run `dbdemos.install('auto-loader')`

Let's use it to our pipeline and ingest the raw JSON & CSV data being delivered in our blob storage `/dbdemos/fsi/fraud-detection/...`. 


###Task 0: 
###Complete Notebook setup. 
In the following statements you have to replace the "YOUR_CATALOG_HERE" with the catalog you defined in your setup. 
That should be "firstnamelastname_allianz"


**After** you have **completed** task 1 and task 2 and run the notebook, you can create a pipeline via the blue "Create Pipeline" button below. 
In the pipeline config you will select in section destination Unity Catalog, your catalog and your schema. 

In [0]:
CREATE STREAMING LIVE TABLE bronze_transactions 
  COMMENT "Historical banking transaction to be trained on fraud detection"
AS 
  SELECT * FROM cloud_files("/Volumes/YOUR_CATALOG_HERE/allianz_lakehouse_fsi_fraud/fraud_raw_data/transactions", "json", map("cloudFiles.maxFilesPerTrigger", "1", "cloudFiles.inferColumnTypes", "true")) 

In [0]:
CREATE STREAMING LIVE TABLE banking_customers (
  CONSTRAINT correct_schema EXPECT (_rescued_data IS NULL)
)
COMMENT "Customer data coming from csv files ingested in incremental with Auto Loader to support schema inference and evolution"
AS 
  SELECT * FROM cloud_files("/Volumes/YOUR_CATALOG_HERE/allianz_lakehouse_fsi_fraud/fraud_raw_data/customers", "csv", map("cloudFiles.inferColumnTypes", "true", "multiLine", "true"))

In [0]:
CREATE STREAMING LIVE TABLE country_coordinates
AS 
  SELECT * FROM cloud_files("/Volumes/YOUR_CATALOG_HERE/allianz_lakehouse_fsi_fraud/fraud_raw_data/country_code", "csv")

# **_TASK 1 Start_**

Ingest the csv files that is located at the volume path: /Volumes/"yourcatalog"/"yourschema"/fraud_raw_data/fraud_report

In [0]:
/* Write your code below*/


# **_TASK 1 END_**

### 2/ Enforce quality and materialize our tables for Data Analysts

<img style="float:right; margin-left: 10px" width="600px" src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/fsi/fraud-detection/fsi-fraud-dlt-2.png"/>


The next layer often call silver is consuming **incremental** data from the bronze one, and cleaning up some information:

* Clean up the codes of the countries of origin and destination (removing the "--")
* Calculate the difference between the Originating and Destination Balances.

We're also adding an [expectation](https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-expectations.html) on different field to enforce and track our Data Quality. This will ensure that our dashboard are relevant and easily spot potential errors due to data anomaly.

For more advanced DLT capabilities run `dbdemos.install('dlt-loans')` or `dbdemos.install('dlt-cdc')` for CDC/SCDT2 example.

These tables are clean and ready to be used by the BI team!

# **_# Task 2 Start_**


You need to ensure good data quality. 
Please implement in the following code snippe: 
1. correct_data should not be null **and** 
2. correct_customer_id should not be null

Leverage the code snippet below (remove the comment tags /* */ and add the constraints
)




In [0]:
/* uncomment an replace missing parts
CREATE STREAMING LIVE TABLE silver_transactions (
  CONSTRAINT YOURCODE,
  CONSTRAINT YOUR CODE
)
AS 
  SELECT * EXCEPT(countryOrig, countryDest, t._rescued_data, f._rescued_data), 
          regexp_replace(countryOrig, "\-\-", "") as countryOrig, 
          regexp_replace(countryDest, "\-\-", "") as countryDest, 
          newBalanceOrig - oldBalanceOrig as diffOrig, 
          newBalanceDest - oldBalanceDest as diffDest
FROM STREAM(live.bronze_transactions) t
  LEFT JOIN live.fraud_reports f using(id)

*/

### 3/ Aggregate and join data to create our ML features

<img style="float:right; margin-left: 10px" width="600px" src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/fsi/fraud-detection/fsi-fraud-dlt-3.png"/>

We're now ready to create the features required for Fraud detection.

We need to enrich our transaction dataset with extra information our model will use to help detecting churn.

In [0]:
CREATE LIVE TABLE gold_transactions (
  CONSTRAINT amount_decent EXPECT (amount > 10)
)
AS 
  SELECT t.* EXCEPT(countryOrig, countryDest, is_fraud), c.* EXCEPT(id, _rescued_data),
          boolean(coalesce(is_fraud, 0)) as is_fraud,
          o.alpha3_code as countryOrig, o.country as countryOrig_name, o.long_avg as countryLongOrig_long, o.lat_avg as countryLatOrig_lat,
          d.alpha3_code as countryDest, d.country as countryDest_name, d.long_avg as countryLongDest_long, d.lat_avg as countryLatDest_lat
FROM live.silver_transactions t
  INNER JOIN live.country_coordinates o ON t.countryOrig=o.alpha3_code 
  INNER JOIN live.country_coordinates d ON t.countryDest=d.alpha3_code 
  INNER JOIN live.banking_customers c ON c.id=t.customer_id 

## Our pipeline is now ready!

As you can see, building Data Pipeline with databricks let you focus on your business implementation while the engine solves all hard data engineering work for you.

The table is now ready for our Data Scientist to train a model detecting fraud risk.

Open the <a dbdemos-pipeline-id="dlt-fsi-fraud" href="#joblist/pipelines/9c690e9c-75bf-4653-a170-a21bfcfca6bb" target="_blank">Fraud detection Delta Live Table pipeline</a> and click on start to visualize your lineage and consume the new data incrementally!

# Next: secure and share data with Unity Catalog

Now that these tables are available in our Lakehouse, let's review how we can share them with the Data Scientists and Data Analysts teams.

Jump to the [Governance with Unity Catalog notebook]($../00-churn-introduction-lakehouse) or [Go back to the introduction]($../00-churn-introduction-lakehouse)



### Source Data

This dataset is built with PaySim, an open source banking transactions simulator.

[PaySim](https://github.com/EdgarLopezPhD/PaySim) simulates mobile money transactions based on a sample of real transactions extracted from one month of financial logs from a mobile money service implemented in an African country. 