# Quickstart: Pandas API on Spark

This is a short introduction to pandas API on Spark, geared mainly for new users. This notebook shows you some key differences between pandas and pandas API on Spark. You can run this examples by yourself in 'Live Notebook: pandas API on Spark' at [the quickstart page](https://spark.apache.org/docs/latest/api/python/getting_started/index.html).

Customarily, we import pandas API on Spark as follows:

In [1]:
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [2]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

## Object Creation



Creating a pandas-on-Spark Series by passing a list of values, letting pandas API on Spark create a default integer index:

In [3]:
s = ps.Series([1, 3, 5, np.nan, 6, 8])

  fields = [
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/22 11:37:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/22 11:37:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


  for column, series in pdf.iteritems():


In [4]:
s

                                                                                

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

Creating a pandas-on-Spark DataFrame by passing a dict of objects that can be converted to series-like.

In [5]:
psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])

  fields = [
  for column, series in pdf.iteritems():


In [6]:
psdf

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


Creating a pandas DataFrame by passing a numpy array, with a datetime index and labeled columns:

In [7]:
dates = pd.date_range('20130101', periods=6)

In [8]:
dates

DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06'],
              dtype='datetime64[ns]', freq='D')

In [9]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))

In [10]:
pdf

Unnamed: 0,A,B,C,D
2013-01-01,-0.850689,-0.764666,-1.243188,0.501557
2013-01-02,0.219467,0.41405,0.474394,0.580563
2013-01-03,0.339819,-1.475556,0.719637,0.031728
2013-01-04,-0.387437,-0.797423,0.382872,-0.255168
2013-01-05,0.300101,-0.906621,1.403083,2.183866
2013-01-06,1.154524,1.04161,0.542439,0.873893


Now, this pandas DataFrame can be converted to a pandas-on-Spark DataFrame

In [11]:
psdf = ps.from_pandas(pdf)

  fields = [
  for column, series in pdf.iteritems():


In [12]:
type(psdf)

pyspark.pandas.frame.DataFrame

It looks and behaves the same as a pandas DataFrame.

In [13]:
psdf

  series = series.astype(t, copy=False)


Unnamed: 0,A,B,C,D
2013-01-01,-0.850689,-0.764666,-1.243188,0.501557
2013-01-02,0.219467,0.41405,0.474394,0.580563
2013-01-03,0.339819,-1.475556,0.719637,0.031728
2013-01-04,-0.387437,-0.797423,0.382872,-0.255168
2013-01-05,0.300101,-0.906621,1.403083,2.183866
2013-01-06,1.154524,1.04161,0.542439,0.873893


Also, it is possible to create a pandas-on-Spark DataFrame from Spark DataFrame easily.

Creating a Spark DataFrame from pandas DataFrame

In [14]:
spark = SparkSession.builder.getOrCreate()

In [15]:
sdf = spark.createDataFrame(pdf)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [16]:
sdf.show()

+-------------------+-------------------+-------------------+-------------------+
|                  A|                  B|                  C|                  D|
+-------------------+-------------------+-------------------+-------------------+
|-0.8506886374555153| -0.764665706477574|-1.2431876040950454| 0.5015565883829702|
|  0.219466666044472|0.41405026077049156| 0.4743943740172285| 0.5805632453364462|
|0.33981903773908745|-1.4755561062029638| 0.7196368757832436|0.03172786260197081|
|-0.3874372077293841|-0.7974232745953413| 0.3828718679031604|-0.2551684761861235|
| 0.3001012966530861|-0.9066206743664363| 1.4030833059849108|  2.183865584696714|
| 1.1545240050875596| 1.0416095504796914| 0.5424389133429812| 0.8738931073508763|
+-------------------+-------------------+-------------------+-------------------+



Creating pandas-on-Spark DataFrame from Spark DataFrame.

In [17]:
psdf = sdf.pandas_api()

In [18]:
psdf

Unnamed: 0,A,B,C,D
0,-0.850689,-0.764666,-1.243188,0.501557
1,0.219467,0.41405,0.474394,0.580563
2,0.339819,-1.475556,0.719637,0.031728
3,-0.387437,-0.797423,0.382872,-0.255168
4,0.300101,-0.906621,1.403083,2.183866
5,1.154524,1.04161,0.542439,0.873893


Having specific [dtypes](http://pandas.pydata.org/pandas-docs/stable/basics.html#basics-dtypes) . Types that are common to both Spark and pandas are currently supported.

In [19]:
psdf.dtypes

A    float64
B    float64
C    float64
D    float64
dtype: object

Here is how to show top rows from the frame below.

Note that the data in a Spark dataframe does not preserve the natural order by default. The natural order can be preserved by setting `compute.ordered_head` option but it causes a performance overhead with sorting internally.

In [20]:
psdf.head()

Unnamed: 0,A,B,C,D
0,-0.850689,-0.764666,-1.243188,0.501557
1,0.219467,0.41405,0.474394,0.580563
2,0.339819,-1.475556,0.719637,0.031728
3,-0.387437,-0.797423,0.382872,-0.255168
4,0.300101,-0.906621,1.403083,2.183866


Displaying the index, columns, and the underlying numpy data.

In [21]:
psdf.index

Int64Index([0, 1, 2, 3, 4, 5], dtype='int64')

In [22]:
psdf.columns

Index(['A', 'B', 'C', 'D'], dtype='object')

In [23]:
psdf.to_numpy()



array([[-0.85068864, -0.76466571, -1.2431876 ,  0.50155659],
       [ 0.21946667,  0.41405026,  0.47439437,  0.58056325],
       [ 0.33981904, -1.47555611,  0.71963688,  0.03172786],
       [-0.38743721, -0.79742327,  0.38287187, -0.25516848],
       [ 0.3001013 , -0.90662067,  1.40308331,  2.18386558],
       [ 1.15452401,  1.04160955,  0.54243891,  0.87389311]])

Showing a quick statistic summary of your data

In [24]:
psdf.describe()

                                                                                

Unnamed: 0,A,B,C,D
count,6.0,6.0,6.0,6.0
mean,0.129298,-0.414768,0.379873,0.65274
std,0.687037,0.942764,0.875454,0.852206
min,-0.850689,-1.475556,-1.243188,-0.255168
25%,-0.387437,-0.906621,0.382872,0.031728
50%,0.219467,-0.797423,0.474394,0.501557
75%,0.339819,0.41405,0.719637,0.873893
max,1.154524,1.04161,1.403083,2.183866


Transposing your data

In [25]:
psdf.T

  fields = [
  for column, series in pdf.iteritems():


Unnamed: 0,0,1,2,3,4,5
A,-0.850689,0.219467,0.339819,-0.387437,0.300101,1.154524
B,-0.764666,0.41405,-1.475556,-0.797423,-0.906621,1.04161
C,-1.243188,0.474394,0.719637,0.382872,1.403083,0.542439
D,0.501557,0.580563,0.031728,-0.255168,2.183866,0.873893


Sorting by its index

In [26]:
psdf.sort_index(ascending=False)

Unnamed: 0,A,B,C,D
5,1.154524,1.04161,0.542439,0.873893
4,0.300101,-0.906621,1.403083,2.183866
3,-0.387437,-0.797423,0.382872,-0.255168
2,0.339819,-1.475556,0.719637,0.031728
1,0.219467,0.41405,0.474394,0.580563
0,-0.850689,-0.764666,-1.243188,0.501557


Sorting by value

In [27]:
psdf.sort_values(by='B')

Unnamed: 0,A,B,C,D
2,0.339819,-1.475556,0.719637,0.031728
4,0.300101,-0.906621,1.403083,2.183866
3,-0.387437,-0.797423,0.382872,-0.255168
0,-0.850689,-0.764666,-1.243188,0.501557
1,0.219467,0.41405,0.474394,0.580563
5,1.154524,1.04161,0.542439,0.873893


## Missing Data
Pandas API on Spark primarily uses the value `np.nan` to represent missing data. It is by default not included in computations. 


In [28]:
pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])

In [29]:
pdf1.loc[dates[0]:dates[1], 'E'] = 1

In [30]:
psdf1 = ps.from_pandas(pdf1)

  fields = [
  for column, series in pdf.iteritems():


In [31]:
psdf1

  series = series.astype(t, copy=False)


Unnamed: 0,A,B,C,D,E
2013-01-01,-0.850689,-0.764666,-1.243188,0.501557,1.0
2013-01-02,0.219467,0.41405,0.474394,0.580563,1.0
2013-01-03,0.339819,-1.475556,0.719637,0.031728,
2013-01-04,-0.387437,-0.797423,0.382872,-0.255168,


To drop any rows that have missing data.

In [32]:
psdf1.dropna(how='any')

  series = series.astype(t, copy=False)


Unnamed: 0,A,B,C,D,E
2013-01-01,-0.850689,-0.764666,-1.243188,0.501557,1.0
2013-01-02,0.219467,0.41405,0.474394,0.580563,1.0


Filling missing data.

In [33]:
psdf1.fillna(value=5)

  series = series.astype(t, copy=False)


Unnamed: 0,A,B,C,D,E
2013-01-01,-0.850689,-0.764666,-1.243188,0.501557,1.0
2013-01-02,0.219467,0.41405,0.474394,0.580563,1.0
2013-01-03,0.339819,-1.475556,0.719637,0.031728,5.0
2013-01-04,-0.387437,-0.797423,0.382872,-0.255168,5.0


## Operations

### Stats
Performing a descriptive statistic:

In [34]:
psdf.mean()

  fields = [
  for column, series in pdf.iteritems():


A    0.129298
B   -0.414768
C    0.379873
D    0.652740
dtype: float64

### Spark Configurations

Various configurations in PySpark could be applied internally in pandas API on Spark.
For example, you can enable Arrow optimization to hugely speed up internal pandas conversion. See also <a href="https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html">PySpark Usage Guide for Pandas with Apache Arrow</a> in PySpark documentation.

In [35]:
prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")  # Keep its default value.
ps.set_option("compute.default_index_type", "distributed")  # Use default index prevent overhead.
import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

In [36]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
%timeit ps.range(300000).to_pandas()

                                                                                

312 ms ± 114 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [37]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False)
%timeit ps.range(300000).to_pandas()

767 ms ± 141 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [38]:
ps.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", prev)  # Set its default value back.

## Grouping
By “group by” we are referring to a process involving one or more of the following steps:

- Splitting the data into groups based on some criteria
- Applying a function to each group independently
- Combining the results into a data structure

In [39]:
psdf = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
                          'foo', 'bar', 'foo', 'foo'],
                    'B': ['one', 'one', 'two', 'three',
                          'two', 'two', 'one', 'three'],
                    'C': np.random.randn(8),
                    'D': np.random.randn(8)})

In [40]:
psdf

Unnamed: 0,A,B,C,D
0,foo,one,-0.335324,1.128205
1,bar,one,0.323737,1.830269
2,foo,two,-2.86582,0.482715
3,bar,three,0.100906,-0.056906
4,foo,two,-0.52874,-0.764055
5,bar,two,-1.491642,0.534729
6,foo,one,-1.629542,0.366241
7,foo,three,1.22379,0.44825


Grouping and then applying the [sum()](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.groupby.GroupBy.sum.html) function to the resulting groups.

In [41]:
psdf.groupby('A').sum()

                                                                                

Unnamed: 0_level_0,C,D
A,Unnamed: 1_level_1,Unnamed: 2_level_1
foo,-4.135636,1.661355
bar,-1.066998,2.308093


Grouping by multiple columns forms a hierarchical index, and again we can apply the sum function.

In [42]:
psdf.groupby(['A', 'B']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,C,D
A,B,Unnamed: 2_level_1,Unnamed: 3_level_1
foo,one,-1.964866,1.494446
bar,one,0.323737,1.830269
foo,two,-3.39456,-0.281341
bar,three,0.100906,-0.056906
bar,two,-1.491642,0.534729
foo,three,1.22379,0.44825


## Plotting

In [43]:
pser = pd.Series(np.random.randn(1000),
                 index=pd.date_range('1/1/2000', periods=1000))

In [44]:
psser = ps.Series(pser)

In [45]:
psser = psser.cummax()

In [46]:
psser.plot()

22/09/22 11:37:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


On a DataFrame, the [plot()](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.plot.html) method is a convenience to plot all of the columns with labels:

In [47]:
pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
                   columns=['A', 'B', 'C', 'D'])

In [48]:
psdf = ps.from_pandas(pdf)

In [49]:
psdf = psdf.cummax()

In [50]:
psdf.plot()

22/09/22 11:37:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


For more details, [Plotting](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/frame.html#plotting) documentation.

## Getting data in/out

### CSV

CSV is straightforward and easy to use. See [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.to_csv.html) to write a CSV file and [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_csv.html) to read a CSV file.

In [51]:
psdf.to_csv('foo.csv')
ps.read_csv('foo.csv').head(10)

22/09/22 11:37:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


                                                                                

Unnamed: 0,A,B,C,D
0,0.275525,1.45874,0.019346,1.378075
1,0.275525,1.45874,1.143522,1.378075
2,0.275525,1.45874,1.143522,1.378075
3,0.275525,1.45874,1.143522,1.378075
4,0.275525,1.45874,1.143522,1.378075
5,1.481085,1.45874,1.155879,1.378075
6,1.481085,1.45874,1.242548,1.378075
7,1.481085,1.45874,1.242548,1.378075
8,1.481085,1.45874,1.242548,1.378075
9,1.481085,1.45874,1.242548,1.378075


### Parquet

Parquet is an efficient and compact file format to read and write faster. See [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.to_paruqet.html) to write a Parquet file and [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_parquet.html) to read a Parquet file.

In [52]:
psdf.to_parquet('bar.parquet')
ps.read_parquet('bar.parquet').head(10)

22/09/22 11:37:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


                                                                                

Unnamed: 0,A,B,C,D
0,0.275525,1.45874,0.019346,1.378075
1,0.275525,1.45874,1.143522,1.378075
2,0.275525,1.45874,1.143522,1.378075
3,0.275525,1.45874,1.143522,1.378075
4,0.275525,1.45874,1.143522,1.378075
5,1.481085,1.45874,1.155879,1.378075
6,1.481085,1.45874,1.242548,1.378075
7,1.481085,1.45874,1.242548,1.378075
8,1.481085,1.45874,1.242548,1.378075
9,1.481085,1.45874,1.242548,1.378075


### Spark IO

In addition, pandas API on Spark fully supports Spark's various datasources such as ORC and an external datasource.  See [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.to_orc.html) to write it to the specified datasource and [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_orc.html) to read it from the datasource.

In [53]:
psdf.to_spark_io('zoo.orc', format="orc")
ps.read_spark_io('zoo.orc', format="orc").head(10)

22/09/22 11:37:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/22 11:37:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


                                                                                

Unnamed: 0,A,B,C,D
0,0.275525,1.45874,0.019346,1.378075
1,0.275525,1.45874,1.143522,1.378075
2,0.275525,1.45874,1.143522,1.378075
3,0.275525,1.45874,1.143522,1.378075
4,0.275525,1.45874,1.143522,1.378075
5,1.481085,1.45874,1.155879,1.378075
6,1.481085,1.45874,1.242548,1.378075
7,1.481085,1.45874,1.242548,1.378075
8,1.481085,1.45874,1.242548,1.378075
9,1.481085,1.45874,1.242548,1.378075


See the [Input/Output](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/io.html) documentation for more details.