## ArcGIS Pro, Jupyter Notebook and Databricks

This notebook demonstrates the authoring of a Spark Job in a local Jupyter notebook in ArcGIS Pro. However, the execution of the Job is performed remotely on a predefined [Databricks](https://databricks.com/) cluster.

In this example, we started a Spark cluster on [Microsoft Azure using Databricks](https://databricks.com/product/azure).  It is a small cluster consisting of:

- 1 driver machine with 14GB of Memory and 4 cores
- 2-8 worker machines, where each has 14GB of Memory and 4 cores.

![](media/Cluster.png)

To connect the local notebook to the cluster, we use the [Databricks Connect API](https://docs.databricks.com/dev-tools/databricks-connect.html).

### Setting Up an ArcGIS Pro Conda Env.

Using the ArcGIS Pro Python Command Prompt, the following are the steps to create an ArcGIS Pro conda environment to enable the local notebook to execute remotely on a cluster with a Databricks Runtime version 6.6.  

```
proswap arcgispro-py3
conda remove --yes --all --name dbconnect
conda create --yes --name dbconnect --clone arcgispro-py3
activate dbconnect
proswap dbconnect
conda install --yes -c conda-forge untangle
pip install -U databricks-connect==6.6 databricks-cli pyarrow
clone https://github.com/mraad/https://github.com/mraad/spark-esri.git.git
cd https://github.com/mraad/spark-esri.git
python setup.py install
```

Next, get a [personal access token](https://docs.databricks.com/dev-tools/api/latest/authentication.html) and save it somewhere safe.

And finally, [configure](https://docs.databricks.com/dev-tools/databricks-connect.html#set-up-client) the Databricks connection and test it.

```
databricks-connect configure
powershell
$env:DATABRICKS_TOKEN=xxx-xxx-xxx-xxx
$env:SPARK_HOME="$env:LOCALAPPDATA\esri\conda\envs\dbconnect\lib\site-packages\pyspark".ToLower()

$env:SPARK_HOME="$env:LOCALAPPDATA\esri\conda\envs\spark_esri2\lib\site-packages\pyspark".ToLower()

databricks-connect test
```

Note that 2 inlined environment variables were defined to perform the test.  However, it is **HIGHLY** advisable to define them in the System Properties and make sure to relaunch Pro to take effect.

### Import required modules.

In [None]:
import os
from spark_dbconnect import spark_start, spark_stop

### Configure and start a remote Spark instance.

Note that the personal access token in retrieved from the `DATABRICKS_TOKEN` env variable.  The other values can be derived from the cluster url.  For example:

```
https://adb-2740165739887726.6.azuredatabricks.net/?o=2740165739887726#/setting/clusters/0802-221206-sols809/configuration
```

In [None]:
config = {
    "spark.databricks.service.server.enabled": True,
    "spark.databricks.service.address": "https://adb-2740165739887726.6.azuredatabricks.net",
    "spark.databricks.service.token": os.environ["DATABRICKS_TOKEN"],
    "spark.databricks.service.clusterId": "0802-221206-sols809",
    "spark.databricks.service.orgId": "2740165739887726",
    "spark.databricks.service.port": "15001"
}
spark = spark_start(config=config)

For speed purposes and bandwidth reduction, it is best to colocate the data storage and the execution engine.

Databricks provides a [distributed file system](https://docs.databricks.com/data/databricks-file-system.html) that is accessable from any running cluster.

Using the [ExportToFile toolbox](tools/ParquetToolkit.pyt), we exported the Broadcast point features in the [Miami AIS sample geodatabase](ftp://ftp.coast.noaa.gov/pub/MSP/AIS/AIS.SampleData.zip) to a local [parquet](https://parquet.apache.org/) file.

![](media/ExportToFile.png)

We configured the [Databricks CLI](https://docs.databricks.com/dev-tools/cli/index.html#set-up-the-cli):

```
dbfs configure --token
```

And then, we uploaded the local parquet file to DBFS.

```
dbfs cp BroadcastXY.parq dbfs:/FileStore/tables
```

### Create a Spark dataframe from a parquet file.

In addition to storing the data in a columar format, Parquet files hold also the column types enabling the dataframe schema generation.

In [None]:
df = spark.read.parquet("dbfs:/FileStore/tables/BroadcastXY.parq")
df.createOrReplaceTempView("v0")

### Display the dataframe schema.

In [None]:
df.printSchema()

### Perform spatial binning at 100x100 meters.

Note that due to the @ sign in the field name, the field is escaped with back quotes in the inner select statement.

In [None]:
cell0 = 100.0 # meters
cell1 = cell0 * 0.5

rows = spark.sql(f"""
select q*{cell0}+{cell1} x,r*{cell0}+{cell1} y,least(count(1),1000) as pop
from
(select cast(`Shape@X`/{cell0} as long) q,cast(`Shape@Y`/{cell0} as long) r from v0)
group by q,r
""")\
    .collect()

### Create an ephemeral feature class in the TOC of the collected bins.

In [None]:
ws = "memory"
nm = "Bins"

fc = os.path.join(ws,nm)

arcpy.management.Delete(fc)

sp_ref = arcpy.SpatialReference(3857)
arcpy.management.CreateFeatureclass(ws,nm,"POINT",spatial_reference=sp_ref)
arcpy.management.AddField(fc, "POP", "LONG")

with arcpy.da.InsertCursor(fc, ["SHAPE@X","SHAPE@Y", "POP"]) as cursor:
    for row in rows:
        cursor.insertRow(row)

### Stop the spark instance.

In [None]:
spark_stop()