In [5]:
from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster(name="daskmlcluster",
                      image='ghcr.io/dask/dask:2023.7.0-py3.10',
                      n_workers=2,
                      resources={"requests": {"memory": "0.5Gi"}, "limits": {"memory": "1.5Gi"}},
                      env={
                       'EXTRA_PIP_PACKAGES':"dask-sql==2023.8.0 dask-ml==2023.3.24 scikit-learn==1.3.0"
                           }
                     )
cluster

Output()

0,1
Dashboard: http://localhost:52460/status,Workers: 2
Total threads: 24,Total memory: 3.00 GiB

0,1
Comm: tcp://10.244.0.18:8786,Workers: 2
Dashboard: http://10.244.0.18:8787/status,Total threads: 24
Started: 1 hour ago,Total memory: 3.00 GiB

0,1
Comm: tcp://10.244.0.19:44399,Total threads: 12
Dashboard: http://10.244.0.19:8788/status,Memory: 1.50 GiB
Nanny: tcp://10.244.0.19:45881,
Local directory: /tmp/dask-scratch-space/worker-5dev4ko0,Local directory: /tmp/dask-scratch-space/worker-5dev4ko0

0,1
Comm: tcp://10.244.0.20:39599,Total threads: 12
Dashboard: http://10.244.0.20:8788/status,Memory: 1.50 GiB
Nanny: tcp://10.244.0.20:39815,
Local directory: /tmp/dask-scratch-space/worker-at3xikij,Local directory: /tmp/dask-scratch-space/worker-at3xikij


In [4]:
from dask.distributed import Client
# Connect Dask to the cluster
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_kubernetes.KubeCluster
Dashboard: http://localhost:64687/status,

0,1
Dashboard: http://localhost:64687/status,Workers: 2
Total threads: 24,Total memory: 3.00 GiB

0,1
Comm: tcp://10.244.0.18:8786,Workers: 2
Dashboard: http://10.244.0.18:8787/status,Total threads: 24
Started: 31 minutes ago,Total memory: 3.00 GiB

0,1
Comm: tcp://10.244.0.19:44399,Total threads: 12
Dashboard: http://10.244.0.19:8788/status,Memory: 1.50 GiB
Nanny: tcp://10.244.0.19:45881,
Local directory: /tmp/dask-scratch-space/worker-5dev4ko0,Local directory: /tmp/dask-scratch-space/worker-5dev4ko0

0,1
Comm: tcp://10.244.0.20:39599,Total threads: 12
Dashboard: http://10.244.0.20:8788/status,Memory: 1.50 GiB
Nanny: tcp://10.244.0.20:39815,
Local directory: /tmp/dask-scratch-space/worker-at3xikij,Local directory: /tmp/dask-scratch-space/worker-at3xikij


In [31]:
from dask_sql import Context
c = Context()  # Python equivalent to a SQL database

In [32]:
c.sql("""CREATE OR REPLACE TABLE iris WITH (
    location = 'https://datahub.io/machine-learning/iris/r/iris.csv',
    persist = True)"""
)



In [33]:
c.sql("""SHOW TABLES FROM root""").compute()

Unnamed: 0,Table
0,iris


In [34]:
c.sql("""SELECT * From iris""").compute()

Unnamed: 0,sepallength,sepalwidth,petallength,petalwidth,class
0,5.1,3.5,1.4,0.2,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa
3,4.6,3.1,1.5,0.2,Iris-setosa
4,5.0,3.6,1.4,0.2,Iris-setosa
...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,Iris-virginica
146,6.3,2.5,5.0,1.9,Iris-virginica
147,6.5,3.0,5.2,2.0,Iris-virginica
148,6.2,3.4,5.4,2.3,Iris-virginica


In [35]:
c.sql("SELECT DISTINCT class FROM iris").compute()

Unnamed: 0,class
0,Iris-setosa
1,Iris-versicolor
2,Iris-virginica


In [37]:
c.sql("""
    CREATE MODEL sql_model WITH (
        model_class = 'sklearn.ensemble.GradientBoostingClassifier',
        wrap_predict = True,
        target_column = 'target'
    ) AS (
        SELECT sepallength, sepalwidth, petallength, petalwidth, 
            CASE
                WHEN class = 'Iris-setosa' THEN 0
                WHEN class = 'Iris-versicolor' THEN 1
                WHEN class = 'Iris-virginica' THEN 2
            END AS target
        FROM iris
        LIMIT 100
    )
""")

In [38]:
c.sql("""SHOW MODELS""").compute()

Unnamed: 0,Models
0,sql_model


In [41]:
c.sql("""
    SELECT * FROM PREDICT (
        MODEL sql_model,
        SELECT sepallength, sepalwidth, petallength, petalwidth, 
        CASE
            WHEN class = 'Iris-setosa' THEN 0
            WHEN class = 'Iris-versicolor' THEN 1
            WHEN class = 'Iris-virginica' THEN 2
        END AS actual 
        FROM iris
        OFFSET 50
    )
""").compute()

Unnamed: 0,sepallength,sepalwidth,petallength,petalwidth,actual,target
50,7.0,3.2,4.7,1.4,1.0,1.0
51,6.4,3.2,4.5,1.5,1.0,1.0
52,6.9,3.1,4.9,1.5,1.0,1.0
53,5.5,2.3,4.0,1.3,1.0,1.0
54,6.5,2.8,4.6,1.5,1.0,1.0
...,...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,2.0,1.0
146,6.3,2.5,5.0,1.9,2.0,1.0
147,6.5,3.0,5.2,2.0,2.0,1.0
148,6.2,3.4,5.4,2.3,2.0,1.0


In [42]:
query = """
CREATE EXPERIMENT my_exp WITH (
   model_class = 'sklearn.ensemble.GradientBoostingClassifier',
   experiment_class = 'sklearn.model_selection.GridSearchCV',
   tune_parameters = (n_estimators = ARRAY [16, 32, 2],
                   learning_rate = ARRAY [0.1,0.01,0.001],
                  max_depth = ARRAY [3,4,5,10]
                  ),
   target_column = 'target'
   ) AS (
      SELECT sepallength, sepalwidth, petallength, petalwidth,
            CASE
                WHEN class = 'Iris-setosa' THEN 0
                WHEN class = 'Iris-versicolor' THEN 1
                WHEN class = 'Iris-virginica' THEN 2
            END AS target
        FROM iris
        LIMIT 100
   )
"""
result1 = c.sql(query)

INFO:dask_sql.physical.rel.custom.create_experiment:{'n_estimators': [16, 32, 2], 'learning_rate': [0.1, 0.01, 0.001], 'max_depth': [3, 4, 5, 10]}
INFO:dask_sql.physical.rel.custom.create_experiment:{}


In [43]:
sorted_r = result1.sort_values(by='mean_test_score', ascending=False)
sorted_r.compute().head()

Unnamed: 0,sepallength,sepalwidth,petallength,petalwidth,target,param_max_depth,param_n_estimators,params,split0_test_score,split1_test_score,split2_test_score,split3_test_score,split4_test_score,mean_test_score,std_test_score,rank_test_score,model_class
0,0.079544,0.020265,0.107238,0.028677,0.1,3,16,"{'learning_rate': 0.1, 'max_depth': 3, 'n_esti...",1.0,1.0,1.0,1.0,1.0,1.0,0.0,1,sklearn.ensemble.GradientBoostingClassifier
1,0.076631,0.023495,0.111938,0.009267,0.1,3,32,"{'learning_rate': 0.1, 'max_depth': 3, 'n_esti...",1.0,1.0,1.0,1.0,1.0,1.0,0.0,1,sklearn.ensemble.GradientBoostingClassifier
20,0.060919,0.010588,0.129812,0.004805,0.01,5,2,"{'learning_rate': 0.01, 'max_depth': 5, 'n_est...",1.0,1.0,1.0,1.0,1.0,1.0,0.0,1,sklearn.ensemble.GradientBoostingClassifier
21,0.076836,0.018038,0.09881,0.014934,0.01,10,16,"{'learning_rate': 0.01, 'max_depth': 10, 'n_es...",1.0,1.0,1.0,1.0,1.0,1.0,0.0,1,sklearn.ensemble.GradientBoostingClassifier
22,0.084052,0.016099,0.133651,0.051007,0.01,10,32,"{'learning_rate': 0.01, 'max_depth': 10, 'n_es...",1.0,1.0,1.0,1.0,1.0,1.0,0.0,1,sklearn.ensemble.GradientBoostingClassifier


In [44]:
c.sql("""SHOW MODELS""").compute()

Unnamed: 0,Models
0,sql_model
1,my_exp


In [64]:
c.sql("""
    SELECT * FROM PREDICT (
        MODEL my_exp,
        SELECT sepallength, sepalwidth, petallength, petalwidth,
        CASE
            WHEN class = 'Iris-setosa' THEN 0
            WHEN class = 'Iris-versicolor' THEN 1
            WHEN class = 'Iris-virginica' THEN 2
        END AS actual
        FROM iris
        OFFSET 50
    )
""").compute()



Unnamed: 0,sepallength,sepalwidth,petallength,petalwidth,actual,target
50,7.0,3.2,4.7,1.4,1.0,1.0
51,6.4,3.2,4.5,1.5,1.0,1.0
52,6.9,3.1,4.9,1.5,1.0,1.0
53,5.5,2.3,4.0,1.3,1.0,1.0
54,6.5,2.8,4.6,1.5,1.0,1.0
...,...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,2.0,1.0
146,6.3,2.5,5.0,1.9,2.0,1.0
147,6.5,3.0,5.2,2.0,2.0,1.0
148,6.2,3.4,5.4,2.3,2.0,1.0


In [49]:
iris_shuffled = c.sql("""
    SELECT *
    FROM iris
    shuffle
""").compute()

In [54]:
iris_shuffled.tail(50)

Unnamed: 0,sepallength,sepalwidth,petallength,petalwidth,class
100,6.3,3.3,6.0,2.5,Iris-virginica
101,5.8,2.7,5.1,1.9,Iris-virginica
102,7.1,3.0,5.9,2.1,Iris-virginica
103,6.3,2.9,5.6,1.8,Iris-virginica
104,6.5,3.0,5.8,2.2,Iris-virginica
105,7.6,3.0,6.6,2.1,Iris-virginica
106,4.9,2.5,4.5,1.7,Iris-virginica
107,7.3,2.9,6.3,1.8,Iris-virginica
108,6.7,2.5,5.8,1.8,Iris-virginica
109,7.2,3.6,6.1,2.5,Iris-virginica


In [51]:
c.sql("""SHOW TABLES FROM root""").compute()

Unnamed: 0,Table
0,iris
1,f7dbc522-de90-4757-bf43-efa9c9bc3bc0
2,330acd7f-7bda-4450-97d7-85a98679bf20


In [12]:
client.close()
cluster.close()