In [14]:
!pip install dask[dataframe]



In [15]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import time as time

## Reading Large Dataset using pandas and dask

In [16]:
pd.DataFrame({
            'A': np.random.randint(0,100,size=20000000),
            'B': np.random.randint(0,100,size=20000000),
            'C': np.random.randint(0,100,size=20000000)
}).to_csv('dataset.csv',index=False)

In [17]:
start_time = time.time()
df = pd.read_csv('dataset.csv')
pandas_time = time.time() - start_time
print(f"Pandas: shape ={df.shape}, time ={pandas_time} seconds")

Pandas: shape =(20000000, 3), time =1.7752459049224854 seconds


In [18]:
# Read the same file using desk
start_time = time.time()
dask_df = pd.read_csv('dataset.csv')
dask_time = time.time() - start_time
print(f"Dask: shape ={dask_df.shape}, time ={dask_time} seconds")

Dask: shape =(20000000, 3), time =1.721172571182251 seconds


In [19]:
df.head()

Unnamed: 0,A,B,C
0,58,31,7
1,40,39,74
2,26,83,28
3,37,10,64
4,20,25,63


In [20]:
dask_df.head()

Unnamed: 0,A,B,C
0,58,31,7
1,40,39,74
2,26,83,28
3,37,10,64
4,20,25,63


## Group By and Aggregation using pandas and dask

In [21]:
# Time the group by operation using Pandas
start_time = time.time()
pandas_groupby = df.groupby(['A','B']).agg({'C':'sum'})
pandas_time = time.time() - start_time
print(f"Pandas: Time ={pandas_time} seconds")

Pandas: Time =0.6233129501342773 seconds


In [22]:
# Time the group by operation using Dask
start_time = time.time()
dask_groupby = dask_df.groupby(['A','B']).agg({'C':'sum'})
dask_time = time.time() - start_time
print(f"Dask: Time ={dask_time} seconds")

Dask: Time =0.617121696472168 seconds


## Merging Datasets using pandas and dask

In [23]:
# Merge using Pandas
start_time = time.time()
marged_pandas = pd.merge(df,df)
pandas_time = time.time() - start_time
print(f"Pandas: Time ={pandas_time} seconds")

Pandas: Time =35.56597948074341 seconds


In [34]:
# Merge using Dask
start_time = time.time()
marged_dask = dd.merge(dask_df,dask_df)
dask_time = time.time() - start_time
print(f"Pandas: Time ={dask_time} seconds")

Pandas: Time =0.6841371059417725 seconds


## Filtering Data using pandas and dask

In [25]:
# Filtering using Pandas
start_time = time.time()
selected_pandas = df[df['A'] > 5000000]
pandas_time = time.time() - start_time
print(f"Pandas: Time ={pandas_time} seconds")

Pandas: Time =0.047921180725097656 seconds


In [33]:
# Filtering using Dask
start_time = time.time()
selected_dask = dask_df[dask_df['A'] > 5000000]
dask_time = time.time() - start_time
print(f"Dask: Time ={dask_time} seconds")

Dask: Time =0.017808198928833008 seconds


## Apply function in pandas and dask

In [27]:
# function to perform Apply on
def my_function(x):
    return x * 2

In [28]:
# Apply using Pandas
start_time = time.time()
applied_pandas = df['A'].apply(my_function)
pandas_time = time.time() - start_time
print(f"Pandas: Time ={pandas_time} seconds")

Pandas: Time =3.3092613220214844 seconds


In [32]:
# Apply using Dask
start_time = time.time()
applied_dask = dask_df['A'].map(my_function)
dask_time = time.time() - start_time
print(f"Dask: Time ={dask_time} seconds")

Dask: Time =3.285322904586792 seconds


## Distibuted Computing

In [43]:
# Distibuted Computing using Dask
dask_df = dd.from_pandas(df, npartitions=24)
start_time = time.time()
distibuted_dask = dask_df['A'].map_partitions(my_function)
dask_time = time.time() - start_time
print(f"Dask: Time ={dask_time} seconds")

Dask: Time =0.019426584243774414 seconds
