In [None]:
%%capture
%pip install -U 'rockfish[labs]' -f 'https://docs142.rockfish.ai/packages/index.html'

In [None]:
import io
import rockfish as rf
import rockfish.actions as ra

## Fill missing values
1. fill by indicated value
2. fill by previous value
3. fill by next value
4. fill by its mean
5. fill by its median


In [None]:
# create a dataset with missing value
data = b"""\
a,b,c
1,2,3
4,5,6
,7,8
9,0,1
"""

dataset = rf.Dataset.from_csv("nulls", io.BytesIO(data))


### 1. fill missing values by the indicated value

In [None]:
dataset.to_pandas()

Unnamed: 0,a,b,c
0,1.0,2,3
1,4.0,5,6
2,,7,8
3,9.0,0,1


In [None]:
conn = rf.Connection.local()

In [None]:
fill_value = 42
fill_col = "a"
fill_null = ra.Transform({"function": {"fill_null": [fill_col, fill_value]}})

In [None]:
save = rf.actions.DatasetSave(name="fill_value_dataset")
builder = rf.WorkflowBuilder.local()
builder.add_dataset(dataset)
builder.add_action(fill_null, parents=[dataset])
builder.add_action(save, parents=[fill_null])
workflow = await builder.start(conn)
print(f"Workflow: {workflow.id()}")

Workflow: 6e74dca2-d76a-49a6-adb6-ff9600670b75


In [None]:
new_dataset = None
async for sds in workflow.datasets():
    new_dataset = await sds.to_local(conn)
new_dataset.to_pandas()

Unnamed: 0,a,b,c
0,1,2,3
1,4,5,6
2,42,7,8
3,9,0,1


### 2. fill missing values by its previous value in that column

In [None]:
dataset.to_pandas()

Unnamed: 0,a,b,c
0,1.0,2,3
1,4.0,5,6
2,,7,8
3,9.0,0,1


In [None]:
fill_col = "a"
fill_null = ra.Transform({"function": {"fill_null_forward": [fill_col]}})

In [None]:
save = rf.actions.DatasetSave(name="fill_null_forward_dataset")
builder = rf.WorkflowBuilder()
builder.add_dataset(dataset)
builder.add_action(fill_null, parents=[dataset])
builder.add_action(save, parents=[fill_null])
workflow = await builder.start(conn)
print(f"Workflow: {workflow.id()}")

Workflow: 29ac804f-46e1-463b-a2e8-7c0338192fd4


In [None]:
new_dataset = None
async for sds in workflow.datasets():
    new_dataset = await sds.to_local(conn)
new_dataset.to_pandas()

Unnamed: 0,a,b,c
0,1,2,3
1,4,5,6
2,4,7,8
3,9,0,1


### 3. fill missing values by its next value in that column

In [None]:
dataset.to_pandas()

Unnamed: 0,a,b,c
0,1.0,2,3
1,4.0,5,6
2,,7,8
3,9.0,0,1


In [None]:
fill_col = "a"
fill_null = ra.Transform({"function": {"fill_null_backward": [fill_col]}})

In [None]:
save = rf.actions.DatasetSave(name="fill_null_backward_dataset")
builder = rf.WorkflowBuilder()
builder.add_dataset(dataset)
builder.add_action(fill_null, parents=[dataset])
builder.add_action(save, parents=[fill_null])
workflow = await builder.start(conn)
print(f"Workflow: {workflow.id()}")

Workflow: 3820b4b8-1a7c-45e9-ab1a-0b772a77f55f


In [None]:
new_dataset = None
async for sds in workflow.datasets():
    new_dataset = await sds.to_local(conn)
new_dataset.to_pandas()

Unnamed: 0,a,b,c,new_a
0,1.0,2,3,1
1,4.0,5,6,4
2,,7,8,42
3,9.0,0,1,9


### 4. fill missing values by its mean value in that column

In [None]:
dataset.to_pandas()

Unnamed: 0,a,b,c
0,1.0,2,3
1,4.0,5,6
2,,7,8
3,9.0,0,1


In [None]:
fill_col = "a"
fill_method = "mean"
fill_null = ra.Transform({"function": {"fill_null_aggregation": [fill_col, fill_method]}})

In [None]:
save = rf.actions.DatasetSave(name="fill_mean_dataset")
builder = rf.WorkflowBuilder()
builder.add_dataset(dataset)
builder.add_action(fill_null, parents=[dataset])
builder.add_action(save, parents=[fill_null])
workflow = await builder.start(conn)
print(f"Workflow: {workflow.id()}")

Workflow: f15386cc-efa8-4a42-bed8-005d8dd06222


In [None]:
new_dataset = None
async for sds in workflow.datasets():
    new_dataset = await sds.to_local(conn)
new_dataset.to_pandas()

Unnamed: 0,a,b,c
0,1.0,2,3
1,4.0,5,6
2,4.666667,7,8
3,9.0,0,1


### 5. fill missing values by its median value in that column

In [None]:
dataset.to_pandas()

Unnamed: 0,a,b,c
0,1.0,2,3
1,4.0,5,6
2,,7,8
3,9.0,0,1


In [None]:
fill_col = "a"
fill_method = "median"
fill_null = ra.Transform({"function": {"fill_null_aggregation": [fill_col, fill_method]}})

In [None]:
save = rf.actions.DatasetSave(name="fill_median_dataset")
builder = rf.WorkflowBuilder()
builder.add_dataset(dataset)
builder.add_action(fill_null, parents=[dataset])
builder.add_action(save, parents=[fill_null])
workflow = await builder.start(conn)
print(f"Workflow: {workflow.id()}")

Workflow: 444a5246-7018-4b80-b739-44cd2feff297


In [None]:
new_dataset = None
async for sds in workflow.datasets():
    new_dataset = await sds.to_local(conn)
new_dataset.to_pandas()

Unnamed: 0,a,b,c
0,1.0,2,3
1,4.0,5,6
2,4.0,7,8
3,9.0,0,1


## Append new column for the transformed field

Add new column for the result after filling missing with indicated values and the original column with missing values keeps the same

In [None]:
dataset.to_pandas()

Unnamed: 0,a,b,c
0,1.0,2,3
1,4.0,5,6
2,,7,8
3,9.0,0,1


In [None]:
fill_value = 42
fill_col = "a"
new_col_name = "new_a"
fill_null = ra.Apply({"function": {"fill_null": [fill_col, fill_value]}, "append_field": new_col_name})

In [None]:
save = ra.DatasetSave({"name": "new_column_filled_dataset"})
builder = rf.WorkflowBuilder()
builder.add_dataset(dataset)
builder.add_action(fill_null, parents=[dataset])
builder.add_action(save, parents=[fill_null])
workflow = await builder.start(conn)

print(f"Workflow: {workflow.id()}")

Workflow: ec7c53f5-2e2f-4a7d-878c-c4efe69c5a07


In [None]:
new_dataset = None
async for sds in workflow.datasets():
    new_dataset = await sds.to_local(conn)
new_dataset.to_pandas()

Unnamed: 0,a,b,c,new_a
0,1.0,2,3,1
1,4.0,5,6,4
2,,7,8,42
3,9.0,0,1,9
