Skip to content

Commit

Permalink
Remove rdd (#188)
Browse files Browse the repository at this point in the history
* remove rdd reference in column to list

* remove rdd references
  • Loading branch information
MrPowers committed Feb 7, 2024
1 parent 3d18dc3 commit 423ea4a
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 3 deletions.
4 changes: 2 additions & 2 deletions quinn/dataframe_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def column_to_list(df: DataFrame, col_name: str) -> list[Any]:

# sparksession from df is not available in older versions of pyspark
if sys.modules["pyspark"].__version__ < "3.3.0":
return df.select(col_name).rdd.flatMap(lambda x: x).collect()
return [row[0] for row in df.select(col_name).collect()]

spark_config = df.sparkSession.sparkContext.getConf().getAll()

Expand All @@ -40,7 +40,7 @@ def column_to_list(df: DataFrame, col_name: str) -> list[Any]:
if pyarrow_valid and pandas_valid:
return df.select(col_name).toPandas()[col_name].tolist()

return df.select(col_name).rdd.flatMap(lambda x: x).collect()
return [row[0] for row in df.select(col_name).collect()]


def two_columns_to_dictionary(
Expand Down
2 changes: 1 addition & 1 deletion quinn/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def fix_nullability(field: StructField, result_dict: dict) -> None:
spark = SparkSession.getActiveSession()
spark = spark if spark is not None else SparkSession.builder.getOrCreate()

return spark.createDataFrame(output.rdd, output.schema)
return output


def flatten_struct(df: DataFrame, col_name: str, separator: str = ":") -> DataFrame:
Expand Down

0 comments on commit 423ea4a

Please sign in to comment.