In [1]:
# Variables

# Storage account name for the Synapse WS storage account - starts with "synsa"
storage_acct_name lmi"
storage_container_name = "workspace"
storage_path_scored = "lab-data/scored/"

# Cosmos DB
cosmos_db_database = "ContosoAuto"
cosmos_db_container_metadata = "metadata"
cosmos_db_container_maintenance = "maintenance"

# Synapse linked service pointing to Cosmos DB Analytical Store - this is where we get the source data
synapse_cosmos_db_linked_service = "CosmosDbIoTLab"

StatementMeta(spark1, 27, 3, Finished, Available)



In [2]:
sa_uri = "abfss://workspace@" + storage_acct_name + ".dfs.core.windows.net/" + storage_path_scored

StatementMeta(spark1, 27, 4, Finished, Available)



In [3]:
scored_maintenance_df = spark.read.parquet(sa_uri)

StatementMeta(spark1, 27, 5, Finished, Available)



In [4]:
print(scored_maintenance_df.count())

scored_maintenance_df.printSchema()

StatementMeta(spark1, 27, 6, Finished, Available)

110
root
 |-- vin: string (nullable = true)
 |-- tripEnded: timestamp (nullable = true)
 |-- tripDurationMinutes: double (nullable = true)
 |-- batteryAgeDays: long (nullable = true)
 |-- batteryRatedCycles: long (nullable = true)
 |-- lifetimeBatteryCyclesUsed: double (nullable = true)
 |-- maint_needed: long (nullable = true)

In [5]:
scored_maintenance_df.show()

StatementMeta(spark1, 27, 7, Finished, Available)

+-----------------+-------------------+-------------------+--------------+------------------+-------------------------+------------+
|              vin|          tripEnded|tripDurationMinutes|batteryAgeDays|batteryRatedCycles|lifetimeBatteryCyclesUsed|maint_needed|
+-----------------+-------------------+-------------------+--------------+------------------+-------------------------+------------+
|T8DNDN5UDCWL7M72H|2020-07-09 00:00:00|              19.75|           112|               200|        12.56852760294626|           0|
|0ZGVI20GIS84M1B4D|2020-07-09 00:00:00|          33.433333|           664|               200|        73.11092063018555|           0|
|9K3NPUOHFCGGMDO9G|2020-07-09 00:00:00|          20.816667|           275|               200|       29.630602915264014|           0|
|V5V483U7H0713ZAQ0|2020-07-09 00:00:00|          23.866667|           814|               200|       103.54608638921663|           0|
|ZUM234INX6MPOJ1D6|2020-07-09 00:00:00|              36.95|          

# Write scored metadata back to Cosmos DB maintenance container

In [6]:
# Retrieve connection string and key from linked service
import sys
import re

from pyspark.sql import SparkSession
sc = SparkSession.builder.getOrCreate()
token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary

connection_string = token_library.getConnectionString(synapse_cosmos_db_linked_service)
matchObj = re.match( r'AccountEndpoint=(.*);Database=(.*);AccountKey="(.*)";', connection_string, re.M|re.I)
endpoint = matchObj.group(1)
masterkey = matchObj.group(3)

StatementMeta(spark1, 27, 8, Finished, Available)



In [7]:
read_config_maintenance = {
    "Endpoint" : endpoint,
    "Masterkey" : masterkey,
    "Database" : cosmos_db_database,
    "Collection" : cosmos_db_container_maintenance
}

StatementMeta(spark1, 27, 9, Finished, Available)



In [8]:
# Read existing maintenance records (if any)

existing_maintenance_df = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**read_config_maintenance).load()

StatementMeta(spark1, 27, 10, Finished, Available)



In [9]:
print(existing_maintenance_df.count())

existing_maintenance_df.show()

StatementMeta(spark1, 27, 11, Finished, Available)

110
+------------+--------------------+--------------------+--------------------+----------+--------------+------------------+--------------------+-------------------------+------------+-------------------+-------------+-----------------+
|_attachments|               _etag|                _rid|               _self|       _ts|batteryAgeDays|batteryRatedCycles|                  id|lifetimeBatteryCyclesUsed|maint_needed|tripDurationMinutes|    tripEnded|              vin|
+------------+--------------------+--------------------+--------------------+----------+--------------+------------------+--------------------+-------------------------+------------+-------------------+-------------+-----------------+
|attachments/|"02008e6b-0000-01...|VmxNAK-WzjYBAAAAA...|dbs/VmxNAA==/coll...|1598321933|           506|               200|a4cc7b11-c2aa-4a7...|        65.37857016784706|           0|          36.616667|1594252800000|TNZWN4KCBEAFU2PY4|
|attachments/|"02009e6b-0000-01...|VmxNAK-WzjYCAAAAA...|

In [10]:
# If we had existing maintenance records from Cosmos DB, let's join them to the batch predictions on VIN. This is so we get the Cosmos DB-assigned
# unique id on each document, and can do an update instead of a redundant insert for the same VIN.
# If there are no maintenance records, we do not join, so we will not pass an id field, which means Cosmos DB will auto-generate it and insert it with the new document.

if existing_maintenance_df.count() > 0:
    maintenance_records_to_write_df = scored_maintenance_df\
        .join(existing_maintenance_df, scored_maintenance_df.vin == existing_maintenance_df.vin)\
        .select(scored_maintenance_df["*"], existing_maintenance_df["id"])
else:
    maintenance_records_to_write_df = scored_maintenance_df

StatementMeta(spark1, 27, 12, Finished, Available)



In [11]:
print(maintenance_records_to_write_df.count())

maintenance_records_to_write_df.show()

StatementMeta(spark1, 27, 13, Finished, Available)

110
+-----------------+-------------------+-------------------+--------------+------------------+-------------------------+------------+--------------------+
|              vin|          tripEnded|tripDurationMinutes|batteryAgeDays|batteryRatedCycles|lifetimeBatteryCyclesUsed|maint_needed|                  id|
+-----------------+-------------------+-------------------+--------------+------------------+-------------------------+------------+--------------------+
|TNZWN4KCBEAFU2PY4|2020-07-09 00:00:00|          36.616667|           506|               200|        65.37857016784706|           0|a4cc7b11-c2aa-4a7...|
|QZUP2Q4EHOE069K1X|2020-07-09 00:00:00|          39.666667|           422|               200|        70.42915592317448|           0|6e5c0455-ab6b-473...|
|Q8GLGAECVVBV4YDL5|2020-07-09 00:00:00|              39.55|            42|               200|        4.733837810416631|           0|24e225b4-04a2-402...|
|KMXVZZGIYQNL8CW3B|2020-07-09 00:00:00|          38.133333|           12

In [12]:
write_config_maintenance = {
    "Endpoint": endpoint,
    "Masterkey": masterkey,
    "Database": cosmos_db_database,
    "Collection": cosmos_db_container_maintenance,
    "Upsert": "true"
}

StatementMeta(spark1, 27, 14, Finished, Available)



In [13]:
maintenance_records_to_write_df.write.mode("overwrite").format("com.microsoft.azure.cosmosdb.spark").options(**write_config_maintenance).save()

StatementMeta(spark1, 27, 15, Finished, Available)

