Skip to content

Commit

Permalink
spark connect attempt 2
Browse files Browse the repository at this point in the history
  • Loading branch information
MrPowers committed Feb 10, 2024
1 parent 64c1be2 commit 3f0e198
Show file tree
Hide file tree
Showing 8 changed files with 912 additions and 891 deletions.
1 change: 1 addition & 0 deletions benchmarks/create_benchmark_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def save_benchmark_df(

builder = (
SparkSession.builder.appName("MyApp")
.remote("sc://localhost")
.config("spark.executor.memory", "20G")
.config("spark.driver.memory", "25G")
.config("spark.sql.shuffle.partitions", "2")
Expand Down
1 change: 1 addition & 0 deletions benchmarks/visualize_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def get_benchmark_date(benchmark_path: str) -> str:
if __name__ == "__main__":
spark = (
SparkSession.builder.appName("MyApp") # type: ignore # noqa: PGH003
.remote("sc://localhost")
.config("spark.executor.memory", "10G")
.config("spark.driver.memory", "25G")
.config("spark.sql.shuffle.partitions", "2")
Expand Down
1,740 changes: 879 additions & 861 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ build-backend = "poetry.masonry.api"
###########################################################################

[tool.poetry.dependencies]
python = ">=3.7,<4.0"
python = ">=3.8,<4.0"


###########################################################################
Expand All @@ -37,14 +37,14 @@ optional = true
optional = true

[tool.poetry.group.development.dependencies]
pyspark = ">2"
pyspark = "^3.5.0"
semver = "^3"

[tool.poetry.group.testing.dependencies]
pytest = "^7"
chispa = "0.9.4"
pytest-describe = "^2"
pyspark = ">2"
pyspark = "^3.5.0"
semver = "^3"

[tool.poetry.group.linting.dependencies]
Expand Down
37 changes: 19 additions & 18 deletions quinn/extensions/column_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,55 @@
from pyspark.sql.functions import lit, trim, when


def isFalsy(self: Column) -> Column:
def isFalsy(col: Column) -> Column:
"""Returns a Column indicating whether all values in the Column are False or NULL (**falsy**).
Each element in the resulting column is True if all the elements in the
Column are either NULL or False, or False otherwise. This is accomplished by
performing a bitwise or of the ``isNull`` condition and a literal False value and
then wrapping the result in a **when** statement.
:param self: Column object
:param col: Column object
:returns: Column object
:rtype: Column
"""
return when(self.isNull() | (self == lit(False)), True).otherwise(False)
return when(col.isNull() | (col == lit(False)), True).otherwise(False)


def isTruthy(self: Column) -> Column:
def isTruthy(col: Column) -> Column:
"""Calculates a boolean expression that is the opposite of isFalsy for the given ``Column`` self.
:param Column self: The ``Column`` to calculate the opposite of isFalsy for.
:param Column col: The ``Column`` to calculate the opposite of isFalsy for.
:returns: A ``Column`` with the results of the calculation.
:rtype: Column
"""
return ~(self.isFalsy())
return ~(col.isFalsy())


def isFalse(self: Column) -> Column:
def isFalse(col: Column) -> Column:
"""Function checks if the column is equal to False and returns the column.
:param self: Column
:param col: Column
:return: Column
:rtype: Column
"""
return self == lit(False)
return col == lit(False)


def isTrue(self: Column) -> Column:
def isTrue(col: Column) -> Column:
"""Function takes a column of type Column as an argument and returns a column of type Column.
It evaluates whether each element in the column argument is equal to True, and
if so will return True, otherwise False.
:param self: Column object
:param col: Column object
:returns: Column object
:rtype: Column
"""
return self == lit(True)
return col == lit(True)


def isNullOrBlank(self: Column) -> Column:
def isNullOrBlank(col: Column) -> Column:
r"""Returns a Boolean value which expresses whether a given column is ``null`` or contains only blank characters.
:param \*\*self: The :class:`Column` to check.
Expand All @@ -63,17 +63,17 @@ def isNullOrBlank(self: Column) -> Column:
blank characters, or ``False`` otherwise.
:rtype: Column
"""
return (self.isNull()) | (trim(self) == "")
return (col.isNull()) | (trim(col) == "")


def isNotIn(self: Column, _list: list[Any]) -> Column:
def isNotIn(col: Column, _list: list[Any]) -> Column:
"""To see if a value is not in a list of values.
:param self: Column object
:param col: Column object
:_list: list[Any]
:rtype: Column
"""
return ~(self.isin(_list))
return ~(col.isin(_list))


def nullBetween(self: Column, lower: Column, upper: Column) -> Column:
Expand All @@ -96,7 +96,8 @@ def nullBetween(self: Column, lower: Column, upper: Column) -> Column:
)


Column.isFalsy = isFalsy
# Column.isFalsy = isFalsy
Column.isFalsy = getattr(Column, "isFalsy", isFalsy)
Column.isTruthy = isTruthy
Column.isFalse = isFalse
Column.isTrue = isTrue
Expand Down
2 changes: 1 addition & 1 deletion quinn/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def fix_nullability(field: StructField, result_dict: dict) -> None:
fix_nullability(field, result_dict)

if not hasattr(SparkSession, "getActiveSession"): # spark 2.4
spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
else:
spark = SparkSession.getActiveSession()
spark = spark if spark is not None else SparkSession.builder.getOrCreate()
Expand Down
14 changes: 7 additions & 7 deletions tests/extensions/test_column_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def test_is_falsy():
("expected", BooleanType(), True),
],
)
actual_df = source_df.withColumn("is_has_stuff_falsy", F.col("has_stuff").isFalsy())
actual_df = source_df.withColumn("is_has_stuff_falsy", isFalsy(F.col("has_stuff")))
chispa.assert_column_equality(actual_df, "is_has_stuff_falsy", "expected")


Expand All @@ -27,7 +27,7 @@ def test_is_truthy():
[("has_stuff", BooleanType(), True), ("expected", BooleanType(), True)],
)
actual_df = source_df.withColumn(
"is_has_stuff_truthy", F.col("has_stuff").isTruthy()
"is_has_stuff_truthy", isTruthy(F.col("has_stuff"))
)
chispa.assert_column_equality(actual_df, "is_has_stuff_truthy", "expected")

Expand All @@ -38,7 +38,7 @@ def test_is_false():
[(True, False), (False, True), (None, None)],
[("has_stuff", BooleanType(), True), ("expected", BooleanType(), True)],
)
actual_df = source_df.withColumn("is_has_stuff_false", F.col("has_stuff").isFalse())
actual_df = source_df.withColumn("is_has_stuff_false", isFalse(F.col("has_stuff")))
chispa.assert_column_equality(actual_df, "is_has_stuff_false", "expected")


Expand All @@ -48,7 +48,7 @@ def test_is_true():
[(True, True), (False, False), (None, None)],
[("has_stuff", BooleanType(), True), ("expected", BooleanType(), True)],
)
actual_df = source_df.withColumn("is_stuff_true", F.col("has_stuff").isTrue())
actual_df = source_df.withColumn("is_stuff_true", isTrue(F.col("has_stuff")))
chispa.assert_column_equality(actual_df, "is_stuff_true", "expected")


Expand All @@ -67,7 +67,7 @@ def test_is_null_or_blank():
],
)
actual_df = source_df.withColumn(
"is_blah_null_or_blank", F.col("blah").isNullOrBlank()
"is_blah_null_or_blank", isNullOrBlank(F.col("blah"))
)
chispa.assert_column_equality(actual_df, "is_blah_null_or_blank", "expected")

Expand All @@ -87,7 +87,7 @@ def test_is_not_in():
)
bobs_hobbies = ["dancing", "snowboarding"]
actual_df = source_df.withColumn(
"is_not_bobs_hobby", F.col("fun_thing").isNotIn(bobs_hobbies)
"is_not_bobs_hobby", isNotIn(F.col("fun_thing"), bobs_hobbies)
)
chispa.assert_column_equality(actual_df, "is_not_bobs_hobby", "expected")

Expand All @@ -113,6 +113,6 @@ def test_null_between():
],
)
actual_df = source_df.withColumn(
"is_between", F.col("age").nullBetween(F.col("lower_age"), F.col("upper_age"))
"is_between", nullBetween(F.col("age"), (F.col("lower_age"), F.col("upper_age")))
)
chispa.assert_column_equality(actual_df, "is_between", "expected")
2 changes: 1 addition & 1 deletion tests/spark.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("chispa").getOrCreate()
spark = SparkSession.builder.remote("sc://localhost").appName("chispa").getOrCreate()

0 comments on commit 3f0e198

Please sign in to comment.