<h1> About this notebook </h1>

This notebook has been created to demo capabilities of Databricks notebooks and Azure Cognitive Services entity recognition API.

Following Azure services have been used in this demo:
1. Azure Data Lake Storage (ADLS Gen2) - used for storing files that have to be processed for PII detection.
2. Azure Databricks - one stop shop to run our program code; transform data; interact with other Azure services to deliver a complete end to end solution.
3. Azure SQL DB - used as a metadata store where Databricks writes information like job start/end time; file name that is being processed etc.
4. Azure Cosmos DB - store output of Azure Cognitive Services in case PII is detected. Where no PII is detected, the output is ignored. 
5. Azure Cognitive Services / Text Analytics API -  (Named Entity Recognition) is used for detecting PII.
</br>




**To run this notebook in your subscription, please replace text in <> with your account keys/credentials. **

<h4> Install Python packages required for this notebook </h4>


Install packages for Python required to run this notebook. 

** You only need to run this step once when you run this notebook for the first time. **

In [3]:
# Install packages
# Note - installation of packages can be automated 

dbutils.library.installPyPI("requests")
dbutils.library.installPyPI("simpleJson")
dbutils.library.installPyPI("pyodbc")
dbutils.library.installPyPI("azure.cosmos")
dbutils.library.installPyPI("requests")

<h4> Install ODBC drivers for Microsoft SQL Server on Databricks cluster </h4>

We are using PYODBC package to connect to Azure SQL DB. This step installs the SQL Server driver.

** You only need to run this step once when you run this notebook for the first time. **

In [5]:
%sh

curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list > /etc/apt/sources.list.d/mssql-release.list 
apt-get update
ACCEPT_EULA=Y apt-get install msodbcsql17
apt-get -y install unixodbc-dev

In [6]:
# Import modules required to run demo code in this notebook
import os 
import requests
import simplejson as json
import io
import datetime
import pyodbc
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
import azure.cosmos.http_constants as http_constants


In [7]:
# Some functions to create batch ID and date/timestamp string that we will use to tag the workloads

def dtStr():
    x=datetime.datetime.now()
    dtTime=x.strftime("%Y")+"-"+x.strftime("%m")+"-"+x.strftime("%d")+" "+x.strftime("%H")+":"+x.strftime("%M")+":"+x.strftime("%S")
    return dtTime

def batchStr():
    # create a unique batchid to identify workload
    x = datetime.datetime.now()
    batchId = x.strftime("%Y%m%d%H%M%S%f")
    return batchId


In [8]:
# Job ID - unique identifier to tag each workload

batchId=batchStr()
print("Batch ID for this workload is "+batchId)

In [9]:
# Set variables for Text Analytics API
# Please note, for prod implementation please include reference to Azure Key Vault instead of storing passwords/keys in plain text inside this notebook
apiEndpoint = "https://eastus.cognitiveservices.azure.com/text/analytics/v3.0-preview.1/entities/recognition/pii"
subKey = <Azure Text Analytics API key>
language = 'en'
id = 1

In [10]:
# For this demo, we have pre-configured an ADLS instance and copied a few demo files there

dataLakeAccountDNS = "Replace this with your Storage Account name" + ".blob.core.windows.net"
datalakeAccountKey = "Replace this with your Storage Key"
dataLakeConf = "fs.azure.account.key." + dataLakeAccountDNS
dataLakeExtraConfig = {dataLakeConf:datalakeAccountKey}
dataLakeContainer = "Replace this with your container name"
rawContainer = "wasbs://"+dataLakeContainer+"@" + dataLakeAccountDNS

# This is the mountpoint that will be used by Databricks to access files in ADLS 
rawMountPoint = "/mnt/raw"
dbutils.fs.mount(source = rawContainer, mount_point = rawMountPoint, extra_configs = dataLakeExtraConfig)

In [11]:
# List files on ADLS mount 
# These are the files that we will be processing

# Full pathname - mount + directory 
inDir="/dbfs/mnt/raw/inputfiles"


In [12]:
# Azure SQLDB details. We are using this to log job details.
# For production implementation, please do not store database password in clear text.
# Note - Please add Databricks cluster IP address to firewall rules 

dbServerName = "Replace this with your instance of Azure SQL DB"
dbName = "Replace this with the database name"
dbUser = "Replace this with the database user"
dbPass = "Replace this password for database account"
driver= 'ODBC Driver 17 for SQL Server'
cnxn = pyodbc.connect('DRIVER='+driver+';SERVER='+dbServerName+';PORT=1433;DATABASE='+dbName+';UID='+dbUser+';PWD='+dbPass)

In [13]:
# Azure Cosmos DB is used for storing output of Text Analytics where PII is detected.
# For this demo, we deployed an instance of Azure Cosmos DB separately. 
# Please do not store keys in clear text inside a notebook for production purposes. Please use Azure Key Vault for storing keys.

url="Replace this with Azure Cosmos DB URL"
key="Replace this with Azure Cosmos DB key"
database_id = "Replace this with Cosmos database name"
collection_id = "Replace this with Cosmos container"
database_link = 'dbs/' + database_id
collection_link = database_link + '/colls/' + collection_id
client=cosmos_client.CosmosClient(url,{'masterKey': key})

<h4> This part of the notebook loops through all the files that are stored on ADLS Gen2 directory which we mounted to Databricks cluster.</h4>
For each file processed as part of this step, Databricks creates an entry in Azure SQL DB to mark start and completion of a job. 
This step also calls Text Analytics API and captures the output of the API call and stores it into Cosmos DB if PII is detected. 

**Note on whether or not to batch input datasets that have to be parsed by Text Analytics API**<br/>
Users have option to either batch data from all the files and call Text Analytics API once or call the API once per file. 
For this demo, the code makes multiple calls to the API (once per each input file). <br/>

In [15]:


inDir="/dbfs/mnt/raw/inputfiles"
# loop through directory and append content of each file as an array element. Increment ID and keep appending records until you end up with a single Json document
for file in os.listdir(inDir):
    # initialise id counter
    strObj = " "
    
    # read contents of the file and store it in a variable
    fullFileName=inDir+"/"+file 
    readFile = open(inDir+"/"+file,"r")
    
    print("Processing file .. "+fullFileName)

    # Insert job status in Azure SQL DB when file processing starts
    # writing to azure sqldb
    stDtTime=dtStr()
    insertStr="insert into dbo.joblogs values ('"+batchId+"','"+fullFileName+"','Started','"+stDtTime+"')"
    cursor = cnxn.cursor()
    cursor.execute(insertStr)
    cnxn.commit()

    for line in readFile:
        
        # read lines into a string object 
        strObj = strObj + line

    # create JSON structure that can be processed by Text Analytics API
    jsonDoc = { "documents": [{"language":language,"id":id,"Text":strObj}]}

    # call API 
    headers = {"Ocp-Apim-Subscription-Key": subKey}
    response = requests.post(apiEndpoint, headers=headers, json=jsonDoc)
    entities = response.json()
 
    # print(json.dumps(entities, separators=(',', ':'), sort_keys=True, indent = 4 * ' '))

    # If PII is detected by the API - write output to Cosmos DB container else ignore the entry
    if len(entities['documents'][0]['entities']) != 0:

        print("potential pii present")

        # Write API output to Cosmos DB collection using SQL API
        print('Writing API results to Cosmos DB ... ')
        # Add batch ID and filename to the JSON document before it gets written to Cosmos DB
        entities.__setitem__('batchid',batchId)
        entities.__setitem__('input data',fullFileName)
        
        # Write document to Cosmos DB collection
        client.CreateItem(collection_link,entities)
        
        # Insert job status into Azure SQL DB
        stDtTime=dtStr()
        insertStr="insert into dbo.joblogs values ('"+batchId+"','"+fullFileName+"','Completed','"+stDtTime+"')"
        cursor = cnxn.cursor()
        cursor.execute(insertStr)
        cnxn.commit()
  
    else:
        # Insert job status into Azure SQL DB
        stDtTime=dtStr()
        insertStr="insert into dbo.joblogs values ('"+batchId+"','"+fullFileName+"','Completed','"+stDtTime+"')"
        cursor = cnxn.cursor()
        cursor.execute(insertStr)
        cnxn.commit()