Skip to content

Commit

Permalink
Merge pull request #89 from tubular/imp/dataframe-breakpoint-for-testing
Browse files Browse the repository at this point in the history
IMP add a dataframe breakpoint method for testing
  • Loading branch information
srstrickland committed May 20, 2022
2 parents 89d2f1f + 125a2ff commit 015dbc8
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions sparkly/testing.py
Expand Up @@ -32,6 +32,7 @@
import warnings

from pyspark.context import SparkContext
from pyspark.sql import DataFrame
from pyspark.sql import types as T

from sparkly import SparklySession
Expand Down Expand Up @@ -97,6 +98,34 @@ def _ensure_gateway_is_down():
os.environ.pop('PYSPARK_GATEWAY_SECRET', None)


def dataframe_breakpoint(self, break_function=None):
"""Injected method for DataFrame class during testing
User may supply their own break_function which takes a single parameter, df
Example:
output_df = (
input_df
.withColumn(...)
.where(...)
.groupBy(...)
.agg(...)
.breakpoint() # will bring up the pdb debugger
.select(...)
.breakpoint(lambda df: df.show(10, False)) # will print 10 rows to console
...
)
"""

def pdb_breakpoint(df):
import pdb
pdb.set_trace()

break_function = break_function or pdb_breakpoint
break_function(self)
return self


class SparklyTest(TestCase):
"""Base test for spark scrip tests.
Expand Down Expand Up @@ -158,6 +187,9 @@ def setUpClass(cls):

cls._init_session()

# define a `df.breakpoint()` for testing:
DataFrame.breakpoint = dataframe_breakpoint

for fixture in cls.class_fixtures:
fixture.setup_data()

Expand All @@ -178,6 +210,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
cls.spark.stop()
delattr(DataFrame, 'breakpoint')
_ensure_gateway_is_down()
super(SparklyTest, cls).tearDownClass()

Expand Down

0 comments on commit 015dbc8

Please sign in to comment.