In [None]:
"""
Install and configuration
(pip or pip3)
"""
# !pip install koalas
# !pip install findspark
# !pip install pyspark

In [1]:
import glob
from datetime import datetime

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [2]:
import pandas as pd
import databricks.koalas as ks

#configure spark context
import pyspark
import findspark
findspark.init()

sc = pyspark.SparkContext(appName='koalas_test')
sc

In [4]:
"""
Reading 28 files and concatenating
"""

#Pandas

print('Pandas:')
start = datetime.now()

path = r'' # use your path
all_files = glob.glob(path + "bronze/*.csv")

li = []

for filename in all_files:
    df = pd.read_csv(filename, index_col=None, header=0)
    li.append(df)

print(f'{len(li)} files imported.')
pdf = pd.concat(li, axis=0, ignore_index=True)
print(f'Shape: {pdf.shape}')
print(f"df size in memory: {round(sum(pdf.memory_usage(deep=True))/(1024*1024), 2)} MB")
pdf.head(1)

end = datetime.now()
print(f'Duration: {end-start}')
print('-----------------------------------')



#Koalas

print('Koalas:')

start = datetime.now()

path = r'' # use your path
all_files = glob.glob(path + "bronze/*.csv")

li = []

for filename in all_files:
    df = ks.read_csv(filename, index_col=None, header=0)
    li.append(df)

print(f'{len(li)} files imported.')
kdf = ks.concat(li, axis=0, ignore_index=True)
print(f'Shape: {kdf.shape}')
kdf.head(1)

end = datetime.now()
print(f'Duration: {end-start}')

'\nReading 28 files and concatenating and head()\n'

Pandas:
28 files imported.
Shape: (16778708, 32)
df size in memory: 5795.97 MB


Unnamed: 0,airline,plane,flight,origin_airport,origin_city,origin_state,destination_airport,destination_city,destination_state,departure_delay,...,nas_delay,security_delay,aircraft_delay,year,month,day,departure_scheduled_hour,departure_scheduled_minute,arrival_scheduled_hour,arrival_scheduled_minute
0,20363,N292PQ,3468,15380,35380,26,11433,31295,26,-4.0,...,0,0,0,2018,1,1,6,15,7,38


Duration: 0:01:21.633994
-----------------------------------
Koalas:
28 files imported.
Shape: (16778708, 32)


Unnamed: 0,airline,plane,flight,origin_airport,origin_city,origin_state,destination_airport,destination_city,destination_state,departure_delay,taxi_out,taxi_in,arrive_delay,cancelled,cancel_code,diverted,scheduled_travel_time,actual_travel_time,number_flights,distance,airline_delay,weather_delay,nas_delay,security_delay,aircraft_delay,year,month,day,departure_scheduled_hour,departure_scheduled_minute,arrival_scheduled_hour,arrival_scheduled_minute
0,20363,N292PQ,3468,15380,35380,26,11433,31295,26,-4.0,26,9.0,-17.0,0,No,0,83.0,70.0,1,207,0,0,0,0,0,2018,1,1,6,15,7,38


Duration: 0:01:05.293464


In [6]:
"""
Grouping data
"""

#pandas

start = datetime.now()

pdf_grouped = pdf.groupby('plane').sum()
pdf_grouped.shape

end = datetime.now()
print('-----------------------------------')
print(f'Pandas Duration: {end-start}')
print('-----------------------------------')



#Koalas

start = datetime.now()

kdf_grouped = kdf.groupby('plane').sum()
kdf_grouped.shape

end = datetime.now()
print('-----------------------------------')
print(f'Koalas Duration: {end-start}')
print('-----------------------------------')
print('-----------------------------------')

#pandas


start = datetime.now()

pdf_grouped = pdf.groupby(['plane', 'origin_city']).sum()
pdf_grouped.shape

end = datetime.now()
print('-----------------------------------')
print(f'Pandas Duration: {end-start}')
print('-----------------------------------')



#Koalas

start = datetime.now()

kdf_grouped = kdf.groupby(['plane', 'origin_city']).sum()
kdf_grouped.shape

end = datetime.now()
print('-----------------------------------')
print(f'Koalas Duration: {end-start}')
print('-----------------------------------')
print('-----------------------------------')

'\nGrouping data\n'

Pandas:


(6179, 30)

Duration: 0:00:12.997644
-----------------------------------
Koalas:


(6180, 30)

Duration: 0:01:05.955752
-----------------------------------
-----------------------------------
Pandas:


(389301, 29)

Duration: 0:00:09.096625
-----------------------------------
Koalas:


(389303, 29)

Duration: 0:01:26.354337


In [29]:
"""
Joining 2 dataframes pandas vs koalas vs pyspark
"""
airlines = list(set(pdf['airline']))
random = list(range(len(airlines)))


aux_kdf = ks.DataFrame({'airlines': airlines,
                         'random': random})

aux_pdf = pd.DataFrame({'airlines': airlines,
                         'random': random})


start = datetime.now()

joined_kdf = kdf.join(aux_kdf, lsuffix='_left', rsuffix='_right')
joined_kdf.head(1)

end = datetime.now()
print('--------------------------------')
print(f'Koalas Duration: {end-start}')
print('--------------------------------')

start = datetime.now()

joined_pdf = pdf.join(aux_pdf, lsuffix='_left', rsuffix='_right')
joined_pdf.head(1)

end = datetime.now()
print('--------------------------------')
print(f'Pandas Duration: {end-start}')
print('--------------------------------')

'\nJoining 2 dataframes pandas vs koalas vs pyspark\n'

Koalas


Unnamed: 0_level_0,airline,flight,origin_airport,origin_city,origin_state,destination_airport,destination_city,destination_state,departure_delay,taxi_out,taxi_in,arrive_delay,cancelled,cancel_code,diverted,scheduled_travel_time,actual_travel_time,number_flights,distance,airline_delay,weather_delay,nas_delay,security_delay,aircraft_delay,year,month,day,departure_scheduled_hour,departure_scheduled_minute,arrival_scheduled_hour,arrival_scheduled_minute,airlines,random
plane,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1
0,20436,874,14027,34027,12,15356,35356,34,0.0,0,0.0,0.0,1,Carrier,0,159.0,0.0,1,985,0,0,0,0,0,2018,1,1,18,10,20,49,20416,0


Duration: 0:00:16.740715
--------------------------------
Pandas


Unnamed: 0,airline,flight,origin_airport,origin_city,origin_state,destination_airport,destination_city,destination_state,departure_delay,taxi_out,...,aircraft_delay,year,month,day,departure_scheduled_hour,departure_scheduled_minute,arrival_scheduled_hour,arrival_scheduled_minute,airlines,random
0,20436,874,14027,34027,12,15356,35356,34,0.0,0,...,0,2018,1,1,18,10,20,49,,


Duration: 0:00:41.694866


In [34]:
"""
Filter
"""

print('Koalas')
start = datetime.now()
k_fill = kdf[kdf['nas_delay'] != 0]
k_fill.head(1)
end = datetime.now()

print('----------------------------------')
print(f'Koalas Duration: {end-start}')
print('----------------------------------')

print('Pandas')
start = datetime.now()
p_fill = pdf[pdf['nas_delay'] != 0]
p_fill.head(1)
end = datetime.now()
print('----------------------------------')
print(f'Pandas Duration: {end-start}')
print('----------------------------------')

'\nFilter\n'

Koalas


Unnamed: 0_level_0,airline,flight,origin_airport,origin_city,origin_state,destination_airport,destination_city,destination_state,departure_delay,taxi_out,taxi_in,arrive_delay,cancelled,cancel_code,diverted,scheduled_travel_time,actual_travel_time,number_flights,distance,airline_delay,weather_delay,nas_delay,security_delay,aircraft_delay,year,month,day,departure_scheduled_hour,departure_scheduled_minute,arrival_scheduled_hour,arrival_scheduled_minute
plane,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1
N309PQ,20363,3470,13871,33316,31,11433,31295,26,47.0,38,11.0,58.0,0,No,0,126.0,137.0,1,651,47,0,11,0,0,2018,1,1,8,15,11,21


----------------------------------
Koalas Duration: 0:00:01.698990
----------------------------------
Pandas


Unnamed: 0_level_0,airline,flight,origin_airport,origin_city,origin_state,destination_airport,destination_city,destination_state,departure_delay,taxi_out,...,nas_delay,security_delay,aircraft_delay,year,month,day,departure_scheduled_hour,departure_scheduled_minute,arrival_scheduled_hour,arrival_scheduled_minute
plane,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
N309PQ,20363,3470,13871,33316,31,11433,31295,26,47.0,38,...,11,0,0,2018,1,1,8,15,11,21


----------------------------------
Pandas Duration: 0:00:00.534770
----------------------------------


In [10]:
"""
Saving dataframe as csv format. pandas vs koalas
"""

start = datetime.now()

kdf.to_csv('kdf.csv')

end = datetime.now()
print('----------------------------------')
print(f'Koalas Duration: {end-start}')
print('----------------------------------')


start = datetime.now()

pdf.to_csv('pdf.csv')

end = datetime.now()
print('----------------------------------')
print(f'Pandas Duration: {end-start}')
print('----------------------------------')

'\nSaving dataframe as csv format. pandas vs koalas\n'

Koalas
Duration: 0:00:22.421496
------------------------
Pandas
Duration: 0:02:19.899314


In [None]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:0.1.0 pyspark-shell'


# https://github.com/pyMixin/DeltaLake/blob/master/Delta%20Lake%20on%20Jupyter%20Notebooks.ipynb

In [None]:
"""
Saving dataframe as delta format. pandas vs koalas


import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:0.1.0 pyspark-shell'

# https://github.com/pyMixin/DeltaLake/blob/master/Delta%20Lake%20on%20Jupyter%20Notebooks.ipynb

"""

print('Koalas')
start = datetime.now()

kdf.to_delta(path='', mode='append', partition_cols='date')

end = datetime.now()
print(f'Duration: {end-start}')

print('------------------------')

# print('Pandas')
# start = datetime.now()

# pdf.to_delta('pdf.csv')

# end = datetime.now()
# print(f'Duration: {end-start}')

In [35]:
'''
OneHotEncoding  Koalas vs Pandas vs Spark syntaxe
'''

#Koalas

start = datetime.now()

ks.get_dummies(data=kdf).head(1)

end = datetime.now()

print('----------------------------------')
print(f'Koalas Duration: {end-start}')
print('----------------------------------')



start = datetime.now()

pd.get_dummies(data=pdf).head(1)

end = datetime.now()

print('----------------------------------')
print(f'Pandas Duration: {end-start}')
print('----------------------------------')


print('Spark..')

# from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
# indexer = StringIndexer(inputCol="Symbol", outputCol="SymbolIndex")
# df_sp_indexed = indexer.fit(df_sp).transform(df_sp)
# encoder = OneHotEncoderEstimator(inputCols=["SymbolIndex"],
#                                  outputCols=["SymbolVec"])
# model = encoder.fit(df_sp_indexed)
# df_sp_encoded = model.transform(df_sp_indexed)
# df_sp_encoded.show(1)

'\nOneHotEncoding  Koalas vs Pandas vs Spark syntaxe\n'

Unnamed: 0_level_0,airline,flight,origin_airport,origin_city,origin_state,destination_airport,destination_city,destination_state,departure_delay,taxi_out,taxi_in,arrive_delay,cancelled,diverted,scheduled_travel_time,actual_travel_time,number_flights,distance,airline_delay,weather_delay,nas_delay,security_delay,aircraft_delay,year,month,day,departure_scheduled_hour,departure_scheduled_minute,arrival_scheduled_hour,arrival_scheduled_minute,cancel_code_Carrier,cancel_code_National Air System,cancel_code_No,cancel_code_Security,cancel_code_Weather
plane,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1
N292PQ,20363,3468,15380,35380,26,11433,31295,26,-4.0,26,9.0,-17.0,0,0,83.0,70.0,1,207,0,0,0,0,0,2018,1,1,6,15,7,38,0,0,1,0,0


----------------------------------
Koalas Duration: 0:00:08.104372
----------------------------------


Unnamed: 0_level_0,airline,flight,origin_airport,origin_city,origin_state,destination_airport,destination_city,destination_state,departure_delay,taxi_out,...,day,departure_scheduled_hour,departure_scheduled_minute,arrival_scheduled_hour,arrival_scheduled_minute,cancel_code_Carrier,cancel_code_National Air System,cancel_code_No,cancel_code_Security,cancel_code_Weather
plane,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
N292PQ,20363,3468,15380,35380,26,11433,31295,26,-4.0,26,...,1,6,15,7,38,0,0,1,0,0


----------------------------------
Pandas Duration: 0:00:06.026467
----------------------------------
Spark..


### Results



| Function          | Pandas   | Koalas   |
|-------------------|----------|----------|
| read_csv + head() | 01:31.63 | 01:05.29 |
| groupby 1 col     | 00:12.99 | 01:05.95 |
| groupby 2 cols    | 00:09.09 | 01:26.35 |
| join              | 00:41.69 | 00:16.74 |
| filter            | 00:00.53 | 00:01.69 |
| one hot encoding  | 00:06.02 | 00:08.10 |
| to_csv            | 02:19.89 | 00:22.42 |

### Limitations

1. Currently, about ~70% of pandas APIs are available in Koalas. For instance, method to_list() and timestamp() are not implemented yet. 
2. For missing functions, best practice acording to Databricks is to convert the kdf to pyspark df or pandas, apply the 'function' and then convert back to kdf.. However, using kdf_to_pdf = kdf.to_pandas() it throws an error...

### Conclusions

1. Good for simple scripts to explore data with Pandas and then change to spark without changing the code
2. Still not ready to change from pandas/python to spark/koalas due to limitations
3. Most operations are slower in Koalas when running locally with small datasets
4. Delta tables hard to use in Jupyter Notebook
5. Maybe in Koalas we have to use a more sparkable approach (caching dfs, etc..) however it should be not the goal of it.. if we need to change the cript we should just use pyspark
