Skip to content

Commit

Permalink
Changes and bug fixes to support shared clusters in DBR 14.2 (#248)
Browse files Browse the repository at this point in the history
* wip

* wip

* changes for release

* example notebook

* updates to handle shared spark session restrictions

* updates to handle shared sparkSession

* updates to handle shared sparkSession

* updates to handle shared sparkSession

* updates to handle shared sparkSession

* updates to handle shared sparkSession

* updates to handle shared sparkSession

* updates to handle shared sparkSession

* updates to handle shared sparkSession

* updates to handle shared sparkSession

* updates to handle shared sparkSession

* changes per code review

* Doc updates 032223 (#180)

* wip

* wip

* wip

* wip

* wip

* wip

* wip

wip

Doc updates 032523 (#181)

* wip

* documentation updates

Feature build ordering improvements 2 (#189)

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* improved build ordering

* improved build ordering

* improved build ordering

* reverted unnecessary changes

* reverted unnecessary changes

* udated ColumnSpecOptions description

Feature consistency fixes (#182)

* fixed use of time strings to allow both  and  etc

* fixed use of time strings to allow both 'seconds' and 'second' etc

* id column fixes

Doc updates 100522 (#119)

* fixed reference to dbx in pull_request_template

* reverted inadvertently changed file

* release 0.2.1

* doc updates

* doc updates

* updates for building docs

* updated public docs

* updated sphinx version

* updated docs

* doc updates

* removed generated docs

* removed changes to non-doc

* reverted inadvertently changed file

* release 0.2.1

* doc updates

doc updates

* tidied up makefile

* added workflow action to update tag 'preview'

* develop branch updates

* revised unit tests to use parameterized approach

* changed utils tests to pytest

* changed changelog format

* changelog changes

* changelog updates from merge

* update to change as a resultt of merge of time fixes

* updates for test speed improvements

* updated tests

* updated tests

* updated tests

* updated tests

* fixed typo

* reverted pytest changes - separate feature

* reverted pytest changes - separate feature

* reverted pytest changes - separate feature

* reverted pytest changes - separate feature

* changed partitioning to run more efficiently on github runner

* changed partitioning to run more efficiently on github runner

* changed partitioning to run more efficiently on github runner

* changed partitioning to run more efficiently on github runner

* changed partitioning to run more efficiently on github runner

* use  as query name for spark instance

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* Updates to template text generation for better performance and repeatable text generation

* Updates to template text generation for better performance and repeatable text generation

* reverted unrelated changes

* added further coverage tests and renamed option fromn 'seedColumn' to 'seedColumnName' for clarity

* added further coverage test for 'seedColumnName' property'

* additional test coverage

* updated tests for ILText generation

* updated tests for ILText generation

* merged changes from master

* change to test potential break in build process

* updated build process to explicotly use python 3.8

* added explicit python version setup to build

* changes to build actions

* reverted changes to master + build action changes

* remerged repeatable feature generation

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* changed table formatting in TemplateGenerator doc string

* changed table formatting in TemplateGenerator doc string

* updates from master

* updates to develop

* dont update coverage when pushing to develop

* Feature docs v34 (#197)

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* doc changes only

* wip

* wip

* Update generating_column_data.rst

* Update generating_json_data.rst

* wip

* new document build

* adjusted comment banner at start of each doc file

* updated build

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* reverted code comment changes

* merge Feature ordering improvements2 into develop (#198)

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* improved build ordering

* improved build ordering

* improved build ordering

* reverted unnecessary changes

* reverted unnecessary changes

* reverted inadvertent merge

* Revert "Merge branch 'develop' into feature_consistency_fixes"

This reverts commit e0efc4e, reversing
changes made to a263bd9.

Feature docs v34 v2 (#192)

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* doc changes only

* wip

* wip

* Update generating_column_data.rst

* Update generating_json_data.rst

* wip

* new document build

* adjusted comment banner at start of each doc file

* updated build

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* reverted code comment changes

* wip

* Feature v34 (#201)

* wip

* prep for release

* prep for release

* wip

* Feature generate from existing data (#163)

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* Remove calls to root logger. (#205)

* Release v34post1 (#206)

* wip

* hotfix release for logger fixes

* fix for root logger configuration

* Fix doc typos and minor clarification (#207)

* wip

* wip

* wip

* Feature issue 209 (#210)

* wip

* wip

* wip

* Release 0v34post2 (#211)

* wip

* wip

* Feature html formatting (#208)

* wip

* wip

* wip

* wip

* wip

* wip

* Build fixes (#213)

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* updated build version

* updated build version

* Feature doc change generating text (#218)

* wip

* wip

* wip

* updated running of prospector

* Feature build update (#220)

* wip

* wip

* updates to handle changes in upstream pipenv package

* updated build version

* Feature struct changes (#219)

* wip

* wip

* added support for inferred types

* added support for inferred types

* wip

* wip

* wip

* updates and fixes to unit tests

* wip

* updated pipfile due to upstream changes in pipenv

* additional tests

* wip

* wip

* wip

* wip

* removed old code

* Feature additional docs (#222)

* wip

* wip

* doc updates

* function doc changes only

* function doc changes only

* changes for release

* example notebook

* Feature readme updates - updates readme to note compatible Unity Catalog releases (#236)

* wip

* wip

* changes for release

* example notebook

* updated readme to note compatible unity catalog runtimes

* updated readme to note compatible unity catalog runtimes

* updated readme to note compatible unity catalog runtimes

* updated readme to note compatible unity catalog runtimes

* Feature add codeowners (#238)

* Test assign (#239)

* updates to handle shared spark session restrictions

* Update LICENSE (#246)

* updates to handle shared sparkSession

* changes per code review

---------

Co-authored-by: Marvin Schenkel <marvinschenkel@gmail.com>
Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
  • Loading branch information
3 people committed Feb 22, 2024
1 parent a6987b2 commit b0d23e9
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 8 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
## Change History
All notable changes to the Databricks Labs Data Generator will be documented in this file.



#### Changed
* Updated readme to include details on which versions of Databricks runtime support Unity Catalog `shared` access mode.
* Updated code to use default parallelism of 200 when using a shared Spark session
* Updated code to use Spark's SQL function `element_at` instead of array indexing due to incompatibility


### Version 0.3.5
Expand Down
2 changes: 1 addition & 1 deletion dbldatagen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .data_generator import DataGenerator
from .datagen_constants import DEFAULT_RANDOM_SEED, RANDOM_SEED_RANDOM, RANDOM_SEED_FIXED, \
RANDOM_SEED_HASH_FIELD_NAME, MIN_PYTHON_VERSION, MIN_SPARK_VERSION, \
INFER_DATATYPE
INFER_DATATYPE, SPARK_DEFAULT_PARALLELISM
from .utils import ensure, topologicalSort, mkBoundsList, coalesce_values, \
deprecated, parse_time_interval, DataGenError, split_list_matching_condition, strip_margins, \
json_value_from_path, system_time_millis
Expand Down
2 changes: 1 addition & 1 deletion dbldatagen/column_generation_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,7 @@ def _makeSingleGenerationExpression(self, index=None, use_pandas_optimizations=T
.astype(self.datatype))

if self.values is not None:
new_def = array([lit(x) for x in self.values])[new_def.astype(IntegerType())]
new_def = F.element_at(F.array([F.lit(x) for x in self.values]), new_def.astype(IntegerType()) + 1)
elif type(self.datatype) is StringType and self.expr is None:
new_def = self._applyPrefixSuffixExpressions(self.prefix, self.suffix, new_def)

Expand Down
24 changes: 21 additions & 3 deletions dbldatagen/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .datagen_constants import DEFAULT_RANDOM_SEED, RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME, \
DEFAULT_SEED_COLUMN, SPARK_RANGE_COLUMN, MIN_SPARK_VERSION, \
OPTION_RANDOM, OPTION_RANDOM_SEED, OPTION_RANDOM_SEED_METHOD, \
INFER_DATATYPE
INFER_DATATYPE, SPARK_DEFAULT_PARALLELISM
from .html_utils import HtmlUtils
from .schema_parser import SchemaParser
from .spark_singleton import SparkSingleton
Expand Down Expand Up @@ -50,6 +50,9 @@ class DataGenerator:
it is recommended that you use a different name for the seed column - for example `_id`.
This may be specified by setting the `seedColumnName` attribute to `_id`
Note: in a shared spark session, the sparkContext is not available, so the default parallelism is set to 200.
We recommend passing an explicit value for `partitions` in this case.
"""

# class vars
Expand Down Expand Up @@ -97,9 +100,8 @@ def __init__(self, sparkSession=None, name=None, randomSeedMethod=None,
# if the active Spark session is stopped, you may end up with a valid SparkSession object but the underlying
# SparkContext will be invalid
assert sparkSession is not None, "Spark session not initialized"
assert sparkSession.sparkContext is not None, "Expecting spark session to have valid sparkContext"

self.partitions = partitions if partitions is not None else sparkSession.sparkContext.defaultParallelism
self.partitions = partitions if partitions is not None else self._getDefaultSparkParallelism(sparkSession)

# check for old versions of args
if "starting_id" in kwargs:
Expand Down Expand Up @@ -239,6 +241,22 @@ def _setupLogger(self):
else:
self.logger.setLevel(logging.WARNING)

@staticmethod
def _getDefaultSparkParallelism(sparkSession):
"""Get the default parallelism for a spark session, if spark session supports getting the sparkContext
:param sparkSession: spark session
:return: default parallelism
"""
try:
if sparkSession.sparkContext is not None:
return sparkSession.sparkContext.defaultParallelism
else:
return SPARK_DEFAULT_PARALLELISM
except Exception as err: # pylint: disable=broad-exception-caught
err_msg = f"Error getting default parallelism, using default setting of {SPARK_DEFAULT_PARALLELISM}"
logging.warning(err_msg)
return SPARK_DEFAULT_PARALLELISM

@classmethod
def useSeed(cls, seedVal):
""" set seed for random number generation
Expand Down
5 changes: 4 additions & 1 deletion dbldatagen/datagen_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@
OPTION_RANDOM_SEED_METHOD = "randomSeedMethod"
OPTION_RANDOM_SEED = "randomSeed"

INFER_DATATYPE = "__infer__"
INFER_DATATYPE = "__infer__"

# default parallelism when sparkContext is not available
SPARK_DEFAULT_PARALLELISM = 200
82 changes: 82 additions & 0 deletions tests/test_shared_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import logging
from unittest.mock import Mock, PropertyMock

import pytest
import dbldatagen as dg


@pytest.fixture(scope="class")
def setupLogging():
FORMAT = '%(asctime)-15s %(message)s'
logging.basicConfig(format=FORMAT)


class TestSharedEnv:
"""Tests to simulate testing under a Unity Catalog shared environment. In a Unity Catalog shared environment with
the 14.x versions of the Databricks runtime, the sparkSession object does not support use of the sparkContext
attribute to get the default parallelism. In this case, we want to catch errors and return a default of
200 as the default number of partitions. This is the same as the default parallelism in many versions of Spark.
"""
SMALL_ROW_COUNT = 100000
COLUMN_COUNT = 10

@pytest.fixture(scope="class")
def sparkSession(self, setupLogging):
spark = dg.SparkSingleton.getLocalInstance("unit tests")
return spark

@pytest.fixture(scope="class")
def sharedSparkSession(self, setupLogging):
spark = Mock(wraps=dg.SparkSingleton.getLocalInstance("unit tests"))
del spark.sparkContext
return spark

@pytest.fixture(scope="class")
def sparkSessionNullContext(self, setupLogging):

class MockSparkSession:
def __init__(self):
self.sparkContext = None

spark = MockSparkSession()
return spark

def test_getDefaultParallelism(self, sparkSession):
"""Test that the default parallelism is returned when the sparkSession object supports use of the
sparkContext attribute to get the default parallelism.
:param sparkSession: The sparkSession object to use for the test.
"""
defaultParallelism = dg.DataGenerator._getDefaultSparkParallelism(sparkSession)
assert defaultParallelism == sparkSession.sparkContext.defaultParallelism

def test_getSharedDefaultParallelism(self, sharedSparkSession):
"""Test that the default parallelism is returned when the sparkSession object supports use of the
sparkContext attribute to get the default parallelism, but that a constant is return when the `sparkContext`
attribute is not available.
"""
defaultParallelism = dg.DataGenerator._getDefaultSparkParallelism(sharedSparkSession)
assert defaultParallelism == dg.SPARK_DEFAULT_PARALLELISM

def test_getNullContextDefaultParallelism(self, sparkSessionNullContext):
"""Test that the default parallelism is returned when the sparkSession object supports use of the
sparkContext attribute to get the default parallelism.
:param sparkSession: The sparkSession object to use for the test.
"""
defaultParallelism = dg.DataGenerator._getDefaultSparkParallelism(sparkSessionNullContext)
assert defaultParallelism == dg.SPARK_DEFAULT_PARALLELISM

def test_mocked_shared_session1(self, sharedSparkSession):
# validate that accessing the sparkContext on the shared spark session raises an exception
with pytest.raises(Exception) as excinfo:
context = sharedSparkSession.sparkContext

assert "sparkContext" in str(excinfo.value)

def test_null_context_spark_session(self, sparkSessionNullContext):
# validate that accessing the sparkContext on the shared spark session raises an exception
context = sparkSessionNullContext.sparkContext
assert context is None

0 comments on commit b0d23e9

Please sign in to comment.