Skip to content

Commit

Permalink
add Dataset class for accessing data files of examples
Browse files Browse the repository at this point in the history
  • Loading branch information
fonhorst committed May 22, 2023
1 parent ac9ae98 commit be51f35
Show file tree
Hide file tree
Showing 16 changed files with 165 additions and 169 deletions.
14 changes: 7 additions & 7 deletions examples/spark/automl_feature_scores.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging.config

from examples_utils import get_spark_session, prepare_test_and_train, get_dataset_attrs
from examples_utils import get_spark_session, prepare_test_and_train, get_dataset
from sparklightautoml.automl.presets.tabular_presets import SparkTabularAutoML
from sparklightautoml.tasks.base import SparkTask
from sparklightautoml.utils import log_exec_timer, logging_config, VERBOSE_LOGGING_FORMAT

logging.config.dictConfig(logging_config(level=logging.INFO, log_filename='/tmp/slama.log'))
logging.config.dictConfig(logging_config(log_filename='/tmp/slama.log'))
logging.basicConfig(level=logging.DEBUG, format=VERBOSE_LOGGING_FORMAT)
logger = logging.getLogger(__name__)

Expand All @@ -20,11 +20,11 @@
cv = 2
use_algos = [["linear_l2"]]
dataset_name = "used_cars_dataset"
path, task_type, roles, dtype = get_dataset_attrs(dataset_name)
dataset = get_dataset(dataset_name)

with log_exec_timer("spark-lama training") as train_timer:
task = SparkTask(task_type)
train_data, test_data = prepare_test_and_train(spark, path, seed)
task = SparkTask(dataset.task_type)
train_data, test_data = prepare_test_and_train(dataset, seed)

test_data_dropped = test_data

Expand All @@ -40,7 +40,7 @@

oof_predictions = automl.fit_predict(
train_data,
roles=roles
roles=dataset.roles
).persist()

logger.info("Predicting on out of fold")
Expand All @@ -50,7 +50,7 @@

logger.info(f"score for out-of-fold predictions: {metric_value}")

feature_scores = automl.get_feature_scores(calc_method="fast", data=test_data_dropped, silent=False)
feature_scores = automl.get_feature_scores(data=test_data_dropped, silent=False)

print(feature_scores)

Expand Down
14 changes: 7 additions & 7 deletions examples/spark/automl_multiclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pyspark.sql import functions as sf

from examples_utils import get_persistence_manager, BUCKET_NUMS
from examples_utils import get_spark_session, prepare_test_and_train, get_dataset_attrs
from examples_utils import get_spark_session, prepare_test_and_train, get_dataset
from sparklightautoml.automl.presets.tabular_presets import SparkTabularAutoML
from sparklightautoml.dataset.base import SparkDataset
from sparklightautoml.tasks.base import SparkTask
Expand All @@ -29,12 +29,12 @@
cv = 2
use_algos = [["lgb"]]
dataset_name = "ipums_97"
path, task_type, roles, dtype = get_dataset_attrs(dataset_name)
dataset = get_dataset(dataset_name)

train_data, test_data = prepare_test_and_train(spark, path, seed)
train_data, test_data = prepare_test_and_train(dataset, seed)

with log_exec_timer("spark-lama training") as train_timer:
task = SparkTask(task_type)
task = SparkTask(dataset.task_type)

automl = SparkTabularAutoML(
spark=spark,
Expand All @@ -46,7 +46,7 @@
tuning_params={'fit_on_holdout': True, 'max_tuning_iter': 10, 'max_tuning_time': 3600}
)

preds = automl.fit_predict(train_data, roles, persistence_manager=persistence_manager).persist()
preds = automl.fit_predict(train_data, dataset.roles, persistence_manager=persistence_manager).persist()

logger.info("Predicting on out of fold")

Expand All @@ -69,7 +69,7 @@
score = task.get_dataset_metric()
expected_metric_value = score(te_pred.select(
SparkDataset.ID_COLUMN,
sf.col(roles['target']).alias('target'),
sf.col(dataset.roles['target']).alias('target'),
sf.col(pred_column).alias('prediction')
))

Expand All @@ -88,7 +88,7 @@
score = task.get_dataset_metric()
actual_metric_value = score(te_pred.select(
SparkDataset.ID_COLUMN,
sf.col(roles['target']).alias('target'),
sf.col(dataset.roles['target']).alias('target'),
sf.col(pred_column).alias('prediction')
))
logger.info(f"score for test predictions via loaded pipeline: {actual_metric_value}")
Expand Down
133 changes: 67 additions & 66 deletions examples/spark/examples_utils.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,35 @@
import os
import inspect
import pickle
from dataclasses import dataclass
from enum import Enum
from typing import Tuple, Optional, cast, Any, Dict
import os
from dataclasses import dataclass, field
from typing import Tuple, Optional, Any, Dict

from pyspark import SparkContext
from pyspark.sql import SparkSession

from sparklightautoml.dataset.base import SparkDataset
from sparklightautoml.utils import SparkDataFrame
from sparklightautoml.dataset import persistence

from sparklightautoml.utils import SparkDataFrame

BUCKET_NUMS = 16
PERSISTENCE_MANAGER_ENV_VAR = "PERSISTENCE_MANAGER"
BASE_DATASETS_PATH = "file:///opt/spark_data/"


@dataclass(frozen=True)
class Dataset:
path: str
task_type: str
roles: Dict[str, Any]
dtype: Dict[str, str] = field(default_factory=dict)
file_format: str = 'csv'
file_format_options: Dict[str, Any] = field(default_factory=lambda: {"header": True, "escape": "\""})

def load(self) -> SparkDataFrame:
spark = SparkSession.getActiveSession()
return spark.read.format(self.file_format).options(**self.file_format_options).load(self.path)


def ds_path(rel_path: str) -> str:
return os.path.join(BASE_DATASETS_PATH, rel_path)


used_cars_params = {
Expand All @@ -37,82 +52,68 @@
}

DATASETS = {
"used_cars_dataset": {
"path": "file:///opt/spark_data/small_used_cars_data.csv",
**used_cars_params
},

"used_cars_dataset_1x": {
"path": "file:///opt/spark_data/derivative_datasets/1x_dataset.csv",
"used_cars_dataset": Dataset(
path=ds_path("small_used_cars_data.csv"),
**used_cars_params
},

"used_cars_dataset_4x": {
"path": "file:///opt/spark_data/derivative_datasets/4x_dataset.csv",
),
"used_cars_dataset_1x": Dataset(
path=ds_path("derivative_datasets/1x_dataset.csv"),
**used_cars_params
},

),
"used_cars_dataset_4x": Dataset(
path=ds_path("derivative_datasets/4x_dataset.csv"),
**used_cars_params
),
"lama_test_dataset": Dataset(
path=ds_path("sampled_app_train.csv"),
task_type="binary",
roles={"target": "TARGET", "drop": ["SK_ID_CURR"]}
),
# https://www.openml.org/d/4549
"buzz_dataset": {
"path": "file:///opt/spark_data/Buzzinsocialmedia_Twitter_25k.csv",
"task_type": "reg",
"roles": {"target": "Annotation"},
},

"lama_test_dataset": {
"path": "file:///opt/spark_data/sampled_app_train.csv",
"task_type": "binary",
"roles": {"target": "TARGET", "drop": ["SK_ID_CURR"]},
},

"buzz_dataset": Dataset(
path=ds_path("Buzzinsocialmedia_Twitter_25k.csv"),
task_type="binary",
roles={"target": "TARGET", "drop": ["SK_ID_CURR"]}
),
# https://www.openml.org/d/734
"ailerons_dataset": {
"path": "file:///opt/spark_data/ailerons.csv",
"task_type": "binary",
"roles": {"target": "binaryClass"},
},

"ailerons_dataset": Dataset(
path=ds_path("ailerons.csv"),
task_type="binary",
roles={"target": "binaryClass"}
),
# https://www.openml.org/d/382
"ipums_97": {
"path": "file:///opt/spark_data/ipums_97.csv",
"task_type": "multiclass",
"roles": {"target": "movedin"},
},

"company_bankruptcy_dataset": {
"path": "file:///opt/spark_data/company_bankruptcy_prediction_data.csv",
"task_type": "binary",
"roles": {"target": "Bankrupt?"},
}
"ipums_97": Dataset(
path=ds_path("ipums_97.csv"),
task_type="multiclass",
roles={"target": "movedin"}
),

"company_bankruptcy_dataset": Dataset(
path=ds_path("company_bankruptcy_prediction_data.csv"),
task_type="binary",
roles={"target": "Bankrupt?"}
)
}


def get_dataset_attrs(name: str):
return (
DATASETS[name]['path'],
DATASETS[name]['task_type'],
DATASETS[name]['roles'],
# to assure that LAMA correctly interprets certain columns as categorical
DATASETS[name].get('dtype', dict()),
)
def get_dataset(name: str) -> Dataset:
assert name in DATASETS, f"Unknown dataset: {name}. Known datasets: {list(DATASETS.keys())}"
return DATASETS[name]


def prepare_test_and_train(
spark: SparkSession,
path: str,
dataset: Dataset,
seed: int,
test_size: float = 0.2,
file_format: str = 'csv',
file_format_options: Optional[Dict[str, Any]] = None
test_size: float = 0.2
) -> Tuple[SparkDataFrame, SparkDataFrame]:
assert 0 <= test_size <= 1

spark = SparkSession.getActiveSession()

execs = int(spark.conf.get('spark.executor.instances', '1'))
cores = int(spark.conf.get('spark.executor.cores', '8'))

file_format_options = file_format_options \
or ({"header": True, "escape": "\""} if file_format == 'csv' else dict())
data = spark.read.format(file_format).options(**file_format_options).load(path)
data = dataset.load()

data = data.repartition(execs * cores).cache()
data.write.mode('overwrite').format('noop').save()
Expand Down
10 changes: 5 additions & 5 deletions examples/spark/feature-processing-only.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from examples.spark.examples_utils import get_spark_session, get_dataset_attrs
from examples.spark.examples_utils import get_spark_session, get_dataset
from sparklightautoml.pipelines.features.lgb_pipeline import SparkLGBAdvancedPipeline, SparkLGBSimpleFeatures
from sparklightautoml.pipelines.features.linear_pipeline import SparkLinearFeatures
from sparklightautoml.reader.base import SparkToSparkReader
Expand All @@ -19,16 +19,16 @@
cv = 5
feat_pipe = "lgb_adv" # linear, lgb_simple or lgb_adv
dataset_name = "lama_test_dataset"
path, task_type, roles, dtype = get_dataset_attrs(dataset_name)
df = spark.read.csv(path, header=True)
dataset = get_dataset(dataset_name)
df = spark.read.csv(dataset.path, header=True)

task = SparkTask(name=task_type)
task = SparkTask(name=dataset.task_type)
reader = SparkToSparkReader(task=task, cv=cv, advanced_roles=False)
feature_pipe = feature_pipelines.get(feat_pipe, None)

assert feature_pipe, f"Unsupported feat pipe {feat_pipe}"

ds = reader.fit_read(train_data=df, roles=roles)
ds = reader.fit_read(train_data=df, roles=dataset.roles)
ds = feature_pipe.fit_transform(ds)

# save processed data
Expand Down
16 changes: 8 additions & 8 deletions examples/spark/lama-tabular-preset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import uuid

import pandas as pd
from lightautoml.automl.presets.tabular_presets import TabularAutoML
from lightautoml.tasks import Task
from sklearn.model_selection import train_test_split

from examples_utils import get_dataset_attrs
from lightautoml.automl.presets.tabular_presets import TabularAutoML
from examples_utils import get_dataset
from sparklightautoml.utils import log_exec_timer, logging_config, VERBOSE_LOGGING_FORMAT
from lightautoml.tasks import Task

logging.config.dictConfig(logging_config(level=logging.INFO, log_filename='/tmp/slama.log'))
logging.basicConfig(level=logging.DEBUG, format=VERBOSE_LOGGING_FORMAT)
Expand All @@ -27,13 +27,13 @@ def main(dataset_name: str, seed: int):
# 3. use_algos = [["lgb", "linear_l2"], ["lgb"]]
use_algos = [["lgb", "linear_l2"], ["lgb"]]

path, task_type, roles, dtype = get_dataset_attrs(dataset_name)
dataset = get_dataset(dataset_name)

with log_exec_timer("LAMA") as train_timer:
data = pd.read_csv(path, dtype=dtype)
data = pd.read_csv(dataset.path, dtype=dataset.dtype)
train_data, test_data = train_test_split(data, test_size=0.2, random_state=seed)

task = Task(task_type)
task = Task(dataset.task_type)

num_threads = 8
automl = TabularAutoML(
Expand All @@ -49,7 +49,7 @@ def main(dataset_name: str, seed: int):

oof_predictions = automl.fit_predict(
train_data,
roles=roles
roles=dataset.roles
)

logger.info("Predicting on out of fold")
Expand All @@ -61,7 +61,7 @@ def main(dataset_name: str, seed: int):

with log_exec_timer() as predict_timer:
te_pred = automl.predict(test_data)
te_pred.target = test_data[roles['target']]
te_pred.target = test_data[dataset.roles['target']]

score = task.get_dataset_metric()
test_metric_value = score(te_pred)
Expand Down
14 changes: 7 additions & 7 deletions examples/spark/lgbm-parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pyspark.sql import functions as sf
from synapse.ml.lightgbm import LightGBMClassifier, LightGBMRegressor

from examples.spark.examples_utils import get_spark_session, get_dataset_attrs, prepare_test_and_train
from examples.spark.examples_utils import get_spark_session, get_dataset, prepare_test_and_train
from sparklightautoml.dataset.base import SparkDataset
from sparklightautoml.pipelines.features.lgb_pipeline import SparkLGBSimpleFeatures
from sparklightautoml.reader.base import SparkToSparkReader
Expand Down Expand Up @@ -71,17 +71,17 @@ def prepare_dataset(self, force=True):
f"Removing existing files because force is set to True")
shutil.rmtree(self.base_dataset_path)

path, task_type, roles, dtype = get_dataset_attrs(self.dataset_name)
dataset = get_dataset(self.dataset_name)

train_df, test_df = prepare_test_and_train(self.spark, path, self.seed)
train_df, test_df = prepare_test_and_train(dataset, self.seed)

task = SparkTask(task_type)
task = SparkTask(dataset.task_type)

sreader = SparkToSparkReader(task=task, cv=self.cv, advanced_roles=False)
spark_features_pipeline = SparkLGBSimpleFeatures()

# prepare train
train_sdataset = sreader.fit_read(train_df, roles=roles)
train_sdataset = sreader.fit_read(train_df, roles=dataset.roles)
train_sdataset = spark_features_pipeline.fit_transform(train_sdataset)

# prepare test
Expand All @@ -95,8 +95,8 @@ def prepare_dataset(self, force=True):

metadata = {
"roles": train_sdataset.roles,
"task_type": task_type,
"target": roles["target"]
"task_type": dataset.task_type,
"target": dataset.roles["target"]
}

with open(self.metadata_path, "wb") as f:
Expand Down

0 comments on commit be51f35

Please sign in to comment.