# Pandas on Spark

## Object Creation - Series

In [None]:
import numpy as np
import pandas as pd
import pyspark.pandas as ps

In [None]:
# Create pandas series

pser = pd.Series([2,7,8,np.nan, 9, 3])

In [None]:
print(pser)

0    2.0
1    7.0
2    8.0
3    NaN
4    9.0
5    3.0
dtype: float64


In [None]:
type(pser)

pandas.core.series.Series

In [None]:
# Create pandas on spark series

psser = ps.Series([2,7,8,np.nan, 9, 3])

In [None]:
psser

0    2.0
1    7.0
2    8.0
3    NaN
4    9.0
5    3.0
dtype: float64

In [None]:
type(psser)

pyspark.pandas.series.Series

In [None]:
# Create pandas on spark series by passing a pandas series
psser_1 = ps.Series(pser)

In [None]:
type(psser_1)

pyspark.pandas.series.Series

In [None]:
# Create pandas on spark series from a pandas series using from_pandas method
psser_2 = ps.from_pandas(pser)

In [None]:
type(psser_2)

pyspark.pandas.series.Series

In [None]:
# Sort_index() methos to sort values
print(psser.sort_index())

0    2.0
1    7.0
2    8.0
3    NaN
4    9.0
5    3.0
dtype: float64


## Object Creation - Dataframe

In [None]:
my_dict = {"A": np.random.rand(5),
           "B": np.random.rand(5)}

In [None]:
my_dict

{'A': array([0.28979721, 0.79305657, 0.50397567, 0.53451766, 0.92259389]),
 'B': array([0.78733222, 0.02793944, 0.80501529, 0.58557883, 0.73433594])}

In [None]:
pdf = pd.DataFrame(my_dict)

In [None]:
pdf

Unnamed: 0,A,B
0,0.289797,0.787332
1,0.793057,0.027939
2,0.503976,0.805015
3,0.534518,0.585579
4,0.922594,0.734336


In [None]:
type(pdf)

pandas.core.frame.DataFrame

In [None]:
# Create pandas-on-spark dataframe
psdf = ps.DataFrame(my_dict)

In [None]:
psdf

Unnamed: 0,A,B
0,0.289797,0.787332
1,0.793057,0.027939
2,0.503976,0.805015
3,0.534518,0.585579
4,0.922594,0.734336


In [None]:
type(psdf)

pyspark.pandas.frame.DataFrame

In [None]:
# Create pandas-on-spark dataframe by passing a pandas dataframe
psdf_1 = ps.DataFrame(pdf)
psdf_2 = ps.from_pandas(pdf)

In [None]:
psdf_1

Unnamed: 0,A,B
0,0.289797,0.787332
1,0.793057,0.027939
2,0.503976,0.805015
3,0.534518,0.585579
4,0.922594,0.734336


In [None]:
psdf_2

Unnamed: 0,A,B
0,0.289797,0.787332
1,0.793057,0.027939
2,0.503976,0.805015
3,0.534518,0.585579
4,0.922594,0.734336


In [None]:
# sort_index() on pandas dataframe
print(psdf_1.sort_index())

          A         B
0  0.289797  0.787332
1  0.793057  0.027939
2  0.503976  0.805015
3  0.534518  0.585579
4  0.922594  0.734336


In [None]:
# sort_index() on pandas dataframe
print(psdf_2.sort_index())

          A         B
0  0.289797  0.787332
1  0.793057  0.027939
2  0.503976  0.805015
3  0.534518  0.585579
4  0.922594  0.734336


## Summary Statistics

In [None]:
psser.describe()

count    5.000000
mean     5.800000
std      3.114482
min      2.000000
25%      3.000000
50%      7.000000
75%      8.000000
max      9.000000
dtype: float64

In [None]:
psdf.describe()

Unnamed: 0,A,B
count,5.0,5.0
mean,0.608788,0.58804
std,0.250343,0.324776
min,0.289797,0.027939
25%,0.503976,0.585579
50%,0.534518,0.734336
75%,0.793057,0.787332
max,0.922594,0.805015


In [None]:
# Sort values on series
psser.sort_values()

0    2.0
5    3.0
1    7.0
2    8.0
4    9.0
3    NaN
dtype: float64

In [None]:
psdf

Unnamed: 0,A,B
0,0.289797,0.787332
1,0.793057,0.027939
2,0.503976,0.805015
3,0.534518,0.585579
4,0.922594,0.734336


In [None]:
# Sort value son dataframe : pass column name on which osrting to be done
psdf.sort_values(by='B')

Unnamed: 0,A,B
1,0.793057,0.027939
3,0.534518,0.585579
4,0.922594,0.734336
0,0.289797,0.787332
2,0.503976,0.805015


In [None]:
# transpose method
psdf.transpose()

Unnamed: 0,0,1,2,3,4
A,0.289797,0.793057,0.503976,0.534518,0.922594
B,0.787332,0.027939,0.805015,0.585579,0.734336


In [None]:
ps.get_option('compute.max_rows')

1000

This shows the maximum number of rows displayed

In [None]:
ps.set_option('compute.max_rows',2000)

In [None]:
ps.get_option('compute.max_rows')

2000

## Selection

In [None]:
psdf['A']

0    0.289797
1    0.793057
2    0.503976
3    0.534518
4    0.922594
Name: A, dtype: float64

In [None]:
psdf[['A','B']]

Unnamed: 0,A,B
0,0.289797,0.787332
1,0.793057,0.027939
2,0.503976,0.805015
3,0.534518,0.585579
4,0.922594,0.734336


In [None]:
psdf.A

0    0.289797
1    0.793057
2    0.503976
3    0.534518
4    0.922594
Name: A, dtype: float64

In [None]:
psdf.loc[0:2]

Unnamed: 0,A,B
0,0.289797,0.787332
1,0.793057,0.027939
2,0.503976,0.805015


Both indexes are inclusive

In [None]:
# Slicing
psdf.iloc[0:2,0:1]

Unnamed: 0,A
0,0.289797
1,0.793057


in iloc end index is exclusive

In [None]:
# for creating new column from another series
from pyspark.pandas.config import set_option, reset_option
set_option("compute.ops_on_diff_frames", True)
psdf["C"] = psser

# reset to default to avoid potentially expensive operation in future
reset_option("compute.ops_on_diff_frames")
print(psdf)

          A         B    C
0  0.289797  0.787332  2.0
1  0.793057  0.027939  7.0
2  0.503976  0.805015  8.0
3  0.534518  0.585579  NaN
4  0.922594  0.734336  9.0


## Applying pandas function with pandas-on-spark object

In [None]:
# Cummulative sum
psdf.apply(np.cumsum)

Unnamed: 0,A,B,C
0,0.289797,0.787332,2.0
1,1.082854,0.815272,9.0
2,1.586829,1.620287,17.0
3,2.121347,2.205866,
4,3.043941,2.940202,26.0


In [None]:
# Cummulative sum column wise
psdf.apply(np.cumsum, axis=1)

Unnamed: 0,A,B,C
0,0.289797,1.077129,3.077129
1,0.793057,0.820996,7.820996
2,0.503976,1.308991,9.308991
3,0.534518,1.120096,
4,0.922594,1.65693,10.65693


In [None]:
# applying lambda functions
psdf.apply(lambda x : x ** 2)

Unnamed: 0,A,B,C
0,0.083982,0.619892,4.0
1,0.628939,0.000781,49.0
2,0.253991,0.64805,64.0
3,0.285709,0.342903,
4,0.851179,0.539249,81.0


In [None]:
def square(x) -> ps.Series[np.float64]:
    return x ** 2

In [None]:
psdf.apply(lambda x : square(x))

Unnamed: 0,A,B,C
0,0.083982,0.619892,4.0
1,0.628939,0.000781,49.0
2,0.253991,0.64805,64.0
3,0.285709,0.342903,
4,0.851179,0.539249,81.0


In [None]:
psdf.apply(square)

Unnamed: 0,A,B,C
0,0.083982,0.619892,4.0
1,0.628939,0.000781,49.0
2,0.253991,0.64805,64.0
3,0.285709,0.342903,
4,0.851179,0.539249,81.0


In [None]:
# range function
psdf_5 = ps.DataFrame({'A':range(1000)})

In [None]:
print(psdf_5)

       A
0      0
1      1
2      2
3      3
4      4
5      5
6      6
7      7
8      8
9      9
10    10
11    11
12    12
13    13
14    14
15    15
16    16
17    17
18    18
19    19
20    20
21    21
22    22
23    23
24    24
25    25
26    26
27    27
28    28
29    29
30    30
31    31
32    32
33    33
34    34
35    35
36    36
37    37
38    38
39    39
40    40
41    41
42    42
43    43
44    44
45    45
46    46
47    47
48    48
49    49
50    50
51    51
52    52
53    53
54    54
55    55
56    56
57    57
58    58
59    59
60    60
61    61
62    62
63    63
64    64
65    65
66    66
67    67
68    68
69    69
70    70
71    71
72    72
73    73
74    74
75    75
76    76
77    77
78    78
79    79
80    80
81    81
82    82
83    83
84    84
85    85
86    86
87    87
88    88
89    89
90    90
91    91
92    92
93    93
94    94
95    95
96    96
97    97
98    98
99    99
100  100
101  101
102  102
103  103
104  104
105  105
106  106
107  107
108  108
109  109
1

In [None]:
len(psdf_5)

1000

In [None]:
psdf_5.apply(lambda col : col.max())

A    999
dtype: int64

Short_limit is 1000

In [None]:
psdf.apply(lambda col:col.max())

A    0.922594
B    0.805015
C    9.000000
dtype: float64

In [None]:
psdf_5 = ps.DataFrame({'A':range(1200)})
psdf_5.apply(lambda col:col.max())

A    1199
dtype: int64

In [None]:
# ps.set_option("compute.shortcut_limit",1200)

## Grouping data

In [None]:
psdf.groupby("C").sum()



Unnamed: 0_level_0,A,B
C,Unnamed: 1_level_1,Unnamed: 2_level_1
2.0,0.289797,0.787332
7.0,0.793057,0.027939
8.0,0.503976,0.805015
9.0,0.922594,0.734336


For multiple columns pass them as list

## Plotting

In [None]:
%matplotlib inline

In [None]:
# bar plot
 
speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]
index = ['snail', 'pig', 'elephant',
         'rabbit', 'giraffe', 'coyote', 'horse']
         
psdf = ps.DataFrame({'speed': speed,
                     'lifespan': lifespan}, index=index)
psdf.plot.bar()

In [None]:
psdf

Unnamed: 0,speed,lifespan
snail,0.1,2.0
pig,17.5,8.0
elephant,40.0,70.0
rabbit,48.0,1.5
giraffe,52.0,25.0
coyote,69.0,12.0
horse,88.0,28.0


In [None]:
# horizontal bar plot
psdf.plot.barh()

In [None]:
#  pie chart
 
psdf = ps.DataFrame({'mass': [0.330, 4.87, 5.97],
                     'radius': [2439.7, 6051.8, 6378.1]},
                    index=['Mercury', 'Venus', 'Earth'])
psdf.plot.pie(y='mass')

In [None]:

# area plot
 
psdf = ps.DataFrame({
    'sales': [3, 2, 3, 9, 10, 6, 3],
    'signups': [5, 5, 6, 12, 14, 13, 9],
    'visits': [20, 42, 28, 62, 81, 50, 90],
}, index=pd.date_range(start='2019/08/15', end='2020/03/09',
                       freq='M'))
psdf.plot.area()

In [None]:

# line plot
 
psdf = ps.DataFrame({'rabbit': [20, 18, 489, 675, 1776],
                     'horse': [4, 25, 281, 600, 1900]},
                    index=[1990, 1997, 2003, 2009, 2014])
psdf.plot.line()


In [None]:
# Histogram
 
pdf = pd.DataFrame(
    np.random.randint(1, 7, 6000),
    columns=['one'])
pdf['two'] = pdf['one'] + np.random.randint(1, 7, 6000)
psdf = ps.from_pandas(pdf)
psdf.plot.hist(bins=12, alpha=0.5)

In [None]:

# scatter plot
 
psdf = ps.DataFrame([[5.1, 3.5, 0], [4.9, 3.0, 0], [7.0, 3.2, 1],
                    [6.4, 3.2, 1], [5.9, 3.0, 2]],
                   columns=['length', 'width', 'species'])
psdf.plot.scatter(x='length',
                  y='width',
                  c='species')

# Missing Functionalities and Workarounds in pandas API on Spark

## Directly use pandas APIs through type conversion

In [None]:
psdf

Unnamed: 0,length,width,species
0,5.1,3.5,0
1,4.9,3.0,0
2,7.0,3.2,1
3,6.4,3.2,1
4,5.9,3.0,2


In [None]:
psidx = psdf.index

In [None]:
psidx

Int64Index([0, 1, 2, 3, 4], dtype='int64')

In [None]:
type(psidx)

pyspark.pandas.indexes.numeric.Int64Index

In [None]:
index_list = psidx.to_list()

In [None]:
index_list

[0, 1, 2, 3, 4]

## Native Support for pandas Objects

In [None]:
psdf = ps.DataFrame({'A': 1.,
                     'B': pd.Timestamp('20130102'),
                     'C': pd.Series(1, index=list(range(4)), dtype='float32'),
                     'D': np.array([3] * 4, dtype='int32'),
                     'F': 'foo'})

In [None]:
psdf

Unnamed: 0,A,B,C,D,F
0,1.0,2013-01-02,1.0,3,foo
1,1.0,2013-01-02,1.0,3,foo
2,1.0,2013-01-02,1.0,3,foo
3,1.0,2013-01-02,1.0,3,foo


## Distributed execution for pandas functions

In [None]:
i = pd.date_range('2018-04-09', periods=2000, freq='1D1min')
ts = ps.DataFrame({'A': ['timestamp']}, index=i)

In [None]:
print(ts)

                             A
2018-04-09 00:00:00  timestamp
2018-04-10 00:01:00  timestamp
2018-04-11 00:02:00  timestamp
2018-04-12 00:03:00  timestamp
2018-04-13 00:04:00  timestamp
2018-04-14 00:05:00  timestamp
2018-04-15 00:06:00  timestamp
2018-04-16 00:07:00  timestamp
2018-04-17 00:08:00  timestamp
2018-04-18 00:09:00  timestamp
2018-04-19 00:10:00  timestamp
2018-04-20 00:11:00  timestamp
2018-04-21 00:12:00  timestamp
2018-04-22 00:13:00  timestamp
2018-04-23 00:14:00  timestamp
2018-04-24 00:15:00  timestamp
2018-04-25 00:16:00  timestamp
2018-04-26 00:17:00  timestamp
2018-04-27 00:18:00  timestamp
2018-04-28 00:19:00  timestamp
2018-04-29 00:20:00  timestamp
2018-04-30 00:21:00  timestamp
2018-05-01 00:22:00  timestamp
2018-05-02 00:23:00  timestamp
2018-05-03 00:24:00  timestamp
2018-05-04 00:25:00  timestamp
2018-05-05 00:26:00  timestamp
2018-05-06 00:27:00  timestamp
2018-05-07 00:28:00  timestamp
2018-05-08 00:29:00  timestamp
2018-05-09 00:30:00  timestamp
2018-05-

In [None]:
ts.between_time('0:15', '0:16')

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m/databricks/spark/python/pyspark/pandas/typedef/typehints.py:338[0m, in [0;36mpandas_on_spark_type[0;34m(tpe)[0m
[1;32m    337[0m     dtype [38;5;241m=[39m pandas_dtype(tpe)
[0;32m--> 338[0m     spark_type [38;5;241m=[39m [43mas_spark_type[49m[43m([49m[43mdtype[49m[43m)[49m
[1;32m    339[0m [38;5;28;01mexcept[39;00m [38;5;167;01mTypeError[39;00m:

File [0;32m/databricks/spark/python/pyspark/pandas/typedef/typehints.py:248[0m, in [0;36mas_spark_type[0;34m(tpe, raise_error, prefer_timestamp_ntz)[0m
[1;32m    247[0m [38;5;28;01mif[39;00m raise_error:
[0;32m--> 248[0m     [38;5;28;01mraise[39;00m [38;5;167;01mTypeError[39;00m([38;5;124m"[39m[38;5;124mType [39m[38;5;132;01m%s[39;00m[38;5;124m was not understood.[39m[38;5;124m"[39m [38;5;241m%[39m tpe)
[1;

In [None]:
ts.to_pandas().between_time('0:15', '0:16')

Unnamed: 0,A
2018-04-24 00:15:00,timestamp
2018-04-25 00:16:00,timestamp
2022-04-04 00:15:00,timestamp
2022-04-05 00:16:00,timestamp


In [None]:
# applying function to pandas on spark dataframe in batch wise manner
ts.pandas_on_spark.apply_batch(func=lambda pdf: pdf.between_time('0:15', '0:16'))


Unnamed: 0,A
2018-04-24 00:15:00,timestamp
2018-04-25 00:16:00,timestamp
2022-04-04 00:15:00,timestamp
2022-04-05 00:16:00,timestamp


## Using SQL in pandas API on Spark

In [None]:
psdf = ps.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                     'rabbit': [20, 18, 489, 675, 1776],
                     'horse': [4, 25, 281, 600, 1900]})

In [None]:
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'sheep': [22, 50, 121, 445, 791],
                    'chicken': [250, 326, 589, 1241, 2118]})

In [None]:
psdf

Unnamed: 0,year,rabbit,horse
0,1990,20,4
1,1997,18,25
2,2003,489,281
3,2009,675,600
4,2014,1776,1900


In [None]:
pdf

Unnamed: 0,year,sheep,chicken
0,1990,22,250
1,1997,50,326
2,2003,121,589
3,2009,445,1241
4,2014,791,2118


In [None]:
ps.sql("SELECT * FROM {psdf} WHERE rabbit > 100", psdf=psdf)

Unnamed: 0,year,rabbit,horse
0,2003,489,281
1,2009,675,600
2,2014,1776,1900


In [None]:
ps.sql('''
       SELECT ps.rabbit, pd.chicken
       FROM {psdf} ps INNER JOIN {pdf} pd
       ON ps.year=pd.year
       ORDER BY ps.rabbit, pd.chicken''', psdf=psdf, pdf=pdf)

Unnamed: 0,rabbit,chicken
0,18,326
1,20,250
2,489,589
3,675,1241
4,1776,2118


# Working with PySpark

## Conversion from and to PySpark DataFrame

In [None]:
# Creating a pandas-on-spark DataFrame
psdf = ps.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})

In [None]:
psdf

Unnamed: 0,A,B
0,1,10
1,2,20
2,3,30
3,4,40
4,5,50


In [None]:
type(psdf)

pyspark.pandas.frame.DataFrame

In [None]:

# Converting pandas-on-spark DataFrame to Spark DataFrame
sdf = psdf.to_spark()

In [None]:
sdf

DataFrame[A: bigint, B: bigint]

In [None]:
sdf.show()

+---+---+
|  A|  B|
+---+---+
|  1| 10|
|  2| 20|
|  3| 30|
|  4| 40|
|  5| 50|
+---+---+



In [None]:
type(sdf)

pyspark.sql.dataframe.DataFrame

In [None]:
psdf_2 = sdf.to_pandas_on_spark()


DataFrame.to_pandas_on_spark is deprecated. Use DataFrame.pandas_api instead.



In [None]:
type(psdf_2)

pyspark.pandas.frame.DataFrame

In [None]:
psdf_3 = sdf.pandas_api()

In [None]:
psdf_3

Unnamed: 0,A,B
0,1,10
1,2,20
2,3,30
3,4,40
4,5,50


In [None]:
type(psdf_3)

pyspark.pandas.frame.DataFrame

## Checking Spark execution plans

In [None]:
# option_context : Context Manager from pandas on spark library
# used to set temporary options for pandas-on-spark dataframe
from pyspark.pandas import option_context
 
with option_context(
        "compute.ops_on_diff_frames", True,
        "compute.default_index_type", 'distributed'):
    df = ps.range(10) + ps.range(10)
    df.spark.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [coalesce(__this___index_level_0__#2415L, __that___index_level_0__#2423L) AS __index_level_0__#2428L, (__this_id#2416L + __that_id#2424L) AS id#2458L]
   +- SortMergeJoin [__this___index_level_0__#2415L], [__that___index_level_0__#2423L], FullOuter
      :- Sort [__this___index_level_0__#2415L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(__this___index_level_0__#2415L, 200), ENSURE_REQUIREMENTS, [plan_id=3831]
      :     +- Project [__index_level_0__#2392L AS __this___index_level_0__#2415L, id#2390L AS __this_id#2416L]
      :        +- Project [distributed_index() AS __index_level_0__#2392L, id#2390L]
      :           +- Range (0, 10, step=1, splits=8)
      +- Sort [__that___index_level_0__#2423L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(__that___index_level_0__#2423L, 200), ENSURE_REQUIREMENTS, [plan_id=3832]
            +- Project [__index_level_0__#2403L AS __that___inde

In [None]:

with option_context(
        "compute.ops_on_diff_frames", False,
        "compute.default_index_type", 'distributed'):
    df = ps.range(10)
    df = df + df
    df.spark.explain()

== Physical Plan ==
*(1) Project [__index_level_0__#2463L, (id#2461L + id#2461L) AS id#2475L]
+- *(1) Project [distributed_index() AS __index_level_0__#2463L, id#2461L]
   +- *(1) Range (0, 10, step=1, splits=8)




## Caching DataFrames

In [None]:

with option_context("compute.default_index_type", 'distributed'):
    df = ps.range(10)
    new_df = (df + df).spark.cache()  # `(df + df)` is cached here as `df`
    new_df.spark.explain()

== Physical Plan ==
InMemoryTableScan [__index_level_0__#2480L, id#2492L]
   +- InMemoryRelation [__index_level_0__#2480L, id#2492L, __natural_order__#2483L], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) Project [__index_level_0__#2480L, (id#2478L + id#2478L) AS id#2492L, __natural_order__#2483L]
            +- *(1) Project [__index_level_0__#2480L, id#2478L, monotonically_increasing_id() AS __natural_order__#2483L]
               +- *(1) Project [distributed_index() AS __index_level_0__#2480L, id#2478L]
                  +- *(1) Range (0, 10, step=1, splits=8)




In [None]:
new_df.spark.unpersist()

In [None]:
with (df + df).spark.cache() as df:
    df.spark.explain()

== Physical Plan ==
InMemoryTableScan [__index_level_0__#2480L, id#2572L]
   +- InMemoryRelation [__index_level_0__#2480L, id#2572L, __natural_order__#2483L], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) Project [__index_level_0__#2480L, (id#2478L + id#2478L) AS id#2572L, __natural_order__#2483L]
            +- *(1) Project [__index_level_0__#2480L, id#2478L, monotonically_increasing_id() AS __natural_order__#2483L]
               +- *(1) Project [distributed_index() AS __index_level_0__#2480L, id#2478L]
                  +- *(1) Range (0, 10, step=1, splits=8)


