# Deep Learning Toolkit for Splunk - Spark MLLib FP Growth Frequent Pattern Mining

This notebook contains a barebone example workflow how to work on custom containerized code that seamlessly interfaces with the Deep Learning Toolkit for Splunk.

Note: By default every time you save this notebook the cells are exported into a python module which is then invoked by Splunk MLTK commands like <code> | fit ... | apply ... | summary </code>. Please read the Model Development Guide in the Deep Learning Toolkit app for more information.

## Stage 0 - import libraries
At stage 0 we define all imports necessary to run our subsequent code depending on various libraries.

In [2]:
# this definition exposes all python module imports that should be available in all subsequent commands
import sys
import json
import pandas as pd
import numpy as np
from random import random
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
# ...
# global constants
MODEL_DIRECTORY = "/srv/app/model/data/"

In [3]:
# THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes
print("numpy version: " + np.__version__)
print("pandas version: " + pd.__version__)
SparkSession

numpy version: 1.18.4
pandas version: 1.0.3


pyspark.sql.session.SparkSession

## Stage 1 - get a data sample from Splunk
In Splunk run a search to pipe a dataset into your notebook environment. Note: mode=stage is used in the | fit command to do this.

| inputlookup supermarket.csv <br>
| stats values(product_id) as basket by customer_id<br>
| fit MLTKContainer mode=stage algo=spark_fp_growth min_support=0.1 min_confidence=0.0 min_items=2 limit=100 basket from customer_id into app:frequent_items <br>


After you run this search your data set sample is available as a csv inside the container to develop your model. The name is taken from the into keyword ("barebone_model" in the example above) or set to "default" if no into keyword is present. This step is intended to work with a subset of your data to create your custom model.

In [4]:
# this cell is not executed from MLTK and should only be used for staging data into the notebook environment
def stage(name):
    with open("data/"+name+".csv", 'r') as f:
        df = pd.read_csv(f)
    with open("data/"+name+".json", 'r') as f:
        param = json.load(f)
    return df, param

In [5]:
# THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes
df, param = stage("frequent_items")
print(param)
df

{'options': {'params': {'mode': 'stage', 'algo': 'spark_fp_growth', 'min_support': '0.6', 'min_confidence': '0.1', 'min_items': '2', 'max_items': '2', 'limit': '100'}, 'args': ['basket', 'customer_id'], 'target_variable': ['basket'], 'feature_variables': ['customer_id'], 'model_name': 'frequent_items', 'algo_name': 'MLTKContainer', 'mlspl_limits': {'disabled': False, 'handle_new_cat': 'default', 'max_distinct_cat_values': '1000', 'max_distinct_cat_values_for_classifiers': '1000', 'max_distinct_cat_values_for_scoring': '1000', 'max_fit_time': '6000', 'max_inputs': '100000000', 'max_memory_usage_mb': '4000', 'max_model_size_mb': '150', 'max_score_time': '6000', 'streaming_apply': '0', 'use_sampling': '1'}, 'kfold_cv': None}, 'feature_variables': ['customer_id'], 'target_variables': ['basket']}


Unnamed: 0,customer_id,basket
0,u1,p112 p1174 p1249 p1251 p1252 p1572 p1595 p1602...
1,u10,p1231 p1238 p1245 p1246 p126 p1270 p1415 p1418...
2,u100,p112 p1972 p218 p2756 p2854 p3049 p307 p3114 p...
3,u102,p1749 p1944 p2649 p2778 p3065 p3122 p3557 p505...
4,u103,p1002 p1111 p1113 p1121 p1125 p1127 p1142 p115...
...,...,...
211,u91,p1 p1012 p1115 p1121 p1143 p1144 p1160 p1211 p...
212,u92,p1114 p112 p1726 p1733 p2097 p2831 p2878 p2897...
213,u93,p1036 p1116 p112 p1150 p1192 p1215 p150 p1595 ...
214,u96,p1000 p1267 p147 p1674 p1705 p1706 p1729 p176 ...


In [6]:
print(param['options']['model_name'])

frequent_items


In [7]:
print(param['feature_variables'])
print(param['target_variables'][0])

['customer_id']
basket


## Stage 2 - create and initialize a model

In [8]:
# initialize your model
# available inputs: data and parameters
# returns the model object which will be used as a reference to call fit, apply and summary subsequently
def init(df,param):
    model = {}
    appName = "fp_growth_spark_model"
    if 'options' in param:
        if 'model_name' in param['options']: 
            appName = param['options']['model_name']
    sparkConf = SparkConf().setAll([('spark.executor.memory', '1g'), ('spark.executor.cores', '1'), ('spark.cores.max', '4'), ('spark.driver.memory','4g'), ('spark.driver.maxResultSize','4g')])
    spark = SparkSession.builder.appName(appName).config(conf=sparkConf).getOrCreate()
    model['spark'] = spark
    return model

In [9]:
# THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes
model = init(df,param)
print(model)

{'spark': <pyspark.sql.session.SparkSession object at 0x7f6c94070450>}


In [10]:
model['spark']

In [11]:
str(model['spark'].sparkContext.getConf().getAll())

"[('spark.app.name', 'frequent_items'), ('spark.driver.memory', '4g'), ('spark.app.id', 'local-1592389830151'), ('spark.executor.id', 'driver'), ('spark.driver.port', '46473'), ('spark.cores.max', '4'), ('spark.executor.memory', '1g'), ('spark.rdd.compress', 'True'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.executor.cores', '1'), ('spark.submit.deployMode', 'client'), ('spark.ui.showConsoleProgress', 'true'), ('spark.driver.host', '5e5ceddf3bb3'), ('spark.driver.maxResultSize', '4g')]"

## Stage 3 - fit the model

In [12]:
# train your model
# returns a fit info json object and may modify the model object
def fit(model,df,param):
    spark = model['spark']
    sc = spark.sparkContext
    feature_variables = param['feature_variables']
    target_variable = param['target_variables'][0]

    min_support = 0.10
    min_confidence = 0.10
    if 'options' in param:
        if 'params' in param['options']:
            if 'min_support' in param['options']['params']:
                min_support = float(param['options']['params']['min_support'])
            if 'min_confidence' in param['options']['params']:
                min_confidence = float(param['options']['params']['min_confidence'])

    df['_items'] = df[target_variable].map(lambda l: l.split(' '))
    sdf = spark.createDataFrame(df)

    model['fpgrowth'] = FPGrowth(itemsCol='_items', minSupport=min_support, minConfidence=min_confidence)
    model['model'] = model['fpgrowth'].fit(sdf)

    info = {"message": "model trained"}
    return info

In [13]:
# THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes
print(fit(model,df,param))

{'message': 'model trained'}


In [None]:
model['model'].freqItemsets.show()

In [None]:
model['model'].associationRules.show()

## Stage 4 - apply the model

In [14]:
# apply your model
# returns the calculated results
def apply(model,df,param):
    limit = 10
    min_items = 2
    max_items = 2
    if 'options' in param:
        if 'params' in param['options']:
            if 'limit' in param['options']['params']:
                limit = int(param['options']['params']['limit'])
            if 'min_items' in param['options']['params']:
                min_items = int(param['options']['params']['min_items'])
            if 'max_items' in param['options']['params']:
                max_items = int(param['options']['params']['max_items'])

    spark = model['spark']
    freqItems = model['model'].freqItemsets
    freqItems.createOrReplaceTempView("frequentItems")
    results = spark.sql("select items, freq from frequentItems where size(items) between "+str(min_items)+" and "+str(max_items)+" order by freq desc limit "+str(limit))
    result = results.toPandas()
    return result

In [15]:
# THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes
print(apply(model,df,param))

             items  freq
0    [p2098, p112]   187
1    [p2096, p112]   183
2    [p1669, p112]   182
3    [p2358, p112]   179
4    [p3041, p112]   177
..             ...   ...
95  [p2785, p3041]   153
96   [p710, p2785]   153
97  [p3122, p2358]   153
98   [p710, p1930]   153
99  [p2033, p1234]   153

[100 rows x 2 columns]


## Stage 5 - save the model

In [None]:
# save model to name in expected convention "<algo_name>_<model_name>"
def save(model,name):
    #import shutil
    #from pathlib import Path
    #if Path(MODEL_DIRECTORY + name).is_dir():
    #    shutil.rmtree(MODEL_DIRECTORY + name)
    #model['model'].save(model['spark'].sparkContext, MODEL_DIRECTORY + name)
    return model

In [None]:
save(model,'spark_fp_growth_test')

## Stage 6 - load the model

In [None]:
# load model from name in expected convention "<algo_name>_<model_name>"
def load(name):
    model = init({},{})
    #model['model'] = GradientBoostedTreesModel.load(model['spark'].sparkContext, MODEL_DIRECTORY + name)
    return model

In [None]:
model = load('spark_fp_growth_test')
model

## Stage 7 - provide a summary of the model

In [None]:
# return a model summary
def summary(model=None):
    returns = {"spark": "no model"}
    if model:
        returns = {"spark_info": str(model['spark'].sparkContext.getConf().getAll()) }
    return returns

In [None]:
summary(model)

## End of Stages
All subsequent cells are not tagged and can be used for further freeform code