## Quickstart to run the UDFs with local pyspark

This is an example to run Arrow Flight in a local pyspark environment. 

In [57]:
! pip install docker pyspark numpy pandas scikit-learn==1.0.2

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.1.2[0m[39;49m -> [0m[32;49m22.2.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3.8 install --upgrade pip[0m


### First build the Docker image (from repo root dir)

There is a build script in the root directory to build the images.  You may need to adapt it based on your needs.

For the registry (MYREG:TAG), to deploy this on your Spark cluster, use a container repo that the Spark nodes have access to.

In [59]:
! MYREG="myregname" ;TAG="tag";  ./build.sh arrowflight rf10.skl $MYREG:$TAG arrowflight/Dockerfile


Using Dockerfile:  arrowflight/Dockerfile
arrowflight
[1A[1B[0G[?25l[+] Building 0.0s (0/1)                                                         
 => [internal] load build definition from Dockerfile                       0.0s
[?25h[1A[1A[0G[?25l[+] Building 0.2s (10/14)                                                       
[34m => [internal] load build definition from Dockerfile                       0.0s
[0m[34m => => transferring dockerfile: 38B                                        0.0s
[0m[34m => [internal] load .dockerignore                                          0.0s
[0m[34m => => transferring context: 2B                                            0.0s
[0m[34m => [internal] load metadata for docker.io/library/python:3.8-slim-buster  0.0s
[0m[34m => [build 1/5] FROM docker.io/library/python:3.8-slim-buster              0.0s
[0m[34m => [internal] load build context                                          0.0s
[0m[34m => => transferring context: 382.4

[?25h[1A[1A[1A[1A[1A[1A[1A[1A[1A[1A[1A[1A[1A[1A[1A[1A[1A[1A[1A[1A[0G[?25l[+] Building 0.9s (14/15)                                                       
[34m => [internal] load build definition from Dockerfile                       0.0s
[0m[34m => => transferring dockerfile: 38B                                        0.0s
[0m[34m => [internal] load .dockerignore                                          0.0s
[0m[34m => => transferring context: 2B                                            0.0s
[0m[34m => [internal] load metadata for docker.io/library/python:3.8-slim-buster  0.0s
[0m[34m => [build 1/5] FROM docker.io/library/python:3.8-slim-buster              0.0s
[0m[34m => [internal] load build context                                          0.0s
[0m[34m => => transferring context: 382.48kB                                      0.0s
[0m[34m => CACHED [build 2/5] RUN python3 -m venv /venv                           0.0s
[0m[34m => CACHED [build

#### Start the container locally

This will start a single container locally on your current machine.  

To deploy on your Spark cluster, run this command on all of the nodes.  See the `prespind.sh` script in this folder.  (Ex: `./prespind.sh arrowflight/prespin_arrow`)

In [60]:
import docker
client = docker.from_env()
STARTPORT = 8815
c = client.containers.run('myregname:tag', remove=True,  detach=True,  ports={str(STARTPORT)+'/tcp': 8815})

### Get the data

The main dataset is provided here: https://cagevldbpub.blob.core.windows.net/data.  Inside, it contains  folders stored in snappy.parq for (10k,100k,1m,10m,100m) rows for alphabet, which we used in all our experiments.  **It is public, but you cannot acccess the link directly via web.** It is easiest to get the full blob with az copy  `./azcopy copy --recursive https://cagevldbpub.blob.core.windows.net/data .`   

(azcopy available here: https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-v10)


#### Here, we'll show a quick example with the alphabet10k rows table

In [61]:
! wget https://cagevldbpub.blob.core.windows.net/data/tablesraw/alphabet10k/part-00000-c6cb6e59-f9e4-4013-a243-2ab0b717e29f-c000.snappy.parquet

--2022-10-13 17:35:23--  https://cagevldbpub.blob.core.windows.net/data/tablesraw/alphabet10k/part-00000-c6cb6e59-f9e4-4013-a243-2ab0b717e29f-c000.snappy.parquet
Resolving cagevldbpub.blob.core.windows.net (cagevldbpub.blob.core.windows.net)... 20.60.153.129
Connecting to cagevldbpub.blob.core.windows.net (cagevldbpub.blob.core.windows.net)|20.60.153.129|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 306207 (299K) [application/octet-stream]
Saving to: ‘part-00000-c6cb6e59-f9e4-4013-a243-2ab0b717e29f-c000.snappy.parquet.1’


2022-10-13 17:35:23 (101 MB/s) - ‘part-00000-c6cb6e59-f9e4-4013-a243-2ab0b717e29f-c000.snappy.parquet.1’ saved [306207/306207]



### Start local pyspark

In [62]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

##### load alphabet dataset into spark

In [63]:
parquetFile = spark.read.parquet("part-00000-c6cb6e59-f9e4-4013-a243-2ab0b717e29f-c000.snappy.parquet")

In [64]:
parquetFile.createOrReplaceTempView("alphabet10k")

In [65]:
spark.sql("show tables").show()

+--------+-----------+-----------+
|database|  tableName|isTemporary|
+--------+-----------+-----------+
|        |alphabet10k|       true|
+--------+-----------+-----------+



### On Arrow Flight of figure 4:

In [66]:
from pyspark.sql.functions import pandas_udf, col, expr
from typing import Iterator
from pyspark.sql import SQLContext
sql_context = SQLContext(spark)
import time

###### No container  (fig4/run_nocontainer.py:51)

In [69]:
# UDF definition   
@pandas_udf("long")
def udf_skl_predict(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:

    path  = "models/skl/"
    # This is code "provided by the user"
    print()
    def run(model, data):
        import pickle as pkl

        loaded_model = pkl.load(open(model, 'rb'))
        print(loaded_model)
        return loaded_model.predict(data)
    
    # Here is the actual UDF iterator
    for args in iterator:
        data_unmangled = pd.concat([feature for feature in args], axis=1)
        predictions = run(path+"rf10.skl", data_unmangled.values)  
        yield pd.Series(np.array(predictions))
sql_context.udf.register("PREDICT", udf_skl_predict)

22/10/13 17:35:46 WARN SimpleFunctionRegistry: The function predict replaced a previously registered function.


<function __main__.udf_skl_predict(iterator: Iterator[pandas.core.series.Series]) -> Iterator[pandas.core.series.Series]>

###### Now time no container

In [70]:
%%timeit -n 5 -r 1
start = time.time()
query = sql_context.sql("SELECT PREDICT(A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,1,2) as prediction FROM alphabet10k")

query.write.parquet(str(time.time())+'sqlout.txt')
elapse = time.time()-start
print(elapse)


https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations
RandomForestClassifier(max_depth=10, n_estimators=10)
                                                                                

0.8194265365600586



https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations
RandomForestClassifier(max_depth=10, n_estimators=10)


0.2438809871673584



https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations
RandomForestClassifier(max_depth=10, n_estimators=10)


0.23055672645568848



https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations
RandomForestClassifier(max_depth=10, n_estimators=10)


0.2583658695220947
0.23887157440185547
358 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 5 loops each)



https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations
RandomForestClassifier(max_depth=10, n_estimators=10)


### Notice the errors about "Trying to unpickle estimator DecisionTreeClassifier from version 0.23.1 when using version 1.0.2."
This is one of the ways containers can help...when prestored models have older depedencies, containers can accomodate that

###### Arrow Flight (fig4/run_arrow.py:45)


In [71]:
# UDF definition
@pandas_udf("long")
def udf_skl_predict(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:

    import pyarrow as pa
    import pyarrow.flight as fl

    from pyspark import TaskContext
    ctx = TaskContext()
    partid = str(ctx.partitionId())
    port = 8815 + (int(partid) % 3)

    path = 'scoreit'
    client = fl.connect("grpc://127.0.0.1:" + str(port))


    # Here is the actual UDF iterator
    for args in iterator:
        data_unmangled = pd.concat([feature for feature in args], axis=1)

        table = pa.Table.from_pandas(data_unmangled)

        # write the data to an array on the server
        writer, _ = client.do_put(fl.FlightDescriptor.for_path(path), table.schema)
        writer.write_table(table, table.num_rows)
        writer.close()

        # Do the action to makeit SCORE the array on the server
        response = client.do_action(pa.flight.Action('score', pa.allocate_buffer(0)))
        for _ in response:  #must consume iterator, i think this is what actually triggers the action
            pass

        response = client.do_get(fl.Ticket(b'scored')).read_pandas()
        yield response.squeeze()
sql_context.udf.register("PREDICT_PYARROW", udf_skl_predict)

22/10/13 17:36:03 WARN SimpleFunctionRegistry: The function predict_pyarrow replaced a previously registered function.


<function __main__.udf_skl_predict(iterator: Iterator[pandas.core.series.Series]) -> Iterator[pandas.core.series.Series]>

###### Now time arrow flight local with (PREDICT_PYARROW)

In [72]:
%%timeit -n 5 -r 1
start = time.time()
query = sql_context.sql("SELECT PREDICT_PYARROW(A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,1,2) as prediction FROM alphabet10k")

query.write.parquet(str(time.time())+'sqlout.txt')
elapse = time.time()-start
print(elapse)

0.28724074363708496
0.29674243927001953
0.2611050605773926
0.27724218368530273
0.28267979621887207
281 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 5 loops each)


### This is the basic setup for loading and running the data.  
This small example just shows for 10k rows, but the rest of data is available at `https://cagevldbpub.blob.core.windows.net/data`   (tables are alpabet10k, alphabet (which is 100k but poorly named), alphabet1m, alpahbet10m, alphabet100m)

To run this on your spark cluster, deploy the containers on all the remote nodes, and use the UDFs stored in the scripts for the figures.

#### teardown container

In [25]:

c.stop()