In [1]:
%load_ext autoreload
%autoreload 2

# Create mllib pipeline
from dspipes import Pipelines, MllibPipelines, RumblePipelines

### Write rumble query scripts

In [None]:
MODELS = ['logistic','RandomForest', 'LinearSVC', 'NB']
ALL_PIPELINES = [0, 1, 3, 5]
DATASETS = ['Criteo','YFCC']
for d in DATASETS:
    for i in ALL_PIPELINES:
        for m in MODELS:
            print(d, i, m)
            program = RumblePipelines.create_rumble_program(f"pipe_{i}", clf_mode=f"{m}", dataset=f"{d}")
            f = open(f"./rumbleML_scripts/query_{i}_{m}_{d}.rumble", "w")
            f.write(program)
            f.close()

### Write rumble ablation query scripts

In [5]:
MODELS = ['logistic','RandomForest', 'LinearSVC', 'NB']
ALL_PIPELINES = [0, 1, 3, 5]
DATASETS = ['Criteo','YFCC']
for k in [1000, 10000, 100000]:
    for d in DATASETS:
        for i in ALL_PIPELINES:
            for m in MODELS:
                print(d, i, m)
                program = RumblePipelines.create_rumble_program(f"pipe_{i}", clf_mode=f"{m}", dataset=f"{d}", k=f"{k}")
                f = open(f"./rumbleML_scripts_k/query_{i}_{m}_{d}_{k}.rumble", "w")
                f.write(program)
                f.close()

Criteo 0 logistic
oh
Criteo 0 RandomForest
oh
Criteo 0 LinearSVC
oh
Criteo 0 NB
oh
Criteo 1 logistic
oh
Criteo 1 RandomForest
oh
Criteo 1 LinearSVC
oh
Criteo 1 NB
oh
Criteo 3 logistic
oh
Criteo 3 RandomForest
oh
Criteo 3 LinearSVC
oh
Criteo 3 NB
oh
Criteo 5 logistic
oh
Criteo 5 RandomForest
oh
Criteo 5 LinearSVC
oh
Criteo 5 NB
oh
YFCC 0 logistic
YFCC 0 RandomForest
YFCC 0 LinearSVC
YFCC 0 NB
YFCC 1 logistic
YFCC 1 RandomForest
YFCC 1 LinearSVC
YFCC 1 NB
YFCC 3 logistic
YFCC 3 RandomForest
YFCC 3 LinearSVC
YFCC 3 NB
YFCC 5 logistic
YFCC 5 RandomForest
YFCC 5 LinearSVC
YFCC 5 NB
Criteo 0 logistic
oh
Criteo 0 RandomForest
oh
Criteo 0 LinearSVC
oh
Criteo 0 NB
oh
Criteo 1 logistic
oh
Criteo 1 RandomForest
oh
Criteo 1 LinearSVC
oh
Criteo 1 NB
oh
Criteo 3 logistic
oh
Criteo 3 RandomForest
oh
Criteo 3 LinearSVC
oh
Criteo 3 NB
oh
Criteo 5 logistic
oh
Criteo 5 RandomForest
oh
Criteo 5 LinearSVC
oh
Criteo 5 NB
oh
YFCC 0 logistic
YFCC 0 RandomForest
YFCC 0 LinearSVC
YFCC 0 NB
YFCC 1 logistic
YFCC 

In [2]:
RumblePipelines.create_rumble_program(f"pipe_0", clf_mode="RandomForest", dataset=f"Criteo")


Hi!


'let $training-data := parquet-file("s3://rumbleml-data/output/output.parquet/")\nlet $test-data := parquet-file("s3://rumbleml-data/criteo.kaggle2014.test.parquet")\nlet $vector-assembler_2 := get-transformer("VectorAssembler", {"inputCols" : ["features"], "outputCol" : "transformedFeatures"})\nlet $randomforest := get-estimator("RandomForestClassifier", {"featuresCol": "transformedFeatures", "numTrees": 5})\nlet $pipeline := get-estimator("Pipeline", {"stages": [$vector-assembler_2, $randomforest]})\nlet $pip := $pipeline($training-data, {})\nlet $prediction := $pip($test-data, {})\nlet $total := 6042135\nreturn count($prediction[$$.label eq $$.prediction]) div $total'

### Create all experiment bash script

In [10]:
MODELS = ['logistic', 'RandomForest', 'LinearSVC', 'NB']
ALL_PIPELINES = [0, 1, 3, 5]
DATASETS = ['Criteo', 'YFCC']
f = open(f"./run_all_experiments.sh", "a")
for d in DATASETS:
    for i in ALL_PIPELINES:
        for m in MODELS:
            res = f"echo 'Experiment {i} {m} {d}'\ntime spark-submit --name 'Experiment {i} {m} {d} Rumble' --conf spark.dynamicAllocation.enabled=false --num-executors 4 --executor-cores 2 --executor-memory 19g s3://rumbleml-data/rumble_experiments/rumbledb-1.16.0.jar --show-error-info yes --query-path 's3://rumbleml-data/rumble_experiments/query_{i}_{m}_{d}.rumble'\ntime spark-submit --name 'Experiment {i} {m} {d} Spark' --conf spark.dynamicAllocation.enabled=false --num-executors 4 --executor-cores 2 --executor-memory 19g run_spark.py -m {m} -p {i} -d {d}\n"
            f.write(res)
f.close() 


In [6]:
ALL_K = [1000, 10000]
ALL_PIPELINES = [0, 1]
DATASETS = ['Criteo', 'YFCC']
f = open(f"./run_all_experiments_k.sh", "a")
for k in ALL_K:
    for d in DATASETS:
        if d == 'Criteo':
            MODELS = ['logistic', 'LinearSVC', 'NB']
        else:
            MODELS = ['logistic', 'RandomForest', 'LinearSVC']
        for i in ALL_PIPELINES:
            for m in MODELS:
                echo = f"echo 'Experiment {i} {m} {d} {k}'\n"
                cmd_all = f"time spark-submit --name 'Experiment {i} {m} {d} {k} Rumble 1' --driver-memory 10G --conf spark.dynamicAllocation.enabled=false --num-executors 4 --executor-cores 2 --executor-memory 19g s3://rumbleml-data/rumbledb-1.16.2-jar-with-deactivatable-optimizations.jar --materialization-cap 100000 --show-error-info yes --query-path 's3://rumbleml-data/rumble_experiments/query_{i}_{m}_{d}_{k}.rumble'\n"
                cmd_no_sql = f"time spark-submit --name 'Experiment {i} {m} {d} {k} Rumble 2' --driver-memory 10G --conf spark.dynamicAllocation.enabled=false --num-executors 4 --executor-cores 2 --executor-memory 19g s3://rumbleml-data/rumbledb-1.16.2-jar-with-deactivatable-optimizations.jar --materialization-cap 100000 --native-sql-predicates no --show-error-info yes --query-path 's3://rumbleml-data/rumble_experiments/query_{i}_{m}_{d}_{k}.rumble'\n"
                cmd_no_df = f"time spark-submit --name 'Experiment {i} {m} {d} {k} Rumble 3' --driver-memory 10G --conf spark.dynamicAllocation.enabled=false --num-executors 4 --executor-cores 2 --executor-memory 19g s3://rumbleml-data/rumbledb-1.16.2-jar-with-deactivatable-optimizations.jar --materialization-cap 100000 --data-frame-execution-mode-detection no --native-sql-predicates no --show-error-info yes --query-path 's3://rumbleml-data/rumble_experiments/query_{i}_{m}_{d}_{k}.rumble'\n"
                res = f"{echo}{cmd_no_df}{cmd_no_sql}{cmd_all}"
                f.write(res)
f.close() 