Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 52 additions & 17 deletions component/file_cache/file_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1598,10 +1598,11 @@ loopbackfs:
func (suite *fileCacheTestSuite) TestCronOnToOFFUpload() {
defer suite.cleanupTest()

testStartTime := time.Now()
second := testStartTime.Second() % 60
duration := 2 * time.Second
cronExpr := fmt.Sprintf("%d * * * * *", second)
// Schedule for 3 seconds in the future to allow setup time
now := time.Now()
startSecond := (now.Second() + 3) % 60
duration := 4 * time.Second
cronExpr := fmt.Sprintf("%d * * * * *", startSecond)

configContent := fmt.Sprintf(`file_cache:
path: %s
Expand All @@ -1623,6 +1624,13 @@ loopbackfs:
// Setup the file cache with this configuration
suite.setupTestHelper(configContent)

// Wait for the upload window to start
windowStart := now.Truncate(time.Minute).Add(time.Duration(startSecond) * time.Second)
if windowStart.Before(now) {
windowStart = windowStart.Add(time.Minute)
}
time.Sleep(time.Until(windowStart) + 100*time.Millisecond)

file1 := "scheduled_on_window.txt"
handle1, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777})
suite.assert.NoError(err)
Expand All @@ -1634,8 +1642,8 @@ loopbackfs:
suite.assert.FileExists(filepath.Join(suite.cache_path, file1))

_, err = os.Stat(filepath.Join(suite.fake_storage_path, file1))
for i := 0; i < 200 && os.IsNotExist(err); i++ {
time.Sleep(10 * time.Millisecond)
for i := 0; i < 300 && os.IsNotExist(err); i++ {
time.Sleep(20 * time.Millisecond)
_, err = os.Stat(filepath.Join(suite.fake_storage_path, file1))
}
suite.FileExists(
Expand All @@ -1644,7 +1652,10 @@ loopbackfs:
)

// wait until the window closes
time.Sleep(time.Since(testStartTime.Add(duration + 10*time.Millisecond)))
windowEnd := windowStart.Add(duration)
if time.Now().Before(windowEnd) {
time.Sleep(time.Until(windowEnd) + 100*time.Millisecond)
}

file2 := "scheduled_off_window.txt"
handle2, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: file2, Mode: 0777})
Expand Down Expand Up @@ -2080,9 +2091,10 @@ func (suite *fileCacheTestSuite) TestOverlappingSchedules() {
defer suite.cleanupTest()

now := time.Now()
// Create two schedules that will run in close succession (3 seconds apart)
firstSecond := now.Second()
secondSecond := (now.Second() + 2) % 60
// Create two schedules that will run in close succession (2 seconds apart)
// Schedule for 3 seconds in the future to allow setup time
firstSecond := (now.Second() + 3) % 60
secondSecond := (now.Second() + 5) % 60

configContent := fmt.Sprintf(`file_cache:
path: %s
Expand All @@ -2105,6 +2117,13 @@ loopbackfs:
)
suite.setupTestHelper(configContent)

// Wait for the first upload window to start
windowStart := now.Truncate(time.Minute).Add(time.Duration(firstSecond) * time.Second)
if windowStart.Before(now) {
windowStart = windowStart.Add(time.Minute)
}
time.Sleep(time.Until(windowStart) + 100*time.Millisecond)

file1 := "overlap_test_first_window.txt"
handle1, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777})
suite.assert.NoError(err)
Expand All @@ -2114,8 +2133,12 @@ loopbackfs:
err = suite.fileCache.ReleaseFile(internal.ReleaseFileOptions{Handle: handle1})
suite.assert.NoError(err)

// File should be uploaded immediately as we're in an upload window
time.Sleep(100 * time.Millisecond)
// File should be uploaded - poll for up to 6 seconds
_, err = os.Stat(filepath.Join(suite.fake_storage_path, file1))
for i := 0; i < 300 && os.IsNotExist(err); i++ {
time.Sleep(20 * time.Millisecond)
_, err = os.Stat(filepath.Join(suite.fake_storage_path, file1))
}
suite.assert.FileExists(filepath.Join(suite.fake_storage_path, file1),
"File should be uploaded immediately during first window")

Expand All @@ -2129,7 +2152,12 @@ loopbackfs:
err = suite.fileCache.ReleaseFile(internal.ReleaseFileOptions{Handle: handle2})
suite.assert.NoError(err)

time.Sleep(100 * time.Millisecond)
// Poll for second file upload
_, err = os.Stat(filepath.Join(suite.fake_storage_path, file2))
for i := 0; i < 300 && os.IsNotExist(err); i++ {
time.Sleep(20 * time.Millisecond)
_, err = os.Stat(filepath.Join(suite.fake_storage_path, file2))
}
suite.assert.FileExists(filepath.Join(suite.fake_storage_path, file2),
"File should be uploaded immediately during second window")

Expand All @@ -2150,11 +2178,18 @@ loopbackfs:
err = suite.fileCache.ReleaseFile(internal.ReleaseFileOptions{Handle: handle1})
suite.assert.NoError(err)

// Verify updated data was uploaded
time.Sleep(100 * time.Millisecond)
cloudData, err := os.ReadFile(filepath.Join(suite.fake_storage_path, file1))
// Verify updated data was uploaded - poll for the update
expectedData := append(data1, updatedData...)
var cloudData []byte
for i := 0; i < 300; i++ {
cloudData, err = os.ReadFile(filepath.Join(suite.fake_storage_path, file1))
if err == nil && len(cloudData) == len(expectedData) {
break
}
time.Sleep(20 * time.Millisecond)
}
suite.assert.NoError(err)
suite.assert.Equal(append(data1, updatedData...), cloudData,
suite.assert.Equal(expectedData, cloudData,
"File should be updated immediately in the second window")
}

Expand Down
Loading