From 1ae1700a7cf7cc3dab85991dbd5c927e91d33e3f Mon Sep 17 00:00:00 2001 From: Khalid Mammadov Date: Mon, 8 Aug 2022 17:18:11 +0100 Subject: [PATCH 1/2] Add doc string to DataFrame union and unionAll --- python/pyspark/sql/dataframe.py | 62 +++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 41ac701a332ac..7775c37a8bdbd 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2508,6 +2508,36 @@ def union(self, other: "DataFrame") -> "DataFrame": (that does deduplication of elements), use this function followed by :func:`distinct`. Also as standard in SQL, this function resolves columns by position (not by name). + + .. versionadded:: 2.0.0 + + Parameters + ---------- + other : :class:`DataFrame` + Another :class:`DataFrame` that needs to be unioned + + Returns + ------- + :class:`DataFrame` + + Examples + -------- + >>> df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"]) + >>> df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"]) + >>> df1.union(df2).show() + +----+----+----+ + |col0|col1|col2| + +----+----+----+ + | 1| 2| 3| + | 4| 5| 6| + +----+----+----+ + >>> df1.union(df1).show() + +----+----+----+ + |col0|col1|col2| + +----+----+----+ + | 1| 2| 3| + | 1| 2| 3| + +----+----+----+ """ return DataFrame(self._jdf.union(other._jdf), self.sparkSession) @@ -2520,6 +2550,38 @@ def unionAll(self, other: "DataFrame") -> "DataFrame": (that does deduplication of elements), use this function followed by :func:`distinct`. Also as standard in SQL, this function resolves columns by position (not by name). + + Note: This is identical to :func:`union` function + + .. versionadded:: 1.3.0 + + Parameters + ---------- + other : :class:`DataFrame` + Another :class:`DataFrame` that needs to be unioned + + Returns + ------- + :class:`DataFrame` + + Examples + -------- + >>> df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"]) + >>> df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"]) + >>> df1.unionAll(df2).show() + +----+----+----+ + |col0|col1|col2| + +----+----+----+ + | 1| 2| 3| + | 4| 5| 6| + +----+----+----+ + >>> df1.unionAll(df1).show() + +----+----+----+ + |col0|col1|col2| + +----+----+----+ + | 1| 2| 3| + | 1| 2| 3| + +----+----+----+ """ return self.union(other) From 967eaded72414bac751fc21a5ca4776c94993bfd Mon Sep 17 00:00:00 2001 From: Khalid Mammadov Date: Sat, 13 Aug 2022 19:12:34 +0100 Subject: [PATCH 2/2] Add missing docstring info --- python/pyspark/sql/dataframe.py | 759 ++++++++++++++++++++++++++++++-- 1 file changed, 725 insertions(+), 34 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 7775c37a8bdbd..3dd612a747c6b 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -159,6 +159,10 @@ def sparkSession(self) -> "SparkSession": .. versionadded:: 3.3.0 + Returns + ------- + :class:`SparkSession` + Examples -------- >>> df = spark.range(1) @@ -170,7 +174,20 @@ def sparkSession(self) -> "SparkSession": @property # type: ignore[misc] @since(1.3) def rdd(self) -> "RDD[Row]": - """Returns the content as an :class:`pyspark.RDD` of :class:`Row`.""" + """Returns the content as an :class:`pyspark.RDD` of :class:`Row`. + + .. versionadded:: 1.3.0 + + Returns + ------- + :class:`RDD` + + Examples + -------- + >>> df = spark.range(1) + >>> type(df.rdd) + + """ if self._lazy_rdd is None: jrdd = self._jdf.javaToPython() self._lazy_rdd = RDD( @@ -181,13 +198,48 @@ def rdd(self) -> "RDD[Row]": @property # type: ignore[misc] @since("1.3.1") def na(self) -> "DataFrameNaFunctions": - """Returns a :class:`DataFrameNaFunctions` for handling missing values.""" + """Returns a :class:`DataFrameNaFunctions` for handling missing values. + + .. versionadded:: 1.3.1 + + Returns + ------- + :class:`DataFrameNaFunctions` + + Examples + -------- + >>> df = spark.sql("select 1 as c1, int(null) as c2") + >>> type(df.na) + + >>> df.na.fill(2).show() + +---+---+ + | c1| c2| + +---+---+ + | 1| 2| + +---+---+ + """ return DataFrameNaFunctions(self) @property # type: ignore[misc] @since(1.4) def stat(self) -> "DataFrameStatFunctions": - """Returns a :class:`DataFrameStatFunctions` for statistic functions.""" + """Returns a :class:`DataFrameStatFunctions` for statistic functions. + + .. versionadded:: 1.4.0 + + Returns + ------- + :class:`DataFrameStatFunctions` + + Examples + -------- + >>> import pyspark.sql.functions as f + >>> df = spark.range(3).withColumn("c", f.expr("id+1")) + >>> type(df.stat) + + >>> df.stat.corr("id", "c") + 1.0 + """ return DataFrameStatFunctions(self) def toJSON(self, use_unicode: bool = True) -> RDD[str]: @@ -197,6 +249,15 @@ def toJSON(self, use_unicode: bool = True) -> RDD[str]: .. versionadded:: 1.3.0 + Parameters + ---------- + use_unicode : bool, optional (default: True) + Whether to convert to unicode or not. + + Returns + ------- + :class:`RDD` + Examples -------- >>> df.toJSON().first() @@ -239,6 +300,15 @@ def createTempView(self, name: str) -> None: .. versionadded:: 2.0.0 + Parameters + ---------- + name : str + Name of the view. + + Returns + ------- + None + Examples -------- >>> df.createTempView("people") @@ -263,6 +333,15 @@ def createOrReplaceTempView(self, name: str) -> None: .. versionadded:: 2.0.0 + Parameters + ---------- + name : str + Name of the view. + + Returns + ------- + None + Examples -------- >>> df.createOrReplaceTempView("people") @@ -286,6 +365,15 @@ def createGlobalTempView(self, name: str) -> None: .. versionadded:: 2.1.0 + Parameters + ---------- + name : str + Name of the view. + + Returns + ------- + None + Examples -------- >>> df.createGlobalTempView("people") @@ -309,6 +397,15 @@ def createOrReplaceGlobalTempView(self, name: str) -> None: .. versionadded:: 2.2.0 + Parameters + ---------- + name : str + Name of the view. + + Returns + ------- + None + Examples -------- >>> df.createOrReplaceGlobalTempView("people") @@ -334,6 +431,12 @@ def write(self) -> DataFrameWriter: Returns ------- :class:`DataFrameWriter` + + Examples + -------- + >>> type(df.write) + + >>> df.write.saveAsTable("tab2") """ return DataFrameWriter(self) @@ -352,6 +455,13 @@ def writeStream(self) -> DataStreamWriter: Returns ------- :class:`DataStreamWriter` + + Examples + -------- + >>> df = spark.readStream.format("rate").load() + >>> dsw = df.writeStream + >>> dsw.option("checkpointLocation", "/tmp/c").toTable("tab3") # doctest: +ELLIPSIS + """ return DataStreamWriter(self) @@ -361,6 +471,10 @@ def schema(self) -> StructType: .. versionadded:: 1.3.0 + Returns + ------- + :class:`StructType` + Examples -------- >>> df.schema @@ -381,6 +495,10 @@ def printSchema(self) -> None: .. versionadded:: 1.3.0 + Returns + ------- + None + Examples -------- >>> df.printSchema() @@ -417,6 +535,10 @@ def explain( .. versionchanged:: 3.0.0 Added optional argument `mode` to specify the expected output format of plans. + Returns + ------- + None + Examples -------- >>> df.explain() @@ -493,6 +615,15 @@ def exceptAll(self, other: "DataFrame") -> "DataFrame": .. versionadded:: 2.4.0 + Parameters + ---------- + other : :class:`DataFrame` + The other :class:`DataFrame` to compare to. + + Returns + ------- + :class:`DataFrame` + Examples -------- >>> df1 = spark.createDataFrame( @@ -516,6 +647,16 @@ def exceptAll(self, other: "DataFrame") -> "DataFrame": def isLocal(self) -> bool: """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally (without any Spark executors). + + Returns + ------- + bool + + Examples + -------- + >>> df = spark.sql("SHOW TABLES") + >>> df.isLocal() + True """ return self._jdf.isLocal() @@ -533,6 +674,17 @@ def isStreaming(self) -> bool: Notes ----- This API is evolving. + + Returns + ------- + bool + Whether it's streaming DataFrame or not. + + Examples + -------- + >>> df = spark.readStream.format("rate").load() + >>> df.isStreaming + True """ return self._jdf.isStreaming() @@ -541,6 +693,11 @@ def isEmpty(self) -> bool: .. versionadded:: 3.3.0 + Returns + ------- + bool + Whether it's empty DataFrame or not. + Examples -------- >>> df_empty = spark.createDataFrame([], 'a STRING') @@ -569,6 +726,10 @@ def show(self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = If set to ``True``, print output rows vertically (one line per column value). + Returns + ------- + None + Examples -------- >>> df @@ -673,7 +834,18 @@ def checkpoint(self, eager: bool = True) -> "DataFrame": Parameters ---------- eager : bool, optional - Whether to checkpoint this :class:`DataFrame` immediately + Whether to checkpoint this :class:`DataFrame` immediately (default True) + + Returns + ------- + :class:`DataFrame` + Checkpointed DataFrame. + + Examples + -------- + >>> spark.sparkContext.setCheckpointDir("/tmp/bb") + >>> df.checkpoint(False) + DataFrame[age: int, name: string] Notes ----- @@ -693,7 +865,17 @@ def localCheckpoint(self, eager: bool = True) -> "DataFrame": Parameters ---------- eager : bool, optional - Whether to checkpoint this :class:`DataFrame` immediately + Whether to checkpoint this :class:`DataFrame` immediately (default True) + + Returns + ------- + :class:`DataFrame` + Checkpointed DataFrame. + + Examples + -------- + >>> df.localCheckpoint(False) + DataFrame[age: int, name: string] Notes ----- @@ -729,15 +911,22 @@ def withWatermark(self, eventTime: str, delayThreshold: str) -> "DataFrame": latest record that has been processed in the form of an interval (e.g. "1 minute" or "5 hours"). - Notes - ----- - This API is evolving. + Returns + ------- + :class:`DataFrame` + Watermarked DataFrame + Examples + -------- >>> from pyspark.sql.functions import timestamp_seconds >>> sdf.select( ... 'name', ... timestamp_seconds(sdf.time).alias('time')).withWatermark('time', '10 minutes') DataFrame[name: string, time: timestamp] + + Notes + ----- + This API is evolving. """ if not eventTime or type(eventTime) is not str: raise TypeError("eventTime should be provided as a string") @@ -763,6 +952,7 @@ def hint( Returns ------- :class:`DataFrame` + Hinted DataFrame Examples -------- @@ -796,6 +986,11 @@ def count(self) -> int: .. versionadded:: 1.3.0 + Returns + ------- + int + Number of rows. + Examples -------- >>> df.count() @@ -808,6 +1003,11 @@ def collect(self) -> List[Row]: .. versionadded:: 1.3.0 + Returns + ------- + list + List of rows. + Examples -------- >>> df.collect() @@ -831,6 +1031,11 @@ def toLocalIterator(self, prefetchPartitions: bool = False) -> Iterator[Row]: prefetchPartitions : bool, optional If Spark should pre-fetch the next partition before it is needed. + Returns + ------- + Iterator + Iterator of rows. + Examples -------- >>> list(df.toLocalIterator()) @@ -845,6 +1050,17 @@ def limit(self, num: int) -> "DataFrame": .. versionadded:: 1.3.0 + Parameters + ---------- + num : int + Number of records to return. Will return this number of records + or whataver number is available. + + Returns + ------- + :class:`DataFrame` + Subset of the records + Examples -------- >>> df.limit(1).collect() @@ -860,6 +1076,17 @@ def take(self, num: int) -> List[Row]: .. versionadded:: 1.3.0 + Parameters + ---------- + num : int + Number of records to return. Will return this number of records + or whataver number is available. + + Returns + ------- + list + List of rows + Examples -------- >>> df.take(2) @@ -876,6 +1103,17 @@ def tail(self, num: int) -> List[Row]: .. versionadded:: 3.0.0 + Parameters + ---------- + num : int + Number of records to return. Will return this number of records + or whataver number is available. + + Returns + ------- + list + List of rows + Examples -------- >>> df.tail(1) @@ -892,6 +1130,16 @@ def foreach(self, f: Callable[[Row], None]) -> None: .. versionadded:: 1.3.0 + Parameters + ---------- + f : function + A function that accepts one parameter which will + receive each row to process. + + Returns + ------- + None + Examples -------- >>> def f(person): @@ -907,6 +1155,16 @@ def foreachPartition(self, f: Callable[[Iterator[Row]], None]) -> None: .. versionadded:: 1.3.0 + Parameters + ---------- + f : function + A function that accepts one parameter which will receive + each partition to process. + + Returns + ------- + None + Examples -------- >>> def f(people): @@ -924,6 +1182,17 @@ def cache(self) -> "DataFrame": Notes ----- The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0. + + Returns + ------- + :class:`DataFrame` + Cached DataFrame. + + Examples + -------- + >>> df = spark.range(1) + >>> df.cache() + DataFrame[id: bigint] """ self.is_cached = True self._jdf.cache() @@ -943,6 +1212,25 @@ def persist( Notes ----- The default storage level has changed to `MEMORY_AND_DISK_DESER` to match Scala in 3.0. + + Parameters + ---------- + storageLevel : :class:`StorageLevel` + Storage level to set for persistence. Default is MEMORY_AND_DISK_DESER. + + Returns + ------- + :class:`DataFrame` + Persisted DataFrame. + + Examples + -------- + >>> df = spark.range(1) + >>> df.persist() + DataFrame[id: bigint] + >>> from pyspark.storagelevel import StorageLevel + >>> df.persist(StorageLevel.DISK_ONLY) + DataFrame[id: bigint] """ self.is_cached = True javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel) @@ -955,6 +1243,11 @@ def storageLevel(self) -> StorageLevel: .. versionadded:: 2.1.0 + Returns + ------- + :class:`StorageLevel` + Currently defined storage level. + Examples -------- >>> df.storageLevel @@ -983,6 +1276,27 @@ def unpersist(self, blocking: bool = False) -> "DataFrame": Notes ----- `blocking` default has changed to ``False`` to match Scala in 2.0. + + Parameters + ---------- + blocking : bool + Whether to block until all blocks are deleted. + + Returns + ------- + :class:`DataFrame` + Unpersisted DataFrame. + + Examples + -------- + >>> df = spark.range(1) + >>> df.persist() + DataFrame[id: bigint] + >>> df.unpersist() + DataFrame[id: bigint] + >>> df = spark.range(1) + >>> df.unpersist(True) + DataFrame[id: bigint] """ self.is_cached = False self._jdf.unpersist(blocking) @@ -1012,6 +1326,10 @@ def coalesce(self, numPartitions: int) -> "DataFrame": numPartitions : int specify the target number of partitions + Returns + ------- + :class:`DataFrame` + Examples -------- >>> df.coalesce(1).rdd.getNumPartitions() @@ -1049,6 +1367,11 @@ def repartition( # type: ignore[misc] Added optional arguments to specify the partitioning columns. Also made numPartitions optional if partitioning columns are specified. + Returns + ------- + :class:`DataFrame` + Repartitioned DataFrame. + Examples -------- >>> df.repartition(10).rdd.getNumPartitions() @@ -1129,6 +1452,11 @@ def repartitionByRange( # type: ignore[misc] cols : str or :class:`Column` partitioning columns. + Returns + ------- + :class:`DataFrame` + Repartitioned DataFrame. + Notes ----- Due to performance reasons this method uses sampling to estimate the ranges. @@ -1177,6 +1505,11 @@ def distinct(self) -> "DataFrame": .. versionadded:: 1.3.0 + Returns + ------- + :class:`DataFrame` + DataFrame with distinct records. + Examples -------- >>> df.distinct().count() @@ -1216,6 +1549,11 @@ def sample( # type: ignore[misc] seed : int, optional Seed for sampling (default a random seed). + Returns + ------- + :class:`DataFrame` + Sampled rows from given DataFrame. + Notes ----- This is not guaranteed to provide exactly the fraction specified of the total @@ -1349,6 +1687,11 @@ def randomSplit(self, weights: List[float], seed: Optional[int] = None) -> List[ seed : int, optional The seed for sampling. + Returns + ------- + list + List of DataFrames. + Examples -------- >>> splits = df4.randomSplit([1.0, 2.0], 24) @@ -1373,6 +1716,11 @@ def dtypes(self) -> List[Tuple[str, str]]: .. versionadded:: 1.3.0 + Returns + ------- + list + List of columns as tuple pairs. + Examples -------- >>> df.dtypes @@ -1386,6 +1734,11 @@ def columns(self) -> List[str]: .. versionadded:: 1.3.0 + Returns + ------- + list + List of column names. + Examples -------- >>> df.columns @@ -1405,6 +1758,10 @@ def colRegex(self, colName: str) -> Column: colName : str string, column name specified as a regex. + Returns + ------- + :class:`Column` + Examples -------- >>> df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "Col2"]) @@ -1452,6 +1809,11 @@ def to(self, schema: StructType) -> "DataFrame": schema : :class:`StructType` Specified schema. + Returns + ------- + :class:`DataFrame` + Reconciled DataFrame. + Examples -------- >>> df = spark.createDataFrame([("a", 1)], ["i", "j"]) @@ -1482,6 +1844,11 @@ def alias(self, alias: str) -> "DataFrame": alias : str an alias name to be set for the :class:`DataFrame`. + Returns + ------- + :class:`DataFrame` + Aliased DataFrame. + Examples -------- >>> from pyspark.sql.functions import * @@ -1505,6 +1872,11 @@ def crossJoin(self, other: "DataFrame") -> "DataFrame": other : :class:`DataFrame` Right side of the cartesian product. + Returns + ------- + :class:`DataFrame` + Joined DataFrame. + Examples -------- >>> df.select("age", "name").collect() @@ -1544,6 +1916,11 @@ def join( ``right``, ``rightouter``, ``right_outer``, ``semi``, ``leftsemi``, ``left_semi``, ``anti``, ``leftanti`` and ``left_anti``. + Returns + ------- + :class:`DataFrame` + Joined DataFrame. + Examples -------- The following performs a full outer join between ``df1`` and ``df2``. @@ -1729,6 +2106,11 @@ def sortWithinPartitions( Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the `cols`. + Returns + ------- + :class:`DataFrame` + DataFrame sorted by partitions. + Examples -------- >>> df.sortWithinPartitions("age", ascending=False).show() @@ -1761,6 +2143,11 @@ def sort( Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the `cols`. + Returns + ------- + :class:`DataFrame` + Sorted DataFrame. + Examples -------- >>> df.sort(df.age.desc()).collect() @@ -1838,6 +2225,16 @@ def describe(self, *cols: Union[str, List[str]]) -> "DataFrame": Use summary for expanded statistics and control over which statistics to compute. + Parameters + ---------- + cols : str, list, optional + Column name or list of column names to describe by (default All columns). + + Returns + ------- + :class:`DataFrame` + A new DataFrame that describes (provides statistics) given DataFrame. + Examples -------- >>> df = spark.createDataFrame( @@ -1895,6 +2292,16 @@ def summary(self, *statistics: str) -> "DataFrame": guarantee about the backward compatibility of the schema of the resulting :class:`DataFrame`. + Parameters + ---------- + statistics : str, optional + Column names to calculate statistics by (default All columns). + + Returns + ------- + :class:`DataFrame` + A new DataFrame that provides statistics for the given DataFrame. + Examples -------- >>> df = spark.createDataFrame( @@ -1980,6 +2387,10 @@ def first(self) -> Optional[Row]: .. versionadded:: 1.3.0 + Returns + ------- + First Row if DataFrame is not empty, otherwise None. + Examples -------- >>> df.first() @@ -2029,10 +2440,22 @@ def __getattr__(self, name: str) -> Column: .. versionadded:: 1.3.0 + Parameters + ---------- + name : str + Column name to return as :class:`Column`. + + Returns + ------- + :class:`Column` + Requested column. + Examples -------- >>> df.select(df.age).collect() [Row(age=2), Row(age=5)] + >>> df["age"] + Column<'age'> """ if name not in self.columns: raise AttributeError( @@ -2061,6 +2484,11 @@ def select(self, *cols: "ColumnOrName") -> "DataFrame": # type: ignore[misc] If one of the column names is '*', that column is expanded to include all columns in the current :class:`DataFrame`. + Returns + ------- + :class:`DataFrame` + A DataFrame with subset (or all) of columns. + Examples -------- >>> df.select('*').collect() @@ -2088,6 +2516,11 @@ def selectExpr(self, *expr: Union[str, List[str]]) -> "DataFrame": .. versionadded:: 1.3.0 + Returns + ------- + :class:`DataFrame` + A DataFrame with new/old columns transformed by expressions. + Examples -------- >>> df.selectExpr("age * 2", "abs(age)").collect() @@ -2111,6 +2544,11 @@ def filter(self, condition: "ColumnOrName") -> "DataFrame": a :class:`Column` of :class:`types.BooleanType` or a string of SQL expression. + Returns + ------- + :class:`DataFrame` + Filtered DataFrame. + Examples -------- >>> df.filter(df.age > 3).collect() @@ -2152,7 +2590,13 @@ def groupBy(self, *cols: "ColumnOrName") -> "GroupedData": # type: ignore[misc] ---------- cols : list, str or :class:`Column` columns to group by. - Each element should be a column name (string) or an expression (:class:`Column`). + Each element should be a column name (string) or an expression (:class:`Column`) + or list of them. + + Returns + ------- + :class:`GroupedData` + Grouped data by given columns. Examples -------- @@ -2185,6 +2629,18 @@ def rollup(self, *cols: "ColumnOrName") -> "GroupedData": # type: ignore[misc] .. versionadded:: 1.4.0 + Parameters + ---------- + cols : list, str or :class:`Column` + Columns to roll-up by. + Each element should be a column name (string) or an expression (:class:`Column`) + or list of them. + + Returns + ------- + :class:`GroupedData` + Rolled-up data by given columns. + Examples -------- >>> df.rollup("name", df.age).count().orderBy("name", "age").show() @@ -2218,6 +2674,18 @@ def cube(self, *cols: "ColumnOrName") -> "GroupedData": # type: ignore[misc] .. versionadded:: 1.4.0 + Parameters + ---------- + cols : list, str or :class:`Column` + columns to create cube by. + Each element should be a column name (string) or an expression (:class:`Column`) + or list of them. + + Returns + ------- + :class:`GroupedData` + Cube of the data by given columns. + Examples -------- >>> df.cube("name", df.age).count().orderBy("name", "age").show() @@ -2281,7 +2749,7 @@ def unpivot( Returns ------- - DataFrame + :class:`DataFrame` Unpivoted DataFrame. Examples @@ -2365,7 +2833,7 @@ def melt( Returns ------- - DataFrame + :class:`DataFrame` Unpivoted DataFrame. See Also @@ -2380,6 +2848,16 @@ def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame": .. versionadded:: 1.3.0 + Parameters + ---------- + exprs : :class:`Column` or dict of key and value strings + Columns or expressions to aggreate DataFrame by. + + Returns + ------- + :class:`DataFrame` + Aggregated DataFrame. + Examples -------- >>> df.agg({"age": "max"}).collect() @@ -2509,7 +2987,11 @@ def union(self, other: "DataFrame") -> "DataFrame": Also as standard in SQL, this function resolves columns by position (not by name). - .. versionadded:: 2.0.0 + .. versionadded:: 2.0 + + See Also + -------- + DataFrame.unionAll Parameters ---------- @@ -2538,6 +3020,7 @@ def union(self, other: "DataFrame") -> "DataFrame": | 1| 2| 3| | 1| 2| 3| +----+----+----+ + """ return DataFrame(self._jdf.union(other._jdf), self.sparkSession) @@ -2551,37 +3034,24 @@ def unionAll(self, other: "DataFrame") -> "DataFrame": Also as standard in SQL, this function resolves columns by position (not by name). - Note: This is identical to :func:`union` function + :func:`unionAll` is an alias to :func:`union` .. versionadded:: 1.3.0 + See Also + -------- + DataFrame.union + Parameters ---------- other : :class:`DataFrame` - Another :class:`DataFrame` that needs to be unioned + Another :class:`DataFrame` that needs to be combined Returns ------- :class:`DataFrame` + Combined DataFrame - Examples - -------- - >>> df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"]) - >>> df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"]) - >>> df1.unionAll(df2).show() - +----+----+----+ - |col0|col1|col2| - +----+----+----+ - | 1| 2| 3| - | 4| 5| 6| - +----+----+----+ - >>> df1.unionAll(df1).show() - +----+----+----+ - |col0|col1|col2| - +----+----+----+ - | 1| 2| 3| - | 1| 2| 3| - +----+----+----+ """ return self.union(other) @@ -2594,6 +3064,16 @@ def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) -> .. versionadded:: 2.3.0 + Parameters + ---------- + other : :class:`DataFrame` + Another :class:`DataFrame` that needs to be combined. + + Returns + ------- + :class:`DataFrame` + Combined DataFrame. + Examples -------- The difference between this function and :func:`union` is that this function @@ -2634,8 +3114,32 @@ def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) -> def intersect(self, other: "DataFrame") -> "DataFrame": """Return a new :class:`DataFrame` containing rows only in both this :class:`DataFrame` and another :class:`DataFrame`. + Note that any duplicates are removed. To preserve duplicates + use :func:`intersectAll`. This is equivalent to `INTERSECT` in SQL. + + Parameters + ---------- + other : :class:`DataFrame` + Another :class:`DataFrame` that needs to be combined. + + Returns + ------- + :class:`DataFrame` + Combined DataFrame. + + Examples + -------- + >>> df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"]) + >>> df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"]) + >>> df1.intersect(df2).show() + +---+---+ + | C1| C2| + +---+---+ + | b| 3| + | a| 1| + +---+---+ """ return DataFrame(self._jdf.intersect(other._jdf), self.sparkSession) @@ -2648,6 +3152,16 @@ def intersectAll(self, other: "DataFrame") -> "DataFrame": .. versionadded:: 2.4.0 + Parameters + ---------- + other : :class:`DataFrame` + Another :class:`DataFrame` that needs to be combined. + + Returns + ------- + :class:`DataFrame` + Combined DataFrame. + Examples -------- >>> df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"]) @@ -2661,7 +3175,6 @@ def intersectAll(self, other: "DataFrame") -> "DataFrame": | a| 1| | b| 3| +---+---+ - """ return DataFrame(self._jdf.intersectAll(other._jdf), self.sparkSession) @@ -2672,6 +3185,27 @@ def subtract(self, other: "DataFrame") -> "DataFrame": This is equivalent to `EXCEPT DISTINCT` in SQL. + Parameters + ---------- + other : :class:`DataFrame` + Another :class:`DataFrame` that needs to be subtracted. + + Returns + ------- + :class:`DataFrame` + Subtracted DataFrame. + + Examples + -------- + >>> df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"]) + >>> df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"]) + + >>> df1.subtract(df2).show() + +---+---+ + | C1| C2| + +---+---+ + | c| 4| + +---+---+ """ return DataFrame(getattr(self._jdf, "except")(other._jdf), self.sparkSession) @@ -2689,6 +3223,16 @@ def dropDuplicates(self, subset: Optional[List[str]] = None) -> "DataFrame": .. versionadded:: 1.4.0 + Parameters + ---------- + subset : List of column names, optional + List of columns to use for duplicate comparison (default All columns). + + Returns + ------- + :class:`DataFrame` + DataFrame without duplicates. + Examples -------- >>> from pyspark.sql import Row @@ -2744,6 +3288,11 @@ def dropna( subset : str, tuple or list, optional optional list of column names to consider. + Returns + ------- + :class:`DataFrame` + DataFrame with null only rows excluded. + Examples -------- >>> df4.na.drop().show() @@ -2803,6 +3352,11 @@ def fillna( For example, if `value` is a string, and subset contains a non-string column, then the non-string column is simply ignored. + Returns + ------- + :class:`DataFrame` + DataFrame with replaced null values. + Examples -------- >>> df4.na.fill(50).show() @@ -2929,6 +3483,11 @@ def replace( # type: ignore[misc] For example, if `value` is a string, and subset contains a non-string column, then the non-string column is simply ignored. + Returns + ------- + :class:`DataFrame` + DataFrame with replaced values. + Examples -------- >>> df4.na.replace(10, 20).show() @@ -3185,6 +3744,21 @@ def corr(self, col1: str, col2: str, method: Optional[str] = None) -> float: The name of the second column method : str, optional The correlation method. Currently only supports "pearson" + + Returns + ------- + float + Pearson Correlation Coefficient of two columns. + + Examples + -------- + >>> df = spark.createDataFrame([(1, 12), (10, 1), (19, 8)], ["c1", "c2"]) + >>> df.corr("c1", "c2") + -0.3592106040535498 + >>> df = spark.createDataFrame([(11, 12), (10, 11), (9, 10)], ["small", "bigger"]) + >>> df.corr("small", "bigger") + 1.0 + """ if not isinstance(col1, str): raise TypeError("col1 should be a string.") @@ -3212,6 +3786,21 @@ def cov(self, col1: str, col2: str) -> float: The name of the first column col2 : str The name of the second column + + Returns + ------- + float + Covariance of two columns. + + Examples + -------- + >>> df = spark.createDataFrame([(1, 12), (10, 1), (19, 8)], ["c1", "c2"]) + >>> df.cov("c1", "c2") + -18.0 + >>> df = spark.createDataFrame([(11, 12), (10, 11), (9, 10)], ["small", "bigger"]) + >>> df.cov("small", "bigger") + 1.0 + """ if not isinstance(col1, str): raise TypeError("col1 should be a string.") @@ -3239,6 +3828,24 @@ def crosstab(self, col1: str, col2: str) -> "DataFrame": col2 : str The name of the second column. Distinct items will make the column names of the :class:`DataFrame`. + + Returns + ------- + :class:`DataFrame` + Frequency matrix of two columns. + + Examples + -------- + >>> df = spark.createDataFrame([(1, 11), (1, 11), (3, 10), (4, 8), (4, 8)], ["c1", "c2"]) + >>> df.crosstab("c1", "c2").sort("c1_c2").show() + +-----+---+---+---+ + |c1_c2| 10| 11| 8| + +-----+---+---+---+ + | 1| 0| 2| 0| + | 3| 1| 0| 0| + | 4| 0| 0| 2| + +-----+---+---+---+ + """ if not isinstance(col1, str): raise TypeError("col1 should be a string.") @@ -3266,6 +3873,21 @@ def freqItems( The frequency with which to consider an item 'frequent'. Default is 1%. The support must be greater than 1e-4. + Returns + ------- + :class:`DataFrame` + DataFrame with frequent items. + + Examples + -------- + >>> df = spark.createDataFrame([(1, 11), (1, 11), (3, 10), (4, 8), (4, 8)], ["c1", "c2"]) + >>> df.freqItems(["c1", "c2"]).show() # doctest: +SKIP + +------------+------------+ + |c1_freqItems|c2_freqItems| + +------------+------------+ + | [4, 1, 3]| [8, 11, 10]| + +------------+------------+ + Notes ----- This function is meant for exploratory data analysis, as we make no @@ -3298,6 +3920,11 @@ def withColumns(self, *colsMap: Dict[str, Column]) -> "DataFrame": colsMap : dict a dict of column name and :class:`Column`. Currently, only single map is supported. + Returns + ------- + :class:`DataFrame` + DataFrame with new or replaced columns. + Examples -------- >>> df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}).collect() @@ -3335,6 +3962,11 @@ def withColumn(self, colName: str, col: Column) -> "DataFrame": col : :class:`Column` a :class:`Column` expression for the new column. + Returns + ------- + :class:`DataFrame` + DataFrame with new or replaced column. + Notes ----- This method introduces a projection internally. Therefore, calling it multiple @@ -3365,6 +3997,11 @@ def withColumnRenamed(self, existing: str, new: str) -> "DataFrame": new : str string, new name of the column. + Returns + ------- + :class:`DataFrame` + DataFrame with renamed column. + Examples -------- >>> df.withColumnRenamed('age', 'age2').collect() @@ -3384,6 +4021,11 @@ def withMetadata(self, columnName: str, metadata: Dict[str, Any]) -> "DataFrame" metadata : dict dict, new metadata to be assigned to df.schema[columnName].metadata + Returns + ------- + :class:`DataFrame` + DataFrame with updated metadata column. + Examples -------- >>> df_meta = df.withMetadata('age', {'foo': 'bar'}) @@ -3406,7 +4048,7 @@ def drop(self, *cols: str) -> "DataFrame": ... def drop(self, *cols: "ColumnOrName") -> "DataFrame": # type: ignore[misc] - """Returns a new :class:`DataFrame` that drops the specified column. + """Returns a new :class:`DataFrame` without specified columns. This is a no-op if schema doesn't contain the given column name(s). .. versionadded:: 1.4.0 @@ -3416,6 +4058,11 @@ def drop(self, *cols: "ColumnOrName") -> "DataFrame": # type: ignore[misc] cols: str or :class:`Column` a name of the column, or the :class:`Column` to drop + Returns + ------- + :class:`DataFrame` + DataFrame without given columns. + Examples -------- >>> df.drop('age').collect() @@ -3457,6 +4104,11 @@ def toDF(self, *cols: "ColumnOrName") -> "DataFrame": cols : str new column names + Returns + ------- + :class:`DataFrame` + DataFrame with new column names. + Examples -------- >>> df.toDF('f1', 'f2').collect() @@ -3483,6 +4135,11 @@ def transform(self, func: Callable[..., "DataFrame"], *args: Any, **kwargs: Any) .. versionadded:: 3.3.0 + Returns + ------- + :class:`DataFrame` + Transformed DataFrame. + Examples -------- >>> from pyspark.sql.functions import col @@ -3533,6 +4190,16 @@ def sameSemantics(self, other: "DataFrame") -> bool: This API is a developer API. + Parameters + ---------- + other : :class:`DataFrame` + The other DataFrame to compare against. + + Returns + ------- + bool + Whether these two DataFrames are similar. + Examples -------- >>> df1 = spark.range(10) @@ -3561,6 +4228,11 @@ def semanticHash(self) -> int: This API is a developer API. + Returns + ------- + int + Hash value. + Examples -------- >>> spark.range(10).selectExpr("id as col0").semanticHash() # doctest: +SKIP @@ -3579,6 +4251,11 @@ def inputFiles(self) -> List[str]: .. versionadded:: 3.1.0 + Returns + ------- + list + List of file paths. + Examples -------- >>> df = spark.read.load("examples/src/main/resources/people.json", format="json") @@ -3614,6 +4291,16 @@ def writeTo(self, table: str) -> DataFrameWriterV2: .. versionadded:: 3.1.0 + Parameters + ---------- + table : str + Target table name to write to. + + Returns + ------- + :class:`DataFrameWriterV2` + DataFrameWriterV2 to use further to specify how to save the data + Examples -------- >>> df.writeTo("catalog.db.table").append() # doctest: +SKIP @@ -3650,6 +4337,10 @@ def pandas_api( index_col: str or list of str, optional, default: None Index column of table in Spark. + Returns + ------- + :class:`PandasOnSparkDataFrame` + See Also -------- pyspark.pandas.frame.DataFrame.to_spark