From b29fdcca1f13bfe01fe6e4c319d706637751656d Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Mon, 8 Jan 2024 17:21:24 +0800 Subject: [PATCH 1/3] init --- .../source/reference/pyspark.pandas/frame.rst | 1 + python/pyspark/pandas/frame.py | 150 ++++++++++++++++++ python/pyspark/pandas/missing/frame.py | 1 - 3 files changed, 151 insertions(+), 1 deletion(-) diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst b/python/docs/source/reference/pyspark.pandas/frame.rst index 12cf6e7db12fc..d94e35ce14f52 100644 --- a/python/docs/source/reference/pyspark.pandas/frame.rst +++ b/python/docs/source/reference/pyspark.pandas/frame.rst @@ -261,6 +261,7 @@ Time series-related .. autosummary:: :toctree: api/ + DataFrame.asfreq DataFrame.resample DataFrame.shift DataFrame.first_valid_index diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index a7edac5509b14..83af2b6140ee8 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -21,6 +21,7 @@ from collections import defaultdict, namedtuple from collections.abc import Mapping import re +import uuid import warnings import inspect import json @@ -9344,6 +9345,155 @@ def sample( else: return DataFrame(self._internal.with_new_sdf(sdf)) + def asfreq( + self, + freq: str, + method: Optional[str] = None, + how: Optional[str] = None, + normalize: bool = False, + fill_value: Optional[Any] = None, + ) -> "DataFrame": + """ + Convert time series to specified frequency. + + Returns the original data conformed to a new index with the specified + frequency. + + If the index of this {klass} is a :class:`~pandas.PeriodIndex`, the new index + is the result of transforming the original index with + :meth:`PeriodIndex.asfreq ` (so the original index + will map one-to-one to the new index). + + Otherwise, the new index will be equivalent to ``pd.date_range(start, end, + freq=freq)`` where ``start`` and ``end`` are, respectively, the first and + last entries in the original index (see :func:`pandas.date_range`). The + values corresponding to any timesteps in the new index which were not present + in the original index will be null (``NaN``), unless a method for filling + such unknowns is provided (see the ``method`` parameter below). + + The :meth:`resample` method is more appropriate if an operation on each group of + timesteps (such as an aggregate) is necessary to represent the data at the new + frequency. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + freq : DateOffset or str + Frequency DateOffset or string. + method : {{'backfill'/'bfill', 'pad'/'ffill'}}, default None + Method to use for filling holes in reindexed Series (note this + does not fill NaNs that already were present): + + * 'pad' / 'ffill': propagate last valid observation forward to next + valid + * 'backfill' / 'bfill': use NEXT valid observation to fill. + how : {{'start', 'end'}}, default end + For PeriodIndex only (see PeriodIndex.asfreq). + normalize : bool, default False + Whether to reset output index to midnight. + fill_value : scalar, optional + Value to use for missing values, applied during upsampling (note + this does not fill NaNs that already were present). + + Returns + ------- + {klass} + {klass} object reindexed to the specified frequency. + + See Also + -------- + reindex : Conform DataFrame to new index with optional filling logic. + + Notes + ----- + To learn more about the frequency strings, please see `this link + `__. + + Examples + -------- + Start by creating a series with 4 one minute timestamps. + + >>> index = pd.date_range('1/1/2000', periods=4, freq='min') + >>> series = pd.Series([0.0, None, 2.0, 3.0], index=index) + >>> pdf = pd.DataFrame({'s': series}) + >>> psdf = ps.from_pandas(pdf) + >>> psdf + s + 2000-01-01 00:00:00 0.0 + 2000-01-01 00:01:00 NaN + 2000-01-01 00:02:00 2.0 + 2000-01-01 00:03:00 3.0 + + Upsample the series into 30 second bins. + + >>> psdf.asfreq(freq='30s') + s + 2000-01-01 00:00:00 0.0 + 2000-01-01 00:00:30 NaN + 2000-01-01 00:01:00 NaN + 2000-01-01 00:01:30 NaN + 2000-01-01 00:02:00 2.0 + 2000-01-01 00:02:30 NaN + 2000-01-01 00:03:00 3.0 + + Upsample again, providing a ``fill value``. + + >>> psdf.asfreq(freq='30s', fill_value=9.0) + s + 2000-01-01 00:00:00 0.0 + 2000-01-01 00:00:30 9.0 + 2000-01-01 00:01:00 NaN + 2000-01-01 00:01:30 9.0 + 2000-01-01 00:02:00 2.0 + 2000-01-01 00:02:30 9.0 + 2000-01-01 00:03:00 3.0 + + Upsample again, providing a ``method``. + + >>> psdf.asfreq(freq='30s', method='bfill') + s + 2000-01-01 00:00:00 0.0 + 2000-01-01 00:00:30 NaN + 2000-01-01 00:01:00 NaN + 2000-01-01 00:01:30 2.0 + 2000-01-01 00:02:00 2.0 + 2000-01-01 00:02:30 3.0 + 2000-01-01 00:03:00 3.0 + """ + log_advice( + "`frame.asfreq` loads partial data into the driver's memory to infer the schema, " + "and loads all data into one executor's memory to compute. " + "It should only be used if the pandas DataFrame is expected to be small." + ) + tmp_df = self.copy() + + uid = str(uuid.uuid4()).replace("-", "") + tmp_agg_column_name = f"__tmp_aggregate_col_for_frame_asfreq_{uid}__" + tmp_idx_column_name = f"__tmp_index_col_for_frame_asfreq_{uid}__" + + tmp_df[tmp_agg_column_name] = 0 + tmp_df[tmp_idx_column_name] = tmp_df.index + + def asfreq_compute(df): + return ( + df.drop(columns=[tmp_agg_column_name]) + .set_index(tmp_idx_column_name) + .asfreq( + freq=freq, + method=method, + how=how, + normalize=normalize, + fill_value=fill_value, + ) + ) + + result_df = tmp_df.groupby(tmp_agg_column_name).apply(asfreq_compute) + result_df = result_df.droplevel(tmp_agg_column_name) + result_df.index.names = self.index.names + + return result_df + def astype(self, dtype: Union[str, Dtype, Dict[Name, Union[str, Dtype]]]) -> "DataFrame": """ Cast a pandas-on-Spark object to a specified dtype ``dtype``. diff --git a/python/pyspark/pandas/missing/frame.py b/python/pyspark/pandas/missing/frame.py index 5ba81c81b36f6..a93f8e07bae59 100644 --- a/python/pyspark/pandas/missing/frame.py +++ b/python/pyspark/pandas/missing/frame.py @@ -34,7 +34,6 @@ class MissingPandasLikeDataFrame: # Documentation path: `python/docs/source/reference/pyspark.pandas/`. # Functions - asfreq = _unsupported_function("asfreq") asof = _unsupported_function("asof") combine = _unsupported_function("combine") compare = _unsupported_function("compare") From 16746ad9e69101b6fd1fe5d0add68c532c366fe2 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 10 Jan 2024 21:37:14 +0800 Subject: [PATCH 2/3] refine --- python/pyspark/pandas/frame.py | 39 +++++++++++++++++----------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 83af2b6140ee8..aa22d1290ea68 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -9466,33 +9466,34 @@ def asfreq( "and loads all data into one executor's memory to compute. " "It should only be used if the pandas DataFrame is expected to be small." ) - tmp_df = self.copy() + input_df = self.copy() uid = str(uuid.uuid4()).replace("-", "") tmp_agg_column_name = f"__tmp_aggregate_col_for_frame_asfreq_{uid}__" tmp_idx_column_name = f"__tmp_index_col_for_frame_asfreq_{uid}__" - tmp_df[tmp_agg_column_name] = 0 - tmp_df[tmp_idx_column_name] = tmp_df.index - - def asfreq_compute(df): - return ( - df.drop(columns=[tmp_agg_column_name]) - .set_index(tmp_idx_column_name) - .asfreq( - freq=freq, - method=method, - how=how, - normalize=normalize, - fill_value=fill_value, - ) + input_df[tmp_agg_column_name] = 0 + input_df[tmp_idx_column_name] = input_df.index + + def asfreq_compute(pdf: pd.DataFrame): # type: ignore[no-untyped-def] + pdf = pdf.drop(columns=[tmp_agg_column_name]) + pdf = pdf.set_index(tmp_idx_column_name, drop=True) + pdf = pdf.sort_index() + pdf = pdf.asfreq( + freq=freq, + method=method, + how=how, + normalize=normalize, + fill_value=fill_value, ) + pdf[tmp_idx_column_name] = pdf.index + return pdf.reset_index(drop=True) - result_df = tmp_df.groupby(tmp_agg_column_name).apply(asfreq_compute) - result_df = result_df.droplevel(tmp_agg_column_name) - result_df.index.names = self.index.names + output_df = input_df.groupby(tmp_agg_column_name).apply(asfreq_compute) + output_df = output_df.set_index(tmp_idx_column_name) + output_df.index.names = self.index.names - return result_df + return output_df # type: ignore[return-value] def astype(self, dtype: Union[str, Dtype, Dict[Name, Union[str, Dtype]]]) -> "DataFrame": """ From 02e0679ba9c5fe98b5ba4934590afb857da61341 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Thu, 11 Jan 2024 08:53:03 +0800 Subject: [PATCH 3/3] lint --- python/pyspark/pandas/frame.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index aa22d1290ea68..ef1e4cb42c516 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -9479,7 +9479,7 @@ def asfreq_compute(pdf: pd.DataFrame): # type: ignore[no-untyped-def] pdf = pdf.drop(columns=[tmp_agg_column_name]) pdf = pdf.set_index(tmp_idx_column_name, drop=True) pdf = pdf.sort_index() - pdf = pdf.asfreq( + pdf = pdf.asfreq( # type: ignore[assignment] freq=freq, method=method, how=how, @@ -9493,7 +9493,7 @@ def asfreq_compute(pdf: pd.DataFrame): # type: ignore[no-untyped-def] output_df = output_df.set_index(tmp_idx_column_name) output_df.index.names = self.index.names - return output_df # type: ignore[return-value] + return output_df def astype(self, dtype: Union[str, Dtype, Dict[Name, Union[str, Dtype]]]) -> "DataFrame": """