# 🚚 Distributed Dataframes

## Dask Dataframe

https://docs.dask.org/en/latest/dataframe.html

In [1]:
import webbrowser

import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client as DaskClient
from sklearn import datasets

In [2]:
# load boston dataset from sklearn
boston = datasets.load_boston()

# load boston dataset as pandas dataframe
df = pd.DataFrame(boston.data, columns=boston.feature_names)
df.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0,0.00632,18.0,2.31,0.0,0.538,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98
1,0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14
2,0.02729,0.0,7.07,0.0,0.469,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03
3,0.03237,0.0,2.18,0.0,0.458,6.998,45.8,6.0622,3.0,222.0,18.7,394.63,2.94
4,0.06905,0.0,2.18,0.0,0.458,7.147,54.2,6.0622,3.0,222.0,18.7,396.9,5.33


In [3]:
# create Dask scheduler, checking whether it already exists or not from the local runtime for this notebook
if "daskclient" not in locals():
    daskclient = DaskClient()
print("Dask Scheduler Address: " + str(daskclient.scheduler_info()["address"]))

Dask Scheduler Address: tcp://127.0.0.1:59514


In [4]:
# open a webbrowser to view dask status dynamically
webbrowser.open("http://127.0.0.1:8787/status")

True

In [5]:
# load the pandas df into dask dataframe
dask_df = dd.from_pandas(data=df, npartitions=3)
dask_df.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0,0.00632,18.0,2.31,0.0,0.538,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98
1,0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14
2,0.02729,0.0,7.07,0.0,0.469,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03
3,0.03237,0.0,2.18,0.0,0.458,6.998,45.8,6.0622,3.0,222.0,18.7,394.63,2.94
4,0.06905,0.0,2.18,0.0,0.458,7.147,54.2,6.0622,3.0,222.0,18.7,396.9,5.33


In [6]:
ddf = ddf.apply(
    lambda x: (((x ** 10) / 0.345) ** 0.55) / 0.01,
    axis=1,
    meta={
        "CRIM": "float64",
        "ZN": "float64",
        "INDUS": "float64",
        "CHAS": "float64",
        "NOX": "float64",
        "RM": "float64",
        "AGE": "float64",
        "DIS": "float64",
        "RAD": "float64",
        "TAX": "float64",
        "PTRATIO": "float64",
        "B": "float64",
        "LSTAT": "float64",
    },
)
ddf.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0,1.439274e-10,1439456000.0,17950.04,0.0,5.936111,5657521.0,1708289000000.0,415602.1,179.555825,7019463000000000.0,588846900.0,3.523267e+16,1227329.0
1,4.507844e-07,0.0,8433469.0,0.0,2.790295,4966085.0,4876666000000.0,1209945.0,8125.769035,2318377000000000.0,1353660000.0,3.523267e+16,34626100.0
2,4.489717e-07,0.0,8433469.0,0.0,2.790295,9216091.0,1195160000000.0,1209945.0,8125.769035,2318377000000000.0,1353660000.0,3.329086e+16,383157.4
3,1.148112e-06,0.0,13053.0,0.0,2.448836,7971796.0,244883600000.0,3619645.0,75572.954068,1442583000000000.0,1775531000.0,3.413855e+16,67625.4
4,7.406274e-05,0.0,13053.0,0.0,2.448836,8951180.0,618294800000.0,3619645.0,75572.954068,1442583000000000.0,1775531000.0,3.523267e+16,1783196.0


## Spark Dataframe

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?#pyspark.sql.DataFrame

In [7]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import FloatType
from sklearn import datasets

In [8]:
# load boston dataset from sklearn
boston = datasets.load_boston()

# load boston dataset as pandas dataframe
df = pd.DataFrame(boston.data, columns=boston.feature_names)
df.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0,0.00632,18.0,2.31,0.0,0.538,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98
1,0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14
2,0.02729,0.0,7.07,0.0,0.469,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03
3,0.03237,0.0,2.18,0.0,0.458,6.998,45.8,6.0622,3.0,222.0,18.7,394.63,2.94
4,0.06905,0.0,2.18,0.0,0.458,7.147,54.2,6.0622,3.0,222.0,18.7,396.9,5.33


In [9]:
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(df).toDF(
    "CRIM",
    "ZN",
    "INDUS",
    "CHAS",
    "NOX",
    "RM",
    "AGE",
    "DIS",
    "RAD",
    "TAX",
    "PTRATIO",
    "B",
    "LSTAT",
)
sdf.show(n=5, truncate=False)

+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+
|CRIM   |ZN  |INDUS|CHAS|NOX  |RM   |AGE |DIS   |RAD|TAX  |PTRATIO|B     |LSTAT|
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+
|0.00632|18.0|2.31 |0.0 |0.538|6.575|65.2|4.09  |1.0|296.0|15.3   |396.9 |4.98 |
|0.02731|0.0 |7.07 |0.0 |0.469|6.421|78.9|4.9671|2.0|242.0|17.8   |396.9 |9.14 |
|0.02729|0.0 |7.07 |0.0 |0.469|7.185|61.1|4.9671|2.0|242.0|17.8   |392.83|4.03 |
|0.03237|0.0 |2.18 |0.0 |0.458|6.998|45.8|6.0622|3.0|222.0|18.7   |394.63|2.94 |
|0.06905|0.0 |2.18 |0.0 |0.458|7.147|54.2|6.0622|3.0|222.0|18.7   |396.9 |5.33 |
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+
only showing top 5 rows



In [10]:
udf = UserDefinedFunction(lambda x: (((x ** 10) / 0.345) ** 0.55) / 0.01, FloatType())
sdf = sdf.select(*[udf(column).alias(column) for column in sdf.columns])
sdf.show(
    n=5, truncate=False,
)

+-------------+------------+---------+----+---------+---------+-------------+---------+---------+-------------+------------+-------------+---------+
|CRIM         |ZN          |INDUS    |CHAS|NOX      |RM       |AGE          |DIS      |RAD      |TAX          |PTRATIO     |B            |LSTAT    |
+-------------+------------+---------+----+---------+---------+-------------+---------+---------+-------------+------------+-------------+---------+
|1.4392738E-10|1.43945562E9|17950.04 |0.0 |5.936111 |5657521.0|1.70828877E12|415602.06|179.55583|7.0194632E15 |5.8884685E8 |3.5232669E16 |1227328.6|
|4.5078437E-7 |0.0         |8433469.0|0.0 |2.7902951|4966085.5|4.8766659E12 |1209944.5|8125.769 |2.31837664E15|1.35365978E9|3.5232669E16 |3.46261E7|
|4.489717E-7  |0.0         |8433469.0|0.0 |2.7902951|9216091.0|1.19516024E12|1209944.5|8125.769 |2.31837664E15|1.35365978E9|3.32908627E16|383157.44|
|1.1481122E-6 |0.0         |13052.996|0.0 |2.4488356|7971795.5|2.44883554E11|3619644.5|75572.95 |1.4425831

## Mars Dataframe

https://docs.pymars.org/en/latest/getting_started/dataframe.html

In [15]:
import mars.dataframe as md
import pandas as pd
from sklearn import datasets

In [12]:
# load boston dataset from sklearn
boston = datasets.load_boston()

# load boston dataset as pandas dataframe
df = pd.DataFrame(boston.data, columns=boston.feature_names)
df.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0,0.00632,18.0,2.31,0.0,0.538,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98
1,0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14
2,0.02729,0.0,7.07,0.0,0.469,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03
3,0.03237,0.0,2.18,0.0,0.458,6.998,45.8,6.0622,3.0,222.0,18.7,394.63,2.94
4,0.06905,0.0,2.18,0.0,0.458,7.147,54.2,6.0622,3.0,222.0,18.7,396.9,5.33


In [13]:
mdf = md.DataFrame(df)
mdf.head().execute()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0,0.00632,18.0,2.31,0.0,0.538,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98
1,0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14
2,0.02729,0.0,7.07,0.0,0.469,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03
3,0.03237,0.0,2.18,0.0,0.458,6.998,45.8,6.0622,3.0,222.0,18.7,394.63,2.94
4,0.06905,0.0,2.18,0.0,0.458,7.147,54.2,6.0622,3.0,222.0,18.7,396.9,5.33


In [14]:
mdf = mdf.apply(lambda x: (((x ** 10) / 0.345) ** 0.55) / 0.01).execute()
mdf.head().execute()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0,1.439274e-10,1439456000.0,17950.04,0.0,5.936111,5657521.0,1708289000000.0,415602.1,179.555825,7019463000000000.0,588846900.0,3.523267e+16,1227329.0
1,4.507844e-07,0.0,8433469.0,0.0,2.790295,4966085.0,4876666000000.0,1209945.0,8125.769035,2318377000000000.0,1353660000.0,3.523267e+16,34626100.0
2,4.489717e-07,0.0,8433469.0,0.0,2.790295,9216091.0,1195160000000.0,1209945.0,8125.769035,2318377000000000.0,1353660000.0,3.329086e+16,383157.4
3,1.148112e-06,0.0,13053.0,0.0,2.448836,7971796.0,244883600000.0,3619645.0,75572.954068,1442583000000000.0,1775531000.0,3.413855e+16,67625.4
4,7.406274e-05,0.0,13053.0,0.0,2.448836,8951180.0,618294800000.0,3619645.0,75572.954068,1442583000000000.0,1775531000.0,3.523267e+16,1783196.0
