Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 82 additions & 43 deletions packages/hub/scripts/debug-xet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,22 @@ import { existsSync } from "node:fs";

/**
* This script debugs xet uploads by capturing all network data locally
* It takes a local file, repo, and token, then uploads while saving:
* It takes one or more local files, repo, and token, then uploads while saving:
* - Dedup shards as dedup_[chunk_hash]_shard.bin
* - Uploaded xorbs as uploaded_xorb_1.bin, uploaded_xorb_2.bin, etc.
* - Uploaded shards as uploaded_shard_1.bin, uploaded_shard_2.bin, etc.
*
* Normal mode: Captures all upload data to upload_[filename]/ directory
* Normal mode: Captures all upload data to upload_[filename]/ directory (single file) or multiple-files/ directory (multiple files)
* Replay mode: Validates upload data matches previously captured local files
*
* Usage:
* Single file:
* pnpm --filter hub debug-xet -f <local_file> -t <write_token> -r <xet_repo>
* pnpm --filter hub debug-xet -f <local_file> -t <write_token> -r <xet_repo> --replay
*
* Multiple files (comma-separated):
* pnpm --filter hub debug-xet -f <file1,file2,file3> -t <write_token> -r <xet_repo>
* pnpm --filter hub debug-xet -f <file1,file2,file3> -t <write_token> -r <xet_repo> --replay
*/

interface DebugFetchStats {
Expand Down Expand Up @@ -182,32 +187,34 @@ function createDebugFetch(args: { debugDir: string; replay?: boolean }): {
};
}

async function* createFileSource(filepath: string): AsyncGenerator<{
async function* createMultiFileSource(filepaths: string[]): AsyncGenerator<{
content: Blob;
path: string;
sha256: string;
}> {
const filename = basename(filepath);
console.log(`Processing ${filename}...`);

const blob: Blob = await FileBlob.create(filepath);

// Calculate sha256
console.log(`Calculating SHA256 for ${filename}...`);
const sha256Iterator = sha256(blob, { useWebWorker: false });
let res: IteratorResult<number, string>;
do {
res = await sha256Iterator.next();
} while (!res.done);
const sha256Hash = res.value;

console.log(`SHA256 for ${filename}: ${sha256Hash}`);

yield {
content: blob,
path: filename,
sha256: sha256Hash,
};
for (const filepath of filepaths) {
const filename = basename(filepath);
console.log(`Processing ${filename}...`);

const blob: Blob = await FileBlob.create(filepath);

// Calculate sha256
console.log(`Calculating SHA256 for ${filename}...`);
const sha256Iterator = sha256(blob, { useWebWorker: false });
let res: IteratorResult<number, string>;
do {
res = await sha256Iterator.next();
} while (!res.done);
const sha256Hash = res.value;

console.log(`SHA256 for ${filename}: ${sha256Hash}`);

yield {
content: blob,
path: filename,
sha256: sha256Hash,
};
}
}

async function main() {
Expand All @@ -233,20 +240,27 @@ async function main() {
});

if (!args.token || !args.repo || !args.file) {
console.error("Usage: pnpm --filter hub debug-xet -f <local_file> -t <write_token> -r <xet_repo>");
console.error("Usage: pnpm --filter hub debug-xet -f <file1,file2,file3> -t <write_token> -r <xet_repo>");
console.error("Example: pnpm --filter hub debug-xet -f ./model.bin -t hf_... -r myuser/myrepo");
console.error("Example: pnpm --filter hub debug-xet -f ./model1.bin,./model2.bin -t hf_... -r myuser/myrepo");
console.error("Options:");
console.error(" --replay Use local dedup info instead of remote");
process.exit(1);
}

if (!existsSync(args.file)) {
console.error(`❌ File ${args.file} does not exist`);
process.exit(1);
// Parse comma-separated file paths
const filePaths = args.file.split(",").map((f) => f.trim());

// Validate all files exist
for (const filePath of filePaths) {
if (!existsSync(filePath)) {
console.error(`❌ File ${filePath} does not exist`);
process.exit(1);
}
}

const filename = basename(args.file);
const debugDir = `upload_${filename}`;
// Determine debug directory name
const debugDir = filePaths.length > 1 ? "multiple-files" : `upload_${basename(filePaths[0])}`;

// Handle debug directory based on mode
if (args.replay) {
Expand Down Expand Up @@ -288,20 +302,30 @@ async function main() {
rev: "main",
};

console.log(`\n=== Starting debug upload for ${filename} ===`);
console.log(
`\n=== Starting debug upload for ${filePaths.length > 1 ? `${filePaths.length} files` : basename(filePaths[0])} ===`
);
if (args.replay) {
console.log("🔄 Replay mode: Using local dedup info when available");
}

// Get file stats
const fileStats = await stat(args.file);
console.log(`📄 File size: ${(fileStats.size / 1024 / 1024).toFixed(2)} MB`);
// Get total file stats
let totalSize = 0;
for (const filePath of filePaths) {
const fileStats = await stat(filePath);
totalSize += fileStats.size;
console.log(`📄 ${basename(filePath)}: ${(fileStats.size / 1_000_000).toFixed(2)} MB`);
}
console.log(`📊 Total size: ${(totalSize / 1_000_000).toFixed(2)} MB`);

// Process file through uploadShards
const fileSource = createFileSource(args.file);
// Process files through uploadShards
const fileSource = createMultiFileSource(filePaths);

let dedupRatio = 0;
let fileSha256 = "";
const processedFiles: Array<{
path: string;
sha256: string;
dedupRatio: number;
}> = [];

for await (const event of uploadShards(fileSource, uploadParams)) {
switch (event.event) {
Expand All @@ -310,8 +334,11 @@ async function main() {
console.log(` SHA256: ${event.sha256}`);
console.log(` Dedup ratio: ${(event.dedupRatio * 100).toFixed(2)}%`);

dedupRatio = event.dedupRatio;
fileSha256 = event.sha256;
processedFiles.push({
path: event.path,
sha256: event.sha256,
dedupRatio: event.dedupRatio,
});
break;
}

Expand All @@ -327,9 +354,21 @@ async function main() {

console.log("\n=== DEBUG UPLOAD RESULTS ===");
console.log(`📁 Debug directory: ${debugDir}`);
console.log(`📄 Original file: ${filename} (${(fileStats.size / 1024 / 1024).toFixed(2)} MB)`);
console.log(`🔒 SHA256: ${fileSha256}`);
console.log(`📊 Deduplication: ${(dedupRatio * 100).toFixed(2)}%`);
console.log(`📄 Processed files: ${processedFiles.length}`);
console.log(`📊 Total size: ${(totalSize / 1024 / 1024).toFixed(2)} MB`);

// Show details for each file
for (const file of processedFiles) {
console.log(`\n🔒 ${file.path}:`);
console.log(` SHA256: ${file.sha256}`);
console.log(` Deduplication: ${(file.dedupRatio * 100).toFixed(2)}%`);
}

// Calculate average dedup ratio
const avgDedupRatio =
processedFiles.length > 0 ? processedFiles.reduce((sum, f) => sum + f.dedupRatio, 0) / processedFiles.length : 0;

console.log(`\n📊 Average deduplication: ${(avgDedupRatio * 100).toFixed(2)}%`);
console.log(`📤 Network calls:`);
console.log(` - ${stats.xorbCount} xorb uploads`);
console.log(` - ${stats.shardCount} shard uploads`);
Expand Down
8 changes: 8 additions & 0 deletions packages/hub/src/utils/ChunkCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ export class ChunkCache {
};
}

updateChunkIndex(hash: string, chunkIndex: number): void {
const index = this.map.get(hash);
if (index === undefined) {
throw new Error(`Chunk not found in cache: ${hash}`);
}
this.chunkIndices[index] = chunkIndex;
}

removeChunkFromCache(hash: string): void {
this.map.delete(hash);
}
Expand Down
93 changes: 93 additions & 0 deletions packages/hub/src/utils/createXorb.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { describe, expect, it } from "vitest";
import { backtrackDedup, CurrentXorbInfo } from "./createXorbs";
import type { ShardData } from "./shardParser";
import { ChunkCache } from "./ChunkCache";

describe("createXorb", () => {
describe("backtrackDedup", () => {
it("should update cache info for chunks that go back due to previous chunks being erased", () => {
const xorb = new CurrentXorbInfo();

const chunkMetadata = [
{
xorbId: xorb.id,
chunkIndex: 0,
length: 101,
},
{
xorbId: xorb.id,
chunkIndex: 1,
length: 101,
},
];
xorb.chunks = [
{
hash: "chunk1",
length: 101,
offset: 0,
},
{
hash: "chunk2",
length: 101,
offset: 101,
},
];
const shardData: ShardData = {
hmacKey: "shard1",
xorbs: [
{
hash: "remoteXorb1",
chunks: [
{
hash: "chunk0:shard1",
startOffset: 0,
unpackedLength: 100,
},
{
hash: "chunk1:shard1",
startOffset: 100,
unpackedLength: 101,
},
],
},
],
};

const computeHmac = (hash: string, key: string) => {
return hash + ":" + key;
};

const chunkCache = new ChunkCache();
let chunkIndex = 0;
for (const chunk of xorb.chunks) {
chunkCache.addChunkToCache(chunk.hash, xorb.id, chunkIndex++, shardData.hmacKey);
}
let xorbIndex = 0;
for (const xorb of shardData.xorbs) {
xorbIndex--;
for (let i = 0; i < xorb.chunks.length; i++) {
chunkCache.addChunkToCache(xorb.chunks[i].hash, xorbIndex, i, shardData.hmacKey);
}
}
const dedup = backtrackDedup(xorb, computeHmac, shardData, chunkCache, chunkMetadata, 0);
expect(dedup).toBe(101);
expect(xorb.chunks).toEqual([{ hash: "chunk2", length: 101, offset: 0 }]);
expect(chunkMetadata).toEqual([
{
xorbId: -1,
chunkIndex: 1,
length: 101,
},
{
xorbId: 0,
chunkIndex: 0,
length: 101,
},
]);
// chunk1 should use remote hash now
expect(chunkCache.getChunk("chunk1", computeHmac)).toEqual({ xorbIndex: -1, chunkIndex: 1 });
// The xorb index for chunk2 should be 0 now that the previous chunk was erased from the xorb
expect(chunkCache.getChunk("chunk2", computeHmac)).toEqual({ xorbIndex: 0, chunkIndex: 0 });
});
});
});
12 changes: 8 additions & 4 deletions packages/hub/src/utils/createXorbs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ interface XorbEvent {
}>;
}

class CurrentXorbInfo {
export class CurrentXorbInfo {
id: number;
offset: number;
chunks: Array<{ hash: string; length: number; offset: number }>;
Expand Down Expand Up @@ -192,7 +192,6 @@ export async function* createXorbs(
}
let chunkIndex = xorb.chunks.length;
let chunkXorbId = xorbId;
fileChunks.push({ hash: chunk.hash, length: chunk.length });

// Remove chunks from source data
const chunkToCopy = removeChunkFromSourceData(sourceChunks, chunk.length);
Expand Down Expand Up @@ -361,14 +360,14 @@ export async function* createXorbs(
}
}

function backtrackDedup(
export function backtrackDedup(
xorb: CurrentXorbInfo,
computeHmac: (hash: string, key: string) => string,
shardData: ShardData,
chunkCache: ChunkCache,
chunkMetadata: { xorbId: number | string; chunkIndex: number; length: number }[],
dedupedBytes: number
) {
): number {
const chunkIndexesToBacktrackFor = new Map<number, { xorbId: number; chunkIndex: number }>();
for (
let chunkToRecheckIndex = xorb.immutableData?.chunkIndex ?? 0;
Expand Down Expand Up @@ -453,10 +452,15 @@ function backtrackDedup(
}
xorb.chunks = newXorbChunks;
xorb.offset = currentOffset;
// Update chunkMetadata and chunkCache with new chunk indexes for the current xorb chunks
for (const chunk of chunkMetadata) {
if (chunk.xorbId === xorb.id) {
const newIndex = oldIndexToNewIndex.get(chunk.chunkIndex);
if (newIndex !== undefined) {
const cached = chunkCache.getChunk(xorb.chunks[newIndex].hash, null);
if (cached !== undefined && cached.xorbIndex === chunk.xorbId && cached.chunkIndex === chunk.chunkIndex) {
chunkCache.updateChunkIndex(xorb.chunks[newIndex].hash, newIndex);
}
chunk.chunkIndex = newIndex;
}
}
Expand Down