<a href="https://colab.research.google.com/github/charan-debug/Data-Engineering/blob/main/Msdc_205_quickstart_ps.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# QUICKSTART PS

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=87cc05e60b19cf4d0249af72354d71eb6d0cb4ddaaaa4b147ddd01ab597c696e
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession



In [4]:
s = ps.Series([1, 3, 5, np.nan, 6, 8])

In [5]:
s

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

In [6]:
psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])
psdf

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


In [8]:
dates = pd.date_range('20130101', periods=6)
dates

DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06'],
              dtype='datetime64[ns]', freq='D')

In [10]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf

Unnamed: 0,A,B,C,D
2013-01-01,-0.710646,-0.238013,-0.133868,1.017392
2013-01-02,-1.235203,-0.753679,0.049116,-0.545689
2013-01-03,2.290906,-0.41147,1.637401,0.251176
2013-01-04,-0.19066,0.086062,-0.55757,-0.57619
2013-01-05,-0.847742,1.708555,0.986235,-0.464991
2013-01-06,-0.064988,0.359567,1.097156,-1.09567


Now, this pandas DataFrame can be converted to a pandas-on-Spark DataFrame

In [12]:
psdf = ps.from_pandas(pdf)
type(psdf)

In [13]:
psdf

Unnamed: 0,A,B,C,D
2013-01-01,-0.710646,-0.238013,-0.133868,1.017392
2013-01-02,-1.235203,-0.753679,0.049116,-0.545689
2013-01-03,2.290906,-0.41147,1.637401,0.251176
2013-01-04,-0.19066,0.086062,-0.55757,-0.57619
2013-01-05,-0.847742,1.708555,0.986235,-0.464991
2013-01-06,-0.064988,0.359567,1.097156,-1.09567


In [16]:
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(pdf)
sdf.show()

+--------------------+--------------------+--------------------+--------------------+
|                   A|                   B|                   C|                   D|
+--------------------+--------------------+--------------------+--------------------+
| -0.7106461322502922|-0.23801303626323553|-0.13386788958201673|  1.0173916850987053|
| -1.2352026806744127| -0.7536794214856011|  0.0491155464004922| -0.5456890409276018|
|  2.2909058646216787|-0.41147007381343065|  1.6374008431202287|   0.251175753287909|
| -0.1906595547377152| 0.08606185922064907| -0.5575703952658586| -0.5761896758913374|
| -0.8477418189464767|  1.7085548887130435|  0.9862345222163094|-0.46499135760551924|
|-0.06498812770578065|  0.3595667744264233|  1.0971562413818174|  -1.095669505974754|
+--------------------+--------------------+--------------------+--------------------+



Creating pandas-on-Spark DataFrame from Spark DataFrame.


In [18]:
psdf = sdf.pandas_api()
psdf

Unnamed: 0,A,B,C,D
0,-0.710646,-0.238013,-0.133868,1.017392
1,-1.235203,-0.753679,0.049116,-0.545689
2,2.290906,-0.41147,1.637401,0.251176
3,-0.19066,0.086062,-0.55757,-0.57619
4,-0.847742,1.708555,0.986235,-0.464991
5,-0.064988,0.359567,1.097156,-1.09567


In [19]:
psdf.dtypes

A    float64
B    float64
C    float64
D    float64
dtype: object

Here is how to show top rows from the frame below.

Note that the data in a Spark dataframe does not preserve the natural order by default. The natural order can be preserved by setting compute.ordered_head option but it causes a performance overhead with sorting internally.

In [20]:
psdf.head()

Unnamed: 0,A,B,C,D
0,-0.710646,-0.238013,-0.133868,1.017392
1,-1.235203,-0.753679,0.049116,-0.545689
2,2.290906,-0.41147,1.637401,0.251176
3,-0.19066,0.086062,-0.55757,-0.57619
4,-0.847742,1.708555,0.986235,-0.464991


In [21]:
psdf.index

Int64Index([0, 1, 2, 3, 4, 5], dtype='int64')

In [22]:
psdf.columns

Index(['A', 'B', 'C', 'D'], dtype='object')

In [23]:
psdf.to_numpy()



array([[-0.71064613, -0.23801304, -0.13386789,  1.01739169],
       [-1.23520268, -0.75367942,  0.04911555, -0.54568904],
       [ 2.29090586, -0.41147007,  1.63740084,  0.25117575],
       [-0.19065955,  0.08606186, -0.5575704 , -0.57618968],
       [-0.84774182,  1.70855489,  0.98623452, -0.46499136],
       [-0.06498813,  0.35956677,  1.09715624, -1.09566951]])

In [24]:
psdf.describe()

Unnamed: 0,A,B,C,D
count,6.0,6.0,6.0,6.0
mean,-0.126389,0.12517,0.513078,-0.235662
std,1.260348,0.866699,0.849613,0.749858
min,-1.235203,-0.753679,-0.55757,-1.09567
25%,-0.847742,-0.41147,-0.133868,-0.57619
50%,-0.710646,-0.238013,0.049116,-0.545689
75%,-0.064988,0.359567,1.097156,0.251176
max,2.290906,1.708555,1.637401,1.017392


Transposing your data

In [25]:
psdf.T

Unnamed: 0,0,1,2,3,4,5
A,-0.710646,-1.235203,2.290906,-0.19066,-0.847742,-0.064988
B,-0.238013,-0.753679,-0.41147,0.086062,1.708555,0.359567
C,-0.133868,0.049116,1.637401,-0.55757,0.986235,1.097156
D,1.017392,-0.545689,0.251176,-0.57619,-0.464991,-1.09567


Sorting by its index

In [28]:
psdf.sort_index(ascending=False)

Unnamed: 0,A,B,C,D
5,-0.064988,0.359567,1.097156,-1.09567
4,-0.847742,1.708555,0.986235,-0.464991
3,-0.19066,0.086062,-0.55757,-0.57619
2,2.290906,-0.41147,1.637401,0.251176
1,-1.235203,-0.753679,0.049116,-0.545689
0,-0.710646,-0.238013,-0.133868,1.017392


Sorting by value

In [29]:
psdf.sort_values(by='B')

Unnamed: 0,A,B,C,D
1,-1.235203,-0.753679,0.049116,-0.545689
2,2.290906,-0.41147,1.637401,0.251176
0,-0.710646,-0.238013,-0.133868,1.017392
3,-0.19066,0.086062,-0.55757,-0.57619
5,-0.064988,0.359567,1.097156,-1.09567
4,-0.847742,1.708555,0.986235,-0.464991


Missing Data
Pandas API on Spark primarily uses the value np.nan to represent missing data. It is by default not included in computations.

In [38]:
pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])
pdf1

Unnamed: 0,A,B,C,D,E
2013-01-01,-0.710646,-0.238013,-0.133868,1.017392,
2013-01-02,-1.235203,-0.753679,0.049116,-0.545689,
2013-01-03,2.290906,-0.41147,1.637401,0.251176,
2013-01-04,-0.19066,0.086062,-0.55757,-0.57619,


In [39]:
pdf1.loc[dates[0]:dates[1], 'E'] = 1
pdf1

Unnamed: 0,A,B,C,D,E
2013-01-01,-0.710646,-0.238013,-0.133868,1.017392,1.0
2013-01-02,-1.235203,-0.753679,0.049116,-0.545689,1.0
2013-01-03,2.290906,-0.41147,1.637401,0.251176,
2013-01-04,-0.19066,0.086062,-0.55757,-0.57619,


In [40]:
psdf1 = ps.from_pandas(pdf1)
psdf1

Unnamed: 0,A,B,C,D,E
2013-01-01,-0.710646,-0.238013,-0.133868,1.017392,1.0
2013-01-02,-1.235203,-0.753679,0.049116,-0.545689,1.0
2013-01-03,2.290906,-0.41147,1.637401,0.251176,
2013-01-04,-0.19066,0.086062,-0.55757,-0.57619,


To drop any rows that have missing data.

In [41]:
psdf1.dropna(how='any')

Unnamed: 0,A,B,C,D,E
2013-01-01,-0.710646,-0.238013,-0.133868,1.017392,1.0
2013-01-02,-1.235203,-0.753679,0.049116,-0.545689,1.0


In [42]:
psdf1.fillna(value=5)

Unnamed: 0,A,B,C,D,E
2013-01-01,-0.710646,-0.238013,-0.133868,1.017392,1.0
2013-01-02,-1.235203,-0.753679,0.049116,-0.545689,1.0
2013-01-03,2.290906,-0.41147,1.637401,0.251176,5.0
2013-01-04,-0.19066,0.086062,-0.55757,-0.57619,5.0


Operations

Stats
Performing a descriptive statistic:


In [43]:
psdf.mean()

A   -0.126389
B    0.125170
C    0.513078
D   -0.235662
dtype: float64

In [45]:
psdf.median()

A   -0.710646
B   -0.238013
C    0.049116
D   -0.545689
dtype: float64

Spark Configurations
Various configurations in PySpark could be applied internally in pandas API on Spark. For example, you can enable Arrow optimization to hugely speed up internal pandas conversion. See also PySpark Usage Guide for Pandas with Apache Arrow in PySpark documentation.

In [46]:
prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")  # Keep its default value.
ps.set_option("compute.default_index_type", "distributed")  # Use default index prevent overhead.
import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

In [47]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
%timeit ps.range(300000).to_pandas()

395 ms ± 78.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [48]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False)
%timeit ps.range(300000).to_pandas()

2.27 s ± 422 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [49]:
ps.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", prev)  # Set its default value back.

Grouping
By “group by” we are referring to a process involving one or more of the following steps:

Splitting the data into groups based on some criteria
Applying a function to each group independently
Combining the results into a data structure

In [51]:
psdf = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
                          'foo', 'bar', 'foo', 'foo'],
                    'B': ['one', 'one', 'two', 'three',
                          'two', 'two', 'one', 'three'],
                    'C': np.random.randn(8),
                    'D': np.random.randn(8)})

psdf

Unnamed: 0,A,B,C,D
0,foo,one,1.585321,0.190254
1,bar,one,0.109417,1.133938
2,foo,two,0.213035,0.728148
3,bar,three,0.397821,-0.492254
4,foo,two,-1.608526,0.824263
5,bar,two,0.241676,1.927333
6,foo,one,-0.274076,1.139541
7,foo,three,0.167225,-1.205695


In [52]:
psdf.groupby('A').sum()

Unnamed: 0_level_0,C,D
A,Unnamed: 1_level_1,Unnamed: 2_level_1
bar,0.748914,2.569016
foo,0.082978,1.676511


In [53]:
psdf.groupby(['A', 'B']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,C,D
A,B,Unnamed: 2_level_1,Unnamed: 3_level_1
foo,one,1.311244,1.329795
foo,two,-1.395492,1.55241
bar,three,0.397821,-0.492254
bar,one,0.109417,1.133938
foo,three,0.167225,-1.205695
bar,two,0.241676,1.927333


Plotting

In [54]:
pser = pd.Series(np.random.randn(1000),
                 index=pd.date_range('1/1/2000', periods=1000))

In [56]:
psser = ps.Series(pser)
psser = psser.cummax()

In [57]:
psser.plot()

In [59]:
pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
                   columns=['A', 'B', 'C', 'D'])
psdf = ps.from_pandas(pdf)
psdf.plot()

Getting data in/out

CSV
CSV is straightforward and easy to use. See here to write a CSV file and here to read a CSV file.

In [60]:
psdf.to_csv('foo.csv')
ps.read_csv('foo.csv').head(10)

Unnamed: 0,A,B,C,D
0,1.180809,-1.195816,0.134608,-0.859188
1,-0.198112,0.22083,1.793432,0.769389
2,0.240737,0.435504,0.579333,-0.092616
3,1.040581,2.068318,-0.443578,-0.275009
4,-0.072669,-1.368435,-1.148132,0.520837
5,-0.461521,-0.838564,-0.17973,0.065438
6,2.330541,0.981681,-0.561768,-0.378735
7,0.125638,1.739995,0.6285,-0.074183
8,0.305586,-0.850718,-0.202722,0.1191
9,-1.006386,0.307909,1.985064,-1.089465


Parquet
Parquet is an efficient and compact file format to read and write faster. See here to write a Parquet file and here to read a Parquet file.

In [61]:
psdf.to_parquet('bar.parquet')
ps.read_parquet('bar.parquet').head(10)

Unnamed: 0,A,B,C,D
0,0.413107,-1.626197,0.584137,-0.643141
1,-1.986895,-0.517836,0.785005,1.497941
2,0.532283,-0.051592,-0.667433,-0.008909
3,0.581009,-0.562544,-1.460757,1.245051
4,0.046276,0.675619,-0.878365,-0.705454
5,-0.255342,-0.672944,0.989106,0.716248
6,0.764227,0.53037,0.344629,0.187689
7,-0.893746,1.11497,-1.564545,-1.573821
8,1.946784,-2.221978,-1.664886,-2.257437
9,0.332921,0.524989,0.004543,1.768795


Spark IO
In addition, pandas API on Spark fully supports Spark's various datasources such as ORC and an external datasource. See here to write it to the specified datasource and here to read it from the datasource.

In [63]:
psdf.to_spark_io('zoo.orc', format="orc")
ps.read_spark_io('zoo.orc', format="orc").head(10)

Unnamed: 0,A,B,C,D
0,1.180809,-1.195816,0.134608,-0.859188
1,-0.198112,0.22083,1.793432,0.769389
2,0.240737,0.435504,0.579333,-0.092616
3,1.040581,2.068318,-0.443578,-0.275009
4,-0.072669,-1.368435,-1.148132,0.520837
5,-0.461521,-0.838564,-0.17973,0.065438
6,2.330541,0.981681,-0.561768,-0.378735
7,0.125638,1.739995,0.6285,-0.074183
8,0.305586,-0.850718,-0.202722,0.1191
9,-1.006386,0.307909,1.985064,-1.089465
