Skip to content

Commit

Permalink
Feature/examples (#1)
Browse files Browse the repository at this point in the history
* add save/load functionality for SparkDataset (including handling of square brackets in column names) 

* add test for save/load SparkDataset

* add feature-processing-only and simple optuna examples

---------

Co-authored-by: fonhorst <fonhorst@alipoov.nb@gmail.com>
  • Loading branch information
fonhorst and fonhorst committed May 19, 2023
1 parent 3dff63a commit bd23c6e
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 27 deletions.
35 changes: 35 additions & 0 deletions examples/spark/feature-processing-only.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from examples.spark.examples_utils import get_spark_session, get_dataset_attrs
from sparklightautoml.pipelines.features.lgb_pipeline import SparkLGBAdvancedPipeline, SparkLGBSimpleFeatures
from sparklightautoml.pipelines.features.linear_pipeline import SparkLinearFeatures
from sparklightautoml.reader.base import SparkToSparkReader
from sparklightautoml.tasks.base import SparkTask


feature_pipelines = {
"linear": SparkLinearFeatures(),
"lgb_simple": SparkLGBSimpleFeatures(),
"lgb_adv": SparkLGBAdvancedPipeline()
}


if __name__ == "__main__":
spark = get_spark_session()

# settings and data
cv = 5
feat_pipe = "lgb_adv" # linear, lgb_simple or lgb_adv
dataset_name = "lama_test_dataset"
path, task_type, roles, dtype = get_dataset_attrs(dataset_name)
df = spark.read.csv(path, header=True)

task = SparkTask(name=task_type)
reader = SparkToSparkReader(task=task, cv=cv, advanced_roles=False)
feature_pipe = feature_pipelines.get(feat_pipe, None)

assert feature_pipe, f"Unsupported feat pipe {feat_pipe}"

ds = reader.fit_read(train_data=df, roles=roles)
ds = feature_pipe.fit_transform(ds)

# save processed data
ds.save(f"/tmp/{dataset_name}__{feat_pipe}__features.dataset")
97 changes: 97 additions & 0 deletions examples/spark/optuna_simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import logging
import pickle
from logging import config
from typing import Tuple, Union, Callable

import optuna
from lightautoml.ml_algo.tuning.optuna import OptunaTuner, TunableAlgo
from lightautoml.ml_algo.utils import tune_and_fit_predict
from lightautoml.validation.base import TrainValidIterator
from pyspark.sql import functions as sf

from examples.spark.examples_utils import get_spark_session
from sparklightautoml.dataset.base import SparkDataset
from sparklightautoml.dataset.persistence import PlainCachePersistenceManager
from sparklightautoml.ml_algo.boost_lgbm import SparkBoostLGBM
from sparklightautoml.utils import logging_config, VERBOSE_LOGGING_FORMAT
from sparklightautoml.validation.iterators import SparkHoldoutIterator, SparkFoldsIterator

logging.config.dictConfig(logging_config(level=logging.DEBUG, log_filename='/tmp/slama.log'))
logging.basicConfig(level=logging.DEBUG, format=VERBOSE_LOGGING_FORMAT)
logger = logging.getLogger(__name__)


class ProgressReportingOptunaTuner(OptunaTuner):
def _get_objective(self, ml_algo: TunableAlgo, estimated_n_trials: int, train_valid_iterator: TrainValidIterator) \
-> Callable[[optuna.trial.Trial], Union[float, int]]:
obj_func = super()._get_objective(ml_algo, estimated_n_trials, train_valid_iterator)

def func(*args, **kwargs):
obj_score = obj_func(*args, **kwargs)
logger.info(f"Objective score: {obj_score}")
return obj_score

return func


def train_test_split(dataset: SparkDataset, test_slice_or_fold_num: Union[float, int] = 0.2) \
-> Tuple[SparkDataset, SparkDataset]:

if isinstance(test_slice_or_fold_num, float):
assert 0 <= test_slice_or_fold_num <= 1
train, test = dataset.data.randomSplit([1 - test_slice_or_fold_num, test_slice_or_fold_num])
else:
train = dataset.data.where(sf.col(dataset.folds_column) != test_slice_or_fold_num)
test = dataset.data.where(sf.col(dataset.folds_column) == test_slice_or_fold_num)

train_dataset, test_dataset = dataset.empty(), dataset.empty()
train_dataset.set_data(train, dataset.features, roles=dataset.roles)
test_dataset.set_data(test, dataset.features, roles=dataset.roles)

return train_dataset, test_dataset


if __name__ == "__main__":
spark = get_spark_session()

feat_pipe = "lgb_adv" # linear, lgb_simple or lgb_adv
dataset_name = "lama_test_dataset"

# load and prepare data
ds = SparkDataset.load(
path=f"/tmp/{dataset_name}__{feat_pipe}__features.dataset",
persistence_manager=PlainCachePersistenceManager()
)
train_ds, test_ds = train_test_split(ds, test_slice_or_fold_num=4)

# create main entities
iterator = SparkFoldsIterator(train_ds).convert_to_holdout_iterator()
tuner = ProgressReportingOptunaTuner(n_trials=101, timeout=3000)
ml_algo = SparkBoostLGBM()
score = ds.task.get_dataset_metric()

# fit and predict
model, oof_preds = tune_and_fit_predict(ml_algo, tuner, iterator)
test_preds = ml_algo.predict(test_ds)

# reporting trials
# TODO: reporting to mlflow
# TODO: quality curves on different datasets
with open("/tmp/trials.pickle", "wb") as f:
pickle.dump(tuner.study.trials, f)

# estimate oof and test metrics
oof_metric_value = score(oof_preds.data.select(
SparkDataset.ID_COLUMN,
sf.col(ds.target_column).alias('target'),
sf.col(ml_algo.prediction_feature).alias('prediction')
))

test_metric_value = score(test_preds.data.select(
SparkDataset.ID_COLUMN,
sf.col(ds.target_column).alias('target'),
sf.col(ml_algo.prediction_feature).alias('prediction')
))

print(f"OOF metric: {oof_metric_value}")
print(f"Test metric: {oof_metric_value}")
81 changes: 58 additions & 23 deletions sparklightautoml/dataset/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import functools
import logging
import os
import pickle
import uuid
import warnings
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -29,7 +31,7 @@

from sparklightautoml import VALIDATION_COLUMN
from sparklightautoml.dataset.roles import NumericVectorOrArrayRole
from sparklightautoml.utils import SparkDataFrame
from sparklightautoml.utils import SparkDataFrame, create_directory

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -76,6 +78,28 @@ def _set_col(data: Any, k: int, val: Any):

ID_COLUMN = "_id"

@classmethod
def load(cls,
path: str,
file_format: str = 'parquet',
options: Optional[Dict[str, Any]] = None,
persistence_manager: Optional['PersistenceManager'] = None) -> 'SparkDataset':
metadata_file_path = os.path.join(path, f"metadata.{file_format}")
file_path = os.path.join(path, f"data.{file_format}")
options = options or dict()
spark = SparkSession.getActiveSession()

# reading metadata
metadata_df = spark.read.format(file_format).options(**options).load(metadata_file_path)
metadata = pickle.loads(metadata_df.select('metadata').first().asDict()['metadata'])

# reading data
data_df = spark.read.format(file_format).options(**options).load(file_path)
name_fixed_cols = (sf.col(c).alias(c.replace('[', '(').replace(']', ')')) for c in data_df.columns)
data_df = data_df.select(*name_fixed_cols)

return SparkDataset(data=data_df, persistence_manager=persistence_manager, **metadata)

# TODO: SLAMA - implement filling dependencies
@classmethod
def concatenate(
Expand Down Expand Up @@ -133,30 +157,11 @@ def __init__(self,
bucketized: bool = False,
dependencies: Optional[List[Dependency]] = None,
name: Optional[str] = None,
target: Optional[str] = None,
folds: Optional[str] = None,
**kwargs: Any):

if "target" in kwargs:
assert isinstance(kwargs["target"], str), "Target should be a str representing column name"
self._target_column: str = kwargs["target"]
else:
self._target_column = None

self._folds_column = None
if "folds" in kwargs and kwargs["folds"] is not None:
assert isinstance(kwargs["folds"], str), "Folds should be a str representing column name"
self._folds_column: str = kwargs["folds"]
else:
self._folds_column = None

self._validate_dataframe(data)

self._data = None

# columns that can be transferred intact across all transformations
# in the pipeline
base_service_columns = {self.ID_COLUMN, self.target_column, self.folds_column, VALIDATION_COLUMN}
self._service_columns: Set[str] = base_service_columns

roles = roles if roles else dict()

# currently only target is supported
Expand All @@ -166,15 +171,20 @@ def __init__(self,
if roles[f].name == r:
roles[f] = DropRole()

self._data = None
self._bucketized = bucketized
self._roles = None

self._uid = str(uuid.uuid4())
self._persistence_manager = persistence_manager
self._dependencies = dependencies
self._frozen = False
self._name = name
self._is_persisted = False
self._target_column = target
self._folds_column = folds
# columns that can be transferred intact across all transformations
# in the pipeline
self._service_columns: Set[str] = {self.ID_COLUMN, target, folds, VALIDATION_COLUMN}

super().__init__(data, list(roles.keys()), roles, task, **kwargs)

Expand Down Expand Up @@ -490,6 +500,31 @@ def freeze(self) -> 'SparkDataset':
ds.set_data(self.data, self.features, self.roles, frozen=True)
return ds

def save(self, path: str, save_mode: str = 'error', file_format: str = 'parquet', options: Optional[Dict[str, Any]] = None):
metadata_file_path = os.path.join(path, f"metadata.{file_format}")
file_path = os.path.join(path, f"data.{file_format}")
options = options or dict()

# prepare metadata of the dataset
metadata = {
"name": self.name,
"roles": self.roles,
"task": self.task,
"target": self.target_column,
"folds": self.folds_column,
}
metadata_str = pickle.dumps(metadata)
metadata_df = self.spark_session.createDataFrame([{"metadata": metadata_str}])

# create directory that will store data and metadata as separate files of dataframes
create_directory(path, spark=self.spark_session, exists_ok=(save_mode in ['overwrite', 'append']))

# writing dataframes
metadata_df.write.format(file_format).mode(save_mode).options(**options).save(metadata_file_path)
# fix name of columns: parquet cannot have columns with '(' or ')' in the name
name_fixed_cols = (sf.col(c).alias(c.replace('(', '[').replace(')', ']')) for c in self.data.columns)
self.data.select(*name_fixed_cols).write.format(file_format).mode(save_mode).options(**options).save(file_path)

def to_pandas(self) -> PandasDataset:
data, target_data, folds_data, roles = self._materialize_to_pandas()

Expand Down
21 changes: 17 additions & 4 deletions sparklightautoml/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@
from typing import Optional, Tuple, Dict, List, cast

import pyspark
from py4j.java_gateway import java_import
from pyspark import RDD
from pyspark.ml import Transformer, Estimator
from pyspark.ml.common import inherit_doc
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.param.shared import HasInputCols, HasOutputCols
from pyspark.ml.pipeline import PipelineModel, PipelineSharedReadWrite, PipelineModelReader
from pyspark.ml.util import DefaultParamsWritable, DefaultParamsReadable, MLReadable, MLWritable, MLWriter, \
from pyspark.ml.pipeline import PipelineModel, PipelineSharedReadWrite
from pyspark.ml.util import DefaultParamsWritable, DefaultParamsReadable, MLWriter, \
DefaultParamsWriter, MLReader, DefaultParamsReader
from pyspark.sql import SparkSession

from sparklightautoml.mlwriters import CommonPickleMLWritable, CommonPickleMLReadable

VERBOSE_LOGGING_FORMAT = "%(asctime)s %(levelname)s %(module)s %(filename)s:%(lineno)d %(message)s"

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -444,3 +443,17 @@ def JobGroup(group_id: str, description: str, spark: SparkSession):
sc.setJobGroup(group_id, description)
yield
sc._jsc.clearJobGroup()


# noinspection PyProtectedMember,PyUnresolvedReferences
def create_directory(path: str, spark: SparkSession, exists_ok: bool = False):
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
java_import(spark._jvm, 'org.apache.hadoop.fs.FileSystem')

jpath = spark._jvm.Path(path)
fs = spark._jvm.FileSystem.get(spark._jsc.hadoopConfiguration())

if not fs.exists(jpath):
fs.mkdirs(jpath)
elif not exists_ok:
raise FileExistsError(f"The path already exists: {path}")
80 changes: 80 additions & 0 deletions tests/spark/unit/test_spark_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os
import shutil
from typing import Optional

import numpy as np
from lightautoml.dataset.roles import NumericRole
from lightautoml.tasks import Task
from pandas.testing import assert_frame_equal
from pyspark.sql import SparkSession

from sparklightautoml.dataset.base import SparkDataset
from sparklightautoml.tasks.base import SparkTask
from . import spark as spark_sess

spark = spark_sess


def compare_tasks(task_a: Optional[Task], task_b: Optional[Task]):
assert (task_a and task_b) or (not task_a and not task_b)
assert task_a.name == task_b.name
assert task_a.metric_name == task_b.metric_name
assert task_a.greater_is_better == task_b.greater_is_better


def compare_dfs(dataset_a: SparkDataset, dataset_b: SparkDataset):
assert dataset_a.data.schema == dataset_b.data.schema

# checking data
df_a = dataset_a.data.orderBy(SparkDataset.ID_COLUMN).toPandas()
df_b = dataset_b.data.orderBy(SparkDataset.ID_COLUMN).toPandas()
assert_frame_equal(df_a, df_b)


def test_spark_dataset_save_load(spark: SparkSession):
path = "/tmp/test_slama_ds.dataset"

# cleanup
if os.path.exists(path):
shutil.rmtree(path)

# creating test data
df = spark.createDataFrame([{
SparkDataset.ID_COLUMN: i,
"a": i + 1,
"b": i * 10 + 1,
"this_is_target": 0,
"this_is_fold": 0,
"scaler__fillnamed__fillinf__logodds__oof__inter__(CODE_GENDER__EMERGENCYSTATE_MODE)": 12.0
} for i in range(10)])

ds = SparkDataset(
data=df,
task=SparkTask("reg"),
target="this_is_target",
folds="this_is_fold",
roles={
"a": NumericRole(dtype=np.int32),
"b": NumericRole(dtype=np.int32),
"scaler__fillnamed__fillinf__logodds__oof__inter__(CODE_GENDER__EMERGENCYSTATE_MODE)": NumericRole()
}
)

ds.save(path=path)
loaded_ds = SparkDataset.load(path=path)

# checking metadata
assert loaded_ds.uid
assert loaded_ds.uid != ds.uid
assert loaded_ds.name == ds.name
assert loaded_ds.target_column == ds.target_column
assert loaded_ds.folds_column == ds.folds_column
assert loaded_ds.service_columns == ds.service_columns
assert loaded_ds.features == ds.features
assert loaded_ds.roles == ds.roles
compare_tasks(loaded_ds.task, ds.task)
compare_dfs(loaded_ds, ds)

# cleanup
if os.path.exists(path):
shutil.rmtree(path)

0 comments on commit bd23c6e

Please sign in to comment.