In [17]:
import os
import pandas as pd
import numpy as np
import cupy as cp
import dask.dataframe as dd
import dask.array as da

cp.random.seed(12)


# Dask

## Dataframe

### Creating a Dask Object

In [17]:
index = pd.date_range(start = "01-01-2024 00:00", end = "31-12-2024 23:59", freq = "1h")
df = pd.DataFrame(
    {"a" : np.arange(len(index)), "b" : "zyts" * 300}, index = index
)
ddf = dd.from_pandas(df, npartitions = 10)

In [None]:
ddf.divisions

# this gives the [lower bound, upper bound) (lower bound, upper bound is in terms of index)
# for partition 0, this is from [ Timestamp('2024-01-01 00:00:00'), Timestamp('2024-02-06 15:00:00') )

(Timestamp('2024-01-01 00:00:00'),
 Timestamp('2024-02-06 15:00:00'),
 Timestamp('2024-03-14 06:00:00'),
 Timestamp('2024-04-19 21:00:00'),
 Timestamp('2024-05-26 12:00:00'),
 Timestamp('2024-07-02 02:00:00'),
 Timestamp('2024-08-07 16:00:00'),
 Timestamp('2024-09-13 06:00:00'),
 Timestamp('2024-10-19 20:00:00'),
 Timestamp('2024-11-25 10:00:00'),
 Timestamp('2024-12-31 23:00:00'))

In [54]:
# access a particular partition
print(ddf.partitions[0])
print("\n") # gives new line
print(f"First 10 rows of partition 0: \n {ddf.partitions[0].head(10)}")

Dask DataFrame Structure:
                         a       b
npartitions=1                     
2024-01-01 00:00:00  int64  string
2024-02-06 15:00:00    ...     ...
Dask Name: partitions, 2 expressions
Expr=Partitions(frame=df, partitions=[0])


First 10 rows of partition 0: 
                      a                                                  b
2024-01-01 00:00:00  0  zytszytszytszytszytszytszytszytszytszytszytszy...
2024-01-01 01:00:00  1  zytszytszytszytszytszytszytszytszytszytszytszy...
2024-01-01 02:00:00  2  zytszytszytszytszytszytszytszytszytszytszytszy...
2024-01-01 03:00:00  3  zytszytszytszytszytszytszytszytszytszytszytszy...
2024-01-01 04:00:00  4  zytszytszytszytszytszytszytszytszytszytszytszy...
2024-01-01 05:00:00  5  zytszytszytszytszytszytszytszytszytszytszytszy...
2024-01-01 06:00:00  6  zytszytszytszytszytszytszytszytszytszytszytszy...
2024-01-01 07:00:00  7  zytszytszytszytszytszytszytszytszytszytszytszy...
2024-01-01 08:00:00  8  zytszytszytszytszytszytszytszyt

### Indexing

In [58]:
ddf["a"]

Dask Series Structure:
npartitions=10
2024-01-01 00:00:00    int64
2024-02-06 15:00:00      ...
                       ...  
2024-11-25 10:00:00      ...
2024-12-31 23:00:00      ...
Dask Name: getitem, 2 expressions
Expr=df['a']

In [73]:
# for row based indexing use:
print(ddf.loc["2024-06-01" : "2024-06-13"])

print("\n")
# for column based indexing use:
print(ddf.iloc[:, 0])
# iloc does not support row indexing

Dask DataFrame Structure:
                                   a       b
npartitions=1                               
2024-06-01 00:00:00.000000000  int64  string
2024-06-13 23:59:59.999999999    ...     ...
Dask Name: loc, 2 expressions
Expr=LocSlice(frame=df, iindexer=slice(Timestamp('2024-06-01 00:00:00'), Timestamp('2024-06-13 23:59:59.999999999'), None))


Dask Series Structure:
npartitions=10
2024-01-01 00:00:00    int64
2024-02-06 15:00:00      ...
                       ...  
2024-11-25 10:00:00      ...
2024-12-31 23:00:00      ...
Dask Name: getitem, 2 expressions
Expr=df['a']


In [74]:
ddf.loc["2024-06-01" : "2024-06-13"]

Unnamed: 0_level_0,a,b
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
2024-06-01 00:00:00.000000000,int64,string
2024-06-13 23:59:59.999999999,...,...


### Compute 

In [None]:
ddf.loc["2024-06-01" : "2024-06-13"].compute()

# note the difference with just .compute()
# whenever you have a dask object, it will be lazily evaluated (note the missing "a", "b" values)
# meaning it wouldn't show the full df unless added with .compute
# .compute will produce the full results 

Unnamed: 0,a,b
2024-06-01 00:00:00,3648,zytszytszytszytszytszytszytszytszytszytszytszy...
2024-06-01 01:00:00,3649,zytszytszytszytszytszytszytszytszytszytszytszy...
2024-06-01 02:00:00,3650,zytszytszytszytszytszytszytszytszytszytszytszy...
2024-06-01 03:00:00,3651,zytszytszytszytszytszytszytszytszytszytszytszy...
2024-06-01 04:00:00,3652,zytszytszytszytszytszytszytszytszytszytszytszy...
...,...,...
2024-06-13 19:00:00,3955,zytszytszytszytszytszytszytszytszytszytszytszy...
2024-06-13 20:00:00,3956,zytszytszytszytszytszytszytszytszytszytszytszy...
2024-06-13 21:00:00,3957,zytszytszytszytszytszytszytszytszytszytszytszy...
2024-06-13 22:00:00,3958,zytszytszytszytszytszytszytszytszytszytszytszy...


In [None]:
print(ddf["a"].mean()) # this does not give the value
print(ddf["a"].mean().compute()) # add .compute to trigger the computation



<dask_expr.expr.Scalar: expr=df['a'].mean(), dtype=float64>
4391.5


## Array

### Creating a Dask Array

In [None]:
data = np.arange(1_000_000).reshape(5_000, 1_000_000 // 5_000)
a = da.from_array(data, chunks = (500, 100))
a

# we break down our (5_000, 200) array into chunks
# we have 20 chunks, each chunk is the size of (500, 100)


Unnamed: 0,Array,Chunk
Bytes,7.63 MiB,390.62 kiB
Shape,"(5000, 200)","(500, 100)"
Dask graph,20 chunks in 1 graph layer,20 chunks in 1 graph layer
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 7.63 MiB 390.62 kiB Shape (5000, 200) (500, 100) Dask graph 20 chunks in 1 graph layer Data type int64 numpy.ndarray",200  5000,

Unnamed: 0,Array,Chunk
Bytes,7.63 MiB,390.62 kiB
Shape,"(5000, 200)","(500, 100)"
Dask graph,20 chunks in 1 graph layer,20 chunks in 1 graph layer
Data type,int64 numpy.ndarray,int64 numpy.ndarray


In [None]:
a.chunks

# this shows how the arr is broken down
# (no.of chunk rows, no.of chunk columns) for chunks
# then in no.of chunk rows, each value is the number no of elements
# similarly for no.of chunk columns

((500, 500, 500, 500, 500, 500, 500, 500, 500, 500), (100, 100))

In [None]:
a.blocks[7, 0]

# access chunk[7,0]

Unnamed: 0,Array,Chunk
Bytes,390.62 kiB,390.62 kiB
Shape,"(500, 100)","(500, 100)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 390.62 kiB 390.62 kiB Shape (500, 100) (500, 100) Dask graph 1 chunks in 2 graph layers Data type int64 numpy.ndarray",100  500,

Unnamed: 0,Array,Chunk
Bytes,390.62 kiB,390.62 kiB
Shape,"(500, 100)","(500, 100)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray


### Indexing

In [14]:
a[:50, 50]

Unnamed: 0,Array,Chunk
Bytes,400 B,400 B
Shape,"(50,)","(50,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 400 B 400 B Shape (50,) (50,) Dask graph 1 chunks in 2 graph layers Data type int64 numpy.ndarray",50  1,

Unnamed: 0,Array,Chunk
Bytes,400 B,400 B
Shape,"(50,)","(50,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray


### Computation

In [15]:
a[:50, 50].compute()

array([  50,  250,  450,  650,  850, 1050, 1250, 1450, 1650, 1850, 2050,
       2250, 2450, 2650, 2850, 3050, 3250, 3450, 3650, 3850, 4050, 4250,
       4450, 4650, 4850, 5050, 5250, 5450, 5650, 5850, 6050, 6250, 6450,
       6650, 6850, 7050, 7250, 7450, 7650, 7850, 8050, 8250, 8450, 8650,
       8850, 9050, 9250, 9450, 9650, 9850])

# cudf

In [22]:
s = cudf.Series([1, 2, 3, None, 4], dtype = cp.int32)
s

0       1
1       2
2       3
3    <NA>
4       4
dtype: int32

In [None]:
ds = dask_cudf.from_cudf(s, npartitions = 2)
ds

# splits s into 2 partitions

Dask Series Structure:
npartitions=2
0    int32
3      ...
4      ...
Dask Name: frompandas, 1 expression
Expr=df

In [38]:
ds.head(n = 5, npartitions = 2)
# this will check the first 2 partitions and give first 5 elements

# if we do ds.head(n = 5, npartitions = 1)
# it will just check the first partition only

# if we do ds.head(n = 5, npartitions = -1)
# it will check all partitions and give the first 5 elements

0       1
1       2
2       3
3    <NA>
4       4
dtype: int32