In [None]:
!pip install torch==2.3.0 --index-url https://download.pytorch.org/whl/cpu
!pip install dgl==2.3.0 -f https://data.dgl.ai/wheels/torch-2.3/repo.html

In [None]:
!pip install graphstorm

In [3]:
import os
os.environ["DGL_SKIP_GRAPHBOLT"] = "1"
import dgl

DGL backend not selected or invalid.  Assuming PyTorch for now.


Setting the default backend to "pytorch". You can change it in the ~/.dgl/config.json file or export the DGLBACKEND environment variable.  Valid options are: pytorch, mxnet, tensorflow (all lowercase)


################################################################################
The 'datapipes', 'dataloader2' modules are deprecated and will be removed in a
future torchdata release! Please see https://github.com/pytorch/data/issues/1196
to learn more and leave feedback.
################################################################################

  from .autonotebook import tqdm as notebook_tqdm
  from pkg_resources import parse_version


In [4]:
import graphstorm
print("GraphStorm:", graphstorm.__version__)

GraphStorm: 0.5.0post1


In [5]:
import boto3
import json
import pandas as pd
import gzip
from io import BytesIO
from collections import defaultdict

In [None]:
# Initialize S3 client
s3 = boto3.client('s3')
bucket_name = 'crowd-raw'
prefix = 'raw/year=2025/month=11/day=30/hour=09'  # S3 folder containing JSONs

# Configuration
RSSI_THRESHOLD = -40
MIN_STRONG_CONNECTIONS = 3

def list_gz_files(bucket, prefix):
    files = []
    paginator = s3.get_paginator('list_objects_v2')
    
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        if 'Contents' in page:
            for obj in page['Contents']:
                key = obj['Key']
                if key.endswith('.gz'):
                    files.append(key)
    
    return files

print("Listing files from S3...")
gz_files = list_gz_files(bucket_name, prefix)
print(f"Found {len(gz_files)} .gz files")

# Initialize data structures
node_connections = defaultdict(list)
nodes = set()
edges = []

# Process each file
for idx, file_key in enumerate(gz_files, 1):
    print(f"Processing {idx}/{len(gz_files)}: {file_key}")
    
    # Download and decompress
    obj = s3.get_object(Bucket=bucket_name, Key=file_key)
    
    # Decompress gzip content
    with gzip.GzipFile(fileobj=BytesIO(obj['Body'].read())) as gzipfile:
        content = gzipfile.read().decode('utf-8')
    
    # Process each line
    for line in content.strip().split('\n'):
        if not line:
            continue
        try:
            data = json.loads(line)
            anchor_id = data['anchor_id']
            timestamp = data['timestamp']
            nodes.add(anchor_id)
            
            for peer in data['peers']:
                peer_id = peer['id']
                rssi = peer['rssi']
                nodes.add(peer_id)
                
                edges.append({
                    'source': anchor_id,
                    'destination': peer_id,
                    'rssi': rssi,
                    'timestamp': timestamp
                })
                
                if rssi >= RSSI_THRESHOLD:
                    node_connections[anchor_id].append(peer_id)
                    node_connections[peer_id].append(anchor_id)
        except Exception as e:
            continue

# Create labels
node_strong_counts = {node: len(set(node_connections[node])) for node in nodes}

nodes_df = pd.DataFrame({
    'node_id': list(nodes),
    'node_type': ['anchor'] * len(nodes),
    'strong_connection_count': [node_strong_counts.get(node, 0) for node in nodes],
    'label': [1 if node_strong_counts.get(node, 0) >= MIN_STRONG_CONNECTIONS else 0 
              for node in nodes]
})

edges_df = pd.DataFrame(edges)

print(f"\nResults:")
print(f"Nodes: {len(nodes_df)}")
print(f"Edges: {len(edges_df)}")
print(f"Label distribution:\n{nodes_df['label'].value_counts()}")

# Save
nodes_df.to_csv('nodes_with_labels.csv', index=False)
edges_df.to_csv('edges.csv', index=False)

# Upload to S3
s3.upload_file('nodes_with_labels.csv', bucket_name, 'processed/nodes_with_labels.csv')
s3.upload_file('edges.csv', bucket_name, 'processed/edges.csv')

print("\nDone! Uploaded to s3://crowd-raw/processed/")


In [21]:
import pandas as pd

# Rename columns to GraphStorm format
nodes_df = pd.read_csv('nodes_with_labels.csv')
nodes_df = nodes_df.rename(columns={'node_id': '~id'})
nodes_df.to_csv('nodes_formatted.csv', index=False)
print("✓ Created nodes_formatted.csv")

edges_df = pd.read_csv('edges.csv')
edges_df = edges_df.rename(columns={'source': '~from', 'destination': '~to'})
edges_df.to_csv('edges_formatted.csv', index=False)
print("✓ Created edges_formatted.csv")

✓ Created nodes_formatted.csv
✓ Created edges_formatted.csv


In [None]:
import pandas as pd
import numpy as np

# Load data
nodes_df = pd.read_csv('nodes_formatted.csv')
edges_df = pd.read_csv('edges_formatted.csv')

# Calculate additional features per anchor
anchor_features = []

for anchor_id in nodes_df['node_id']:
    # Edges involving this anchor
    outgoing = edges_df[edges_df['~from'] == anchor_id]
    incoming = edges_df[edges_df['~to'] == anchor_id]
    all_edges = pd.concat([outgoing, incoming])
    
    # Feature 1: Strong connection count
    strong_connections = len(all_edges[all_edges['rssi'] > -40])  # Adjust threshold
    
    # Feature 2: Average RSSI (stronger signal = closer proximity)
    avg_rssi = all_edges['rssi'].mean() if len(all_edges) > 0 else -100
    
    # Feature 3: Max RSSI (closest device)
    max_rssi = all_edges['rssi'].max() if len(all_edges) > 0 else -100
    
    # Feature 4: RSSI variance (density uniformity)
    rssi_std = all_edges['rssi'].std() if len(all_edges) > 1 else 0
    
    # Feature 5: Total connection count
    total_connections = len(all_edges)
    
    # Feature 6: Recent connections (time-based)
    if len(all_edges) > 0:
        max_time = all_edges['timestamp'].max()
        recent = all_edges[all_edges['timestamp'] > (max_time - 5000)]  # Last 5 seconds
        recent_count = len(recent)
    else:
        recent_count = 0
    
    anchor_features.append({
        'node_id': anchor_id,
        'strong_connection_count': strong_connections,
        'avg_rssi': avg_rssi,
        'max_rssi': max_rssi,
        'rssi_std': rssi_std,
        'total_connections': total_connections,
        'recent_connections': recent_count
    })

# Create enhanced feature dataframe
enhanced_nodes = pd.DataFrame(anchor_features)

# Merge with original labels
enhanced_nodes = enhanced_nodes.merge(
    nodes_df[['node_id', 'node_type', 'label']], 
    on='node_id'
)

# Normalize features (important for neural networks)
from sklearn.preprocessing import StandardScaler

feature_cols = ['strong_connection_count', 'avg_rssi', 'max_rssi', 
                'rssi_std', 'total_connections', 'recent_connections']

scaler = StandardScaler()
enhanced_nodes[feature_cols] = scaler.fit_transform(enhanced_nodes[feature_cols])

# Save
enhanced_nodes.to_csv('nodes_enhanced.csv', index=False)
print(f"✓ Created enhanced features for {len(enhanced_nodes)} nodes")
print("\nFeature summary:")
print(enhanced_nodes[feature_cols].describe())

✓ Created enhanced features for 5 nodes

Feature summary:
       strong_connection_count      avg_rssi      max_rssi      rssi_std  \
count             5.000000e+00  5.000000e+00  5.000000e+00  5.000000e+00   
mean              8.881784e-17 -1.776357e-16 -3.108624e-16 -1.332268e-16   
std               1.118034e+00  1.118034e+00  1.118034e+00  1.118034e+00   
min              -1.307971e+00 -1.388506e+00 -1.568929e+00 -1.584021e+00   
25%              -9.622871e-01 -1.037579e+00 -5.883484e-01 -5.180613e-01   
50%               1.350578e-01  7.337971e-01  6.537205e-02  1.249152e-02   
75%               8.867190e-01  7.341151e-01  1.045953e+00  9.049025e-01   
max               1.248481e+00  9.581734e-01  1.045953e+00  1.184688e+00   

       total_connections  recent_connections  
count       5.000000e+00        5.000000e+00  
mean        8.881784e-17        4.440892e-17  
std         1.118034e+00        1.118034e+00  
min        -9.259579e-01       -1.012739e+00  
25%        -8.143967e-

In [46]:
print(pd.read_csv("nodes_formatted.csv"))

  node_id node_type  strong_connection_count  label
0     WB5    anchor                        2      0
1     WB1    anchor                        3      1
2     WB4    anchor                        1      0
3     WB3    anchor                        0      0
4     WB2    anchor                        2      0


In [86]:
# construct graph
!python3 -m graphstorm.gconstruct.construct_graph \
     --conf-file config.json \
     --output-dir gs_output \
     --graph-name my_graph \
     --num-parts 1 


/bin/bash: switchml: line 1: syntax error: unexpected end of file
/bin/bash: error importing function definition for `switchml'
/bin/bash: module: line 1: syntax error: unexpected end of file
/bin/bash: error importing function definition for `module'
  from pkg_resources import parse_version
INFO:root:Parsing config file as GConstruct config
INFO:root:The graph has 1 node types and 1 edge types.
INFO:root:Node type _N has 5 nodes
INFO:root:Edge type ('_N', '_E', '_N') has 1377 edges
INFO:root:Node type _N has features: ['strong_connection_count', 'avg_rssi', 'max_rssi', 'rssi_std', 'total_connections', 'recent_connections', 'train_mask', 'val_mask', 'test_mask', 'label'].
INFO:root:Train/val/test on _N with mask train_mask, val_mask, test_mask: 3, 1, 1
INFO:root:Note: Custom train, validate, test mask information for nodes are not collected.
INFO:root:Edge type ('_N', '_E', '_N') has features: ['rssi', 'timestamp'].
Converting to homogeneous graph takes 0.001s, peak mem: 1.475 GB
Save

In [87]:
# train graph
!python3 -m graphstorm.run.gs_node_classification \
    --part-config gs_output/my_graph.json \
    --num-trainers 1 \
    --num-servers 1 \
    --cf ./train_config.yaml \
    --save-model-path ./models \
    --node-feat-name _N:strong_connection_count _N:avg_rssi _N:max_rssi _N:rssi_std _N:total_connections _N:recent_connections \
    --edge-feat-name "_N,_E,_N:rssi,timestamp"

# !python3 -m graphstorm.run.gs_node_classification \
#      --part-config gs_output/my_graph.json \
#      --num-trainers 1 \
#      --num-servers 1 \
#      --cf ./train_config.yaml \
#      --node-feat-name anchor:strong_connection_count

/bin/bash: switchml: line 1: syntax error: unexpected end of file
/bin/bash: error importing function definition for `switchml'
/bin/bash: module: line 1: syntax error: unexpected end of file
/bin/bash: error importing function definition for `module'
  from pkg_resources import parse_version
/bin/sh: switchml: line 1: syntax error: unexpected end of file
/bin/sh: error importing function definition for `switchml'
/bin/sh: module: line 1: syntax error: unexpected end of file
/bin/sh: error importing function definition for `module'
/bin/sh: switchml: line 1: syntax error: unexpected end of file
/bin/sh: error importing function definition for `switchml'
/bin/sh: module: line 1: syntax error: unexpected end of file
/bin/sh: error importing function definition for `module'
and will be removed in future. Use torchrun.
Note that --use-env is set by default in torchrun.
If your script expects `--local-rank` argument to be set, please
change it to read from `os.environ['LOCAL_RANK']` instead

In [77]:
!python3 -m graphstorm.run.gs_node_classification \
  --inference \
  --part-config gs_output/my_graph.json \
  --cf ./models/GRAPHSTORM_RUNTIME_UPDATED_TRAINING_CONFIG.yaml \
  --restore-model-path ./models/epoch-49 \
  --save-prediction-path ./predictions \
  --node-feat-name _N:strong_connection_count _N:avg_rssi _N:max_rssi _N:rssi_std _N:total_connections _N:recent_connections \
  --edge-feat-name _N,_E,_N:rssi,timestamp \
  --num-trainers 1

/bin/bash: switchml: line 1: syntax error: unexpected end of file
/bin/bash: error importing function definition for `switchml'
/bin/bash: module: line 1: syntax error: unexpected end of file
/bin/bash: error importing function definition for `module'
  from pkg_resources import parse_version
/bin/sh: switchml: line 1: syntax error: unexpected end of file
/bin/sh: error importing function definition for `switchml'
/bin/sh: module: line 1: syntax error: unexpected end of file
/bin/sh: error importing function definition for `module'
/bin/sh: switchml: line 1: syntax error: unexpected end of file
/bin/sh: error importing function definition for `switchml'
/bin/sh: module: line 1: syntax error: unexpected end of file
/bin/sh: error importing function definition for `module'
and will be removed in future. Use torchrun.
Note that --use-env is set by default in torchrun.
If your script expects `--local-rank` argument to be set, please
change it to read from `os.environ['LOCAL_RANK']` instead

In [18]:
import json

# Load the partition config to see what node types exist
with open('gs_output/my_graph.json', 'r') as f:
    config = json.load(f)
    
print("Node types in your graph:")
print(config.get('ntypes', []))
print("\nEdge types in your graph:")
print(config.get('etypes', []))

Node types in your graph:
{'_N': 0}

Edge types in your graph:
{'_N:_E:_N': 0}


In [98]:
!python graphstorm/sagemaker/launch/launch_realtime_endpoint.py \
  --image-uri 274744451761.dkr.ecr.ap-southeast-1.amazonaws.com/graphstorm-inference \
  --role arn:aws:iam::274744451761:role/service-role/AmazonSageMaker-ExecutionRole-20251130T233961 \
  --region ap-southeast-1 \
  --restore-model-path ./models/epoch-49 \
  --model-yaml-config-file train_config.yaml \
  --graph-json-config-file config.json \
  --infer-task-type node_classification \
  --upload-tarfile-s3 s3://crowd-raw/graphstorm_models/ \
  --model-name my-graphstorm-model

/bin/bash: switchml: line 1: syntax error: unexpected end of file
/bin/bash: error importing function definition for `switchml'
/bin/bash: module: line 1: syntax error: unexpected end of file
/bin/bash: error importing function definition for `module'
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
Waiting for endpoint 'my-graphstorm-model-Endpoint-2025-12-01-06-06-00' to be in service in ap-southeast-1 region...
Endpoint named 'my-graphstorm-model-Endpoint-2025-12-01-06-06-00' has been successfully created, and ready to be invoked!


In [102]:
import boto3, json

runtime = boto3.client("sagemaker-runtime", region_name="ap-southeast-1")
endpoint_name = "my-graphstorm-model-Endpoint-2025-12-01-06-06-00"

payload = {
  "version": "gs-realtime-v0.1",
  "gml_task": "node_classification",
  "graph": {
    "nodes": [
      {
        "node_type": "anchor",
        "node_id": "node_123",
        "features": {
          "strong_connection_count": 3,
          "avg_rssi": -65.2,
          "max_rssi": -55,
          "rssi_std": 5.1,
          "total_connections": 10,
          "recent_connections": 2
        }
      }
    ],
    "edges": [
      {
        "edge_type": "anchor:connects:anchor",
        "src_id": "node_123",
        "dst_id": "node_456",
        "features": {
          "rssi": -70,
          "timestamp": 1735700000
        }
      }
    ]
  },
  "targets": [
    {
      "node_type": "anchor",
      "node_id": "node_123"
    }
  ]
}
  # your graph + features JSON
resp = runtime.invoke_endpoint(EndpointName=endpoint_name,
                               ContentType="application/json",
                               Body=json.dumps(payload))
result = json.loads(resp["Body"].read())
print(result)


ReadTimeoutError: Read timeout on endpoint URL: "https://runtime.sagemaker.ap-southeast-1.amazonaws.com/endpoints/my-graphstorm-model-Endpoint-2025-12-01-06-06-00/invocations"