/
accessors.py
1249 lines (1038 loc) · 41.2 KB
/
accessors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Spark related features. Usually, the features here are missing in pandas
but Spark has it.
"""
from abc import ABCMeta, abstractmethod
from distutils.version import LooseVersion
from typing import TYPE_CHECKING, Optional, Union, List, cast
import pyspark
from pyspark import StorageLevel
from pyspark.sql import Column, DataFrame as SparkDataFrame
from pyspark.sql.types import DataType, StructType
if TYPE_CHECKING:
import pyspark.pandas as pp # noqa: F401 (SPARK-34943)
from pyspark.pandas.base import IndexOpsMixin # noqa: F401 (SPARK-34943)
from pyspark.pandas.frame import CachedDataFrame # noqa: F401 (SPARK-34943)
class SparkIndexOpsMethods(object, metaclass=ABCMeta):
"""Spark related features. Usually, the features here are missing in pandas
but Spark has it."""
def __init__(self, data: Union["IndexOpsMixin"]):
self._data = data
@property
def data_type(self) -> DataType:
""" Returns the data type as defined by Spark, as a Spark DataType object."""
return self._data._internal.spark_type_for(self._data._column_label)
@property
def nullable(self) -> bool:
""" Returns the nullability as defined by Spark. """
return self._data._internal.spark_column_nullable_for(self._data._column_label)
@property
def column(self) -> Column:
"""
Spark Column object representing the Series/Index.
.. note:: This Spark Column object is strictly stick to its base DataFrame the Series/Index
was derived from.
"""
return self._data._internal.spark_column_for(self._data._column_label)
def transform(self, func) -> Union["pp.Series", "pp.Index"]:
"""
Applies a function that takes and returns a Spark column. It allows to natively
apply a Spark function and column APIs with the Spark column internally used
in Series or Index. The output length of the Spark column should be same as input's.
.. note:: It requires to have the same input and output length; therefore,
the aggregate Spark functions such as count does not work.
Parameters
----------
func : function
Function to use for transforming the data by using Spark columns.
Returns
-------
Series or Index
Raises
------
ValueError : If the output from the function is not a Spark column.
Examples
--------
>>> from pyspark.sql.functions import log
>>> df = pp.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
>>> df
a b
0 1 4
1 2 5
2 3 6
>>> df.a.spark.transform(lambda c: log(c))
0 0.000000
1 0.693147
2 1.098612
Name: a, dtype: float64
>>> df.index.spark.transform(lambda c: c + 10)
Int64Index([10, 11, 12], dtype='int64')
>>> df.a.spark.transform(lambda c: c + df.b.spark.column)
0 5
1 7
2 9
Name: a, dtype: int64
"""
from pyspark.pandas import MultiIndex
if isinstance(self._data, MultiIndex):
raise NotImplementedError("MultiIndex does not support spark.transform yet.")
output = func(self._data.spark.column)
if not isinstance(output, Column):
raise ValueError(
"The output of the function [%s] should be of a "
"pyspark.sql.Column; however, got [%s]." % (func, type(output))
)
new_ser = self._data._with_new_scol(scol=output)
# Trigger the resolution so it throws an exception if anything does wrong
# within the function, for example,
# `df1.a.spark.transform(lambda _: F.col("non-existent"))`.
new_ser._internal.to_internal_spark_frame
return new_ser
@property
@abstractmethod
def analyzed(self) -> Union["pp.Series", "pp.Index"]:
pass
class SparkSeriesMethods(SparkIndexOpsMethods):
def transform(self, func) -> "pp.Series":
return cast("pp.Series", super().transform(func))
transform.__doc__ = SparkIndexOpsMethods.transform.__doc__
def apply(self, func) -> "pp.Series":
"""
Applies a function that takes and returns a Spark column. It allows to natively
apply a Spark function and column APIs with the Spark column internally used
in Series or Index.
.. note:: It forces to lose the index and end up with using default index. It is
preferred to use :meth:`Series.spark.transform` or `:meth:`DataFrame.spark.apply`
with specifying the `inedx_col`.
.. note:: It does not require to have the same length of the input and output.
However, it requires to create a new DataFrame internally which will require
to set `compute.ops_on_diff_frames` to compute even with the same origin
DataFrame that is expensive, whereas :meth:`Series.spark.transform` does not
require it.
Parameters
----------
func : function
Function to apply the function against the data by using Spark columns.
Returns
-------
Series
Raises
------
ValueError : If the output from the function is not a Spark column.
Examples
--------
>>> from pyspark import pandas as pp
>>> from pyspark.sql.functions import count, lit
>>> df = pp.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
>>> df
a b
0 1 4
1 2 5
2 3 6
>>> df.a.spark.apply(lambda c: count(c))
0 3
Name: a, dtype: int64
>>> df.a.spark.apply(lambda c: c + df.b.spark.column)
0 5
1 7
2 9
Name: a, dtype: int64
"""
from pyspark.pandas.frame import DataFrame
from pyspark.pandas.series import Series, first_series
from pyspark.pandas.internal import HIDDEN_COLUMNS
output = func(self._data.spark.column)
if not isinstance(output, Column):
raise ValueError(
"The output of the function [%s] should be of a "
"pyspark.sql.Column; however, got [%s]." % (func, type(output))
)
assert isinstance(self._data, Series)
sdf = self._data._internal.spark_frame.drop(*HIDDEN_COLUMNS).select(output)
# Lose index.
return first_series(DataFrame(sdf)).rename(self._data.name)
@property
def analyzed(self) -> "pp.Series":
"""
Returns a new Series with the analyzed Spark DataFrame.
After multiple operations, the underlying Spark plan could grow huge
and make the Spark planner take a long time to finish the planning.
This function is for the workaround to avoid it.
.. note:: After analyzed, operations between the analyzed Series and the original one
will **NOT** work without setting a config `compute.ops_on_diff_frames` to `True`.
Returns
-------
Series
Examples
--------
>>> ser = pp.Series([1, 2, 3])
>>> ser
0 1
1 2
2 3
dtype: int64
The analyzed one should return the same value.
>>> ser.spark.analyzed
0 1
1 2
2 3
dtype: int64
However, it won't work with the same anchor Series.
>>> ser + ser.spark.analyzed
Traceback (most recent call last):
...
ValueError: ... enable 'compute.ops_on_diff_frames' option.
>>> with pp.option_context('compute.ops_on_diff_frames', True):
... (ser + ser.spark.analyzed).sort_index()
0 2
1 4
2 6
dtype: int64
"""
from pyspark.pandas.frame import DataFrame
from pyspark.pandas.series import first_series
return first_series(DataFrame(self._data._internal.resolved_copy))
class SparkIndexMethods(SparkIndexOpsMethods):
def transform(self, func) -> "pp.Index":
return cast("pp.Index", super().transform(func))
transform.__doc__ = SparkIndexOpsMethods.transform.__doc__
@property
def analyzed(self) -> "pp.Index":
"""
Returns a new Index with the analyzed Spark DataFrame.
After multiple operations, the underlying Spark plan could grow huge
and make the Spark planner take a long time to finish the planning.
This function is for the workaround to avoid it.
.. note:: After analyzed, operations between the analyzed Series and the original one
will **NOT** work without setting a config `compute.ops_on_diff_frames` to `True`.
Returns
-------
Index
Examples
--------
>>> idx = pp.Index([1, 2, 3])
>>> idx
Int64Index([1, 2, 3], dtype='int64')
The analyzed one should return the same value.
>>> idx.spark.analyzed
Int64Index([1, 2, 3], dtype='int64')
However, it won't work with the same anchor Index.
>>> idx + idx.spark.analyzed
Traceback (most recent call last):
...
ValueError: ... enable 'compute.ops_on_diff_frames' option.
>>> with pp.option_context('compute.ops_on_diff_frames', True):
... (idx + idx.spark.analyzed).sort_values()
Int64Index([2, 4, 6], dtype='int64')
"""
from pyspark.pandas.frame import DataFrame
return DataFrame(self._data._internal.resolved_copy).index
class SparkFrameMethods(object):
"""Spark related features. Usually, the features here are missing in pandas
but Spark has it."""
def __init__(self, frame: "pp.DataFrame"):
self._kdf = frame
def schema(self, index_col: Optional[Union[str, List[str]]] = None) -> StructType:
"""
Returns the underlying Spark schema.
Returns
-------
pyspark.sql.types.StructType
The underlying Spark schema.
Parameters
----------
index_col: str or list of str, optional, default: None
Column names to be used in Spark to represent Koalas' index. The index name
in Koalas is ignored. By default, the index is always lost.
Examples
--------
>>> df = pp.DataFrame({'a': list('abc'),
... 'b': list(range(1, 4)),
... 'c': np.arange(3, 6).astype('i1'),
... 'd': np.arange(4.0, 7.0, dtype='float64'),
... 'e': [True, False, True],
... 'f': pd.date_range('20130101', periods=3)},
... columns=['a', 'b', 'c', 'd', 'e', 'f'])
>>> df.spark.schema().simpleString()
'struct<a:string,b:bigint,c:tinyint,d:double,e:boolean,f:timestamp>'
>>> df.spark.schema(index_col='index').simpleString()
'struct<index:bigint,a:string,b:bigint,c:tinyint,d:double,e:boolean,f:timestamp>'
"""
return self.frame(index_col).schema
def print_schema(self, index_col: Optional[Union[str, List[str]]] = None) -> None:
"""
Prints out the underlying Spark schema in the tree format.
Parameters
----------
index_col: str or list of str, optional, default: None
Column names to be used in Spark to represent Koalas' index. The index name
in Koalas is ignored. By default, the index is always lost.
Returns
-------
None
Examples
--------
>>> df = pp.DataFrame({'a': list('abc'),
... 'b': list(range(1, 4)),
... 'c': np.arange(3, 6).astype('i1'),
... 'd': np.arange(4.0, 7.0, dtype='float64'),
... 'e': [True, False, True],
... 'f': pd.date_range('20130101', periods=3)},
... columns=['a', 'b', 'c', 'd', 'e', 'f'])
>>> df.spark.print_schema() # doctest: +NORMALIZE_WHITESPACE
root
|-- a: string (nullable = false)
|-- b: long (nullable = false)
|-- c: byte (nullable = false)
|-- d: double (nullable = false)
|-- e: boolean (nullable = false)
|-- f: timestamp (nullable = false)
>>> df.spark.print_schema(index_col='index') # doctest: +NORMALIZE_WHITESPACE
root
|-- index: long (nullable = false)
|-- a: string (nullable = false)
|-- b: long (nullable = false)
|-- c: byte (nullable = false)
|-- d: double (nullable = false)
|-- e: boolean (nullable = false)
|-- f: timestamp (nullable = false)
"""
self.frame(index_col).printSchema()
def frame(self, index_col: Optional[Union[str, List[str]]] = None) -> SparkDataFrame:
"""
Return the current DataFrame as a Spark DataFrame. :meth:`DataFrame.spark.frame` is an
alias of :meth:`DataFrame.to_spark`.
Parameters
----------
index_col: str or list of str, optional, default: None
Column names to be used in Spark to represent Koalas' index. The index name
in Koalas is ignored. By default, the index is always lost.
See Also
--------
DataFrame.to_spark
DataFrame.to_koalas
DataFrame.spark.frame
Examples
--------
By default, this method loses the index as below.
>>> df = pp.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]})
>>> df.to_spark().show() # doctest: +NORMALIZE_WHITESPACE
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 4| 7|
| 2| 5| 8|
| 3| 6| 9|
+---+---+---+
>>> df = pp.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]})
>>> df.spark.frame().show() # doctest: +NORMALIZE_WHITESPACE
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 4| 7|
| 2| 5| 8|
| 3| 6| 9|
+---+---+---+
If `index_col` is set, it keeps the index column as specified.
>>> df.to_spark(index_col="index").show() # doctest: +NORMALIZE_WHITESPACE
+-----+---+---+---+
|index| a| b| c|
+-----+---+---+---+
| 0| 1| 4| 7|
| 1| 2| 5| 8|
| 2| 3| 6| 9|
+-----+---+---+---+
Keeping index column is useful when you want to call some Spark APIs and
convert it back to Koalas DataFrame without creating a default index, which
can affect performance.
>>> spark_df = df.to_spark(index_col="index")
>>> spark_df = spark_df.filter("a == 2")
>>> spark_df.to_koalas(index_col="index") # doctest: +NORMALIZE_WHITESPACE
a b c
index
1 2 5 8
In case of multi-index, specify a list to `index_col`.
>>> new_df = df.set_index("a", append=True)
>>> new_spark_df = new_df.to_spark(index_col=["index_1", "index_2"])
>>> new_spark_df.show() # doctest: +NORMALIZE_WHITESPACE
+-------+-------+---+---+
|index_1|index_2| b| c|
+-------+-------+---+---+
| 0| 1| 4| 7|
| 1| 2| 5| 8|
| 2| 3| 6| 9|
+-------+-------+---+---+
Likewise, can be converted to back to Koalas DataFrame.
>>> new_spark_df.to_koalas(
... index_col=["index_1", "index_2"]) # doctest: +NORMALIZE_WHITESPACE
b c
index_1 index_2
0 1 4 7
1 2 5 8
2 3 6 9
"""
from pyspark.pandas.utils import name_like_string
kdf = self._kdf
data_column_names = []
data_columns = []
for i, (label, spark_column, column_name) in enumerate(
zip(
kdf._internal.column_labels,
kdf._internal.data_spark_columns,
kdf._internal.data_spark_column_names,
)
):
name = str(i) if label is None else name_like_string(label)
data_column_names.append(name)
if column_name != name:
spark_column = spark_column.alias(name)
data_columns.append(spark_column)
if index_col is None:
return kdf._internal.spark_frame.select(data_columns)
else:
if isinstance(index_col, str):
index_col = [index_col]
old_index_scols = kdf._internal.index_spark_columns
if len(index_col) != len(old_index_scols):
raise ValueError(
"length of index columns is %s; however, the length of the given "
"'index_col' is %s." % (len(old_index_scols), len(index_col))
)
if any(col in data_column_names for col in index_col):
raise ValueError("'index_col' cannot be overlapped with other columns.")
new_index_scols = [
index_scol.alias(col) for index_scol, col in zip(old_index_scols, index_col)
]
return kdf._internal.spark_frame.select(new_index_scols + data_columns)
def cache(self) -> "CachedDataFrame":
"""
Yields and caches the current DataFrame.
The Koalas DataFrame is yielded as a protected resource and its corresponding
data is cached which gets uncached after execution goes of the context.
If you want to specify the StorageLevel manually, use :meth:`DataFrame.spark.persist`
See Also
--------
DataFrame.spark.persist
Examples
--------
>>> df = pp.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
... columns=['dogs', 'cats'])
>>> df
dogs cats
0 0.2 0.3
1 0.0 0.6
2 0.6 0.0
3 0.2 0.1
>>> with df.spark.cache() as cached_df:
... print(cached_df.count())
...
dogs 4
cats 4
dtype: int64
>>> df = df.spark.cache()
>>> df.to_pandas().mean(axis=1)
0 0.25
1 0.30
2 0.30
3 0.15
dtype: float64
To uncache the dataframe, use `unpersist` function
>>> df.spark.unpersist()
"""
from pyspark.pandas.frame import CachedDataFrame
self._kdf._update_internal_frame(
self._kdf._internal.resolved_copy, requires_same_anchor=False
)
return CachedDataFrame(self._kdf._internal)
def persist(
self, storage_level: StorageLevel = StorageLevel.MEMORY_AND_DISK
) -> "CachedDataFrame":
"""
Yields and caches the current DataFrame with a specific StorageLevel.
If a StogeLevel is not given, the `MEMORY_AND_DISK` level is used by default like PySpark.
The Koalas DataFrame is yielded as a protected resource and its corresponding
data is cached which gets uncached after execution goes of the context.
See Also
--------
DataFrame.spark.cache
Examples
--------
>>> import pyspark
>>> df = pp.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
... columns=['dogs', 'cats'])
>>> df
dogs cats
0 0.2 0.3
1 0.0 0.6
2 0.6 0.0
3 0.2 0.1
Set the StorageLevel to `MEMORY_ONLY`.
>>> with df.spark.persist(pyspark.StorageLevel.MEMORY_ONLY) as cached_df:
... print(cached_df.spark.storage_level)
... print(cached_df.count())
...
Memory Serialized 1x Replicated
dogs 4
cats 4
dtype: int64
Set the StorageLevel to `DISK_ONLY`.
>>> with df.spark.persist(pyspark.StorageLevel.DISK_ONLY) as cached_df:
... print(cached_df.spark.storage_level)
... print(cached_df.count())
...
Disk Serialized 1x Replicated
dogs 4
cats 4
dtype: int64
If a StorageLevel is not given, it uses `MEMORY_AND_DISK` by default.
>>> with df.spark.persist() as cached_df:
... print(cached_df.spark.storage_level)
... print(cached_df.count())
...
Disk Memory Serialized 1x Replicated
dogs 4
cats 4
dtype: int64
>>> df = df.spark.persist()
>>> df.to_pandas().mean(axis=1)
0 0.25
1 0.30
2 0.30
3 0.15
dtype: float64
To uncache the dataframe, use `unpersist` function
>>> df.spark.unpersist()
"""
from pyspark.pandas.frame import CachedDataFrame
self._kdf._update_internal_frame(
self._kdf._internal.resolved_copy, requires_same_anchor=False
)
return CachedDataFrame(self._kdf._internal, storage_level=storage_level)
def hint(self, name: str, *parameters) -> "pp.DataFrame":
"""
Specifies some hint on the current DataFrame.
Parameters
----------
name : A name of the hint.
parameters : Optional parameters.
Returns
-------
ret : DataFrame with the hint.
See Also
--------
broadcast : Marks a DataFrame as small enough for use in broadcast joins.
Examples
--------
>>> df1 = pp.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
... 'value': [1, 2, 3, 5]},
... columns=['lkey', 'value']).set_index('lkey')
>>> df2 = pp.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
... 'value': [5, 6, 7, 8]},
... columns=['rkey', 'value']).set_index('rkey')
>>> merged = df1.merge(df2.spark.hint("broadcast"), left_index=True, right_index=True)
>>> merged.spark.explain() # doctest: +ELLIPSIS
== Physical Plan ==
...
...BroadcastHashJoin...
...
"""
from pyspark.pandas.frame import DataFrame
internal = self._kdf._internal.resolved_copy
return DataFrame(internal.with_new_sdf(internal.spark_frame.hint(name, *parameters)))
def to_table(
self,
name: str,
format: Optional[str] = None,
mode: str = "overwrite",
partition_cols: Optional[Union[str, List[str]]] = None,
index_col: Optional[Union[str, List[str]]] = None,
**options
) -> None:
"""
Write the DataFrame into a Spark table. :meth:`DataFrame.spark.to_table`
is an alias of :meth:`DataFrame.to_table`.
Parameters
----------
name : str, required
Table name in Spark.
format : string, optional
Specifies the output data source format. Some common ones are:
- 'delta'
- 'parquet'
- 'orc'
- 'json'
- 'csv'
mode : str {'append', 'overwrite', 'ignore', 'error', 'errorifexists'}, default
'overwrite'. Specifies the behavior of the save operation when the table exists
already.
- 'append': Append the new data to existing data.
- 'overwrite': Overwrite existing data.
- 'ignore': Silently ignore this operation if data already exists.
- 'error' or 'errorifexists': Throw an exception if data already exists.
partition_cols : str or list of str, optional, default None
Names of partitioning columns
index_col: str or list of str, optional, default: None
Column names to be used in Spark to represent Koalas' index. The index name
in Koalas is ignored. By default, the index is always lost.
options
Additional options passed directly to Spark.
Returns
-------
None
See Also
--------
read_table
DataFrame.to_spark_io
DataFrame.spark.to_spark_io
DataFrame.to_parquet
Examples
--------
>>> df = pp.DataFrame(dict(
... date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
... country=['KR', 'US', 'JP'],
... code=[1, 2 ,3]), columns=['date', 'country', 'code'])
>>> df
date country code
0 2012-01-31 12:00:00 KR 1
1 2012-02-29 12:00:00 US 2
2 2012-03-31 12:00:00 JP 3
>>> df.to_table('%s.my_table' % db, partition_cols='date')
"""
if "options" in options and isinstance(options.get("options"), dict) and len(options) == 1:
options = options.get("options") # type: ignore
self._kdf.spark.frame(index_col=index_col).write.saveAsTable(
name=name, format=format, mode=mode, partitionBy=partition_cols, **options
)
def to_spark_io(
self,
path: Optional[str] = None,
format: Optional[str] = None,
mode: str = "overwrite",
partition_cols: Optional[Union[str, List[str]]] = None,
index_col: Optional[Union[str, List[str]]] = None,
**options
) -> None:
"""Write the DataFrame out to a Spark data source. :meth:`DataFrame.spark.to_spark_io`
is an alias of :meth:`DataFrame.to_spark_io`.
Parameters
----------
path : string, optional
Path to the data source.
format : string, optional
Specifies the output data source format. Some common ones are:
- 'delta'
- 'parquet'
- 'orc'
- 'json'
- 'csv'
mode : str {'append', 'overwrite', 'ignore', 'error', 'errorifexists'}, default
'overwrite'. Specifies the behavior of the save operation when data already.
- 'append': Append the new data to existing data.
- 'overwrite': Overwrite existing data.
- 'ignore': Silently ignore this operation if data already exists.
- 'error' or 'errorifexists': Throw an exception if data already exists.
partition_cols : str or list of str, optional
Names of partitioning columns
index_col: str or list of str, optional, default: None
Column names to be used in Spark to represent Koalas' index. The index name
in Koalas is ignored. By default, the index is always lost.
options : dict
All other options passed directly into Spark's data source.
Returns
-------
None
See Also
--------
read_spark_io
DataFrame.to_delta
DataFrame.to_parquet
DataFrame.to_table
DataFrame.to_spark_io
DataFrame.spark.to_spark_io
Examples
--------
>>> df = pp.DataFrame(dict(
... date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
... country=['KR', 'US', 'JP'],
... code=[1, 2 ,3]), columns=['date', 'country', 'code'])
>>> df
date country code
0 2012-01-31 12:00:00 KR 1
1 2012-02-29 12:00:00 US 2
2 2012-03-31 12:00:00 JP 3
>>> df.to_spark_io(path='%s/to_spark_io/foo.json' % path, format='json')
"""
if "options" in options and isinstance(options.get("options"), dict) and len(options) == 1:
options = options.get("options") # type: ignore
self._kdf.spark.frame(index_col=index_col).write.save(
path=path, format=format, mode=mode, partitionBy=partition_cols, **options
)
def explain(self, extended: Optional[bool] = None, mode: Optional[str] = None) -> None:
"""
Prints the underlying (logical and physical) Spark plans to the console for debugging
purpose.
Parameters
----------
extended : boolean, default ``False``.
If ``False``, prints only the physical plan.
mode : string, default ``None``.
The expected output format of plans.
Returns
-------
None
Examples
--------
>>> df = pp.DataFrame({'id': range(10)})
>>> df.spark.explain() # doctest: +ELLIPSIS
== Physical Plan ==
...
>>> df.spark.explain(True) # doctest: +ELLIPSIS
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
>>> df.spark.explain("extended") # doctest: +ELLIPSIS
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
>>> df.spark.explain(mode="extended") # doctest: +ELLIPSIS
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
"""
if LooseVersion(pyspark.__version__) < LooseVersion("3.0"):
if mode is not None and extended is not None:
raise Exception("extended and mode should not be set together.")
if extended is not None and isinstance(extended, str):
mode = extended
if mode is not None:
if mode == "simple":
extended = False
elif mode == "extended":
extended = True
else:
raise ValueError(
"Unknown spark.explain mode: {}. Accepted spark.explain modes are "
"'simple', 'extended'.".format(mode)
)
if extended is None:
extended = False
self._kdf._internal.to_internal_spark_frame.explain(extended)
else:
self._kdf._internal.to_internal_spark_frame.explain(extended, mode)
def apply(self, func, index_col: Optional[Union[str, List[str]]] = None) -> "pp.DataFrame":
"""
Applies a function that takes and returns a Spark DataFrame. It allows natively
apply a Spark function and column APIs with the Spark column internally used
in Series or Index.
.. note:: set `index_col` and keep the column named as so in the output Spark
DataFrame to avoid using the default index to prevent performance penalty.
If you omit `index_col`, it will use default index which is potentially
expensive in general.
.. note:: it will lose column labels. This is a synonym of
``func(kdf.to_spark(index_col)).to_koalas(index_col)``.
Parameters
----------
func : function
Function to apply the function against the data by using Spark DataFrame.
Returns
-------
DataFrame
Raises
------
ValueError : If the output from the function is not a Spark DataFrame.
Examples
--------
>>> kdf = pp.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
>>> kdf
a b
0 1 4
1 2 5
2 3 6
>>> kdf.spark.apply(
... lambda sdf: sdf.selectExpr("a + b as c", "index"), index_col="index")
... # doctest: +NORMALIZE_WHITESPACE
c
index
0 5
1 7
2 9
The case below ends up with using the default index, which should be avoided
if possible.
>>> kdf.spark.apply(lambda sdf: sdf.groupby("a").count().sort("a"))
a count
0 1 1
1 2 1
2 3 1
"""
output = func(self.frame(index_col))
if not isinstance(output, SparkDataFrame):
raise ValueError(
"The output of the function [%s] should be of a "
"pyspark.sql.DataFrame; however, got [%s]." % (func, type(output))
)
return output.to_koalas(index_col)
def repartition(self, num_partitions: int) -> "pp.DataFrame":
"""
Returns a new DataFrame partitioned by the given partitioning expressions. The
resulting DataFrame is hash partitioned.
Parameters
----------
num_partitions : int
The target number of partitions.
Returns
-------
DataFrame
Examples
--------
>>> kdf = pp.DataFrame({"age": [5, 5, 2, 2],
... "name": ["Bob", "Bob", "Alice", "Alice"]}).set_index("age")
>>> kdf.sort_index() # doctest: +NORMALIZE_WHITESPACE
name
age
2 Alice
2 Alice
5 Bob
5 Bob
>>> new_kdf = kdf.spark.repartition(7)
>>> new_kdf.to_spark().rdd.getNumPartitions()
7
>>> new_kdf.sort_index() # doctest: +NORMALIZE_WHITESPACE
name
age
2 Alice