In [1]:
import numpy as np
import pandas as pd
import pyspark.pandas as ps




In [2]:
# Create a pandas Series
pser = pd.Series([1, 3, 5, np.nan, 6, 8]) 
# Create a pandas-on-Spark Series
psser = ps.Series([1, 3, 5, np.nan, 6, 8])
# Create a pandas-on-Spark Series by passing a pandas Series
psser = ps.Series(pser)
psser = ps.from_pandas(pser)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/27 09:11:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
pser

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

In [4]:
psser

                                                                                

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

In [5]:
psser.sort_index()

                                                                                

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

In [6]:
# Create a pandas DataFrame
pdf = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})
# Create a pandas-on-Spark DataFrame
psdf = ps.DataFrame({'A': np.random.rand(5),
                     'B': np.random.rand(5)})
# Create a pandas-on-Spark DataFrame by passing a pandas DataFrame
psdf = ps.DataFrame(pdf)
psdf = ps.from_pandas(pdf)


In [7]:
pdf

Unnamed: 0,A,B
0,0.552198,0.77133
1,0.836591,0.386184
2,0.347079,0.501955
3,0.3773,0.132316
4,0.124253,0.205489


In [8]:
psdf.sort_index()

Unnamed: 0,A,B
0,0.552198,0.77133
1,0.836591,0.386184
2,0.347079,0.501955
3,0.3773,0.132316
4,0.124253,0.205489


In [9]:
psdf.head(2)

Unnamed: 0,A,B
0,0.552198,0.77133
1,0.836591,0.386184


In [10]:
psdf.describe()

                                                                                

Unnamed: 0,A,B
count,5.0,5.0
mean,0.447484,0.399455
std,0.26545,0.253949
min,0.124253,0.132316
25%,0.347079,0.205489
50%,0.3773,0.386184
75%,0.552198,0.501955
max,0.836591,0.77133


In [11]:
psdf.sort_values(by='B')

Unnamed: 0,A,B
3,0.3773,0.132316
4,0.124253,0.205489
1,0.836591,0.386184
2,0.347079,0.501955
0,0.552198,0.77133


In [12]:
psdf.transpose()

Unnamed: 0,0,1,2,3,4
A,0.552198,0.836591,0.347079,0.3773,0.124253
B,0.77133,0.386184,0.501955,0.132316,0.205489


In [13]:
ps.get_option('compute.max_rows')

1000

In [14]:
ps.set_option('compute.max_rows', 2000)
ps.get_option('compute.max_rows')

2000

In [15]:
psdf['A']  # or psdf.A


0    0.552198
1    0.836591
2    0.347079
3    0.377300
4    0.124253
Name: A, dtype: float64

In [16]:
psdf[['A', 'B']]

Unnamed: 0,A,B
0,0.552198,0.77133
1,0.836591,0.386184
2,0.347079,0.501955
3,0.3773,0.132316
4,0.124253,0.205489


In [17]:
psdf.loc[1:2]

Unnamed: 0,A,B
1,0.836591,0.386184
2,0.347079,0.501955


In [18]:
psdf.iloc[:3, 1:2]

Unnamed: 0,B
0,0.77133
1,0.386184
2,0.501955


In [19]:
from pyspark.pandas.config import set_option, reset_option
set_option("compute.ops_on_diff_frames", True)
psdf['C'] = psser
# Reset to default to avoid potential expensive operation in the future
reset_option("compute.ops_on_diff_frames")
psdf

Unnamed: 0,A,B,C
0,0.552198,0.77133,1.0
1,0.836591,0.386184,3.0
2,0.347079,0.501955,5.0
3,0.3773,0.132316,
4,0.124253,0.205489,6.0


In [20]:
psdf.apply(np.cumsum)



Unnamed: 0,A,B,C
0,0.552198,0.77133,1.0
1,1.388789,1.157514,4.0
2,1.735868,1.659469,9.0
3,2.113168,1.791785,
4,2.237421,1.997274,15.0


In [21]:
psdf.apply(np.cumsum, axis=1)




Unnamed: 0,A,B,C
0,0.552198,1.323528,2.323528
1,0.836591,1.222775,4.222775
2,0.347079,0.849034,5.849034
3,0.3773,0.509616,
4,0.124253,0.329742,6.329742


In [22]:
psdf.apply(lambda x: x ** 2)



Unnamed: 0,A,B,C
0,0.304923,0.59495,1.0
1,0.699885,0.149138,9.0
2,0.120464,0.251959,25.0
3,0.142355,0.017508,
4,0.015439,0.042226,36.0


In [23]:
psdf.groupby('A').sum()



Unnamed: 0_level_0,B,C
A,Unnamed: 1_level_1,Unnamed: 2_level_1
0.836591,0.386184,3.0
0.552198,0.77133,1.0
0.347079,0.501955,5.0
0.124253,0.205489,6.0
0.3773,0.132316,


In [24]:
psdf.groupby(['A', 'B']).sum()



Unnamed: 0_level_0,Unnamed: 1_level_0,C
A,B,Unnamed: 2_level_1
0.347079,0.501955,5.0
0.836591,0.386184,3.0
0.124253,0.205489,6.0
0.3773,0.132316,
0.552198,0.77133,1.0


In [25]:
# This is needed for visualizing plot on notebook
%matplotlib inline

speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]
index = ['snail', 'pig', 'elephant',
         'rabbit', 'giraffe', 'coyote', 'horse']
psdf = ps.DataFrame({'speed': speed,
                     'lifespan': lifespan}, index=index)
psdf.plot.bar()

In [26]:
psdf = ps.DataFrame({'mass': [0.330, 4.87, 5.97],
                     'radius': [2439.7, 6051.8, 6378.1]},
                    index=['Mercury', 'Venus', 'Earth'])
psdf.plot.pie(y='mass')

In [27]:
i = pd.date_range('2018-04-09', periods=2000, freq='1D1min')
ts = ps.DataFrame({'A': ['timestamp']}, index=i)

In [28]:
ts.to_pandas().between_time('0:15', '0:16')


`to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small.



Unnamed: 0,A
2018-04-24 00:15:00,timestamp
2018-04-25 00:16:00,timestamp
2022-04-04 00:15:00,timestamp
2022-04-05 00:16:00,timestamp


In [30]:
# Using SQL in pandas API on Spark

psdf = ps.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                     'pig': [20, 18, 489, 675, 1776],
                     'horse': [4, 25, 281, 600, 1900]})


ps.sql("SELECT * FROM {psdf} WHERE pig > 100", psdf = psdf)

                                                                                

Unnamed: 0,year,pig,horse
0,2003,489,281
1,2009,675,600
2,2014,1776,1900


In [31]:
# Conversion from and to PySpark DataFrame
psdf = ps.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
sdf = psdf.to_spark()
type(sdf)


If `index_col` is not specified for `to_spark`, the existing index is lost when converting to Spark DataFrame.



pyspark.sql.dataframe.DataFrame

In [32]:
sdf.show()

+---+---+
|  A|  B|
+---+---+
|  1| 10|
|  2| 20|
|  3| 30|
|  4| 40|
|  5| 50|
+---+---+



In [33]:
from pyspark.pandas import option_context
with option_context(
        "compute.default_index_type", "distributed-sequence"):
    psdf = sdf.to_pandas_on_spark()
print(type(psdf))
psdf



DataFrame.to_pandas_on_spark is deprecated. Use DataFrame.pandas_api instead.



<class 'pyspark.pandas.frame.DataFrame'>


Unnamed: 0,A,B
0,1,10
1,2,20
2,3,30
3,4,40
4,5,50


In [34]:
# Checking Spark execution plans

from pyspark.pandas import option_context
 
with option_context(
        "compute.ops_on_diff_frames", True,
        "compute.default_index_type", 'distributed'):
    df = ps.range(10) + ps.range(10)
    df.spark.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [CASE WHEN isnotnull(__this___index_level_0__#1190L) THEN __this___index_level_0__#1190L ELSE __that___index_level_0__#1198L END AS __index_level_0__#1203L, (__this_id#1191L + __that_id#1199L) AS id#1233L]
   +- SortMergeJoin [__this___index_level_0__#1190L], [__that___index_level_0__#1198L], FullOuter
      :- Sort [__this___index_level_0__#1190L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(__this___index_level_0__#1190L, 200), ENSURE_REQUIREMENTS, [plan_id=1343]
      :     +- Project [__index_level_0__#1167L AS __this___index_level_0__#1190L, id#1165L AS __this_id#1191L]
      :        +- Project [distributed_index() AS __index_level_0__#1167L, id#1165L]
      :           +- Range (0, 10, step=1, splits=8)
      +- Sort [__that___index_level_0__#1198L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(__that___index_level_0__#1198L, 200), ENSURE_REQUIREMENTS, [plan_id=1344]
         

In [35]:
with option_context(
        "compute.ops_on_diff_frames", False,
        "compute.default_index_type", 'distributed'):
    df = ps.range(10)
    df = df + df
    df.spark.explain()

== Physical Plan ==
*(1) Project [__index_level_0__#1238L, (id#1236L + id#1236L) AS id#1250L]
+- *(1) Project [distributed_index() AS __index_level_0__#1238L, id#1236L]
   +- *(1) Range (0, 10, step=1, splits=8)




In [36]:
# Caching DataFrames

with option_context("compute.default_index_type", 'distributed'):
    df = ps.range(10)
    new_df = (df + df).spark.cache()  # `(df + df)` is cached here as `df`
    new_df.spark.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan [__index_level_0__#1255L, id#1267L]
      +- InMemoryRelation [__index_level_0__#1255L, id#1267L, __natural_order__#1258L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Project [__index_level_0__#1255L, (id#1253L + id#1253L) AS id#1267L, __natural_order__#1258L]
               +- *(1) Project [__index_level_0__#1255L, id#1253L, monotonically_increasing_id() AS __natural_order__#1258L]
                  +- *(1) Project [distributed_index() AS __index_level_0__#1255L, id#1253L]
                     +- *(1) Range (0, 10, step=1, splits=8)




In [38]:
new_df.spark.unpersist()

with (df + df).spark.cache() as df:
    df.spark.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan [__index_level_0__#1255L, id#1336L]
      +- InMemoryRelation [__index_level_0__#1255L, id#1336L, __natural_order__#1258L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Project [__index_level_0__#1255L, (id#1253L + id#1253L) AS id#1336L, __natural_order__#1258L]
               +- *(1) Project [__index_level_0__#1255L, id#1253L, monotonically_increasing_id() AS __natural_order__#1258L]
                  +- *(1) Project [distributed_index() AS __index_level_0__#1255L, id#1253L]
                     +- *(1) Range (0, 10, step=1, splits=8)


