In [1]:
CHUNK_SIZE_BYTE = 512 * 1024 * 1024

EDGE_FILE = '/mnt/nvme2/data/papers100M/raw/papers100M-bin/raw/data.npz'
TRAINING_FILE = '/mnt/nvme2/data/papers100M/raw/papers100M-bin/split/time/train.csv'

# write destination
STREAMING_EDGE_FILE = '/mnt/nvme2/data/papers100M/preprocessed2/streaming_edges.bin'
CHUNK_INFO_FILE = '/mnt/nvme2/data/papers100M/preprocessed2/chunks.txt'
RANDOM_READ_EDGE_FILE = '/mnt/nvme2/data/papers100M/preprocessed2/random_read_edges.bin'
OFFSETS_FILE = '/mnt/nvme2/data/papers100M/preprocessed2/offsets.bin'
TRAINING_BIN_FILE = '/mnt/nvme2/data/papers100M/preprocessed2/train.bin'

SEQUENTIAL_READ_EDGE_FILE = '/mnt/nvme2/data/papers100M/preprocessed2/sequential_read_edges.bin'
SEQUENTIAL_READ_CHUNK_INFO_FILE = '/mnt/nvme2/data/papers100M/preprocessed2/sequential_read_chunks.txt'


In [2]:
import numpy as np
from tqdm.auto import tqdm
import struct
import pandas as pd

### Load from raw files

In [3]:
LOAD_FROM_RAW = True

In [4]:
def compute_degrees(edge_index, n_nodes):
    degrees = np.zeros(n_nodes, dtype=np.uint32)
    pbar = tqdm(total=edge_index[1].shape[0], desc='Computing degrees')
    left = 0
    pre_node = edge_index[1][0]
    for i, node in enumerate(edge_index[1]):
        if i % 10000000 == 0:
            pbar.update(10000000)
        
        if node != pre_node:
            degrees[pre_node] = i - left
            left = i
            pre_node = node

    degrees[edge_index[1][-1]] = edge_index[1].shape[0] - left
    pbar.close()
    return degrees

def sort_the_edges(edges, degrees):
    pbar = tqdm(total=len(degrees), desc="Sort the neighbors of each node")
    pos = 0
    for i, degree in enumerate(degrees):
        if i % 100000 == 0:
            pbar.update(100000)
        
        # assert degree >= 0, f"Node {i} has negative degree"
        # assert is_assending(edges[pos:pos+degree]), f"Edges of node {i} is not in ascending order, edges={edges[pos:pos+degree]}"
        edges[pos:pos+degree] = np.sort(edges[pos:pos+degree])

        pos += degree
    pbar.close()
    return edges

def compute_offsets(degrees):
    offsets = np.zeros(len(degrees), dtype=np.uint32)
    offsets[0] = 0
    pbar = tqdm(total=len(degrees), desc="Compute offsets")
    for i in range(len(degrees) - 1):
        offsets[i+1] = offsets[i] + degrees[i]
        if i % 1000000 == 0:
            pbar.update(1000000)
    pbar.close()
    return offsets

def load_training_file(file_path):
    df = pd.read_csv(file_path, header=None, names=['id'])
    return np.array(df['id'].values, dtype=np.uint32)

In [5]:
if LOAD_FROM_RAW:
    data = np.load(EDGE_FILE)
    n_nodes = data['num_nodes_list'][0]
    edge_index = data['edge_index']
    print('n_nodes', n_nodes, 'edge_index.shape', edge_index.shape)
    degrees = compute_degrees(edge_index, n_nodes)
    edges = np.array(edge_index[0], dtype=np.uint32)
    # edges = sort_the_edges(edges, degrees)
    offsets = compute_offsets(degrees)
    train = load_training_file(TRAINING_FILE)

n_nodes 111059956 edge_index.shape (2, 1615685872)


Computing degrees:   0%|          | 0/1615685872 [00:00<?, ?it/s]

Compute offsets:   0%|          | 0/111059956 [00:00<?, ?it/s]

In [6]:
def is_assending(arr):
    return np.all(np.diff(arr) >= 0)

### Verification of raw data that should satistfy some assumtion

In [7]:
VERIFY_RAW = False

In [8]:
if VERIFY_RAW:
    pre_node = -1
    # pbar = tqdm(total=edge_index[1].shape[0],  desc="Verifying destination nodes are in ascending order")
    # for i, node in enumerate(edge_index[1]):
    #     if i % 10000000 == 0:
    #         pbar.update(10000000)
    #     if node != pre_node:
    #         assert node > pre_node, f"Node ID {node} is not in ascending order"
    #         pre_node = node
    print(is_assending(edge_index[1]))
    # pbar.close()

    pbar = tqdm(total=len(degrees), desc="Verifying source nodes are in ascending order")
    pos = 0
    for i, degree in enumerate(degrees):
        if i % 100000 == 0:
            pbar.update(100000)
        
        assert degree >= 0, f"Node {i} has negative degree"
        assert is_assending(edges[pos:pos+degree]), f"Edges of node {i} is not in ascending order, edges={edges[pos:pos+degree]}"

        pos += degree

### Write the preprocessed data

In [9]:
ENABLE_WRITE = True

In [10]:
def analysis_chunks(degrees, chunk_size=CHUNK_SIZE_BYTE):
    max_ints = chunk_size // 4

    chunks = []
    start = 0
    current_chunk_size = 3 # total, start_id, first offsets
    pbar = tqdm(total=len(degrees), desc="Analyzing chunks")
    for i in range(len(degrees)):
        if i % 100000 == 0:
            pbar.update(100000)
        if current_chunk_size + degrees[i] + 1 > max_ints:
            chunks.append((start, i-1))
            start = i
            current_chunk_size = 3
        current_chunk_size += degrees[i] + 1
    chunks.append((start, len(degrees)-1))
    pbar.close()
    return chunks

def print_chunk_details(chunks, degrees):
    print("Chunk Details")
    print("="*12*5)
    print(f"{'start':>12}{'end':>12}{'n_nodes':>12}{'size(bytes)':>12}{'fill up':>12}")
    for chunk in chunks:
        start = chunk[0]
        end = chunk[1]
        n_nodes = sum(degrees[start:end+1])
        size = (1 + 1 + (end - start + 2) + n_nodes ) * 4
        print(f"{start:12d}{end:12d}{n_nodes:12d}{size:12d}{size/CHUNK_SIZE_BYTE*100:9.2f}%")

    print("len(chunks)", len(chunks))
    print("chunks", chunks)


def create_padded_file(filepath, chunks, degrees, edges):
    pbar = tqdm(total=len(chunks), desc="Creating padded file")
    with open(filepath, "wb") as file:
        edges_cnt = 0
        for t, chunk in enumerate(chunks):
            # write buffer
            byte_data_list = []
            # total number of nodes, little endian, unsigned int
            total_nodes = chunk[1] - chunk[0] + 1
            byte_data_list.append(struct.pack("<I", total_nodes))
            # first node id, little endian, unsigned int
            byte_data_list.append(struct.pack("<I", chunk[0]))
            # offsets, little endian, unsigned int
            cnt = total_nodes + 2 + 1
            byte_data_list.append(struct.pack("<I", cnt))
            for i in range(chunk[0], chunk[1]+1):
                cnt += degrees[i]
                byte_data_list.append(struct.pack("<I", cnt))
            # edges, little endian, unsigned int
            for i in range(chunk[0], chunk[1]+1):
                for j in range(edges_cnt, edges_cnt + degrees[i]):
                    byte_data_list.append(struct.pack("<I", edges[j]))
                edges_cnt += degrees[i]
            # padding zeros, unsigned int
            # For the last chunk, padding to make the chunk size a multiple of 512
            byte_data = b''.join(byte_data_list)
            file.write(byte_data)
            if t != len(chunks) - 1:
                bytes_to_add = CHUNK_SIZE_BYTE - len(byte_data)
            else:
                bytes_to_add = 512 - len(byte_data) % 512
            file.write(b'\x00' * bytes_to_add)
            pbar.update(1)
            print(t, len(byte_data), bytes_to_add, len(byte_data) + bytes_to_add)
            
    pbar.close()

def create_chunk_file(filepath, chunks):
    with open(filepath, "w") as fh:
        # fh.write(f"{len(chunks)}\n")
        for chunk in chunks:
            # fh.write(f"{chunk[0]} {chunk[1]}\n")
            fh.write(f"{chunk[1]}\n")

def create_training_file(filepath, train):
    train.tofile(filepath)

In [11]:
def create_seq_padded_file(filepath, chunks, degrees, edges):
    pbar = tqdm(total=len(chunks), desc="Creating padded file")
    with open(filepath, "wb") as file:
        edges_cnt = 0
        for t, chunk in enumerate(chunks):
            # write buffer
            byte_data_list = []
            # total number of nodes, little endian, unsigned int
            total_nodes = chunk[1] - chunk[0] + 1
            byte_data_list.append(struct.pack("<I", total_nodes))
            # first node id, little endian, unsigned int
            byte_data_list.append(struct.pack("<I", chunk[0]))
            # offsets, little endian, unsigned int
            for i in range(chunk[0], chunk[1]+1):
                byte_data_list.append(struct.pack("<I", degrees[i]))
                for j in range(edges_cnt, edges_cnt + degrees[i]):
                    byte_data_list.append(struct.pack("<I", edges[j]))
                edges_cnt += degrees[i]
            
            byte_data = b''.join(byte_data_list)
            file.write(byte_data)
            if t != len(chunks) - 1:
                bytes_to_add = CHUNK_SIZE_BYTE - len(byte_data)
            else:
                bytes_to_add = 512 - len(byte_data) % 512
            file.write(b'\x00' * bytes_to_add)
            pbar.update(1)
            print(t, len(byte_data), bytes_to_add, len(byte_data) + bytes_to_add)
            
    pbar.close()

In [12]:
# seq kernel

if ENABLE_WRITE:
    chunks = analysis_chunks(degrees)
    print_chunk_details(chunks, degrees)
    create_seq_padded_file(SEQUENTIAL_READ_EDGE_FILE, chunks, degrees, edges)
    create_chunk_file(SEQUENTIAL_READ_CHUNK_INFO_FILE, chunks)

Analyzing chunks:   0%|          | 0/111059956 [00:00<?, ?it/s]

Chunk Details
       start         end     n_nodes size(bytes)     fill up
           0    10521746   123695935   536870740   100.00%
    10521747    16564523   128167549   536841316    99.99%
    16564524    21486461   129295773   536870856   100.00%
    21486462    26399870   129304299   536870844   100.00%
    26399871    31310495   129306910   536870152   100.00%
    31310496    36218482   129309715   536870820   100.00%
    36218483    41128311   129307864   536870784   100.00%
    41128312    45803992   129540167   536863404   100.00%
    45803993    49503593   130518124   536870912   100.00%
    49503594    53191139   130530115   536870656   100.00%
    53191140    63352413   124056435   536870848   100.00%
    63352414    80917735   116652065   536869560   100.00%
    80917736   111059955    86000921   464572576    86.53%
len(chunks) 13
chunks [(0, 10521746), (10521747, 16564523), (16564524, 21486461), (21486462, 26399870), (26399871, 31310495), (31310496, 36218482), (36218483,

Creating padded file:   0%|          | 0/13 [00:00<?, ?it/s]

0 536870736 176 536870912
1 536841312 29600 536870912
2 536870852 60 536870912
3 536870840 72 536870912
4 536870148 764 536870912
5 536870816 96 536870912
6 536870780 132 536870912
7 536863400 7512 536870912
8 536870908 4 536870912
9 536870652 260 536870912
10 536870844 68 536870912
11 536869556 1356 536870912
12 464572572 356 464572928


In [11]:
if ENABLE_WRITE:
    chunks = analysis_chunks(degrees)
    print_chunk_details(chunks, degrees)
    create_padded_file(STREAMING_EDGE_FILE, chunks, degrees, edges)
    create_chunk_file(CHUNK_INFO_FILE, chunks)
    create_training_file(TRAINING_BIN_FILE, train)

Analyzing chunks:   0%|          | 0/111059956 [00:00<?, ?it/s]

Chunk Details
       start         end     n_nodes size(bytes)     fill up
           0    10521746   123695935   536870740   100.00%
    10521747    16564523   128167549   536841316    99.99%
    16564524    21486461   129295773   536870856   100.00%
    21486462    26399870   129304299   536870844   100.00%
    26399871    31310495   129306910   536870152   100.00%
    31310496    36218482   129309715   536870820   100.00%
    36218483    41128311   129307864   536870784   100.00%
    41128312    45803992   129540167   536863404   100.00%
    45803993    49503593   130518124   536870912   100.00%
    49503594    53191139   130530115   536870656   100.00%
    53191140    63352413   124056435   536870848   100.00%
    63352414    80917735   116652065   536869560   100.00%
    80917736   111059955    86000921   464572576    86.53%
len(chunks) 13
chunks [(0, 10521746), (10521747, 16564523), (16564524, 21486461), (21486462, 26399870), (26399871, 31310495), (31310496, 36218482), (36218483,

Creating padded file:   0%|          | 0/13 [00:00<?, ?it/s]

0 536870740 172 536870912
1 536841316 29596 536870912
2 536870856 56 536870912
3 536870844 68 536870912
4 536870152 760 536870912
5 536870820 92 536870912
6 536870784 128 536870912
7 536863404 7508 536870912
8 536870912 0 536870912
9 536870656 256 536870912
10 536870848 64 536870912
11 536869560 1352 536870912
12 464572576 352 464572928


In [12]:
CREATE_RANDOM_READ_SAMPLER_FILE = True

In [13]:
def pad_file_to_512_multiple(edges, output_file_path):
    # write the whole edges as binary file
    print(edges.dtype.byteorder)
    assert edges.dtype.byteorder != '>', "Numpy array shoule not be big endian"
    edges.tofile(output_file_path)
    padding_needed = 512 - (len(edges) * 4) % 512

    if padding_needed > 0:
         with open(output_file_path, 'ab') as file:
            file.write(b'\x00' * padding_needed)

    print(f"File {output_file_path} has been padded with {padding_needed} bytes.")
    

def create_offsets_file(offsets, output_file_path):
    offsets.tofile(output_file_path)

In [14]:
if CREATE_RANDOM_READ_SAMPLER_FILE:
    pad_file_to_512_multiple(edges, RANDOM_READ_EDGE_FILE)
    create_offsets_file(offsets, OFFSETS_FILE)

=
File /mnt/nvme2/data/papers100M/preprocessed2/random_read_edges.bin has been padded with 64 bytes.


## Preprocessing for random read sampler

In [15]:
def pad_file_to_512_multiple(file_path):
    # Open the file in binary mode
    with open(file_path, 'ab') as file:
        # Determine the current file size
        file.seek(0, 2)  # Move to the end of the file
        file_size = file.tell()
        
        # Calculate the padding needed
        padding_needed = (512 - (file_size % 512)) + 512
        
        # Append zeros to pad the file to a multiple of 512 bytes
        if padding_needed > 0:
            file.write(b'\x00' * padding_needed)
            
        print(f"File {file_path} has been padded with {padding_needed} bytes.")

# Example usage
file_path = '/mnt/nvme2/data/yahoo/preprocessed/streaming_edges.bin'
pad_file_to_512_multiple(file_path)

File /mnt/nvme2/data/yahoo/preprocessed/streaming_edges.bin has been padded with 1024 bytes.


### verification

In [16]:
# streaming_edges = np.fromfile( '/mnt/nvme2/data/papers100M/preprocessed/edges_padded.bin', dtype=np.uint32)
streaming_edges = np.fromfile( STREAMING_EDGE_FILE, dtype=np.uint32)

In [18]:
for i in range(len(streaming_edges)):
    if streaming_edges[i] == 134217728:
        print(i, streaming_edges[i])
        break

1077441427 134217728


In [17]:
pbar = tqdm(total=13)
for t in range(13):
    start_pos = t * CHUNK_SIZE_BYTE // 4
    cur_total = streaming_edges[start_pos + 0]
    start_node = streaming_edges[start_pos + 1]
    cur_offsets = streaming_edges[start_pos + 2: start_pos + 2 + cur_total + 1]
    print(t, cur_total, start_node, max(streaming_edges[start_pos + 2 + cur_total + 1:]))
    is_assending = np.all(cur_offsets[1:] >= cur_offsets[:-1])
    assert is_assending, f"Chunk {t} is not in ascending order"
    pbar.update(1)
pbar.close()

  0%|          | 0/13 [00:00<?, ?it/s]

0 10521747 0 134217728
1 6042777 10521747 134217728
2 4921938 16564524 134217728
3 4913409 21486462 134217728
4 4910625 26399871 134217728
5 4907987 31310496 134217728
6 4909829 36218483 134217728
7 4675681 41128312 134217728


KeyboardInterrupt: 

In [None]:
max(edges)

111059955