## How DataFrames (DF) and DataPipes (DP) can work together

In [1]:
from importlib import reload
import torch
reload(torch)
from torch.utils.data import IterDataPipe

In [2]:
# Example IterDataPipe
class ExampleIterPipe(IterDataPipe):
    def __init__(self, range = 20):
        self.range = range
    def __iter__(self):
        for i in range(self.range):
            yield i

def get_dataframes_pipe(range = 10, dataframe_size = 7):
    return ExampleIterPipe(range = range).map(lambda i: (i, i % 3)).to_dataframes_pipe(columns = ['i','j'], dataframe_size = dataframe_size)

def get_regular_pipe(range = 10):
    return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))


Doesn't matter how DF composed internally, iterator over DF Pipe gives single rows to user. This is similar to regular DataPipe.

In [3]:
print('DataFrames Pipe')
dp = get_dataframes_pipe()
for i in dp:
    print(i)

print('Regular DataPipe')
dp = get_regular_pipe()
for i in dp:
    print(i)

DataFrames Pipe
   i  j
0  0  0
   i  j
1  1  1
   i  j
2  2  2
   i  j
3  3  0
   i  j
4  4  1
   i  j
5  5  2
   i  j
6  6  0
   i  j
0  7  1
   i  j
1  8  2
   i  j
2  9  0
Regular DataPipe
(0, 0)
(1, 1)
(2, 2)
(3, 0)
(4, 1)
(5, 2)
(6, 0)
(7, 1)
(8, 2)
(9, 0)


You can iterate over raw DF using `raw_iterator`

In [4]:
dp = get_dataframes_pipe()
for i in dp.raw_iterator():
    print(i)

   i  j
0  0  0
1  1  1
2  2  2
3  3  0
4  4  1
5  5  2
6  6  0
   i  j
0  7  1
1  8  2
2  9  0


Operations over DF Pipe is captured

In [5]:
dp = get_dataframes_pipe(dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
print(dp.ops_str())


var_4 = input_var_3.i * 100
var_5 = var_4 + input_var_3.j
var_6 = var_5 - 2.7
input_var_3["y"] = var_6


Captured operations executed on `__next__` calls of constructed DataPipe

In [6]:
dp = get_dataframes_pipe(dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
for i in dp.raw_iterator():
    print(i)

   i  j      y
0  0  0   -2.7
1  1  1   98.3
2  2  2  199.3
   i  j      y
0  3  0  297.3
1  4  1  398.3
2  5  2  499.3
   i  j      y
0  6  0  597.3
1  7  1  698.3
2  8  2  799.3
   i  j      y
0  9  0  897.3


`shuffle` of DataFramePipe effects rows in individual manner

In [7]:
dp = get_dataframes_pipe(dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
dp = dp.shuffle()
for i in dp.raw_iterator():
    print(i)

# this is similar to shuffle of regular DataPipe
dp = get_regular_pipe()
dp = dp.shuffle()
for i in dp:
    print(i)

   i  j      y
2  2  2  199.3
0  3  0  297.3
1  4  1  398.3
   i  j      y
2  5  2  499.3
1  7  1  698.3
2  8  2  799.3
   i  j      y
0  6  0  597.3
0  0  0   -2.7
1  1  1   98.3
   i  j      y
0  9  0  897.3
(9, 0)
(0, 0)
(8, 2)
(1, 1)
(5, 2)
(2, 2)
(4, 1)
(6, 0)
(7, 1)
(3, 0)


You can continue mixing DF and DP operations

In [8]:
dp = get_dataframes_pipe(dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
dp = dp.shuffle()
dp = dp - 17
dp['y'] = dp.y * 10000
for i in dp.raw_iterator():
    print(i)

    i   j          y
0 -14 -17  2803000.0
1 -13 -16  3813000.0
0 -17 -17  -197000.0
    i   j          y
2  -9 -15  7823000.0
0  -8 -17  8803000.0
2 -12 -15  4823000.0
    i   j          y
0 -11 -17  5803000.0
2 -15 -15  1823000.0
1 -16 -16   813000.0
    i   j          y
1 -10 -16  6813000.0


Batching combines everything into `list` it is possible to nest `list`s. List may have any number of DataFrames as soon as total number of rows equal to batch size.

In [9]:
dp = get_dataframes_pipe(dataframe_size = 3)
dp = dp.shuffle()
dp = dp.batch(2)
for i,v in enumerate(dp):
    print(v)

# this is similar to batching of regular DataPipe
dp = get_regular_pipe()
dp = dp.shuffle()
dp = dp.batch(2)
for i in dp:
    print(i)

[   i  j
0  3  0
1  4  1]
[   i  j
2  5  2
0  9  0]
[   i  j
0  0  0
0  6  0]
[   i  j
1  7  1
2  2  2]
[   i  j
2  8  2
1  1  1]
[(1, 1), (0, 0)]
[(6, 0), (2, 2)]
[(5, 2), (9, 0)]
[(4, 1), (8, 2)]
[(3, 0), (7, 1)]


`concat` should work only of DF with same schema, this code should produce an error 

In [10]:
dp0 = get_dataframes_pipe(range = 8, dataframe_size = 4)
dp = get_dataframes_pipe(range = 6, dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
dp = dp.concat(dp0)
for i,v in enumerate(dp.raw_iterator()):
    print(v)

   i  j      y
0  0  0   -2.7
1  1  1   98.3
2  2  2  199.3
   i  j      y
0  3  0  297.3
1  4  1  398.3
2  5  2  499.3
   i  j
0  0  0
   i  j
1  1  1
   i  j
2  2  2
   i  j
3  3  0
   i  j
0  4  1
   i  j
1  5  2
   i  j
2  6  0
   i  j
3  7  1


`unbatch` of `list` with DataFrame works similarly to regular unbatch.
Note: DataFrame sizes might change

In [11]:
dp = get_dataframes_pipe(range = 18, dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
dp = dp.batch(5).batch(3).batch(1).unbatch(unbatch_level = 3)

# Here is bug with unbatching which doesn't detect DF type.
dp['z'] = dp.y - 100

for i in dp.raw_iterator():
    print(i)

   i  j      y      z
0  0  0   -2.7 -102.7
1  1  1   98.3   -1.7
2  2  2  199.3   99.3
0  3  0  297.3  197.3
1  4  1  398.3  298.3
   i  j      y      z
2  5  2  499.3  399.3
0  6  0  597.3  497.3
1  7  1  698.3  598.3
2  8  2  799.3  699.3
0  9  0  897.3  797.3
    i  j       y       z
1  10  1   998.3   898.3
2  11  2  1099.3   999.3
0  12  0  1197.3  1097.3
1  13  1  1298.3  1198.3
2  14  2  1399.3  1299.3
    i  j       y       z
0  15  0  1497.3  1397.3
1  16  1  1598.3  1498.3
2  17  2  1699.3  1599.3


`map` applied to individual rows, `nesting_level` argument used to penetrate batching

In [12]:
dp = get_dataframes_pipe(range = 10, dataframe_size = 3)
dp = dp.map(lambda x: x + 1111)
dp = dp.batch(5).map(lambda x: x / 1000, nesting_level = 1)

for i in dp:
    print(i)

# Similarly works on row level for classic DataPipe elements
dp = get_regular_pipe(range = 10)
dp = dp.map(lambda x: (x[0] + 1111, x[1]))
dp = dp.batch(5).map(lambda x: (x[0] / 1000, x[1]), nesting_level = 1)

for i in dp:
    print(i)



[       i      j
0  1.111  1.111
1  1.112  1.112
2  1.113  1.113
0  1.114  1.111
1  1.115  1.112]
[       i      j
2  1.116  1.113
0  1.117  1.111
1  1.118  1.112
2  1.119  1.113
0  1.120  1.111]
[(1.111, 0), (1.112, 1), (1.113, 2), (1.114, 0), (1.115, 1)]
[(1.116, 2), (1.117, 0), (1.118, 1), (1.119, 2), (1.12, 0)]


`filter` applied to individual rows, `nesting_level` argument used to penetrate batching

In [13]:
dp = get_dataframes_pipe(range = 30, dataframe_size = 3)
dp = dp.filter(lambda x: x.i > 5)
dp = dp.batch(5).filter(lambda x: x.i < 13, nesting_level = 1)

for i in dp:
    print(i)

# Similarly works on row level for classic DataPipe elements
dp = get_regular_pipe(range = 30)
dp = dp.filter(lambda x: x[0] > 5)
dp = dp.batch(5).filter(lambda x: x[0] < 13, nesting_level = 1)

for i in dp:
    print(i)

[    i  j
0   6  0
1   7  1
2   8  2
0   9  0
1  10  1]
[    i  j
2  11  2
0  12  0]
[(6, 0), (7, 1), (8, 2), (9, 0), (10, 1)]
[(11, 2), (12, 0)]
