## Imports

In [24]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [25]:
%aimport

Modules to reload:
all-except-skipped

Modules to skip:



In [26]:
import json
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
from dotenv import find_dotenv
from pathlib import Path
import sys

project_home = Path(find_dotenv()).parent
sys.path.append(str(project_home))

from constants import *


## Data Pipline

### Snowpark Session

In [27]:
with open(project_home / 'config/creds.json', 'r') as ff:
    conn_param=json.load(ff)

session = Session.builder.configs(conn_param).create() 


### Database, Schema & resource creation

In [28]:
session.sql(f"create database if not exists {SNOWPATROL_DB}").collect()
session.sql(f"create schema if not exists {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}").collect()
session.sql(f"create stage if not exists {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}").collect()
session.sql(f"create stage if not exists {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DEPS_STAGE}").collect()
session.sql(f"create stage if not exists {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{MODELS_STAGE}").collect()
session.sql(f"create stage if not exists {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{OBJECT_STAGE}").collect()

# This File Format uses a special option available for CSV schema inference
session.sql(f"create file format if not exists {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.ff_csv_with_header type=CSV parse_header=TRUE field_optionally_enclosed_by='\"'").collect()

[Row(status='FF_CSV_WITH_HEADER already exists, statement succeeded.')]

### Sample data upload

In [29]:
data_dir = project_home / "data" 
session.sql(f"PUT file://{project_home}/data/obfuscated/* @{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE} overwrite=True auto_compress=True").collect()
session.sql(f"PUT file://{project_home}/data/generated/whitelisted_users.csv @{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE} overwrite=True auto_compress=True").collect()
session.sql(f"PUT file://{project_home}/data/generated/sample_okta_logs.csv @{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE} overwrite=True auto_compress=True").collect()
session.sql(f"PUT file://{project_home}/data/generated/monitored_apps.csv @{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE} overwrite=True auto_compress=True").collect()



[Row(source='monitored_apps.csv', target='monitored_apps.csv.gz', source_size=45, target_size=80, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]

In [30]:

session.sql(f"PUT file://{project_home}/src/python/snowpatrol/train.py @{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DEPS_STAGE} overwrite=True auto_compress=False").collect()
session.sql(f"PUT file://{project_home}/src/python/snowpatrol/constants.py @{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DEPS_STAGE} overwrite=True auto_compress=False").collect()

[Row(source='constants.py', target='constants.py', source_size=558, target_size=560, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

### Table creation to hold source data - FP data

> Note: Uses Schema detection on CSV

In [31]:
session.sql(
    f"""
    CREATE TABLE IF NOT EXISTS {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_MONITORED_APPS}
    USING TEMPLATE (
    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
      FROM TABLE(
        INFER_SCHEMA(
          LOCATION=>'@{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}/monitored_apps.csv',
          FILE_FORMAT=>'snowpatrol.main.ff_csv_with_header',
          IGNORE_CASE => TRUE
        )
      ))
    """
).collect()

session.sql(
    f"""
    CREATE TABLE IF NOT EXISTS {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_OKTA_USERS}
    USING TEMPLATE (
    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
      FROM TABLE(
        INFER_SCHEMA(
          LOCATION=>'@{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}/sample_okta_logs.csv',
          FILE_FORMAT=>'snowpatrol.main.ff_csv_with_header',
          IGNORE_CASE => TRUE
        )
      ))
    """
).collect()

session.sql(
    f"""
    CREATE TABLE IF NOT EXISTS {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_APP_LOGS}
    USING TEMPLATE (
    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
      FROM TABLE(
        INFER_SCHEMA(
          LOCATION=>'@{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}/SnowPatrol_validation_data_app1 _.csv',
          FILE_FORMAT=>'snowpatrol.main.ff_csv_with_header',
          IGNORE_CASE => TRUE
        )
      ))
    """
).collect()

session.sql(
    f"""
    CREATE TABLE IF NOT EXISTS {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_WORK_DAYS}
    USING TEMPLATE (
    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
      FROM TABLE(
        INFER_SCHEMA(
          LOCATION=>'@{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}/SnowPatrol_Working_Days.csv',
          FILE_FORMAT=>'snowpatrol.main.ff_csv_with_header',
          IGNORE_CASE => TRUE
        )
      ))
    """
).collect()

session.sql(
    f"""
    CREATE TABLE IF NOT EXISTS {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_EMPLOYEE_METADATA}
    USING TEMPLATE (
    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
      FROM TABLE(
        INFER_SCHEMA(
          LOCATION=>'@{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}/SnowPatrol_validation_emp_details_app1.csv',
          FILE_FORMAT=>'snowpatrol.main.ff_csv_with_header',
          IGNORE_CASE => TRUE
        )
      ))
    """
).collect()

session.sql(
    f"""
    CREATE TABLE IF NOT EXISTS {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_WHITELISTED_USERS}
    USING TEMPLATE (
    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
      FROM TABLE(
        INFER_SCHEMA(
          LOCATION=>'@{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}/whitelisted_users.csv',
          FILE_FORMAT=>'snowpatrol.main.ff_csv_with_header',
          IGNORE_CASE => TRUE
        )
      ))
    """
).collect()




[Row(status='WHITELISTED_USERS already exists, statement succeeded.')]

### Copying data into respective source tables

In [32]:
session.sql(f"TRUNCATE TABLE {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_MONITORED_APPS}").collect()
session.sql(
    f""" 
    copy into {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_MONITORED_APPS}
    from @{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}
    files = ('monitored_apps.csv.gz') 
    file_format = (type = CSV skip_header=1 field_optionally_enclosed_by='"')
""").collect()

session.sql(f"TRUNCATE TABLE {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_OKTA_USERS}").collect()
session.sql(
    f""" 
    copy into {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_OKTA_USERS}
    from @{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}
    files = ('sample_okta_logs.csv.gz') 
    file_format = (type = CSV skip_header=1 field_optionally_enclosed_by='"')
""").collect()

session.sql(f"TRUNCATE TABLE {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_APP_LOGS}").collect()

session.sql(
    f""" 
    copy into {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_APP_LOGS}
    from @{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}
    files = ('SnowPatrol_validation_data_app1 _.csv.gz', 'SnowPatrol_validation_data_app2 _.csv.gz','SnowPatrol_validation_data_app3 _.csv.gz')
    file_format = (type = CSV skip_header=1 field_optionally_enclosed_by='"')
""").collect()

# Sample work days table - company schedule of whether each day is a work day or not
session.sql(f"TRUNCATE TABLE {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_WORK_DAYS}").collect()

session.sql(
    f""" 
    copy into {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_WORK_DAYS}
    from @{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}
    files = ('SnowPatrol_Working_Days.csv.gz')
    file_format = (type = CSV skip_header=1 field_optionally_enclosed_by='"')
""").collect()

session.sql(f"TRUNCATE TABLE {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_EMPLOYEE_METADATA}").collect()

session.sql(
    f""" 
    copy into {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_EMPLOYEE_METADATA}
    from @{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}
    files = ('SnowPatrol_validation_app2_emp_details.csv.gz', 'SnowPatrol_validation_emp_details_app1.csv.gz','SnowPatrol_validation_emp_details_app3.csv.gz')
    file_format = (type = CSV skip_header=1 field_optionally_enclosed_by='"')
""").collect()

session.sql(
    f""" 
    copy into {SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{TBL_WHITELISTED_USERS}
    from @{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DATA_STAGE}
    files = ('whitelisted_users.csv.gz') 
    file_format = (type = CSV skip_header=1 field_optionally_enclosed_by='"')
""").collect()

[Row(file='data/whitelisted_users.csv.gz', status='LOADED', rows_parsed=1, rows_loaded=1, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]

### Local utility functions

In [33]:
session.sql("use database SNOWPATROL_DB")

<snowflake.snowpark.dataframe.DataFrame at 0x1c99f135ba0>

In [34]:
session.udf.register_from_file(
      file_path="train.py"
      , func_name="contains_anyof"
      , name=f"{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.udf_contains_anyof"
      , is_permanent=True
      , packages = ["snowflake-snowpark-python"]
      , imports=[f"@{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DEPS_STAGE}/constants.py"]
      , stage_location=f"{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{OBJECT_STAGE}"
      , replace=True
)

SnowparkSQLException: (1304): 01ae83d1-3200-da5c-0005-d0a200013076: 090105 (22000): Cannot perform SELECT. This session does not have a current database. Call 'USE DATABASE', or use a qualified name.

### Model Training

In [None]:
session.sproc.register_from_file(
    file_path = f"@{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DEPS_STAGE}/train.py"
    ,func_name = "run_model_today"
    ,name=f"{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.run_model_today"
    ,input_types=[T.IntegerType(),T.IntegerType(),T.FloatType(),T.BooleanType(),T.BooleanType(),T.BooleanType(), T.BooleanType()]
    ,return_type=T.VariantType()
    ,is_permanent=True
    ,replace=True
    ,stage_location=f"@{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{OBJECT_STAGE}"
    ,packages=['snowflake-snowpark-python','pandas','scikit-learn==1.2.1','joblib==1.1.1', 'numpy']
    ,imports=[f"@{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.{DEPS_STAGE}/constants.py"]
)

SnowparkSQLException: (1304): 01ae83c4-3200-da5c-0005-d0a20001300e: 090105 (22000): Cannot perform SELECT. This session does not have a current database. Call 'USE DATABASE', or use a qualified name.

## License usage probablity prediction with revocation decision
Local trigger

In [None]:
# %%time
# from train import run_model_today
# results = session.call('run_model_today', 1,45,0.5,False,False,False,False)
# display(results)

In [None]:
# license_prediction_feature_set = session.table(f"{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.license_prediction_feature_set")
# license_usage_probability = license_prediction_feature_set.select(
#     F.col("session_user")
#     , F.col("title")
#     , F.col("department")
#     , F.col("division")
#     , F.col("work_days_since_last_login")
#     ,F.col("training_date").alias("model_trained_on")
#     ,F.col("cutoff_date")
#     , F.call_udf(f"{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.udf_predict_login_probability", *[F.col(c) for c in ["weighted_authentications_per_day","work_days_since_last_login", 'authentications_per_day', 'log_work_days_since_last_login']]).alias("probability_no_login")
# )
# license_revocation_decision = license_usage_probability.with_column("revoke_access", F.iff(F.col("probability_no_login") > 0.5, 1, 0))

# license_revocation_decision.write.mode("overwrite").save_as_table(f"{SNOWPATROL_DB}.{SNOWPATROL_SCHEMA}.license_revocation_decision")