In [39]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession
import warnings

#Supress Warnings
warnings.filterwarnings('ignore')

In [11]:
spark = SparkSession.builder.getOrCreate()

In [40]:
!pip install plotly

Collecting plotly
  Downloading plotly-5.14.1-py2.py3-none-any.whl (15.3 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.3/15.3 MB[0m [31m215.5 kB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:02[0m
[?25hCollecting tenacity>=6.2.0
  Using cached tenacity-8.2.2-py3-none-any.whl (24 kB)
Installing collected packages: tenacity, plotly
Successfully installed plotly-5.14.1 tenacity-8.2.2


Data Creation

In [2]:
#Pandas onSpark Series
s = ps.Series([1,2,3,np.nan,5,6,np.nan,np.nan,9])
s

23/04/29 17:05:11 WARN Utils: Your hostname, drice resolves to a loopback address: 127.0.1.1; using 192.168.37.109 instead (on interface wlp0s20f3)
23/04/29 17:05:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/29 17:05:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


0    1.0
1    2.0
2    3.0
3    NaN
4    5.0
5    6.0
6    NaN
7    NaN
8    9.0
dtype: float64

In [3]:
#Pandas on Spark DataFrame
psdf = ps.DataFrame(
    {
    'color':['red','black','grey'],
    'num':[2,3,1],
    'amount':[500,250,450]
},
index = [10,20,30])
psdf

Unnamed: 0,color,num,amount
10,red,2,500
20,black,3,250
30,grey,1,450


In [17]:
# Pands DF with Numpy
dates = pd.date_range('20230427', periods=6)

pdf = pd.DataFrame(np.random.randn(6,4),index=dates,columns=['a','b','c','d'])
pdf

Unnamed: 0,a,b,c,d
2023-04-27,2.167529,0.326194,-1.250979,0.412802
2023-04-28,0.056763,-1.344504,-0.076532,0.374034
2023-04-29,0.124202,1.034034,0.992243,0.934321
2023-04-30,-1.260844,-0.370832,-1.697845,0.31661
2023-05-01,-0.070268,0.196291,-1.456757,-0.080044
2023-05-02,0.761721,0.116136,-0.739587,-0.577202


In [18]:
#Check type
type(pdf)

pandas.core.frame.DataFrame

In [10]:
#Pandas on Spark DF
psdf = ps.from_pandas(pdf)
type(psdf)

pyspark.pandas.frame.DataFrame

In [12]:
#Spark DF from Pandas DF
sdf = spark.createDataFrame(pdf)
sdf.show()

+-------------------+--------------------+--------------------+--------------------+
|                  a|                   b|                   c|                   d|
+-------------------+--------------------+--------------------+--------------------+
| 0.5729831029425533|  1.3489285409081861|  1.5154434698008488| -0.6178254012241593|
|-0.7975902700368354| -1.6572149114165604|-0.34125942710383905|-0.13144213452225117|
| 1.6234076363228758| -1.4323482744248401|  1.2803604577481897| -0.5769625090168358|
| 1.5534433844209343|  1.0533760042915037|  0.1162169603230015|-0.47302950755810896|
| 0.4864140789474442|-0.00256273151833...| -2.7259954375956035|  0.5815014787701946|
| 0.6591687989897485| -2.1133715648503393|-0.07425886857544525|  0.2403754556812712|
+-------------------+--------------------+--------------------+--------------------+



In [16]:
#Pandas on Spark from Spark DF
psdf = sdf.pandas_api()
psdf.dtypes

a    float64
b    float64
c    float64
d    float64
dtype: object

In [20]:
#Summary
psdf.describe()

23/04/29 18:09:45 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,a,b,c,d
count,6.0,6.0,6.0,6.0
mean,0.682971,-0.467199,-0.038249,-0.162897
std,0.881533,1.475391,1.516156,0.488332
min,-0.79759,-2.113372,-2.725995,-0.617825
25%,0.486414,-1.657215,-0.341259,-0.576963
50%,0.572983,-1.432348,-0.074259,-0.47303
75%,1.553443,1.053376,1.28036,0.240375
max,1.623408,1.348929,1.515443,0.581501


In [21]:
#Transpose
psdf.T

Unnamed: 0,0,1,2,3,4,5
a,0.572983,-0.79759,1.623408,1.553443,0.486414,0.659169
b,1.348929,-1.657215,-1.432348,1.053376,-0.002563,-2.113372
c,1.515443,-0.341259,1.28036,0.116217,-2.725995,-0.074259
d,-0.617825,-0.131442,-0.576963,-0.47303,0.581501,0.240375


Missing Data

In [25]:
#Null values
new_pdf = pdf
new_pdf['E'] = [np.nan, 8, 9, np.nan,0.25684,np.nan]
new_psdf = ps.from_pandas(new_pdf)
new_psdf

Unnamed: 0,a,b,c,d,E
2023-04-27,2.167529,0.326194,-1.250979,0.412802,
2023-04-28,0.056763,-1.344504,-0.076532,0.374034,8.0
2023-04-29,0.124202,1.034034,0.992243,0.934321,9.0
2023-04-30,-1.260844,-0.370832,-1.697845,0.31661,
2023-05-01,-0.070268,0.196291,-1.456757,-0.080044,0.25684
2023-05-02,0.761721,0.116136,-0.739587,-0.577202,


In [30]:
#Drop missing values
new_psdf.dropna(how='any')


Unnamed: 0,a,b,c,d,E
2023-04-28,0.056763,-1.344504,-0.076532,0.374034,8.0
2023-04-29,0.124202,1.034034,0.992243,0.934321,9.0
2023-05-01,-0.070268,0.196291,-1.456757,-0.080044,0.25684


In [31]:
#Fill missinf values
new_psdf.fillna(value=0)

Unnamed: 0,a,b,c,d,E
2023-04-27,2.167529,0.326194,-1.250979,0.412802,0.0
2023-04-28,0.056763,-1.344504,-0.076532,0.374034,8.0
2023-04-29,0.124202,1.034034,0.992243,0.934321,9.0
2023-04-30,-1.260844,-0.370832,-1.697845,0.31661,0.0
2023-05-01,-0.070268,0.196291,-1.456757,-0.080044,0.25684
2023-05-02,0.761721,0.116136,-0.739587,-0.577202,0.0


Plotting

In [46]:
#Plot
pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
                   columns=['A', 'B', 'C', 'D'])

In [47]:
psdf = ps.from_pandas(pdf)


In [48]:
psdf = psdf.cummax()

In [49]:
psdf.plot()

23/04/29 19:07:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/29 19:07:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/29 19:07:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/29 19:07:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/29 19:07:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
