# Dataset read tests

# Dask setup

In [1]:
# workers x memory_per_worker <= available memory
# threads per worker == 1 if workload is CPU intensive
# dashboard port might need to change if running multiple dask instances within lab
WORKERS = 2
MEMORY_PER_WORKER = "2GB"
THREADS_PER_WORKER = 1
DASHBOARD_PORT = ":8787"

## Local Dask cluster setup for analysis

* Install bokeh, spawn cluster, provide access point to dashboards
* Access jupyter hub at the address - https://jupyter.olcf.ornl.gov/hub/user-redirect/proxy/8787/status")
* Or access point for the Dask jupyter extension - /proxy/8787

In [2]:
# General prerequisites we want to have loaded from the get go
!pip install bokeh



In [3]:
# Cleanup
try:
    client.shutdown()
    client.close()
except Exception as e:
    pass

In [4]:
# Setup block
import os
import pwd
import glob
import pandas as pd
from distributed import LocalCluster, Client
import dask
import dask.dataframe as dd

#LOCALDIR = "/gpfs/alpine/stf218/scratch/shinw/.tmp/dask-interactive"
LOCALDIR = "/tmp/dask"

In [5]:
dask.config.set({'worker.memory': {'target': False, 'spill': False, 'pause': 0.8, 'terminate': 0.95}})
#dask.config.config

<dask.config.set at 0x7feaf949b910>

In [6]:
# Cluster creation
cluster = LocalCluster(processes=True, n_workers=WORKERS, threads_per_worker=THREADS_PER_WORKER,
                       dashboard_address=DASHBOARD_PORT, local_directory=LOCALDIR,
                       memory_limit=MEMORY_PER_WORKER)

client = Client(cluster)
cluster
print("Access jupyter hub at the address - https://jupyter.olcf.ornl.gov/hub/user-redirect/proxy/8787/status")
print("Dask jupyter extension - /proxy/8787")
client

Access jupyter hub at the address - https://jupyter.olcf.ornl.gov/hub/user-redirect/proxy/8787/status
Dask jupyter extension - /proxy/8787


distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask/dask-worker-space/worker-0_s3ij9e', purging


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 2,Total memory: 3.73 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:37905,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 2
Started: Just now,Total memory: 3.73 GiB

0,1
Comm: tcp://127.0.0.1:39941,Total threads: 1
Dashboard: http://127.0.0.1:32775/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:43435,
Local directory: /tmp/dask/dask-worker-space/worker-jous91rh,Local directory: /tmp/dask/dask-worker-space/worker-jous91rh

0,1
Comm: tcp://127.0.0.1:39015,Total threads: 1
Dashboard: http://127.0.0.1:45263/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:43649,
Local directory: /tmp/dask/dask-worker-space/worker-ew93_xqa,Local directory: /tmp/dask/dask-worker-space/worker-ew93_xqa


# Preloading tools & libraries

In [3]:
import sys
import os, glob, datetime
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns
from dask import dataframe as dd
from dask import bag as db
print("seaborn version: {}".format(sns.__version__))
print("Python version:\n{}\n".format(sys.version))
print("matplotlib version: {}".format(matplotlib.__version__))
print("pandas version: {}".format(pd.__version__))
print("numpy version: {}".format(np.__version__))

seaborn version: 0.11.2
Python version:
3.8.10 | packaged by conda-forge | (default, May 11 2021, 07:01:05) 
[GCC 9.3.0]

matplotlib version: 3.4.2
pandas version: 1.3.1
numpy version: 1.19.5


# File locations

In [2]:
DATA_BASE_PATH = "../data"

# Testing dataset I/O

## Test sample dataset A

In [11]:
DATASET_NAME = "a_"
FILES = sorted(glob.glob(f"{DATA_BASE_PATH}/{DATASET_NAME}/*.parquet"))

## Test sample dataset B

In [11]:
DATASET_NAME = "b_snapshot_10sec_24h"
FILES = sorted(glob.glob(f"{DATA_BASE_PATH}/{DATASET_NAME}/*.parquet"))

In [None]:
%%time
# Test dataset b using Dask
ddf = dd.read_parquet(
                FILES,
                index=False,
                engine="pyarrow",
                split_row_groups=True,
                gather_statistics=True
)

df = ddf[ddf['hostname'] == 'a26n16'].compute()
df.info()

In [10]:
%%time
# Test dataset b using only Pandas
df = pd.read_parquet(FILES, engine="pyarrow",)
df = df[df['hostname'] == 'a26n16']
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 558673 entries, 0 to 25512218
Data columns (total 30 columns):
 #   Column             Non-Null Count   Dtype              
---  ------             --------------   -----              
 0   timestamp          558673 non-null  datetime64[ns, UTC]
 1   hostname           558673 non-null  object             
 2   p0_gpu0_power      558564 non-null  float32            
 3   p0_gpu1_power      558561 non-null  float32            
 4   p0_gpu2_power      558563 non-null  float32            
 5   p1_gpu0_power      558561 non-null  float32            
 6   p1_gpu1_power      558561 non-null  float32            
 7   p1_gpu2_power      558561 non-null  float32            
 8   p0_power           558564 non-null  float32            
 9   p1_power           558561 non-null  float32            
 10  gpu0_core_temp     558561 non-null  float32            
 11  gpu0_mem_temp      558514 non-null  float32            
 12  gpu1_core_temp     558561 no

## Test sample dataset D

Each parquet file in dataset D is a single partition parquet file which will need around 2GB of memory to load.

In [11]:
DATASET_NAME = "d_sample_4626_24h"
FILES = sorted(glob.glob(f"{DATA_BASE_PATH}/{DATASET_NAME}/*.parquet"))

In [12]:
# Test dataset b using only Pandas
for file in FILES:
    df = pd.read_parquet(file, engine="pyarrow",)
    break
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6832386 entries, 0 to 6832385
Data columns (total 71 columns):
 #   Column           Dtype              
---  ------           -----              
 0   timestamp        datetime64[ns, UTC]
 1   node_state       object             
 2   hostname         object             
 3   p0_gpu0_power    float32            
 4   p0_gpu1_power    float32            
 5   p0_gpu2_power    float32            
 6   p1_gpu0_power    float32            
 7   p1_gpu1_power    float32            
 8   p1_gpu2_power    float32            
 9   p0_power         float32            
 10  p1_power         float32            
 11  gpu0_core_temp   float32            
 12  gpu0_mem_temp    float32            
 13  gpu1_core_temp   float32            
 14  gpu1_mem_temp    float32            
 15  gpu2_core_temp   float32            
 16  gpu2_mem_temp    float32            
 17  gpu3_core_temp   float32            
 18  gpu3_mem_temp    float32            
 19  

## Test sample dataset E

In [7]:
DATASET_NAME = "e_full_10sec_100hosts"
DATASET_PATH = f"{DATA_BASE_PATH}/{DATASET_NAME}"

In [8]:
import glob
import pandas as pd

def get_host_dataframe(
    input_path = DATASET_PATH,
    hostnames = [],
    months = ["202001", "202008", "202102", "202108", "202201"],
    sort_values=["hostname", "timestamp"],
    set_index=["hostname"],
    columns=None,
):
    print(f"[reading time series for {hostnames} during {months}]")
    if columns != None:
        if "hostname" not in columns:
            columns.push("hostname")
        if "timestamp" not in columns:
            columns.push("timestamp")
    
    # Iterate all the files and fetch data for only the hostnames we're interested
    df_list = []
    for month in months:
        print(f"- reading {month}")
        files = sorted(glob.glob(f"{input_path}/{month}*.parquet"))
        for file in files:
            df = pd.read_parquet(file, engine="pyarrow", columns=columns)
            if hostnames != []:
                mask = df['hostname'].isin(hostnames)
                df_list.append(df[mask])
            else:
                df_list.append(df)
        
    print("- merging dataframe")
    df = pd.concat(df_list).reset_index(drop=True)
    
    print(f"- sorting based on {sort_values}")
    if sort_values != []:
        df = df.sort_values(sort_values)
    if set_index != []:
        df = df.set_index(set_index)
    print("- read success")
    return df    

In [9]:
df = get_host_dataframe(hostnames = ['f04n08', 'e34n12'], columns=["timestamp", "hostname", "ps0_input_power", "ps1_input_power"])

[reading time series for ['f04n08', 'e34n12'] during ['202001', '202008', '202102', '202108', '202201']]
- reading 202001
- reading 202008
- reading 202102
- reading 202108
- reading 202201
- merging dataframe
- sorting based on ['hostname', 'timestamp']
- read success


In [10]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2495783 entries, e34n12 to f04n08
Data columns (total 3 columns):
 #   Column           Dtype              
---  ------           -----              
 0   timestamp        datetime64[ns, UTC]
 1   ps0_input_power  float32            
 2   ps1_input_power  float32            
dtypes: datetime64[ns, UTC](1), float32(2)
memory usage: 57.1+ MB
