In [1]:
from pyspark.sql.types import *
from handyspark import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pandas as pd
import numpy as np

In [2]:
#####################
# Create SparkSession
#####################
sc, sql_sc = [None, None]
try:
    sc = SparkSession \
        .builder \
        .appName("spark-flo") \
        .config("spark.executor.memory", "30g") \
        .config("spark.driver.memory", "30g") \
        .config("spark.driver.allowMultipleContexts", "false") \
        .enableHiveSupport() \
        .getOrCreate()

    sql_sc = SQLContext(sc)
except SparkSessionError:
    print("Spark Session Failed to initialize.")
    pass

# Create spark dataframe from pandas dataframe

In [3]:
pdf1 = pd.DataFrame({'name':['Doraemon','Hachi','Shinchan','Felix'],'score':[24,68,21,49],'kind':['Cat','Bee','Human','Cat']})
pdf2 = pd.DataFrame({'name':['Nobita','Doraemon','Dorami','Ratatouile'],'score':[60,21,45,30],'kind':['Human','Cat','Cat','Mouse'],})

In [4]:
pdf1

Unnamed: 0,name,score,kind
0,Doraemon,24,Cat
1,Hachi,68,Bee
2,Shinchan,21,Human
3,Felix,49,Cat


In [5]:
pdf2

Unnamed: 0,name,score,kind
0,Nobita,60,Human
1,Doraemon,21,Cat
2,Dorami,45,Cat
3,Ratatouile,30,Mouse


In [6]:
sdf1 = sc.createDataFrame(pdf1)
sdf2 = sc.createDataFrame(pdf2)

In [7]:
sdf1.show()

+--------+-----+-----+
|    name|score| kind|
+--------+-----+-----+
|Doraemon|   24|  Cat|
|   Hachi|   68|  Bee|
|Shinchan|   21|Human|
|   Felix|   49|  Cat|
+--------+-----+-----+



In [8]:
sdf2.show()

+----------+-----+-----+
|      name|score| kind|
+----------+-----+-----+
|    Nobita|   60|Human|
|  Doraemon|   21|  Cat|
|    Dorami|   45|  Cat|
|Ratatouile|   30|Mouse|
+----------+-----+-----+



# Joins

### pandas

In [9]:
pdf1.set_index('name').join(pdf2.set_index('name'),how='left',lsuffix='_pdf1').reset_index()

Unnamed: 0,name,score_pdf1,kind_pdf1,score,kind
0,Doraemon,24,Cat,21.0,Cat
1,Hachi,68,Bee,,
2,Shinchan,21,Human,,
3,Felix,49,Cat,,


In [10]:
pdf1.merge(pdf2,on='name',how='left',suffixes=['_pdf1','_pdf2'])

Unnamed: 0,name,score_pdf1,kind_pdf1,score_pdf2,kind_pdf2
0,Doraemon,24,Cat,21.0,Cat
1,Hachi,68,Bee,,
2,Shinchan,21,Human,,
3,Felix,49,Cat,,


### sql

In [11]:
sql_sc.registerDataFrameAsTable(sdf1,'table1')
sql_sc.registerDataFrameAsTable(sdf2,'table2')

In [12]:
sql_sc.sql("""    
select *
from table1
left join table2 on table1.name = table2.name
""").collect()

[Row(name='Felix', score=49, kind='Cat', name=None, score=None, kind=None),
 Row(name='Shinchan', score=21, kind='Human', name=None, score=None, kind=None),
 Row(name='Doraemon', score=24, kind='Cat', name='Doraemon', score=21, kind='Cat'),
 Row(name='Hachi', score=68, kind='Bee', name=None, score=None, kind=None)]

In [13]:
sql_sc.sql("""    
select *
from table1
left join table2 on table1.name = table2.name
""").show()

+--------+-----+-----+--------+-----+----+
|    name|score| kind|    name|score|kind|
+--------+-----+-----+--------+-----+----+
|   Felix|   49|  Cat|    null| null|null|
|Shinchan|   21|Human|    null| null|null|
|Doraemon|   24|  Cat|Doraemon|   21| Cat|
|   Hachi|   68|  Bee|    null| null|null|
+--------+-----+-----+--------+-----+----+



### spark

In [14]:
t1 = sdf1.alias('t1_alias')
t2 = sdf2.alias('t2_alias')

In [15]:
t1.join(t2, t1.name == t2.name, how='left').collect()

[Row(name='Felix', score=49, kind='Cat', name=None, score=None, kind=None),
 Row(name='Shinchan', score=21, kind='Human', name=None, score=None, kind=None),
 Row(name='Doraemon', score=24, kind='Cat', name='Doraemon', score=21, kind='Cat'),
 Row(name='Hachi', score=68, kind='Bee', name=None, score=None, kind=None)]

In [16]:
t1.join(t2, t1.name == t2.name, how='left').show()

+--------+-----+-----+--------+-----+----+
|    name|score| kind|    name|score|kind|
+--------+-----+-----+--------+-----+----+
|   Felix|   49|  Cat|    null| null|null|
|Shinchan|   21|Human|    null| null|null|
|Doraemon|   24|  Cat|Doraemon|   21| Cat|
|   Hachi|   68|  Bee|    null| null|null|
+--------+-----+-----+--------+-----+----+



In [17]:
sdf1.join(sdf2, sdf1.name == sdf2.name, how='left').show()

+--------+-----+-----+--------+-----+----+
|    name|score| kind|    name|score|kind|
+--------+-----+-----+--------+-----+----+
|   Felix|   49|  Cat|    null| null|null|
|Shinchan|   21|Human|    null| null|null|
|Doraemon|   24|  Cat|Doraemon|   21| Cat|
|   Hachi|   68|  Bee|    null| null|null|
+--------+-----+-----+--------+-----+----+



# Group By

### pandas

#### count

In [18]:
pdf1['kind'].value_counts()

Cat      2
Bee      1
Human    1
Name: kind, dtype: int64

#### average

In [19]:
pdf1['score'].mean()

40.5

### sql 

#### count

In [20]:
sql_sc.sql("""    
select kind, count(*) as count_kind
from table1
group by kind
""").show()

+-----+----------+
| kind|count_kind|
+-----+----------+
|  Cat|         2|
|  Bee|         1|
|Human|         1|
+-----+----------+



#### Average

In [21]:
sql_sc.sql("""    
select avg(score) as score_avg
from table1
""").show()

+---------+
|score_avg|
+---------+
|     40.5|
+---------+



### spark

#### count

In [22]:
sdf1.groupBy('kind').count().show()

+-----+-----+
| kind|count|
+-----+-----+
|  Cat|    2|
|  Bee|    1|
|Human|    1|
+-----+-----+



In [23]:
sdf1.groupBy('kind').agg(count('kind').alias('count_kind')).show()

+-----+----------+
| kind|count_kind|
+-----+----------+
|  Cat|         2|
|  Bee|         1|
|Human|         1|
+-----+----------+



#### average

In [24]:
sdf1.select(mean(col('score')).alias('score_avg')).show()

+---------+
|score_avg|
+---------+
|     40.5|
+---------+



# Order by

### pandas

### sql

In [25]:
sql_sc.sql("""
select *
from table1
order by score
""").show()

+--------+-----+-----+
|    name|score| kind|
+--------+-----+-----+
|Shinchan|   21|Human|
|Doraemon|   24|  Cat|
|   Felix|   49|  Cat|
|   Hachi|   68|  Bee|
+--------+-----+-----+



### spark

In [26]:
sdf1.orderBy('score').show()

+--------+-----+-----+
|    name|score| kind|
+--------+-----+-----+
|Shinchan|   21|Human|
|Doraemon|   24|  Cat|
|   Felix|   49|  Cat|
|   Hachi|   68|  Bee|
+--------+-----+-----+



# Filter using where 

### pandas

In [27]:
pdf1[pdf1['score']<30]

Unnamed: 0,name,score,kind
0,Doraemon,24,Cat
2,Shinchan,21,Human


### sql

In [28]:
sql_sc.sql("""
select *
from table1
where score < 30
""").show()

+--------+-----+-----+
|    name|score| kind|
+--------+-----+-----+
|Doraemon|   24|  Cat|
|Shinchan|   21|Human|
+--------+-----+-----+



### spark

In [29]:
sdf1.where(sdf1.score < 30).show()

+--------+-----+-----+
|    name|score| kind|
+--------+-----+-----+
|Doraemon|   24|  Cat|
|Shinchan|   21|Human|
+--------+-----+-----+



# Filter using like

### pandas

### sql

In [30]:
sql_sc.sql("""
select *
from table1
where lower(name) like 'dora%'
""").show()

+--------+-----+----+
|    name|score|kind|
+--------+-----+----+
|Doraemon|   24| Cat|
+--------+-----+----+



### spark

In [31]:
sdf1.where(lower(col('name')).like("dora%")).show()

+--------+-----+----+
|    name|score|kind|
+--------+-----+----+
|Doraemon|   24| Cat|
+--------+-----+----+



# Count distinct values in a column

### pandas

In [32]:
pdf1['kind'].unique()

array(['Cat', 'Bee', 'Human'], dtype=object)

In [33]:
pdf1['kind'].nunique()

3

### sql

In [34]:
sql_sc.sql("""
select distinct kind
from table1
""").show()

+-----+
| kind|
+-----+
|  Cat|
|  Bee|
|Human|
+-----+



In [35]:
sql_sc.sql("""
select count(distinct kind) as count_unique_kind
from table1
""").show()

+-----------------+
|count_unique_kind|
+-----------------+
|                3|
+-----------------+



### spark

In [36]:
sdf1.select("kind").distinct().show()

+-----+
| kind|
+-----+
|  Cat|
|  Bee|
|Human|
+-----+



In [37]:
sdf1.select(countDistinct("kind").alias('count_unique_kind')).alias('count_kind').show()

+-----------------+
|count_unique_kind|
+-----------------+
|                3|
+-----------------+



# Count null rows per column

### pandas

In [38]:
pdf1.describe()

Unnamed: 0,score
count,4.0
mean,40.5
std,22.218611
min,21.0
25%,23.25
50%,36.5
75%,53.75
max,68.0


In [39]:
pdf1.isnull().sum()

name     0
score    0
kind     0
dtype: int64

### sql

In [40]:
sql_sc.sql("""
describe table1
""").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|    name|   string|   null|
|   score|   bigint|   null|
|    kind|   string|   null|
+--------+---------+-------+



In [41]:
sql_sc.sql("""
select
sum(case when name is null then 1 else 0 end) as count_null_name,
sum(case when score is null then 1 else 0 end) as count_null_score,
sum(case when kind is null then 1 else 0 end) as count_null_kind
from table1
""").show()

+---------------+----------------+---------------+
|count_null_name|count_null_score|count_null_kind|
+---------------+----------------+---------------+
|              0|               0|              0|
+---------------+----------------+---------------+



### spark

In [42]:
sdf1.describe().show()

+-------+--------+------------------+-----+
|summary|    name|             score| kind|
+-------+--------+------------------+-----+
|  count|       4|                 4|    4|
|   mean|    null|              40.5| null|
| stddev|    null|22.218610817660647| null|
|    min|Doraemon|                21|  Bee|
|    max|Shinchan|                68|Human|
+-------+--------+------------------+-----+



In [43]:
sdf1.select([count(when(isnan(c), c)).alias('count_null_'+c) for c in sdf1.columns]).show()

+---------------+----------------+---------------+
|count_null_name|count_null_score|count_null_kind|
+---------------+----------------+---------------+
|              0|               0|              0|
+---------------+----------------+---------------+

