# Data pre-processing for Azure Data Explorer

<img src="https://github.com/Azure/azure-kusto-spark/raw/master/kusto_spark.png" style="border: 1px solid #aaa; border-radius: 10px 10px 10px 10px; box-shadow: 5px 5px 5px #aaa"/>

We often see customer scenarios where historical data has to be migrated to Azure Data Explorer (ADX). Although ADX has very powerful data-transformation capabilities via [update policies](https://docs.microsoft.com/azure/data-explorer/kusto/management/updatepolicy), sometimes more or less complex data engineering tasks must be done upfront. This happens if the original data structure is too complex or just single data elements being too big, hitting data explorer limits of dynamic columns of 1 MB or maximum ingest file-size of 1 GB for uncompressed data (see also [Comparing ingestion methods and tools](https://docs.microsoft.com/azure/data-explorer/ingest-data-overview#comparing-ingestion-methods-and-tools)) .

Let' s think about an Industrial Internet-of-Things (IIoT) use-case where you get data from several production lines. In the production line several devices read humidity, pressure, etc. The following example shows a scenario where a one-to-many relationship is implemented within an array. With this you might get very large columns (with millions of device readings per production line) that might exceed the limit of 1 MB in Azure Data Explorer for dynamic columns.
In this case you need to do some pre-processing.


Data has already been uploaded to Azure storage. You will start reading the json-data into a data frame:

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *


# Azure storage access info, in this case demo data
blob_account_name = 'kustosamplefiles'      # replace with your blob name
blob_container_name = 'synapsework'         # replace with your container name
blob_relative_path =  ''                    # replace with your relative folder path

In [None]:
# Allow SPARK to access from Blob remotely
inputpath = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),'')
print('Remote blob path: ' + inputpath)

In [None]:
# Primary storage info, here we also write to the Azure Data Lake Store
account_name =  'your-accountname' # 'fill in your primary account name'fill in your primary account name
container_name = 'your-container-name'        # 'fill in your container name'fill in your container name
relative_path =  'your-path'  # fill in your relative folder path'fill in your relative folder path

outputpath = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path)
print('Target storage account path: ' + outputpath)

The notebook has a parameter IngestDate, this will be used setting the extentsCreationtime, alternatively you can make use of a partitioning policy.

In [None]:
IngestDate = "2021-08-06T00:00:00.000Z"

In [None]:
# read the json file

df = spark.read.format("json").load(inputpath)

We will see that the dataframe has some complex datatypes. The examination of the datatypes is showing the measurement column which is an array of structs with the measurement data per devicedId:

In [None]:
df.dtypes

Displaying the dataframe will show you the data:

In [None]:
display(df)

We see that the dataframe has some complex datatypes. The only thing that we want to change here is getting rid of the array, so having the resulting dataset a row for every entry in the measurement array. 

*How can we achieve this?*

pyspark-sql has some very powerful functions for transformations of complex datatypes. We will make use of the [explode-function](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.explode.html). In this case explode ("measurement") will give us a resulting dataframe with single rows per array-element. Finally we only have to drop the original measurement-column (it is the original structure):

In [None]:
from pyspark.sql.functions import *

df_explode = df.select("*", explode("measurement").alias("device")).drop("measurement")
df_explode.dtypes

With this we already have done the necessary data transformation with one line of code. Let' s do some final "prettyfying". 
As we are already preprocessing the data and want to get rid of the complex data types we select the struct elements to get a simplified table: 

In [None]:
display(df_explode)

The filtering in a "real-world-scenario" should happen on a partitioned column, here we are doing this to demonstrate the explicit setting of the extentsCreationTime configuration.

In [None]:
df_all_in_column = df_explode.select ("header.*", "device.header.*", "device.*", "ProdLineData.*").drop("header")
df_all_in_column = df_all_in_column.filter(df_all_in_column.enqueuedTime[0:10]==IngestDate[0:10])

In [None]:
display (df_all_in_column)

We are setting the extentsCreationTime to the notebook-parameter *IngestDate*. For other ingestion properties see [here](https://github.com/Azure/azure-kusto-spark/blob/master/samples/src/main/python/pyKusto.py).

In [None]:
extentsCreationTime = sc._jvm.org.joda.time.DateTime.parse(IngestDate)
sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
        False, None, None, None, None, extentsCreationTime, None, None)

Finally, we write the resulting dataframe back to to Azure Data Explorer. Prerequisite doing this in Synapse Analytics is 
* having created a linked Service (detailed steps for the setup you can find in the [documentation](https://docs.microsoft.com/azure/synapse-analytics/quickstart-connect-azure-data-explorer))
* the target table created in the target database (.create table measurement (ProductionLineId : string, deviceId:string, enqueuedTime:datetime, humidity:real, humidity_unit:string, temperature:real, temperature_unit:string,  pressure:real, pressure_unit:string, reading : dynamic))
* the credential accessing ADX has sufficient permissions (add the ingestor and viewer role)

In [None]:
df_all_in_column.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<your linked service to ADX") \
    .option("kustoDatabase", "<your ADX database>") \
    .option("kustoTable", "your ADX table") \
    .option("sparkIngestionPropertiesJson", sp.toString()) \
    .mode("Append") \
    .save()

You might also consider writing the data to Azure Storage (this might be also make sense for mor complex tranformation pipelines as an intermediate staging step):

In [None]:
df_all_in_column.write.mode('overwrite').json(outputpath) 