#Scoring the Accident Images 

In this notebook, we'll show how to score the incoming images of damaged vehicles as they flow into the system to come up with a damage severity score using the model that we previously training and promoted to the production.

The end results are stored in the `accident_images` delta table

*Note: we could also have these transformations available in a Delta Live Table. Open [02.2-EXTRA-Batch-Scoring-DLT]($./EXTRA-DLT-inference/02.2-EXTRA-Batch-Scoring-DLT) to see how it's done.*


<!-- Collect usage data (view). Remove it to disable collection or disable tracker during installation. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=lakehouse&org_id=984752964297111&notebook=%2F02-Data-Science-ML%2F02.2-Batch-Scoring&demo_name=lakehouse-fsi-smart-claims&event=VIEW&path=%2F_dbdemos%2Flakehouse%2Flakehouse-fsi-smart-claims%2F02-Data-Science-ML%2F02.2-Batch-Scoring&version=1&user_hash=3d9550665f4179aba1791a587dee6d56e218186d3a057ad2d2c3ad58a351ed5c">

### A cluster has been created for this demo
To run this demo, just select the cluster `dbdemos-lakehouse-fsi-smart-claims-thomas_hass` from the dropdown menu ([open cluster configuration](https://adb-984752964297111.11.azuredatabricks.net/#setting/clusters/0802-065525-9zzx4cdb/configuration)). <br />
*Note: If the cluster was deleted after 30 days, you can re-create it with `dbdemos.create_cluster('lakehouse-fsi-smart-claims')` or re-install the demo: `dbdemos.install('lakehouse-fsi-smart-claims')`*

In [0]:
%pip install mlflow==2.20.2

In [0]:
%run ../_resources/00-setup

In [0]:
from mlflow.store.artifact.models_artifact_repo import ModelsArtifactRepository
import mlflow
# Use the Unity Catalog model registry
mlflow.set_registry_uri("databricks-uc")
# download model requirement from remote registry
requirements_path = ModelsArtifactRepository(f"models:/{catalog}.{db}.dbdemos_claims_damage_level@prod").download_artifacts(artifact_path="requirements.txt") 

In [0]:
%pip install -r $requirements_path
dbutils.library.restartPython()

In [0]:
%run ../_resources/00-setup $reset_all_data=false

## Incrementally ingest the raw incoming images

New images can typically land in a cloud storage (S3/ADLS/GCS), mounted within Unity Catalog using Volumes. 

Let's start by ingesting them and saving them as a Delta Lake table 

In [0]:
volume_path = f"/Volumes/{catalog}/{db}/{volume_name}"

(spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "binaryFile")
            .option("cloudFiles.schemaLocation", f"{volume_path}/checkpoint/images_shema")
            .load(f"{volume_path}/Accidents/images")
            .withColumn("image_name", F.regexp_extract(F.col("path"), r".*/(.*?.jpg)", 1))
      .writeStream
            .option("checkpointLocation", f"{volume_path}/checkpoint/images")
            .trigger(availableNow=True)
            .table("raw_accident_image")).awaitTermination()

display(spark.table("raw_accident_image").limit(10))

In [0]:
(spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.schemaLocation", f"{volume_path}/checkpoint/images_m_shema")
            .load(f"{volume_path}/Accidents/metadata")
      .writeStream
            .option("checkpointLocation", f"{volume_path}/checkpoint/images_m")
            .trigger(availableNow=True)
            .table("raw_accident_metadata")).awaitTermination()
            
display(spark.table("raw_accident_metadata").limit(10))

## Score the damaged vehicle image to determine damage severity

Our claim images are now added to our tables and easily accessible. Let's load our model from Unity Catalog to score the damage. 

In [0]:
import mlflow
model_name = "dbdemos_claims_damage_level"

mlflow.set_registry_uri('databricks-uc')

#Loading the model from UC
predict_damage_udf = mlflow.pyfunc.spark_udf(spark, model_uri=f"models:/{catalog}.{db}.{model_name}@prod")
spark.udf.register("predict_damage", predict_damage_udf)
columns = predict_damage_udf.metadata.get_input_schema().input_names()

## Test inferences

In [0]:
%sql 
SELECT image_name, predict_damage(content) as damage_prediction, content FROM raw_accident_image LIMIT 10

In [0]:
raw_images = (spark.read.table("raw_accident_image")
                   .withColumn("damage_prediction", predict_damage_udf(*columns)))

#Only process 1k claims for the demo to run faster
metadata = spark.table("raw_accident_metadata").orderBy(F.rand()).limit(1000)

raw_images.join(metadata, on="image_name").write.mode('overwrite').saveAsTable("accident_images")

In [0]:
display(spark.table("accident_images").limit(10))


## Real time inference

While this use-case is working with batch inferences (consuming incremental new data in a stream), we could also deploy our model behind a [Serverless model endpoint](#mlflow/endpoints). 

Images can be sent as base64 data over the endpoint. For more details on how to do that, you can run `dbdemos.install('computer-vision-pcb')`.

# Add telematics and accident data to the claims & policy data

Telematics data is joined with Claims and Policy data to monitor the behavior of the driver before the accident. End results are stored into "claim_policy_accident" delta table

In [0]:
%sql
CREATE OR REPLACE TABLE claim_policy_accident AS 
  SELECT
    t.*,
    a.* EXCEPT (chassis_no, claim_no)
  FROM
    claim_policy_telematics t
    JOIN accident_images a USING(claim_no)


## Conclusion

In this notebook, we demonstrated how to <b> retrieve the model </b> from Unity Catalog and run inferences to <b> score </b> on new image data and persist the results back into delta tables. 

Telematics data, accident image data, claims & policy data </b> are all joined together to provide a 360 view of the accident scene to the claims investigation officer to <b>reconstruct the scene</b> and make a decision on what to do next. Eg. release funds, authorize the car for repairs, approve rental loaner car or send for further investigation.

Open notebook [02.3-Dynamic-Rule-Engine]($./02.3-Dynamic-Rule-Engine) to see how dynamic rules can be implemented to start processing our claims faster based on this information.