Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -4820,10 +4820,17 @@ def to_spark(self, index_col: Optional[Union[str, List[str]]] = None) -> SparkDa
"If `index_col` is not specified for `to_spark`, "
"the existing index is lost when converting to Spark DataFrame."
)
return self.spark.frame(index_col)
return self._to_spark(index_col)

to_spark.__doc__ = SparkFrameMethods.__doc__

def _to_spark(self, index_col: Optional[Union[str, List[str]]] = None) -> SparkDataFrame:
"""
Same as `to_spark()`, without issueing the advice log when `index_col` is not specified
for internal usage.
"""
return self.spark.frame(index_col)

def to_pandas(self) -> pd.DataFrame:
"""
Return a pandas DataFrame.
Expand All @@ -4846,7 +4853,7 @@ def to_pandas(self) -> pd.DataFrame:
"`to_pandas` loads all data into the driver's memory. "
"It should only be used if the resulting pandas DataFrame is expected to be small."
)
return self._internal.to_pandas_frame.copy()
return self._to_pandas()

def _to_pandas(self) -> pd.DataFrame:
"""
Expand Down
16 changes: 8 additions & 8 deletions python/pyspark/pandas/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ def to_pandas(self) -> pd.Index:
"`to_pandas` loads all data into the driver's memory. "
"It should only be used if the resulting pandas Index is expected to be small."
)
return self._to_internal_pandas().copy()
return self._to_pandas()

def _to_pandas(self) -> pd.Index:
"""
Expand Down Expand Up @@ -2429,24 +2429,24 @@ def intersection(self, other: Union[DataFrame, Series, "Index", List]) -> "Index
return self._psdf.head(0).index.rename(None)
elif isinstance(other, Index):
other_idx = other
spark_frame_other = other_idx.to_frame().to_spark()
spark_frame_other = other_idx.to_frame()._to_spark()
keep_name = self.name == other_idx.name
elif isinstance(other, Series):
other_idx = Index(other)
spark_frame_other = other_idx.to_frame().to_spark()
spark_frame_other = other_idx.to_frame()._to_spark()
keep_name = True
elif is_list_like(other):
other_idx = Index(other)
if isinstance(other_idx, MultiIndex):
return other_idx.to_frame().head(0).index
spark_frame_other = other_idx.to_frame().to_spark()
spark_frame_other = other_idx.to_frame()._to_spark()
keep_name = True
else:
raise TypeError("Input must be Index or array-like")

index_fields = self._index_fields_for_union_like(other_idx, func_name="intersection")

spark_frame_self = self.to_frame(name=SPARK_DEFAULT_INDEX_NAME).to_spark()
spark_frame_self = self.to_frame(name=SPARK_DEFAULT_INDEX_NAME)._to_spark()
spark_frame_intersected = spark_frame_self.intersect(spark_frame_other)
if keep_name:
index_names = self._internal.index_names
Expand Down Expand Up @@ -2517,9 +2517,9 @@ def insert(self, loc: int, item: Any) -> "Index":
loc = 0 if loc < 0 else loc

index_name = self._internal.index_spark_column_names[0]
sdf_before = self.to_frame(name=index_name)[:loc].to_spark()
sdf_middle = Index([item], dtype=self.dtype).to_frame(name=index_name).to_spark()
sdf_after = self.to_frame(name=index_name)[loc:].to_spark()
sdf_before = self.to_frame(name=index_name)[:loc]._to_spark()
sdf_middle = Index([item], dtype=self.dtype).to_frame(name=index_name)._to_spark()
sdf_after = self.to_frame(name=index_name)[loc:]._to_spark()
sdf = sdf_before.union(sdf_middle).union(sdf_after)

internal = InternalFrame(
Expand Down
14 changes: 7 additions & 7 deletions python/pyspark/pandas/indexes/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ def from_frame(df: DataFrame, names: Optional[List[Name]] = None) -> "MultiIndex
"""
if not isinstance(df, DataFrame):
raise TypeError("Input must be a DataFrame")
sdf = df.to_spark()
sdf = df._to_spark()

if names is None:
names = df._internal.column_labels
Expand Down Expand Up @@ -1076,9 +1076,9 @@ def insert(self, loc: int, item: Any) -> Index:
)

index_name: List[Label] = [(name,) for name in self._internal.index_spark_column_names]
sdf_before = self.to_frame(name=index_name)[:loc].to_spark()
sdf_middle = Index([item]).to_frame(name=index_name).to_spark()
sdf_after = self.to_frame(name=index_name)[loc:].to_spark()
sdf_before = self.to_frame(name=index_name)[:loc]._to_spark()
sdf_middle = Index([item]).to_frame(name=index_name)._to_spark()
sdf_after = self.to_frame(name=index_name)[loc:]._to_spark()
sdf = sdf_before.union(sdf_middle).union(sdf_after)

internal = InternalFrame(
Expand Down Expand Up @@ -1140,7 +1140,7 @@ def intersection(self, other: Union[DataFrame, Series, Index, List]) -> "MultiIn
elif isinstance(other, DataFrame):
raise ValueError("Index data must be 1-dimensional")
elif isinstance(other, MultiIndex):
spark_frame_other = other.to_frame().to_spark()
spark_frame_other = other.to_frame()._to_spark()
keep_name = self.names == other.names
elif isinstance(other, Index):
# Always returns an empty MultiIndex if `other` is Index.
Expand All @@ -1149,13 +1149,13 @@ def intersection(self, other: Union[DataFrame, Series, Index, List]) -> "MultiIn
raise TypeError("other must be a MultiIndex or a list of tuples")
else:
other = MultiIndex.from_tuples(list(other))
spark_frame_other = cast(MultiIndex, other).to_frame().to_spark()
spark_frame_other = cast(MultiIndex, other).to_frame()._to_spark()
keep_name = True

index_fields = self._index_fields_for_union_like(other, func_name="intersection")

default_name: List[Name] = [SPARK_INDEX_NAME_FORMAT(i) for i in range(self.nlevels)]
spark_frame_self = self.to_frame(name=default_name).to_spark()
spark_frame_self = self.to_frame(name=default_name)._to_spark()
spark_frame_intersected = spark_frame_self.intersect(spark_frame_other)
if keep_name:
index_names = self._internal.index_names
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/pandas/plot/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def prepare_hist_data(data, bins):
data, numeric_data = NumericPlotBase.prepare_numeric_data(data)
if is_integer(bins):
# computes boundaries for the column
bins = HistogramPlotBase.get_bins(data.to_spark(), bins)
bins = HistogramPlotBase.get_bins(data._to_spark(), bins)

return numeric_data, bins

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1585,7 +1585,7 @@ def to_pandas(self) -> pd.Series:
"`to_pandas` loads all data into the driver's memory. "
"It should only be used if the resulting pandas Series is expected to be small."
)
return self._to_internal_pandas().copy()
return self._to_pandas()

def _to_pandas(self) -> pd.Series:
"""
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/pandas/sql_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ def _convert_var(self, var: Any) -> Any:
if isinstance(var, DataFrame):
df_id = "pandas_on_spark_" + str(id(var))
if df_id not in self._temp_views:
sdf = var.to_spark()
sdf = var._to_spark()
sdf.createOrReplaceTempView(df_id)
self._temp_views[df_id] = sdf
return df_id
Expand Down