# Spark operations
There are two main types of Spark operations: 1. Transformations and 2. Actions 

# 1. Spark Transformations
![](./image/transforms.png)

# 2. Actions
![](./image/actions.png)

# rdd.DataFrame vs pd.DataFrame

1. From list

In [1]:
my_list = [['a', 1, 2], ['b', 2, 3],['c', 3, 4]]
col_name = ['A', 'B', 'C']

In [2]:
from pyspark import SparkContext , SparkConf
from pyspark.sql import SparkSession
try:
    sc.stop()
except:
    pass
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

In [3]:
import pandas as pd
# caution for the columns=
pd.DataFrame(my_list,columns= col_name)

Unnamed: 0,A,B,C
0,a,1,2
1,b,2,3
2,c,3,4


In [4]:
spark.createDataFrame(my_list, col_name).show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  1|  2|
|  b|  2|  3|
|  c|  3|  4|
+---+---+---+



Pay attentation to the parameter columns= in pd.DataFrame. Since the default value will make the list as rows.

2. From Dict


In [5]:
d = {'A': [0, 1, 0],
     'B': [1, 0, 1],
     'C': [1, 0, 0]}

pd.DataFrame(d)

Unnamed: 0,A,B,C
0,0,1,1
1,1,0,0
2,0,1,0


In [6]:
# Tedious for PySpark
import numpy as np
spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  0|  1|  1|
|  1|  0|  0|
|  0|  1|  0|
+---+---+---+



# Read CSV

In [7]:
# pd.DataFrame dp: DataFrame pandas
dp = pd.read_csv('../data/Advertising.csv')
dp.head(5)

Unnamed: 0,TV,Radio,Newspaper,Sales
0,230.1,37.8,69.2,22.1
1,44.5,39.3,45.1,10.4
2,17.2,45.9,69.3,9.3
3,151.5,41.3,58.5,18.5
4,180.8,10.8,58.4,12.9


In [8]:
#rdd.DataFrame. dp: DataFrame spark
ds = spark.read.csv(path='../data/Advertising.csv',
#                sep=',',
#                encoding='UTF-8',
#                comment=None,
               header=True,
               inferSchema=True)
ds.show(5)

+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
| 44.5| 39.3|     45.1| 10.4|
| 17.2| 45.9|     69.3|  9.3|
|151.5| 41.3|     58.5| 18.5|
|180.8| 10.8|     58.4| 12.9|
+-----+-----+---------+-----+
only showing top 5 rows



## Read Json

In [9]:
dp_json = pd.read_json("../data/data.json")
dp_json.head(2)

Unnamed: 0,id,sampling_rate,timestamp,location,sensor,sensordatavalues
0,1229659310,,2020-07-19 02:30:05,"{'id': 10332, 'latitude': '50.208', 'longitude...","{'id': 20350, 'pin': '11', 'sensor_type': {'id...","[{'id': 2649821739, 'value': '17.62', 'value_t..."
1,1229659307,,2020-07-19 02:30:05,"{'id': 494, 'latitude': '48.744', 'longitude':...","{'id': 30765, 'pin': '3', 'sensor_type': {'id'...","[{'id': 2649821734, 'value': '95808.00', 'valu..."


In [10]:
ds_json = spark.read.json('../data/data.json')
ds_json.show(2 , truncate=3)

+---+--------+-------------+------+----------------+---------+
| id|location|sampling_rate|sensor|sensordatavalues|timestamp|
+---+--------+-------------+------+----------------+---------+
|122|     [29|          nul|   [20|             [[2|      202|
|122|     [50|          nul|   [30|             [[2|      202|
+---+--------+-------------+------+----------------+---------+
only showing top 2 rows



## Column Names

In [11]:
print(dp.columns,'\n' ,ds.columns)

Index(['TV', 'Radio', 'Newspaper', 'Sales'], dtype='object') 
 ['TV', 'Radio', 'Newspaper', 'Sales']


## Data types

In [12]:
print(dp.dtypes)

TV           float64
Radio        float64
Newspaper    float64
Sales        float64
dtype: object


In [13]:
print(ds.dtypes)

[('TV', 'double'), ('Radio', 'double'), ('Newspaper', 'double'), ('Sales', 'double')]


### Fill Null

In [14]:
dp.fillna(-99)

Unnamed: 0,TV,Radio,Newspaper,Sales
0,230.1,37.8,69.2,22.1
1,44.5,39.3,45.1,10.4
2,17.2,45.9,69.3,9.3
3,151.5,41.3,58.5,18.5
4,180.8,10.8,58.4,12.9
...,...,...,...,...
195,38.2,3.7,13.8,7.6
196,94.2,4.9,8.1,9.7
197,177.0,9.3,6.4,12.8
198,283.6,42.0,66.2,25.5


In [15]:
ds.fillna(-99).show()

+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
| 44.5| 39.3|     45.1| 10.4|
| 17.2| 45.9|     69.3|  9.3|
|151.5| 41.3|     58.5| 18.5|
|180.8| 10.8|     58.4| 12.9|
|  8.7| 48.9|     75.0|  7.2|
| 57.5| 32.8|     23.5| 11.8|
|120.2| 19.6|     11.6| 13.2|
|  8.6|  2.1|      1.0|  4.8|
|199.8|  2.6|     21.2| 10.6|
| 66.1|  5.8|     24.2|  8.6|
|214.7| 24.0|      4.0| 17.4|
| 23.8| 35.1|     65.9|  9.2|
| 97.5|  7.6|      7.2|  9.7|
|204.1| 32.9|     46.0| 19.0|
|195.4| 47.7|     52.9| 22.4|
| 67.8| 36.6|    114.0| 12.5|
|281.4| 39.6|     55.8| 24.4|
| 69.2| 20.5|     18.3| 11.3|
|147.3| 23.9|     19.1| 14.6|
+-----+-----+---------+-----+
only showing top 20 rows



### Drop Null

In [27]:
my_list = [['male', 1, None], ['female', 2, 3],['male', 3, 4]]
dp = pd.DataFrame(my_list,columns=['A', 'B', 'C'])
ds = spark.createDataFrame(my_list, ['A', 'B', 'C'])
#
dp.head() , ds.show()

+------+---+----+
|     A|  B|   C|
+------+---+----+
|  male|  1|null|
|female|  2|   3|
|  male|  3|   4|
+------+---+----+



(        A  B    C
 0    male  1  NaN
 1  female  2  3.0
 2    male  3  4.0,
 None)

In [28]:
dp.C.dropna()

1    3.0
2    4.0
Name: C, dtype: float64

In [19]:
ds.na.drop()

DataFrame[A: string, B: bigint, C: bigint]

###  Replace Values

In [29]:
# caution: you need to chose specific col
dp.A.replace(['male', 'female'],[1, 0], inplace=True)
dp

Unnamed: 0,A,B,C
0,1,1,
1,0,2,3.0
2,1,3,4.0


In [30]:
#caution: Mixed type replacements are not supported
ds.na.replace(['male','female'],['1','0']).show()

+---+---+----+
|  A|  B|   C|
+---+---+----+
|  1|  1|null|
|  0|  2|   3|
|  1|  3|   4|
+---+---+----+



### Rename Columns

In [31]:
dp.columns = ['a','b','c']
dp.head(4)

Unnamed: 0,a,b,c
0,1,1,
1,0,2,3.0
2,1,3,4.0


In [32]:
ds.toDF('a','b','c').show(4)

+------+---+----+
|     a|  b|   c|
+------+---+----+
|  male|  1|null|
|female|  2|   3|
|  male|  3|   4|
+------+---+----+



### Rename one or more columns


In [33]:
mapping = {'c':'Newspaper','a':'Test'}
dp.rename(columns=mapping).head()

Unnamed: 0,Test,b,Newspaper
0,1,1,
1,0,2,3.0
2,1,3,4.0


In [34]:
new_names = [mapping.get(col,col) for col in ds.columns]
ds.toDF(*new_names).show(4)

+------+---+----+
|     A|  B|   C|
+------+---+----+
|  male|  1|null|
|female|  2|   3|
|  male|  3|   4|
+------+---+----+



In [35]:
ds.withColumnRenamed('A','C').show(4)

+------+---+----+
|     C|  B|   C|
+------+---+----+
|  male|  1|null|
|female|  2|   3|
|  male|  3|   4|
+------+---+----+



### Drop Columns

In [36]:
drop_name = ['c']
dp.drop(drop_name,axis=1).head(4)

Unnamed: 0,a,b
0,1,1
1,0,2
2,1,3


In [37]:
ds.drop(*drop_name).show(4)

+------+---+
|     A|  B|
+------+---+
|  male|  1|
|female|  2|
|  male|  3|
+------+---+



### Filter

In [38]:
dp = pd.read_csv('../data/Advertising.csv')
#
ds = spark.read.csv(path='../data/Advertising.csv',
                    header=True,
                    inferSchema=True)

In [39]:
dp[dp.Newspaper<20].head(4) 

Unnamed: 0,TV,Radio,Newspaper,Sales
7,120.2,19.6,11.6,13.2
8,8.6,2.1,1.0,4.8
11,214.7,24.0,4.0,17.4
13,97.5,7.6,7.2,9.7


In [43]:
ds[ds.Newspaper<20].show(5)

+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|120.2| 19.6|     11.6| 13.2|
|  8.6|  2.1|      1.0|  4.8|
|214.7| 24.0|      4.0| 17.4|
| 97.5|  7.6|      7.2|  9.7|
| 69.2| 20.5|     18.3| 11.3|
+-----+-----+---------+-----+
only showing top 5 rows



### Shape 

In [44]:
dp.shape

(200, 4)

In [45]:
print((ds.count(), len(ds.columns)))

(200, 4)


### With New Column

In [46]:
dp['tv_norm'] = dp.TV/sum(dp.TV)
dp.head(4)

Unnamed: 0,TV,Radio,Newspaper,Sales,tv_norm
0,230.1,37.8,69.2,22.1,0.007824
1,44.5,39.3,45.1,10.4,0.001513
2,17.2,45.9,69.3,9.3,0.000585
3,151.5,41.3,58.5,18.5,0.005152


In [47]:
from pyspark.sql import functions as F 
ds.withColumn('tv_norm', ds.TV/ds.groupBy().agg(F.sum("TV")).collect()[0][0]).show(4)

+-----+-----+---------+-----+--------------------+
|   TV|Radio|Newspaper|Sales|             tv_norm|
+-----+-----+---------+-----+--------------------+
|230.1| 37.8|     69.2| 22.1|0.007824268493802813|
| 44.5| 39.3|     45.1| 10.4|0.001513167961643...|
| 17.2| 45.9|     69.3|  9.3|5.848649200061207E-4|
|151.5| 41.3|     58.5| 18.5|0.005151571824472517|
+-----+-----+---------+-----+--------------------+
only showing top 4 rows



In [129]:
dp['cond'] = dp.apply(lambda c: 1 if ((c.TV>100)&(c.Radio<40)) else 2 if c.Sales> 10 else 3,axis=1)
#
ds.withColumn('cond',F.when((ds.TV>100)&(ds.Radio<40),1)\
                      .when(ds.Sales>10, 2)\
                      .otherwise(3)).show(4)

+-----+-----+---------+-----+----+
|   TV|Radio|Newspaper|Sales|cond|
+-----+-----+---------+-----+----+
|230.1| 37.8|     69.2| 22.1|   1|
| 44.5| 39.3|     45.1| 10.4|   2|
| 17.2| 45.9|     69.3|  9.3|   3|
|151.5| 41.3|     58.5| 18.5|   2|
+-----+-----+---------+-----+----+
only showing top 4 rows



In [48]:
dp['log_tv'] = np.log(dp.TV)
dp.head(4)
#
import pyspark.sql.functions as F
ds.withColumn('log_tv',F.log(ds.TV)).show(4)

+-----+-----+---------+-----+------------------+
|   TV|Radio|Newspaper|Sales|            log_tv|
+-----+-----+---------+-----+------------------+
|230.1| 37.8|     69.2| 22.1|  5.43851399704132|
| 44.5| 39.3|     45.1| 10.4|3.7954891891721947|
| 17.2| 45.9|     69.3|  9.3|2.8449093838194073|
|151.5| 41.3|     58.5| 18.5| 5.020585624949424|
+-----+-----+---------+-----+------------------+
only showing top 4 rows



In [49]:
dp['tv+10'] = dp.TV.apply(lambda x: x+10)
dp.head(4) 

Unnamed: 0,TV,Radio,Newspaper,Sales,tv_norm,log_tv,tv+10
0,230.1,37.8,69.2,22.1,0.007824,5.438514,240.1
1,44.5,39.3,45.1,10.4,0.001513,3.795489,54.5
2,17.2,45.9,69.3,9.3,0.000585,2.844909,27.2
3,151.5,41.3,58.5,18.5,0.005152,5.020586,161.5


In [50]:
ds.withColumn('tv+10', ds.TV+10).show(4)

+-----+-----+---------+-----+-----+
|   TV|Radio|Newspaper|Sales|tv+10|
+-----+-----+---------+-----+-----+
|230.1| 37.8|     69.2| 22.1|240.1|
| 44.5| 39.3|     45.1| 10.4| 54.5|
| 17.2| 45.9|     69.3|  9.3| 27.2|
|151.5| 41.3|     58.5| 18.5|161.5|
+-----+-----+---------+-----+-----+
only showing top 4 rows



# Join

![](./image/hMKKt.jpg)

In [51]:
leftp = pd.DataFrame({'A': ['A0', 'A1', 'A2', 'A3'],
                    'B': ['B0', 'B1', 'B2', 'B3'],
                    'C': ['C0', 'C1', 'C2', 'C3'],
                    'D': ['D0', 'D1', 'D2', 'D3']},
                    index=[0, 1, 2, 3])

rightp = pd.DataFrame({'A': ['A0', 'A1', 'A6', 'A7'],
                       'F': ['B4', 'B5', 'B6', 'B7'],
                       'G': ['C4', 'C5', 'C6', 'C7'],
                       'H': ['D4', 'D5', 'D6', 'D7']},
                       index=[4, 5, 6, 7])

lefts = spark.createDataFrame(leftp)
rights = spark.createDataFrame(rightp)

In [52]:
lefts.show() , rights.show()

+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
| A0| B0| C0| D0|
| A1| B1| C1| D1|
| A2| B2| C2| D2|
| A3| B3| C3| D3|
+---+---+---+---+

+---+---+---+---+
|  A|  F|  G|  H|
+---+---+---+---+
| A0| B4| C4| D4|
| A1| B5| C5| D5|
| A6| B6| C6| D6|
| A7| B7| C7| D7|
+---+---+---+---+



(None, None)

## Left Join

In [53]:
leftp.merge(rightp,on='A',how='left')

Unnamed: 0,A,B,C,D,F,G,H
0,A0,B0,C0,D0,B4,C4,D4
1,A1,B1,C1,D1,B5,C5,D5
2,A2,B2,C2,D2,,,
3,A3,B3,C3,D3,,,


In [54]:
lefts.join(rights,on='A',how='left').orderBy('A',ascending=True).show()

+---+---+---+---+----+----+----+
|  A|  B|  C|  D|   F|   G|   H|
+---+---+---+---+----+----+----+
| A0| B0| C0| D0|  B4|  C4|  D4|
| A1| B1| C1| D1|  B5|  C5|  D5|
| A2| B2| C2| D2|null|null|null|
| A3| B3| C3| D3|null|null|null|
+---+---+---+---+----+----+----+



## Right Join


In [140]:
leftp.merge(rightp,on='A',how='right')

Unnamed: 0,A,B,C,D,F,G,H
0,A0,B0,C0,D0,B4,C4,D4
1,A1,B1,C1,D1,B5,C5,D5
2,A6,,,,B6,C6,D6
3,A7,,,,B7,C7,D7


In [142]:
lefts.join(rights,on='A',how='right') \
     .orderBy('A',ascending=True).show()

+---+----+----+----+---+---+---+
|  A|   B|   C|   D|  F|  G|  H|
+---+----+----+----+---+---+---+
| A0|  B0|  C0|  D0| B4| C4| D4|
| A1|  B1|  C1|  D1| B5| C5| D5|
| A6|null|null|null| B6| C6| D6|
| A7|null|null|null| B7| C7| D7|
+---+----+----+----+---+---+---+



## Inner Join


In [144]:
leftp.merge(rightp,on='A',how='inner')

Unnamed: 0,A,B,C,D,F,G,H
0,A0,B0,C0,D0,B4,C4,D4
1,A1,B1,C1,D1,B5,C5,D5


In [145]:
lefts.join(rights,on='A',how='inner').orderBy('A',ascending=True).show()

+---+---+---+---+---+---+---+
|  A|  B|  C|  D|  F|  G|  H|
+---+---+---+---+---+---+---+
| A0| B0| C0| D0| B4| C4| D4|
| A1| B1| C1| D1| B5| C5| D5|
+---+---+---+---+---+---+---+



## Full Join or Outer Join

In [55]:
leftp.merge(rightp,on='A',how='outer')

Unnamed: 0,A,B,C,D,F,G,H
0,A0,B0,C0,D0,B4,C4,D4
1,A1,B1,C1,D1,B5,C5,D5
2,A2,B2,C2,D2,,,
3,A3,B3,C3,D3,,,
4,A6,,,,B6,C6,D6
5,A7,,,,B7,C7,D7


In [56]:
lefts.join(rights,on='A',how='full').orderBy('A',ascending=True).show()

+---+----+----+----+----+----+----+
|  A|   B|   C|   D|   F|   G|   H|
+---+----+----+----+----+----+----+
| A0|  B0|  C0|  D0|  B4|  C4|  D4|
| A1|  B1|  C1|  D1|  B5|  C5|  D5|
| A2|  B2|  C2|  D2|null|null|null|
| A3|  B3|  C3|  D3|null|null|null|
| A6|null|null|null|  B6|  C6|  D6|
| A7|null|null|null|  B7|  C7|  D7|
+---+----+----+----+----+----+----+



# Concat Columns

![](./image/merging_append3.png)

In [57]:
my_list = [('a', 2, 3),
           ('b', 5, 6),
           ('c', 8, 9),
           ('a', 2, 3),
           ('b', 5, 6),
           ('c', 8, 9)]
col_name = ['col1', 'col2', 'col3']
#
dp = pd.DataFrame(my_list,columns=col_name)
ds = spark.createDataFrame(my_list,schema=col_name)

In [58]:
dp['concat'] = dp.apply(lambda x:'%s%s'%(x['col1'],x['col2']),axis=1)
dp

Unnamed: 0,col1,col2,col3,concat
0,a,2,3,a2
1,b,5,6,b5
2,c,8,9,c8
3,a,2,3,a2
4,b,5,6,b5
5,c,8,9,c8


In [59]:
ds.withColumn('concat',F.concat('col1','col2')).show()

+----+----+----+------+
|col1|col2|col3|concat|
+----+----+----+------+
|   a|   2|   3|    a2|
|   b|   5|   6|    b5|
|   c|   8|   9|    c8|
|   a|   2|   3|    a2|
|   b|   5|   6|    b5|
|   c|   8|   9|    c8|
+----+----+----+------+



# GroupBy

In [60]:
dp.groupby(['col1']).agg({'col2':'min','col3':'mean'})


Unnamed: 0_level_0,col2,col3
col1,Unnamed: 1_level_1,Unnamed: 2_level_1
a,2,3
b,5,6
c,8,9


In [61]:
ds.groupBy(['col1']).agg({'col2': 'min', 'col3': 'avg'}).show()


+----+---------+---------+
|col1|min(col2)|avg(col3)|
+----+---------+---------+
|   c|        8|      9.0|
|   b|        5|      6.0|
|   a|        2|      3.0|
+----+---------+---------+



# Pivot

https://pandas.pydata.org/pandas-docs/stable/user_guide/reshaping.html

In [62]:
pd.pivot_table(dp, values='col3', index='col1', columns='col2', aggfunc=np.sum)


col2,2,5,8
col1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
a,6.0,,
b,,12.0,
c,,,18.0


In [63]:
ds.groupBy(['col1']).pivot('col2').sum('col3').show()

+----+----+----+----+
|col1|   2|   5|   8|
+----+----+----+----+
|   c|null|null|  18|
|   b|null|  12|null|
|   a|   6|null|null|
+----+----+----+----+



# Window

In [64]:
d = {'A':['a','b','c','d'],'B':['m','m','n','n'],'C':[1,2,3,6]}
dp = pd.DataFrame(d)
ds = spark.createDataFrame(dp)

In [65]:
dp['rank'] = dp.groupby('B')['C'].rank('dense',ascending=False)
dp

Unnamed: 0,A,B,C,rank
0,a,m,1,2.0
1,b,m,2,1.0
2,c,n,3,2.0
3,d,n,6,1.0


In [66]:
from pyspark.sql.window import Window
w = Window.partitionBy('B').orderBy(ds.C.desc())
ds = ds.withColumn('rank',F.rank().over(w))
ds.show(5)

+---+---+---+----+
|  A|  B|  C|rank|
+---+---+---+----+
|  b|  m|  2|   1|
|  a|  m|  1|   2|
|  d|  n|  6|   1|
|  c|  n|  3|   2|
+---+---+---+----+



# rank vs dense_rank

In [67]:
d ={'Id':[1,2,3,4,5,6],
    'Score': [4.00, 4.00, 3.85, 3.65, 3.65, 3.50]}
#
data = pd.DataFrame(d)
dp = data.copy()
dp

Unnamed: 0,Id,Score
0,1,4.0
1,2,4.0
2,3,3.85
3,4,3.65
4,5,3.65
5,6,3.5


In [68]:
ds = spark.createDataFrame(data)
ds.show()

+---+-----+
| Id|Score|
+---+-----+
|  1|  4.0|
|  2|  4.0|
|  3| 3.85|
|  4| 3.65|
|  5| 3.65|
|  6|  3.5|
+---+-----+



In [69]:
dp['Rank_dense'] = dp['Score'].rank(method='dense',ascending =False)
dp['Rank'] = dp['Score'].rank(method='min',ascending =False)
dp


Unnamed: 0,Id,Score,Rank_dense,Rank
0,1,4.0,1.0,1.0
1,2,4.0,1.0,1.0
2,3,3.85,2.0,3.0
3,4,3.65,3.0,4.0
4,5,3.65,3.0,4.0
5,6,3.5,4.0,6.0


In [70]:
#
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.orderBy(ds.Score.desc())
ds = ds.withColumn('Rank_spark_dense',F.dense_rank().over(w))
ds = ds.withColumn('Rank_spark',F.rank().over(w))
ds.show()

+---+-----+----------------+----------+
| Id|Score|Rank_spark_dense|Rank_spark|
+---+-----+----------------+----------+
|  1|  4.0|               1|         1|
|  2|  4.0|               1|         1|
|  3| 3.85|               2|         3|
|  4| 3.65|               3|         4|
|  5| 3.65|               3|         4|
|  6|  3.5|               4|         6|
+---+-----+----------------+----------+

