## Migration from pandas to Koalas

### Object creation

In [0]:
import numpy as np
import pandas as pd
import databricks.koalas as ks

In [0]:
# Create a pandas Series
pser = pd.Series([1, 3, 5, np.nan, 6, 8]) 
# Create a Koalas Series
kser = ks.Series([1, 3, 5, np.nan, 6, 8])
# Create a Koalas Series by passing a pandas Series
kser = ks.Series(pser)
kser = ks.from_pandas(pser)

In [0]:
pser

In [0]:
kser

In [0]:
kser.sort_index()

In [0]:
# Create a pandas DataFrame
pdf = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})
# Create a Koalas DataFrame
kdf = ks.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})
# Create a Koalas DataFrame by passing a pandas DataFrame
kdf = ks.DataFrame(pdf)
kdf = ks.from_pandas(pdf)

In [0]:
pdf

Unnamed: 0,A,B
0,0.415024,0.716808
1,0.265086,0.888355
2,0.512275,0.999517
3,0.627645,0.351122
4,0.577378,0.814657


In [0]:
kdf.sort_index()

Unnamed: 0,A,B
0,0.415024,0.716808
1,0.265086,0.888355
2,0.512275,0.999517
3,0.627645,0.351122
4,0.577378,0.814657


### Viewing data

In [0]:
kdf.head(2)

Unnamed: 0,A,B
0,0.415024,0.716808
1,0.265086,0.888355


In [0]:
kdf.describe()

Unnamed: 0,A,B
count,5.0,5.0
mean,0.479482,0.754092
std,0.143815,0.247842
min,0.265086,0.351122
25%,0.415024,0.716808
50%,0.512275,0.814657
75%,0.577378,0.888355
max,0.627645,0.999517


In [0]:
kdf.sort_values(by='B')

Unnamed: 0,A,B
3,0.627645,0.351122
0,0.415024,0.716808
4,0.577378,0.814657
1,0.265086,0.888355
2,0.512275,0.999517


In [0]:
kdf.transpose()

Unnamed: 0,0,1,2,3,4
A,0.415024,0.265086,0.512275,0.627645,0.577378
B,0.716808,0.888355,0.999517,0.351122,0.814657


In [0]:
from databricks.koalas.config import set_option, get_option
ks.get_option('compute.max_rows')

In [0]:
ks.set_option('compute.max_rows', 2000)
ks.get_option('compute.max_rows')

### Selection

In [0]:
kdf['A']  # or kdf.A

In [0]:
kdf[['A', 'B']]

Unnamed: 0,A,B
0,0.415024,0.716808
1,0.265086,0.888355
2,0.512275,0.999517
3,0.627645,0.351122
4,0.577378,0.814657


In [0]:
kdf.loc[1:2]

Unnamed: 0,A,B
1,0.265086,0.888355
2,0.512275,0.999517


In [0]:
kdf.iloc[:3, 1:2]

Unnamed: 0,B
0,0.716808
1,0.888355
2,0.999517


In [0]:
kser = ks.Series([100, 200, 300, 400, 500], index=[0, 1, 2, 3, 4])
# The below commented line will fail since Koalas disallows adding columns coming from
# different DataFrames or Series to a Koalas DataFrame as adding columns requires
# join operations which are generally expensive.
# This operation can be enabled by setting compute.ops_on_diff_frames to True.
# If you want to know about more detail, See the following blog post.
# https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html
# kdf['C'] = kser

In [0]:
# Those are needed for managing options
from databricks.koalas.config import set_option, reset_option
set_option("compute.ops_on_diff_frames", True)
kdf['C'] = kser
# Reset to default to avoid potential expensive operation in the future
reset_option("compute.ops_on_diff_frames")
kdf

Unnamed: 0,A,B,C
0,0.415024,0.716808,100
1,0.265086,0.888355,200
2,0.512275,0.999517,300
3,0.627645,0.351122,400
4,0.577378,0.814657,500


### Applying Python function with Koalas object

In [0]:
kdf.apply(np.cumsum)

Unnamed: 0,A,B,C
0,0.415024,0.716808,100
1,0.68011,1.605163,300
2,1.192385,2.60468,600
3,1.82003,2.955802,1000
4,2.397408,3.770459,1500


In [0]:
kdf.apply(np.cumsum, axis=1)

Unnamed: 0,A,B,C
0,0.415024,1.131832,101.131832
1,0.265086,1.153442,201.153442
2,0.512275,1.511791,301.511791
3,0.627645,0.978767,400.978767
4,0.577378,1.392036,501.392036


In [0]:
kdf.apply(lambda x: x ** 2)

Unnamed: 0,A,B,C
0,0.172245,0.513813,10000
1,0.070271,0.789175,40000
2,0.262425,0.999033,90000
3,0.393938,0.123287,160000
4,0.333366,0.663666,250000


In [0]:
def square(x) -> ks.Series[np.float64]:
    return x ** 2

In [0]:
kdf.apply(square)

Unnamed: 0,A,B,C
0,0.172245,0.513813,10000.0
1,0.070271,0.789175,40000.0
2,0.262425,0.999033,90000.0
3,0.393938,0.123287,160000.0
4,0.333366,0.663666,250000.0


In [0]:
# Working properly since size of data <= compute.shortcut_limit (1000)
ks.DataFrame({'A': range(1000)}).apply(lambda col: col.max())

In [0]:
# Not working properly since size of data > compute.shortcut_limit (1000)
ks.DataFrame({'A': range(1001)}).apply(lambda col: col.max())

In [0]:
ks.set_option('compute.shortcut_limit', 1001)
ks.DataFrame({'A': range(1001)}).apply(lambda col: col.max())

### Grouping Data

In [0]:
kdf.groupby('A').sum()

Unnamed: 0_level_0,B,C
A,Unnamed: 1_level_1,Unnamed: 2_level_1
0.512275,0.999517,300
0.415024,0.716808,100
0.265086,0.888355,200
0.577378,0.814657,500
0.627645,0.351122,400


In [0]:
kdf.groupby(['A', 'B']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,C
A,B,Unnamed: 2_level_1
0.415024,0.716808,100
0.627645,0.351122,400
0.265086,0.888355,200
0.512275,0.999517,300
0.577378,0.814657,500


### Plotting

In [0]:
# This is needed for visualizing plot on notebook
%matplotlib inline

In [0]:
speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]
index = ['snail', 'pig', 'elephant',
         'rabbit', 'giraffe', 'coyote', 'horse']
kdf = ks.DataFrame({'speed': speed,
                   'lifespan': lifespan}, index=index)
kdf.plot.bar()

In [0]:
kdf.plot.barh()

In [0]:
kdf = ks.DataFrame({'mass': [0.330, 4.87, 5.97],
                    'radius': [2439.7, 6051.8, 6378.1]},
                   index=['Mercury', 'Venus', 'Earth'])
kdf.plot.pie(y='mass')

In [0]:
kdf = ks.DataFrame({
    'sales': [3, 2, 3, 9, 10, 6, 3],
    'signups': [5, 5, 6, 12, 14, 13, 9],
    'visits': [20, 42, 28, 62, 81, 50, 90],
}, index=pd.date_range(start='2019/08/15', end='2020/03/09',
                       freq='M'))
kdf.plot.area()

In [0]:
kdf = ks.DataFrame({'pig': [20, 18, 489, 675, 1776],
                    'horse': [4, 25, 281, 600, 1900]},
                   index=[1990, 1997, 2003, 2009, 2014])
kdf.plot.line()

In [0]:
kdf = pd.DataFrame(
    np.random.randint(1, 7, 6000),
    columns=['one'])
kdf['two'] = kdf['one'] + np.random.randint(1, 7, 6000)
kdf = ks.from_pandas(kdf)
kdf.plot.hist(bins=12, alpha=0.5)

In [0]:
kdf = ks.DataFrame([[5.1, 3.5, 0], [4.9, 3.0, 0], [7.0, 3.2, 1],
                    [6.4, 3.2, 1], [5.9, 3.0, 2]],
                   columns=['length', 'width', 'species'])
kdf.plot.scatter(x='length',
                 y='width',
                 c='species')

## Missing Functionalities and Workarounds in Koalas

### Directly use pandas APIs through type conversion

In [0]:
kidx = kdf.index

In [0]:
# Index.to_list() raises PandasNotImplementedError.
# Koalas does not support this because it requires collecting all data into the client
# (driver node) side. A simple workaround is to convert to pandas using to_pandas().
# If you want to know about more detail, See the following blog post.
# https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html
# kidx.to_list()

In [0]:
kidx.to_pandas().to_list()

### Native Support for pandas Objects

In [0]:
kdf = ks.DataFrame({'A': 1.,
                    'B': pd.Timestamp('20130102'),
                    'C': pd.Series(1, index=list(range(4)), dtype='float32'),
                    'D': np.array([3] * 4, dtype='int32'),
                    'F': 'foo'})

In [0]:
kdf

Unnamed: 0,A,B,C,D,F
0,1.0,2013-01-02,1.0,3,foo
1,1.0,2013-01-02,1.0,3,foo
2,1.0,2013-01-02,1.0,3,foo
3,1.0,2013-01-02,1.0,3,foo


### Distributed execution for pandas functions

In [0]:
i = pd.date_range('2018-04-09', periods=2000, freq='1D1min')
ts = ks.DataFrame({'A': ['timestamp']}, index=i)

# DataFrame.between_time() is not yet implemented in Koalas.
# A simple workaround is to convert to a pandas DataFrame using to_pandas(),
# and then applying the function.
# If you want to know about more detail, See the following blog post.
# https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html
# ts.between_time('0:15', '0:16')

In [0]:
ts.to_pandas().between_time('0:15', '0:16')

Unnamed: 0,A
2018-04-24 00:15:00,timestamp
2018-04-25 00:16:00,timestamp
2022-04-04 00:15:00,timestamp
2022-04-05 00:16:00,timestamp


In [0]:
ts.map_in_pandas(func=lambda pdf: pdf.between_time('0:15', '0:16'))

Unnamed: 0,A
2018-04-24 00:15:00,timestamp
2018-04-25 00:16:00,timestamp
2022-04-04 00:15:00,timestamp
2022-04-05 00:16:00,timestamp


### Using SQL in Koalas

In [0]:
kdf = ks.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'pig': [20, 18, 489, 675, 1776],
                    'horse': [4, 25, 281, 600, 1900]})

In [0]:
ks.sql("SELECT * FROM {kdf} WHERE pig > 100")

Unnamed: 0,year,pig,horse
0,2003,489,281
1,2009,675,600
2,2014,1776,1900


In [0]:
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'sheep': [22, 50, 121, 445, 791],
                    'chicken': [250, 326, 589, 1241, 2118]})

In [0]:
ks.sql('''
    SELECT ks.pig, pd.chicken
    FROM {kdf} ks INNER JOIN {pdf} pd
    ON ks.year = pd.year
    ORDER BY ks.pig, pd.chicken''')

Unnamed: 0,pig,chicken
0,18,326
1,20,250
2,489,589
3,675,1241
4,1776,2118


## Working with PySpark

### Conversion from and to PySpark DataFrame

In [0]:
kdf = ks.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
sdf = kdf.to_spark()
type(sdf)

In [0]:
sdf.show()

In [0]:
from databricks.koalas import option_context
with option_context(
        "compute.default_index_type", "distributed-sequence"):
    kdf = sdf.to_koalas()
type(kdf)

In [0]:
kdf

Unnamed: 0,A,B
4,5,50
0,1,10
1,2,20
3,4,40
2,3,30


In [0]:
sdf.to_koalas(index_col='A')

Unnamed: 0_level_0,B
A,Unnamed: 1_level_1
2,20
4,40
1,10
3,30
5,50


### Checking Spark execution plans

In [0]:
from databricks.koalas import option_context

with option_context(
        "compute.ops_on_diff_frames", True,
        "compute.default_index_type", 'distributed'):
    df = ks.range(10) + ks.range(10)
    df.explain()

In [0]:
with option_context(
        "compute.ops_on_diff_frames", False,
        "compute.default_index_type", 'distributed'):
    df = ks.range(10)
    df = df + df
    df.explain()

### Caching DataFrames

In [0]:
with option_context("compute.default_index_type", 'distributed'):
    df = ks.range(10)
    new_df = (df + df).cache()  # `(df + df)` is cached here as `df`
    new_df.explain()

In [0]:
new_df.unpersist()

In [0]:
with (df + df).cache() as df:
    df.explain()