## Tutorial of using async batch jobs to run distributed data processing
The tutorial if E2E example that creates new table, runs emotion classifier to understand text emotions and upload data back to the output table

In [None]:
!pip install snowflake-snowpark-python
!pip install snowflake-ml-python
!pip install pandas

In [172]:

import os
import shutil
from pathlib import Path
import random
from snowflake.snowpark import Session
import pandas as pd


In [207]:

account='' # YOUR_ACCOUNT
user='' # YOUR_USER
password='' # YOUR_PASSWORD
role='' #YOUR_ROLE

database='' #YOUR_DB
schema='' #YOUR_SCHEMA

warehouse='' #YOUR WH
image_registry='project_repo' #name of the image registry that will be created
image_name='classifier:1' # name of the image

external_access_integration='' # EAI that is used to retrieve the model. It should have access to ('huggingface.co:443', 'cdn-lfs.hf.co:443');
num_replicas = 10 # service number of replicas
job_name='classifier_v1' # job name
input_table='CLASSIFIER_DATA_INPUT' #input table
output_table='CLASSIFIER_DATA_OUTPUT' #table to write results

compute_pool_name='CLASSIFIER_TEST02' 
compute_pool_instance_family='CPU_X64_M'
compute_pool_instances=10


connection_parameters = {
    "account": account,
    "user": user,
    "password": password,
    "warehouse": warehouse,
    "database": database,
    "schema": schema,
    "role": role,
    "client_session_keep_alive": True,
}

session = Session.builder.configs(connection_parameters).create()


In [None]:

# create EAI(optionally). NOTE: You need to have accountadmin permission to run the following code!!!!.

# https://huggingface.co

sql= f"USE {database}.{schema}"
print(f"executing: {sql}")
print(session.sql(sql).collect())

sql="use role accountadmin"
print(f"executing: {sql}")
print(session.sql(sql).collect())


network_rule_sql="""
CREATE OR REPLACE NETWORK RULE hf_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('huggingface.co:443', 'cdn-lfs.hf.co:443');
"""

print(f"executing: {network_rule_sql}")
print(session.sql(network_rule_sql).collect())

eai_sql= f"""
CREATE EXTERNAL ACCESS INTEGRATION {external_access_integration}
  ALLOWED_NETWORK_RULES = (hf_rule)
   ENABLED = true;
"""

print(f"executing: {eai_sql}")
print(session.sql(eai_sql).collect())

sql=f"GRANT USAGE ON INTEGRATION {external_access_integration} TO ROLE {role};"
print(session.sql(sql).collect())

sql=f"use role {role}"
print(session.sql(sql).collect())




In [83]:

words_file = 'english_words.txt'

def get_words(filename):
    with open(words_file) as f:
        return [line.strip() for line in f]

def get_random_phrase(words, max_len=50):
    plen = random.randint(1, max_len)
    return " ".join([words[random.randint(0, len(words) - 1)] for _ in range(plen)])


def setup_table(session, full_table_name, recreate: bool = True,
                 num_rows: int = 100000, batch_size=50000):
    words = get_words(words_file)
    overwrite = recreate
    num_batches = num_rows // batch_size + 1
    print(f"Creating table: {full_table_name}, with # of rows: {num_rows}")
    row_idx = 0
    for batch_idx in range(0, num_batches):
        rows = []
        for _ in range(batch_idx * batch_size, min(num_rows, (batch_idx + 1) * batch_size)):
            rows.append({"ID": row_idx, "TEXT": get_random_phrase(words, max_len=50)})
            row_idx += 1
            
        df = pd.DataFrame(rows)
        if len(df) > 0:
            session.write_pandas(df, full_table_name, auto_create_table=True, overwrite=overwrite)
            print(f"Table: {full_table_name}, finished batch: {batch_idx}x{batch_size}")
        overwrite = False


setup_table(session, input_table, recreate=True)




Creating table: CLASSIFIER_DATA_INPUT, with # of rows: 100000
Table: CLASSIFIER_DATA_INPUT, finished batch: 0x50000
Table: CLASSIFIER_DATA_INPUT, finished batch: 1x50000


In [None]:

create_compute_pool_sql = f"""
create compute pool if not exists {compute_pool_name}
  min_nodes={compute_pool_instances}
  max_nodes={compute_pool_instances}
  instance_family={compute_pool_instance_family};
"""

print(session.sql(create_compute_pool_sql).collect())



In [None]:

import os
from snowflake.ml.jobs import submit_directory

sql_retrieval_command = f"select * from {input_table}"

job = submit_directory(
    os.path.dirname(__file__),
    compute_pool_name,
    entrypoint="main.py",
    args=[
        f'--sql="{sql_retrieval_command}"',
        f'--output-table={output_table}',
        '--batch-size=512',
    ],
    stage_name="payload_stage",
    external_access_integrations=[external_access_integration],
    query_warehouse=warehouse,
    num_instances=num_replicas,  # FIXME: Coming soon, not available yet
)

print(f"Started job {job.id}")

[Row(status='CLASSIFIER_V1 successfully dropped.')]
[Row(status="Started Snowpark Container Services Job 'CLASSIFIER_V1'.")]


In [None]:

print(session.sql(f'DESC SERVICE {job_name}').collect())

res= session.sql(f'SHOW SERVICE CONTAINERS IN SERVICE {job_name}').collect()

for row in res:
    print(f"{row['service_name']}/{row['instance_id']}/{row['container_name']} - status: {row['status']}, message: {row['message']}")

print(f"Job status: {job.status}")
for i in num_replicas:  # TODO: We should add an API for retrieving this info from the MLJob object
    print(f"Job instance {i} status: {job.get_instance_status(i)}")  # FIXME: Coming soon, not available yet

In [None]:
job.show_logs()  # or print(job.get_logs())

In [None]:

records = session.sql(f"select * from {output_table}").collect()
for record in records[0:10]:
    print(record)


# 