diff --git a/scripts/generate_all_test_data.sh b/scripts/generate_all_test_data.sh index 3dab79a..da0aaaa 100755 --- a/scripts/generate_all_test_data.sh +++ b/scripts/generate_all_test_data.sh @@ -17,7 +17,7 @@ USER_SERVICE_SCRIPT="$PROJECT_ROOT/services/user-service/scripts/generate_test_d SOCIAL_GRAPH_SCRIPT="$PROJECT_ROOT/services/social-graph-services/scripts/generate_and_load.sh" # Default values -NUM_USERS=5000 +NUM_USERS=25000 BASE_URL="" # Will be read from Terraform output CONCURRENCY=50 AWS_REGION="us-west-2" @@ -174,20 +174,36 @@ if ! command -v python3 &> /dev/null; then exit 1 fi -# Check dependencies for User Service script -echo "" -echo -e "${BLUE}Checking Python dependencies...${NC}" -python3 -c "import aiohttp" 2>/dev/null -if [ $? -ne 0 ]; then - echo -e "${YELLOW}⚠️ aiohttp not installed, installing...${NC}" - pip3 install aiohttp -fi -python3 -c "import boto3" 2>/dev/null -if [ $? -ne 0 ]; then - echo -e "${YELLOW}⚠️ boto3 not installed, installing...${NC}" - pip3 install boto3 -fi +# Helper: create and activate venv inside a service scripts directory +create_and_activate_service_venv() { + local service_scripts_dir="$1" + local venv_dir="$service_scripts_dir/.venv" + local req_file="$service_scripts_dir/requirements.txt" + + echo "\nSetting up virtualenv in $service_scripts_dir" + if [ ! -d "$venv_dir" ]; then + python3 -m venv "$venv_dir" + fi + + # shellcheck source=/dev/null + . "$venv_dir/bin/activate" + python -m pip install --upgrade pip setuptools wheel + + if [ -f "$req_file" ]; then + echo "Installing packages from $req_file" + python -m pip install -r "$req_file" + else + echo "No requirements.txt in $service_scripts_dir, installing minimal packages" + python -m pip install aiohttp boto3 requests + fi +} + +deactivate_venv() { + if [ -n "${VIRTUAL_ENV-}" ]; then + deactivate || true + fi +} START_TIME=$(date +%s) @@ -197,12 +213,15 @@ echo -e "${BLUE}Step 1: Creating Users via User Service API${NC}" echo -e "${BLUE}================================================================${NC}" echo "" -# Run User Service script -cd "$PROJECT_ROOT/services/user-service/scripts" +# Run User Service script (ensure per-service venv and deps) +USER_SCRIPTS_DIR="$PROJECT_ROOT/services/user-service/scripts" +create_and_activate_service_venv "$USER_SCRIPTS_DIR" +cd "$USER_SCRIPTS_DIR" python3 generate_test_data.py \ "$NUM_USERS" \ --url "$BASE_URL" \ --concurrency "$CONCURRENCY" +deactivate_venv USER_EXIT_CODE=$? @@ -227,13 +246,16 @@ echo -e "${BLUE}Step 2: Creating Relationships via Social Graph DynamoDB${NC}" echo -e "${BLUE}================================================================${NC}" echo "" -# Run Social Graph script -cd "$PROJECT_ROOT/services/social-graph-services/scripts" +# Run Social Graph script (ensure per-service venv and deps) +SOCIAL_SCRIPTS_DIR="$PROJECT_ROOT/services/social-graph-services/scripts" +create_and_activate_service_venv "$SOCIAL_SCRIPTS_DIR" +cd "$SOCIAL_SCRIPTS_DIR" bash generate_and_load.sh \ --users "$NUM_USERS" \ --region "$AWS_REGION" \ --followers-table "$FOLLOWERS_TABLE" \ --following-table "$FOLLOWING_TABLE" +deactivate_venv SOCIAL_EXIT_CODE=$? diff --git a/services/post-service/cmd/main.go b/services/post-service/cmd/main.go index 5355b6c..d34af5e 100644 --- a/services/post-service/cmd/main.go +++ b/services/post-service/cmd/main.go @@ -11,9 +11,11 @@ import ( "post-service/internal/repository" "post-service/internal/service" "sync" + "time" pb "github.com/cs6650/proto/post" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/sns" @@ -39,13 +41,32 @@ func corsMiddleware() gin.HandlerFunc { } } - func main() { - // Load configuration - cfg, err := config.LoadDefaultConfig(context.TODO()) - if err != nil { - log.Fatal("Failed to load AWS config: %w", err) - } + // Load configuration with optimized HTTP client and retry settings + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithHTTPClient(&http.Client{ + Transport: &http.Transport{ + MaxIdleConns: 1000, // Total connection pool ✅ + MaxIdleConnsPerHost: 200, // Per host connection number ✅ + MaxConnsPerHost: 300, // Maximum connections per host ✅ + IdleConnTimeout: 30 * time.Second, // Reduced to 30s, avoid connection buildup + DisableKeepAlives: false, // Keep connection reuse ✅ + TLSHandshakeTimeout: 5 * time.Second, // Reduced to 5s, faster failure + ExpectContinueTimeout: 1 * time.Second, // Added this, reduce HTTP/1.1 delay + ResponseHeaderTimeout: 10 * time.Second, // Added response header timeout + DisableCompression: false, // Keep compression, reduce network transfer + ForceAttemptHTTP2: true, // Force HTTP/2, more efficient + WriteBufferSize: 32 * 1024, // Increase write buffer + ReadBufferSize: 32 * 1024, // Increase read buffer + }, + Timeout: 3 * time.Second, // Reduced to 3s, sufficient for 500 user queries + }), + config.WithRetryMaxAttempts(2), // Add retry configuration + config.WithRetryMode(aws.RetryModeAdaptive), // Adaptive retry + ) + if err != nil { + log.Fatal("Failed to load AWS config: %w", err) + } // Initialize AWS client dynamoClient := dynamodb.NewFromConfig(cfg) @@ -104,7 +125,7 @@ func main() { grpcServer := grpc.NewServer() pb.RegisterPostServiceServer(grpcServer, grpcHandler) - + // Enable gRPC reflection for tools like grpcurl reflection.Register(grpcServer) @@ -125,7 +146,7 @@ func main() { // Wait for both servers wg.Wait() - + } func getEnv(key, defaultValue string) string { @@ -133,4 +154,4 @@ func getEnv(key, defaultValue string) string { return value } return defaultValue -} \ No newline at end of file +} diff --git a/services/post-service/internal/repository/post_repository.go b/services/post-service/internal/repository/post_repository.go index 4d6ad22..9e45242 100644 --- a/services/post-service/internal/repository/post_repository.go +++ b/services/post-service/internal/repository/post_repository.go @@ -3,7 +3,11 @@ package repository import ( "context" "fmt" + "log" + "os" "strconv" + "sync" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" @@ -80,22 +84,208 @@ func (r *PostRepository) GetPost(ctx context.Context, postID int64) (*pb.Post, e return &post, err } -// Retrieve recent posts for multiple users -func (r *PostRepository) GetPostByUserIDs(ctx context.Context, userIDs []int64, limit int32) (map[int64][]*pb.Post, error) { - result := make(map[int64][]*pb.Post) +// batchCheckUsersHasPosts performs parallel COUNT queries to check which users have posts +func (r *PostRepository) batchCheckUsersHasPosts(ctx context.Context, userIDs []int64) (map[int64]bool, error) { + if len(userIDs) == 0 { + return make(map[int64]bool), nil + } + + hasPostsMap := make(map[int64]bool, len(userIDs)) + hasPostsMutex := &sync.Mutex{} + maxWorkers := min(50, len(userIDs)) + // Create worker pool for COUNT queries + userIDChan := make(chan int64, len(userIDs)) for _, userID := range userIDs { - posts, err := r.GetPostByUserID(ctx, userID, limit) + userIDChan <- userID + } + close(userIDChan) + + var wg sync.WaitGroup + errChan := make(chan error, len(userIDs)) + + // Launch worker pool for parallel COUNT queries + for i := 0; i < maxWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for userID := range userIDChan { + hasPosts, err := r.checkUserHasPosts(ctx, userID) + if err != nil { + errChan <- fmt.Errorf("failed to check posts for user %d: %w", userID, err) + continue + } + + hasPostsMutex.Lock() + hasPostsMap[userID] = hasPosts + hasPostsMutex.Unlock() + } + }() + } + + wg.Wait() + close(errChan) + + // Check for errors + for err := range errChan { if err != nil { return nil, err } - result[userID] = posts } + + return hasPostsMap, nil +} + +// Retrieve recent posts for multiple users (parallel execution with worker pool for better performance) +func (r *PostRepository) GetPostByUserIDs(ctx context.Context, userIDs []int64, limit int32) (map[int64][]*pb.Post, error) { + // Check if we're in hybrid mode (read from environment variable) + postStrategy := os.Getenv("POST_STRATEGY") + checkCountFirst := postStrategy == "hybrid" + startTime := time.Now() + // Pre-allocate result map with expected capacity to reduce reallocation + result := make(map[int64][]*pb.Post, len(userIDs)) + resultMutex := &sync.Mutex{} + + // If in hybrid mode, first batch check which users have posts + var usersToQuery []int64 + if checkCountFirst { + countStart := time.Now() + hasPostsMap, err := r.batchCheckUsersHasPosts(ctx, userIDs) + if err != nil { + return nil, fmt.Errorf("failed to batch check users has posts: %w", err) + } + countDuration := time.Since(countStart) + + // Filter users that have posts + usersToQuery = make([]int64, 0, len(userIDs)) + for _, userID := range userIDs { + if hasPostsMap[userID] { + usersToQuery = append(usersToQuery, userID) + } else { + // User has no posts, set empty result immediately + result[userID] = []*pb.Post{} + } + } + + log.Printf("[BatchGetPosts] Batch COUNT check: users=%d, has_posts=%d, no_posts=%d, duration=%v", + len(userIDs), len(usersToQuery), len(userIDs)-len(usersToQuery), countDuration) + } else { + // Not in hybrid mode, query all users + usersToQuery = userIDs + } + + // If no users have posts, return early + if len(usersToQuery) == 0 { + totalDuration := time.Since(startTime) + log.Printf("[BatchGetPosts] Completed: users=%d, duration=%v (all users have no posts)", + len(userIDs), totalDuration) + return result, nil + } + + // Limit concurrent goroutines to avoid resource exhaustion + maxWorkers := min(50, len(usersToQuery)) + + // Create worker pool using buffered channel + userIDChan := make(chan int64, len(usersToQuery)) + for _, userID := range usersToQuery { + userIDChan <- userID + } + close(userIDChan) + + // Use WaitGroup to wait for all workers to complete + var wg sync.WaitGroup + errChan := make(chan error, len(usersToQuery)) + + // Launch worker pool - now we know these users have posts, so skip COUNT check + for i := 0; i < maxWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for userID := range userIDChan { + queryStart := time.Now() + // Skip COUNT check since we already verified these users have posts + posts, err := r.GetPostByUserID(ctx, userID, limit, false) + queryDuration := time.Since(queryStart) + + if err != nil { + errChan <- fmt.Errorf("failed to get posts for user %d: %w", userID, err) + continue + } + + // Optimization: Only write to result map if posts exist or if we want to track empty results + // For hybrid mode, we may want to skip empty results to reduce map size + // But for consistency, we'll include all users (even with empty posts) + resultMutex.Lock() + result[userID] = posts + resultMutex.Unlock() + + // Log slow queries for analysis + if queryDuration > 50*time.Millisecond { + log.Printf("[BatchGetPosts] Slow query: user_id=%d, duration=%v, posts=%d", userID, queryDuration, len(posts)) + } + } + }() + } + + // Wait for all workers to complete + wg.Wait() + close(errChan) + + // Check for errors + for err := range errChan { + if err != nil { + return nil, err + } + } + + totalDuration := time.Since(startTime) + log.Printf("[BatchGetPosts] Completed: users=%d, duration=%v", + len(userIDs), totalDuration) + return result, nil } +// checkUserHasPosts quickly checks if a user has any posts using COUNT query +func (r *PostRepository) checkUserHasPosts(ctx context.Context, userID int64) (bool, error) { + result, err := r.client.Query(ctx, &dynamodb.QueryInput{ + TableName: aws.String(r.tableName), + IndexName: aws.String("user_id-index"), + KeyConditionExpression: aws.String("user_id = :uid"), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":uid": &types.AttributeValueMemberN{ + Value: fmt.Sprintf("%d", userID), + }, + }, + Select: types.SelectCount, // Only return count, not data + Limit: aws.Int32(1), // Only need to know if count > 0 + }) + + if err != nil { + return false, err + } + + return result.Count > 0, nil +} + // Retrieve recent posts for single user -func (r *PostRepository) GetPostByUserID(ctx context.Context, userID int64, limit int32) ([]*pb.Post, error) { +func (r *PostRepository) GetPostByUserID(ctx context.Context, userID int64, limit int32, checkCountFirst bool) ([]*pb.Post, error) { + // Optimization for hybrid mode: First check if user has posts using COUNT query + // This avoids fetching data for users with no posts + if checkCountFirst { + hasPosts, err := r.checkUserHasPosts(ctx, userID) + if err != nil { + return nil, err + } + + if !hasPosts { + // User has no posts, return empty slice immediately + return []*pb.Post{}, nil + } + } + + // User has posts (or checkCountFirst is false), fetch the actual data result, err := r.client.Query(ctx, &dynamodb.QueryInput{ TableName: aws.String(r.tableName), IndexName: aws.String("user_id-index"), // Use GSI for querying by user_id diff --git a/services/post-service/internal/service/post_service.go b/services/post-service/internal/service/post_service.go index a5de11a..5e6e005 100644 --- a/services/post-service/internal/service/post_service.go +++ b/services/post-service/internal/service/post_service.go @@ -16,13 +16,13 @@ const ( ) type PostService struct { - repo *repository.PostRepository + repo *repository.PostRepository fanoutService *FanoutService } func NewPostService(repo *repository.PostRepository, fanoutService *FanoutService) *PostService { return &PostService{ - repo: repo, + repo: repo, fanoutService: fanoutService, } } @@ -37,47 +37,47 @@ func (s *PostService) createPost(req *model.CreatePostRequest) *pb.Post { } } -func (s *PostService)PushStrategy(ctx context.Context, req *model.CreatePostRequest) (*pb.Post, error) { +func (s *PostService) PushStrategy(ctx context.Context, req *model.CreatePostRequest) (*pb.Post, error) { post := s.createPost(req) // Fanout go func() { - if err := s.fanoutService.ExecutePushFanout(context.Background(), post); err!= nil { + if err := s.fanoutService.ExecutePushFanout(context.Background(), post); err != nil { fmt.Printf("Fan-out error for post %d: %v\n", post.PostId, err) } }() return post, nil } -func (s *PostService)PullStrategy(ctx context.Context, req *model.CreatePostRequest) (*pb.Post, error) { +func (s *PostService) PullStrategy(ctx context.Context, req *model.CreatePostRequest) (*pb.Post, error) { post := s.createPost(req) // Save to DynamoDB - if err:= s.repo.CreatePost(ctx, post); err != nil { + if err := s.repo.CreatePost(ctx, post); err != nil { return nil, fmt.Errorf("failed to create post: %w", err) } return post, nil } -func (s *PostService)HybridStrategy(ctx context.Context, req *model.CreatePostRequest, hybridThreshold int) (*pb.Post, error) { +func (s *PostService) HybridStrategy(ctx context.Context, req *model.CreatePostRequest, hybridThreshold int) (*pb.Post, error) { post := s.createPost(req) // Get follower count - followers, err := s.fanoutService.socialGraphClient.GetFollowers(ctx, post.UserId, 1, 0) - if err != nil { - return post, fmt.Errorf("failed to get followers: %w", err) - } - - log.Printf("User %d has %d followers", post.UserId, followers.TotalCount) - - // Check threshold - if followers.TotalCount >= int32(hybridThreshold) { - log.Printf("User %d has >= %d followers, skipping push fan-out", post.UserId, hybridThreshold) - post, err = s.PullStrategy(ctx,req) - if err != nil { - return nil, fmt.Errorf("failed to create post: %w", err) - } - return post, nil + followers, err := s.fanoutService.socialGraphClient.GetFollowers(ctx, post.UserId, 1, 0) + if err != nil { + return post, fmt.Errorf("failed to get followers: %w", err) + } + + log.Printf("User %d has %d followers", post.UserId, followers.TotalCount) + + // Check threshold + if followers.TotalCount >= int32(hybridThreshold) { + log.Printf("User %d has >= %d followers, skipping push fan-out", post.UserId, hybridThreshold) + post, err = s.PullStrategy(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to create post: %w", err) + } + return post, nil } post, err = s.PushStrategy(ctx, req) @@ -88,12 +88,12 @@ func (s *PostService)HybridStrategy(ctx context.Context, req *model.CreatePostRe } // Get single post -func (s *PostService)GetPost(ctx context.Context, postID int64) (*pb.Post, error) { +func (s *PostService) GetPost(ctx context.Context, postID int64) (*pb.Post, error) { return s.repo.GetPost(ctx, postID) } // BatchGetPosts for Timeline Service -func (s *PostService) BatchGetPosts(ctx context.Context, req *pb.BatchGetPostsRequest)(map[int64]*pb.PostList, error) { +func (s *PostService) BatchGetPosts(ctx context.Context, req *pb.BatchGetPostsRequest) (map[int64]*pb.PostList, error) { if req.Limit == 0 { req.Limit = PostsLimit } @@ -109,5 +109,3 @@ func (s *PostService) BatchGetPosts(ctx context.Context, req *pb.BatchGetPostsRe } return result, nil } - - diff --git a/services/social-graph-services/scripts/core/segmenter.py b/services/social-graph-services/scripts/core/segmenter.py index e91c926..038f21e 100644 --- a/services/social-graph-services/scripts/core/segmenter.py +++ b/services/social-graph-services/scripts/core/segmenter.py @@ -101,10 +101,19 @@ def get_following_range(self, user_type: str) -> tuple: min_following = max(abs_min, int(self.total_users * min_ratio)) max_following = max(min_following + 1, int(self.total_users * max_ratio)) - + # Apply absolute maximum cap max_following = min(max_following, abs_max) - + + # Ensure we never return an inverted range (min > max) which would + # cause random.randint to raise ValueError. If absolute maximums are + # smaller than the computed minimum (configuration edge-case), + # clamp the minimum down to the cap so callers receive a valid + # inclusive range. This preserves a deterministic upper bound while + # avoiding runtime exceptions. + if max_following < min_following: + min_following = max_following + return (min_following, max_following) def get_segment_info(self) -> Dict: diff --git a/services/timeline-service/src/fanout/hybrid.go b/services/timeline-service/src/fanout/hybrid.go index 122837e..179f15c 100644 --- a/services/timeline-service/src/fanout/hybrid.go +++ b/services/timeline-service/src/fanout/hybrid.go @@ -3,6 +3,8 @@ package fanout import ( "container/heap" "fmt" + "log" + "time" "github.com/PCBZ/CS6650-Project/services/timeline-service/src/grpc" "github.com/PCBZ/CS6650-Project/services/timeline-service/src/models" @@ -39,21 +41,26 @@ func (s *HybridStrategy) GetTimeline(userID int64, limit int) (*models.TimelineR timeline *models.TimelineResponse err error source string + duration time.Duration } pushChan := make(chan result, 1) pullChan := make(chan result, 1) - // Execute push strategy concurrently + // Execute push strategy concurrently (fetch from database) go func() { + startTime := time.Now() timeline, err := s.pushStrategy.GetTimeline(userID, limit) - pushChan <- result{timeline: timeline, err: err, source: "push"} + duration := time.Since(startTime) + pushChan <- result{timeline: timeline, err: err, source: "push", duration: duration} }() - // Execute pull strategy concurrently + // Execute pull strategy concurrently (fetch from gRPC) go func() { + startTime := time.Now() timeline, err := s.pullStrategy.GetTimeline(userID, limit) - pullChan <- result{timeline: timeline, err: err, source: "pull"} + duration := time.Since(startTime) + pullChan <- result{timeline: timeline, err: err, source: "pull", duration: duration} }() // Wait for both results @@ -65,6 +72,24 @@ func (s *HybridStrategy) GetTimeline(userID int64, limit int) (*models.TimelineR } } + // Log timing information + log.Printf("[HYBRID_TIMING] user_id=%d, database_fetch_duration=%v, grpc_fetch_duration=%v, database_posts=%d, grpc_posts=%d", + userID, + pushResult.duration, + pullResult.duration, + func() int { + if pushResult.timeline != nil { + return len(pushResult.timeline.Timeline) + } + return 0 + }(), + func() int { + if pullResult.timeline != nil { + return len(pullResult.timeline.Timeline) + } + return 0 + }()) + // Merge results - combine posts from both strategies return s.mergeTimelines(pushResult.timeline, pullResult.timeline, pushResult.err, pullResult.err, limit) } diff --git a/services/timeline-service/src/grpc/user_service.go b/services/timeline-service/src/grpc/user_service.go index 9d08fb6..1eb8126 100644 --- a/services/timeline-service/src/grpc/user_service.go +++ b/services/timeline-service/src/grpc/user_service.go @@ -32,14 +32,77 @@ type UserServiceClient interface { // userServiceClient implements UserServiceClient with actual gRPC calls type userServiceClient struct { - client pb.UserServiceClient - conn *grpc.ClientConn + client pb.UserServiceClient + conn *grpc.ClientConn + endpoint string +} + +const ( + userServiceReconnectMaxAttempts = 20 // Increased from 5 to 20 to handle slow startup + userServiceReconnectBaseDelay = 1 * time.Second // Increased from 500ms to 1s + userServiceReconnectMaxDelay = 10 * time.Second // Maximum delay between retries +) + +// ensureConnection ensures the gRPC connection is established, retrying if needed +func (c *userServiceClient) ensureConnection(ctx context.Context) error { + if c.client != nil && c.conn != nil { + // Connection already established + return nil + } + + // Try to reconnect with retries and exponential backoff + var lastErr error + for attempt := 1; attempt <= userServiceReconnectMaxAttempts; attempt++ { + log.Printf("Attempting to reconnect to User Service at %s (attempt %d/%d)...", c.endpoint, attempt, userServiceReconnectMaxAttempts) + + connCtx, cancel := context.WithTimeout(ctx, 15*time.Second) // Increased timeout from 10s to 15s + conn, err := grpc.DialContext( + connCtx, + c.endpoint, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) + cancel() + + if err == nil { + // Close previous connection if exists + if c.conn != nil { + _ = c.conn.Close() + } + + c.conn = conn + c.client = pb.NewUserServiceClient(conn) + log.Printf("Successfully reconnected to User Service at %s", c.endpoint) + return nil + } + + lastErr = err + log.Printf("Failed to reconnect to User Service (attempt %d/%d): %v", attempt, userServiceReconnectMaxAttempts, err) + + // Calculate exponential backoff delay with cap + delay := userServiceReconnectBaseDelay * time.Duration(1< userServiceReconnectMaxDelay { + delay = userServiceReconnectMaxDelay + } + log.Printf("Waiting %v before next retry...", delay) + + // Respect context cancellation + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled while reconnecting to user service: %w", ctx.Err()) + case <-time.After(delay): + // Continue to next attempt + } + } + + return fmt.Errorf("failed to reconnect to user service after %d attempts: %w", userServiceReconnectMaxAttempts, lastErr) } // BatchGetUserInfo calls the real User Service via gRPC func (c *userServiceClient) BatchGetUserInfo(ctx context.Context, userIDs []int64) (*BatchGetUserInfoResponse, error) { - if c.client == nil { - return nil, fmt.Errorf("user service client not initialized - connection failed at startup") + // Ensure connection is established, retry if needed + if err := c.ensureConnection(ctx); err != nil { + return nil, fmt.Errorf("user service client not initialized - connection failed: %w", err) } // Create gRPC request @@ -92,18 +155,20 @@ func NewUserServiceClient(endpoint string) UserServiceClient { grpc.WithBlock(), // Block until connection is established ) if err != nil { - // Return a client that will fail on first use, but allow service to start + // Return a client that will retry on first use, but allow service to start log.Printf("Warning: Failed to connect to user service at %s: %v. Service will retry on first use.", endpoint, err) return &userServiceClient{ - client: nil, - conn: nil, + client: nil, + conn: nil, + endpoint: endpoint, } } log.Printf("User Service client created for %s", endpoint) return &userServiceClient{ - client: pb.NewUserServiceClient(conn), - conn: conn, + client: pb.NewUserServiceClient(conn), + conn: conn, + endpoint: endpoint, } } diff --git a/services/user-service/main.go b/services/user-service/main.go index 78147f9..8cb8482 100644 --- a/services/user-service/main.go +++ b/services/user-service/main.go @@ -123,7 +123,7 @@ func main() { } } -// initializeServiceDatabase creates the service database and user if they don't exist +// initializeServiceDatabase creates the service database if it doesn't exist func initializeServiceDatabase(host, port, masterUser, masterPassword, sslMode, dbName string) error { // Validate database name to prevent SQL injection (alphanumeric and underscores only) dbNamePattern := regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) @@ -166,46 +166,8 @@ func initializeServiceDatabase(host, port, masterUser, masterPassword, sslMode, log.Printf("Database %s already exists", dbName) } - // Create service user if it doesn't exist (optional - for future use) - serviceUser := fmt.Sprintf("%s_user", dbName) - // Validate service user name - if !dbNamePattern.MatchString(serviceUser) { - return fmt.Errorf("invalid service user name: must contain only alphanumeric characters and underscores") - } - - var userExists bool - checkUserQuery := "SELECT EXISTS(SELECT 1 FROM pg_roles WHERE rolname = $1)" - err = masterDB.QueryRow(checkUserQuery, serviceUser).Scan(&userExists) - if err != nil { - return fmt.Errorf("failed to check if user exists: %w", err) - } - - if !userExists { - // Use a more secure approach: create user with a placeholder password, then alter it - // This prevents password exposure in query logs - createUserQuery := fmt.Sprintf("CREATE USER %s", pq.QuoteIdentifier(serviceUser)) - _, err = masterDB.Exec(createUserQuery) - if err != nil { - return fmt.Errorf("failed to create user %s: %w", serviceUser, err) - } - - // Set password in a separate statement to minimize exposure - setPasswordQuery := fmt.Sprintf("ALTER USER %s WITH PASSWORD $1", pq.QuoteIdentifier(serviceUser)) - _, err = masterDB.Exec(setPasswordQuery, masterPassword) - if err != nil { - return fmt.Errorf("failed to set password for user %s: %w", serviceUser, err) - } - - // Grant privileges to the service user - grantQuery := fmt.Sprintf("GRANT ALL PRIVILEGES ON DATABASE %s TO %s", pq.QuoteIdentifier(dbName), pq.QuoteIdentifier(serviceUser)) - _, err = masterDB.Exec(grantQuery) - if err != nil { - return fmt.Errorf("failed to grant privileges to user %s: %w", serviceUser, err) - } - log.Printf("Created user: %s and granted privileges", serviceUser) - } else { - log.Printf("User %s already exists", serviceUser) - } + // Note: We skip user creation and use the postgres user directly + // This simplifies startup and avoids permission issues return nil } diff --git a/services/user-service/terraform/modules/ecs/main.tf b/services/user-service/terraform/modules/ecs/main.tf index 23d0f82..027a458 100644 --- a/services/user-service/terraform/modules/ecs/main.tf +++ b/services/user-service/terraform/modules/ecs/main.tf @@ -110,6 +110,11 @@ resource "aws_ecs_service" "app" { # CRITICAL: Ensure clean shutdown during destroy enable_execute_command = false wait_for_steady_state = false + + # Give user service time to initialize database connection and schema + # User service needs to: connect to RDS, check/create database, initialize schema + # This typically takes 10-30 seconds, so we set grace period to 60 seconds + health_check_grace_period_seconds = 60 network_configuration { subnets = var.subnet_ids diff --git a/terraform/main.tf b/terraform/main.tf index 3e3c948..d6e0127 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -218,6 +218,10 @@ module "timeline_service" { memory_target_value = var.timeline_service_memory_target_value enable_request_based_scaling = var.timeline_service_enable_request_based_scaling request_count_target_value = var.timeline_service_request_count_target_value + + # Ensure user-service is deployed and ready before timeline-service starts + # This helps with gRPC connection timing + depends_on = [module.user_service] } # Social Graph Service diff --git a/tests/push_fanout_results_5K.json b/tests/push_fanout_results_5K.json deleted file mode 100644 index 78002b1..0000000 --- a/tests/push_fanout_results_5K.json +++ /dev/null @@ -1,46 +0,0 @@ -{ - "scale": "5K", - "total_users": 5000, - "total_requests": 1564, - "write_count": 1023, - "read_count": 541, - "error_count": 7691, - "duration_seconds": 480.07200598716736, - "throughput": { - "total_rps": 3.2578446160049723, - "write_rps": 2.13093033387026, - "read_rps": 1.1269142821347122 - }, - "write_latency": { - "avg": 2377.3863585929835, - "p50": 2198.051929473877, - "p95": 4742.579221725464, - "p99": 6315.847873687744, - "max": 8410.406827926636 - }, - "read_latency": { - "avg": 312.35461887282054, - "p50": 44.82769966125488, - "p95": 126.83916091918945, - "p99": 8177.561044692993, - "max": 13163.871049880981 - }, - "regular_metrics": { - "write_count": 1023, - "read_count": 541, - "avg_write_latency": 2377.3863585929835, - "avg_read_latency": 312.35461887282054 - }, - "influencer_metrics": { - "write_count": 0, - "read_count": 0, - "avg_write_latency": 0, - "avg_read_latency": 0 - }, - "celebrity_metrics": { - "write_count": 0, - "read_count": 0, - "avg_write_latency": 0, - "avg_read_latency": 0 - } -} \ No newline at end of file diff --git a/tests/results_5k_push_300users.html b/tests/timeline_retrieval/Report_1_User_100_Followings_Pull_Mode.html similarity index 97% rename from tests/results_5k_push_300users.html rename to tests/timeline_retrieval/Report_1_User_100_Followings_Pull_Mode.html index d809bab..7bc5e33 100644 --- a/tests/results_5k_push_300users.html +++ b/tests/timeline_retrieval/Report_1_User_100_Followings_Pull_Mode.html @@ -1,7 +1,7 @@ - Test Report for locust_push_fanout_test.py + Test Report for locust_timeline_retrieve.py + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-21 05:38:05 - 2025-11-21 05:46:03

+

Target Host: http://cs6650-project-dev-alb-515676591.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline226065293258101830.50.0
Aggregated226065293258101830.50.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline4346495566804103300
Aggregated4346495566804103300
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/timeline_retrieval/Report_1_User_10_Followings_Pull_Mode.html b/tests/timeline_retrieval/Report_1_User_10_Followings_Pull_Mode.html new file mode 100644 index 0000000..ca0a128 --- /dev/null +++ b/tests/timeline_retrieval/Report_1_User_10_Followings_Pull_Mode.html @@ -0,0 +1,592 @@ + + + + Test Report for locust_timeline_retrieve.py + + + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-23 06:56:41 - 2025-11-23 07:04:38

+

Target Host: http://cs6650-project-dev-alb-846255936.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline2400734039178530.50.0
Aggregated2400734039178530.50.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline60636983120160260390
Aggregated60636983120160260390
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/timeline_retrieval/Report_1_User_10_Followings_Push_Mode.html b/tests/timeline_retrieval/Report_1_User_10_Followings_Push_Mode.html new file mode 100644 index 0000000..b335da3 --- /dev/null +++ b/tests/timeline_retrieval/Report_1_User_10_Followings_Push_Mode.html @@ -0,0 +1,592 @@ + + + + Test Report for locust_timeline_retrieve.py + + + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-22 05:47:36 - 2025-11-22 05:55:35

+

Target Host: http://cs6650-project-dev-alb-393183751.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline237011131676310.50.0
Aggregated237011131676310.50.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline8198120170220260440680
Aggregated8198120170220260440680
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/timeline_retrieval/Report_1_User_500_Followings_Pull_Mode.html b/tests/timeline_retrieval/Report_1_User_500_Followings_Pull_Mode.html new file mode 100644 index 0000000..9da8875 --- /dev/null +++ b/tests/timeline_retrieval/Report_1_User_500_Followings_Pull_Mode.html @@ -0,0 +1,592 @@ + + + + Test Report for locust_timeline_retrieve.py + + + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-23 06:37:12 - 2025-11-23 06:45:11

+

Target Host: http://cs6650-project-dev-alb-846255936.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline15201188793354479630.30.0
Aggregated15201188793354479630.30.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline11001100120014001700220028003500
Aggregated11001100120014001700220028003500
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/results_pull_300users.html b/tests/timeline_retrieval/Report_1_User_500_Followings_Push_Mode.html similarity index 97% rename from tests/results_pull_300users.html rename to tests/timeline_retrieval/Report_1_User_500_Followings_Push_Mode.html index 92cfa3a..e641b8b 100644 --- a/tests/results_pull_300users.html +++ b/tests/timeline_retrieval/Report_1_User_500_Followings_Push_Mode.html @@ -1,7 +1,7 @@ - Test Report for locust_push_fanout_test.py + Test Report for locust_timeline_retrieve.py + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-25 21:18:03 - 2025-11-25 21:23:02

+

Target Host: http://cs6650-project-dev-alb-1947481816.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline1380132571180310.50.0
Aggregated1380132571180310.50.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline1101301401501802208801200
Aggregated1101301401501802208801200
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/timeline_retrieval/Report_25K_1_User_100_Followings_Pull_Mode.html b/tests/timeline_retrieval/Report_25K_1_User_100_Followings_Pull_Mode.html new file mode 100644 index 0000000..75e75fc --- /dev/null +++ b/tests/timeline_retrieval/Report_25K_1_User_100_Followings_Pull_Mode.html @@ -0,0 +1,592 @@ + + + + Test Report for locust_timeline_retrieve.py + + + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-23 23:19:04 - 2025-11-23 23:24:03

+

Target Host: http://cs6650-project-dev-alb-111554498.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline13702077662180420.50.0
Aggregated13702077662180420.50.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline200220230250300340380620
Aggregated200220230250300340380620
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/timeline_retrieval/Report_25K_1_User_10_Followings_Hybrid_Mode.html b/tests/timeline_retrieval/Report_25K_1_User_10_Followings_Hybrid_Mode.html new file mode 100644 index 0000000..df9c450 --- /dev/null +++ b/tests/timeline_retrieval/Report_25K_1_User_10_Followings_Hybrid_Mode.html @@ -0,0 +1,592 @@ + + + + Test Report for locust_timeline_retrieve.py + + + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-25 20:59:28 - 2025-11-25 21:04:26

+

Target Host: http://cs6650-project-dev-alb-1947481816.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline14706040280310.50.0
Aggregated14706040280310.50.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline5456586477100220280
Aggregated5456586477100220280
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/timeline_retrieval/Report_25K_1_User_10_Followings_Pull_Mode.html b/tests/timeline_retrieval/Report_25K_1_User_10_Followings_Pull_Mode.html new file mode 100644 index 0000000..284949d --- /dev/null +++ b/tests/timeline_retrieval/Report_25K_1_User_10_Followings_Pull_Mode.html @@ -0,0 +1,592 @@ + + + + Test Report for locust_timeline_retrieve.py + + + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-23 23:10:25 - 2025-11-23 23:15:24

+

Target Host: http://cs6650-project-dev-alb-111554498.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline1420573849578680.50.0
Aggregated1420573849578680.50.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline505156627191140500
Aggregated505156627191140500
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/timeline_retrieval/Report_25K_1_User_1600_Followings_Hybrid_Mode.html b/tests/timeline_retrieval/Report_25K_1_User_1600_Followings_Hybrid_Mode.html new file mode 100644 index 0000000..eefa569 --- /dev/null +++ b/tests/timeline_retrieval/Report_25K_1_User_1600_Followings_Hybrid_Mode.html @@ -0,0 +1,592 @@ + + + + Test Report for locust_timeline_retrieve.py + + + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-25 21:29:42 - 2025-11-25 21:34:40

+

Target Host: http://cs6650-project-dev-alb-1947481816.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline690227519043872310.20.0
Aggregated690227519043872310.20.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline22002200230024002500300039003900
Aggregated22002200230024002500300039003900
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/timeline_retrieval/Report_25K_1_User_1600_Followings_Pull_Mode.html b/tests/timeline_retrieval/Report_25K_1_User_1600_Followings_Pull_Mode.html new file mode 100644 index 0000000..43ca432 --- /dev/null +++ b/tests/timeline_retrieval/Report_25K_1_User_1600_Followings_Pull_Mode.html @@ -0,0 +1,592 @@ + + + + Test Report for locust_timeline_retrieve.py + + + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-23 23:31:06 - 2025-11-23 23:35:59

+

Target Host: http://cs6650-project-dev-alb-111554498.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline57032292789405080390.20.0
Aggregated57032292789405080390.20.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline32003200330034003600390041004100
Aggregated32003200330034003600390041004100
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/results_5k_push.html b/tests/timeline_retrieval/Report_5K_1_User_100_Followings_Hybrid_Mode.html similarity index 97% rename from tests/results_5k_push.html rename to tests/timeline_retrieval/Report_5K_1_User_100_Followings_Hybrid_Mode.html index 9c561bb..5b36a03 100644 --- a/tests/results_5k_push.html +++ b/tests/timeline_retrieval/Report_5K_1_User_100_Followings_Hybrid_Mode.html @@ -1,7 +1,7 @@ - Test Report for locust_push_fanout_test.py + Test Report for locust_timeline_retrieve.py + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-25 06:31:15 - 2025-11-25 06:36:14

+

Target Host: http://cs6650-project-dev-alb-1744692205.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline138098471187102350.50.0
Aggregated138098471187102350.50.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline677686941601908701200
Aggregated677686941601908701200
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/timeline_retrieval/Report_5K_1_User_500_Followings_Hybrid_Mode.html b/tests/timeline_retrieval/Report_5K_1_User_500_Followings_Hybrid_Mode.html new file mode 100644 index 0000000..b06ada3 --- /dev/null +++ b/tests/timeline_retrieval/Report_5K_1_User_500_Followings_Hybrid_Mode.html @@ -0,0 +1,592 @@ + + + + Test Report for locust_timeline_retrieve.py + + + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-25 06:26:45 - 2025-11-25 06:27:15

+

Target Host: http://cs6650-project-dev-alb-1744692205.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline110863699155886660.40.0
Aggregated110863699155886660.40.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline840850860860950160016001600
Aggregated840850860860950160016001600
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/timeline_retrieval/Report_K_1_User_500_Followings_Hybrid_Mode.html b/tests/timeline_retrieval/Report_K_1_User_500_Followings_Hybrid_Mode.html new file mode 100644 index 0000000..bc89a77 --- /dev/null +++ b/tests/timeline_retrieval/Report_K_1_User_500_Followings_Hybrid_Mode.html @@ -0,0 +1,592 @@ + + + + Test Report for locust_timeline_retrieve.py + + + + +
+

Locust Test Report

+ +
+ +

During: 2025-11-24 06:11:05 - 2025-11-24 06:14:19

+

Target Host: http://cs6650-project-dev-alb-1497973835.us-west-2.elb.amazonaws.com

+

Script: locust_timeline_retrieve.py

+
+ +
+

Request Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName# Requests# FailsAverage (ms)Min (ms)Max (ms)Average size (bytes)RPSFailures/s
GET/api/timeline30044354044537880450.20.0
Aggregated30044354044537880450.20.0
+
+ +
+

Response Time Statistics

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodName50%ile (ms)60%ile (ms)70%ile (ms)80%ile (ms)90%ile (ms)95%ile (ms)99%ile (ms)100%ile (ms)
GET/api/timeline44004500450046004700480054005400
Aggregated44004500450046004700480054005400
+
+ + + + + + +
+

Charts

+
+ + +
+

Final ratio

+
+
+ +
+ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/timeline_retrieval/following_distribution.png b/tests/timeline_retrieval/following_distribution.png new file mode 100644 index 0000000..c562d54 Binary files /dev/null and b/tests/timeline_retrieval/following_distribution.png differ diff --git a/tests/timeline_retrieval/locust_background_traffic.py b/tests/timeline_retrieval/locust_background_traffic.py new file mode 100644 index 0000000..2649a25 --- /dev/null +++ b/tests/timeline_retrieval/locust_background_traffic.py @@ -0,0 +1,117 @@ + +#!/usr/bin/env python3 +""" +Simple background traffic for the social graph services. + +Behavior: +- Each simulated user picks a random user id (from DynamoDB if available, + else from a numeric range) and repeatedly performs POST /api/posts and + GET /api/timeline for that user. +- This test is intended as low-cost background traffic. It does not collect + or persist any custom metrics beyond what Locust itself reports. + +Run: + locust -f tests/timeline_retrieval/locust_background_traffic.py --host http://your-alb +""" + +import os +import random +import time +import logging +from typing import List + +from locust import HttpUser, task, between + +import boto3 + +logger = logging.getLogger("locust_background_traffic") +logging.basicConfig(level=logging.INFO) + + +def get_users_from_dynamodb(region: str = "us-west-2", followers_table: str = "social-graph-followers", sample_limit: int = 1000) -> List[int]: + """Try to scan the followers table and return a list of user_ids. + Falls back to an empty list on any failure. + """ + try: + ddb = boto3.resource("dynamodb", region_name=region) + table = ddb.Table(followers_table) + users = [] + resp = table.scan(Limit=sample_limit) + items = resp.get("Items", []) + users.extend([int(it["user_id"]) for it in items if "user_id" in it]) + return users + except Exception as e: + logger.debug(f"DynamoDB scan failed: {e}") + return [] + + +class BackgroundUser(HttpUser): + """Locust user that generates background post + timeline traffic.""" + + wait_time = between(0.5, 2) + + def on_start(self): + # Try to get candidate user ids from DynamoDB, else use numeric range. + self.user_pool = list(range(1, 5000 + 1)) + + # pick current user id for this simulated user + self.user_id = random.choice(self.user_pool) + logger.debug(f"BackgroundUser started with user_id={self.user_id}") + + @task(8) + def read_timeline(self): + try: + # GET /api/timeline/:user_id + # Increased timeout to 30s for pull mode (needs to query multiple services) + path = f"/api/timeline/{self.user_id}" + start_time = time.time() + with self.client.get(path, name="GET /api/timeline", timeout=30, catch_response=True) as r: + elapsed = (time.time() - start_time) * 1000 # Convert to ms + + # Log slow requests for debugging + if elapsed > 1000: + logger.warning(f"Slow timeline request: user_id={self.user_id}, time={elapsed:.2f}ms, status={r.status_code}") + + if r.status_code == 200: + r.success() + elif r.status_code >= 400: + error_msg = f"status {r.status_code}" + try: + error_data = r.json() + if "error" in error_data: + error_msg = f"status {r.status_code}: {error_data['error']}" + except: + pass + r.failure(error_msg) + logger.error(f"Timeline request failed: user_id={self.user_id}, {error_msg}") + except Exception as e: + # Log exceptions for debugging + logger.error(f"Exception during read_timeline for user {self.user_id}: {e}", exc_info=True) + + @task(2) + def create_post(self): + try: + payload = {"user_id": self.user_id, "content": f"bg post {int(time.time())} from {self.user_id}"} + start_time = time.time() + with self.client.post("/api/posts", json=payload, name="POST /api/posts", timeout=10, catch_response=True) as r: + elapsed = (time.time() - start_time) * 1000 # Convert to ms + + # Log slow requests for debugging + if elapsed > 500: + logger.warning(f"Slow post creation: user_id={self.user_id}, time={elapsed:.2f}ms, status={r.status_code}") + + if r.status_code == 200: + r.success() + elif r.status_code >= 400: + error_msg = f"status {r.status_code}" + try: + error_data = r.json() + if "error" in error_data: + error_msg = f"status {r.status_code}: {error_data['error']}" + except: + pass + r.failure(error_msg) + logger.error(f"Post creation failed: user_id={self.user_id}, {error_msg}") + except Exception as e: + logger.error(f"Exception during create_post for user {self.user_id}: {e}", exc_info=True) + diff --git a/tests/locust_push_fanout_test.py b/tests/timeline_retrieval/locust_fanout_test.py similarity index 100% rename from tests/locust_push_fanout_test.py rename to tests/timeline_retrieval/locust_fanout_test.py diff --git a/tests/timeline_retrieval/locust_timeline_retrieve.py b/tests/timeline_retrieval/locust_timeline_retrieve.py new file mode 100644 index 0000000..e203519 --- /dev/null +++ b/tests/timeline_retrieval/locust_timeline_retrieve.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +"""Locust task set that repeatedly fetches /api/timeline for a single user. + +This version expects the target user ID (and optional label) to be supplied +via command-line arguments passed to Locust after `--`, e.g. + + locust ... -- --target-user-id 12345 --target-user eq10 +""" + +import logging +from locust import HttpUser, task, between, events +import sys + +LOG = logging.getLogger("locust_one_user") + + +def _register_parser_args(parser): + parser.add_argument( + "--target-user-id", + type=int, + required=True, + help="User ID whose timeline will be fetched by each Locust user", + ) + + +if not getattr(sys.modules[__name__], "_timeline_parser_registered", False): + events.init_command_line_parser.add_listener(_register_parser_args) + sys.modules[__name__]._timeline_parser_registered = True + + +class TimelineUser(HttpUser): + """Locust user that repeatedly requests the timeline for a single user. + + on_start: selects target user, seeds posts for that user's followings + using `sfs.seed_for_target`, then the Locust task repeatedly calls the + timeline endpoint for that user. + """ + + wait_time = between(1, 3) + + def on_start(self): + opts = self.environment.parsed_options + self.target_uid = opts.target_user_id + LOG.info("Timeline user targeting user_id=%s", self.target_uid) + + @task + def get_timeline(self): + # Use path parameter - route is /api/timeline/:user_id + url = f"/api/timeline/{self.target_uid}" + with self.client.get(url, name="/api/timeline", timeout=30, catch_response=True) as r: + # Check for 200; mark failures for Locust reporting + if r.status_code != 200: + error_body = r.text + error_msg = error_body.strip() or f"HTTP {r.status_code}" + try: + error_data = r.json() + if isinstance(error_data, dict) and "error" in error_data: + error_msg = error_data["error"] + except Exception: + # response is not JSON; keep raw body + pass + final_msg = f"Timeline error (status {r.status_code}): {error_msg}" + r.failure(final_msg) + LOG.error(f"Timeline request failed for user {self.target_uid}: {final_msg}") + else: + r.success() diff --git a/tests/requirements.txt b/tests/timeline_retrieval/requirements.txt similarity index 57% rename from tests/requirements.txt rename to tests/timeline_retrieval/requirements.txt index d1eb646..1319eee 100644 --- a/tests/requirements.txt +++ b/tests/timeline_retrieval/requirements.txt @@ -15,3 +15,10 @@ jsonschema==4.20.0 # Logging colorlog==6.8.0 +# Data processing and visualization (needed for seed_followings_posts.py) +# numpy 2.1.0+ supports Python 3.13 +numpy>=2.1.0 + +# matplotlib compatible with numpy 2.x +matplotlib>=3.9.0 + diff --git a/tests/timeline_retrieval/run_locust_background.sh b/tests/timeline_retrieval/run_locust_background.sh new file mode 100644 index 0000000..a663c80 --- /dev/null +++ b/tests/timeline_retrieval/run_locust_background.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Simple runner for locust_push_fanout_test.py placed inside tests/timeline_retrieval +# Usage: tests/timeline_retrieval/run_locust_push_fanout.sh [mode] [--users N] [--spawn-rate N] [--run-time 5m] [--master-host HOST] [--host URL] +# Modes: ui (default), headless, master, worker + +REPO_ROOT="$(cd "$(dirname "$0")/../.." && pwd)" +LOCUST_FILE="tests/timeline_retrieval/locust_background_traffic.py" + +# Activate venv if present +if [ -f "$REPO_ROOT/.venv/bin/activate" ]; then + # shellcheck disable=SC1090 + source "$REPO_ROOT/.venv/bin/activate" +fi + +# Install requirements before running locust +REQUIREMENTS_FILE="$(dirname "$0")/requirements.txt" +if [ -f "$REQUIREMENTS_FILE" ]; then + echo "📦 Installing requirements from $REQUIREMENTS_FILE..." + pip install -q -r "$REQUIREMENTS_FILE" || { + echo "❌ Failed to install requirements" + exit 1 + } + echo "✅ Requirements installed" +fi + +get_alb_from_terraform() { + if [ -n "${HOST_URL:-}" ]; then + echo "$HOST_URL" + return 0 + fi + if [ -n "${ALB_URL:-}" ]; then + echo "$ALB_URL" + return 0 + fi + + TF_DIR="$REPO_ROOT/terraform" + if [ -d "$TF_DIR" ]; then + if command -v terraform >/dev/null 2>&1; then + set +e + OUT=$(cd "$TF_DIR" && terraform output -raw alb_dns_name 2>/dev/null) || OUT="" + set -e + if [ -n "$OUT" ]; then + echo "http://$OUT" + return 0 + fi + fi + fi +} + +ALB_URL_RESOLVED=$(get_alb_from_terraform) +echo "Using ALB URL: $ALB_URL_RESOLVED" + +exec locust -f locust_background_traffic.py \ + --headless \ + --users 100 \ + --spawn-rate 50 \ + --run-time 15m \ + --host "$ALB_URL_RESOLVED" diff --git a/tests/timeline_retrieval/run_locust_timeline_retrieve.sh b/tests/timeline_retrieval/run_locust_timeline_retrieve.sh new file mode 100644 index 0000000..9f4d50d --- /dev/null +++ b/tests/timeline_retrieval/run_locust_timeline_retrieve.sh @@ -0,0 +1,151 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Runner for locust_timeline_retrieve.py +# Uses preset default values - modify the variables below to change test parameters +# +# Output: +# Generates HTML report: .html + +REPO_ROOT="$(cd "$(dirname "$0")/../.." && pwd)" +LOCUST_FILE="tests/timeline_retrieval/locust_timeline_retrieve.py" + +# Activate venv if present +if [ -f "$REPO_ROOT/.venv/bin/activate" ]; then + # shellcheck disable=SC1090 + source "$REPO_ROOT/.venv/bin/activate" +fi + +# Install requirements before running locust +REQUIREMENTS_FILE="$(dirname "$0")/requirements.txt" +if [ -f "$REQUIREMENTS_FILE" ]; then + echo "📦 Installing requirements from $REQUIREMENTS_FILE..." + pip install -q -r "$REQUIREMENTS_FILE" || { + echo "❌ Failed to install requirements" + exit 1 + } + echo "✅ Requirements installed" +fi + +get_alb_from_terraform() { + if [ -n "${HOST_URL:-}" ]; then + echo "$HOST_URL" + return 0 + fi + if [ -n "${ALB_URL:-}" ]; then + echo "$ALB_URL" + return 0 + fi + + TF_DIR="$REPO_ROOT/terraform" + if [ -d "$TF_DIR" ]; then + if command -v terraform >/dev/null 2>&1; then + set +e + OUT=$(cd "$TF_DIR" && terraform output -raw alb_dns_name 2>/dev/null) || OUT="" + set -e + if [ -n "$OUT" ]; then + echo "http://$OUT" + return 0 + fi + fi + fi +} + +fetch_target_users() { + echo "🎯 Selecting target users via seed_followings_posts.py..." + local selection + if ! selection=$(cd "$REPO_ROOT" && PYTHONPATH="$REPO_ROOT:${PYTHONPATH:-}" python3 - <<'PY' +import sys +import contextlib +import io +from tests.timeline_retrieval import seed_followings_posts as sfs + +# Redirect select_target_users() print to stderr so it shows in terminal +original_stdout = sys.stdout +sys.stdout = sys.stderr +try: + max_user, eq10_user, medium_user, _ = sfs.select_target_users() +finally: + sys.stdout = original_stdout +# Print IDs to stdout for capture +print(f"{max_user} {eq10_user} {medium_user}") +PY +); then + echo "❌ Failed to select target users via Python script" + exit 1 + fi + + selection=$(echo "$selection" | tr -d '\r') + if [ -z "$selection" ]; then + echo "❌ Empty selection output from Python script" + exit 1 + fi + + IFS=' ' read -r MAX_USER_ID EQ10_USER_ID MEDIUM_USER_ID <<< "$selection" + + if [ -z "$MAX_USER_ID" ] || [ -z "$EQ10_USER_ID" ] || [ -z "$MEDIUM_USER_ID" ]; then + echo "❌ Failed to parse target user IDs from selection output: $selection" + exit 1 + fi + + export TARGET_USER_MAX="$MAX_USER_ID" + export TARGET_USER_EQ10="$EQ10_USER_ID" + export TARGET_USER_MEDIUM="$MEDIUM_USER_ID" + + echo "Selected target users -> max: $MAX_USER_ID, eq10: $EQ10_USER_ID, medium: $MEDIUM_USER_ID" +} + +# Preset default values - modify these to change test parameters +TARGET_USER="max" +USERS=1 +SPAWN_RATE=1 +RUN_TIME="5m" +REPORT_NAME="Report_25K_1_User_1600_Followings_Hybrid_Mode" +REPORT_HTML="${REPORT_NAME}.html" + +ALB_URL_RESOLVED=$(get_alb_from_terraform) +echo "Using ALB URL: $ALB_URL_RESOLVED" +echo "Target user type: $TARGET_USER (eq10, medium, or max)" +echo "Users: $USERS, Spawn rate: $SPAWN_RATE, Run time: $RUN_TIME" +echo "Report file: $REPORT_HTML" + +# Fetch target users before running Locust +fetch_target_users + +# Determine which user ID to use based on TARGET_USER +case "$TARGET_USER" in + max) + SELECTED_USER_ID="$TARGET_USER_MAX" + ;; + medium) + SELECTED_USER_ID="$TARGET_USER_MEDIUM" + ;; + *) + TARGET_USER="eq10" + SELECTED_USER_ID="$TARGET_USER_EQ10" + ;; +esac + +echo "Using $TARGET_USER user ID: $SELECTED_USER_ID" + +if [ -z "$SELECTED_USER_ID" ]; then + echo "❌ Failed to resolve user ID for target '$TARGET_USER'" + exit 1 +fi + +# Set PYTHONPATH to include project root for imports +export PYTHONPATH="$REPO_ROOT:${PYTHONPATH:-}" + +# Change to the script directory +cd "$(dirname "$0")" + +# Build locust command +exec locust -f locust_timeline_retrieve.py \ + --headless \ + --users "$USERS" \ + --spawn-rate "$SPAWN_RATE" \ + --run-time "$RUN_TIME" \ + --host "$ALB_URL_RESOLVED" \ + --html "$REPORT_HTML" \ + --target-user-id "$SELECTED_USER_ID" + diff --git a/tests/timeline_retrieval/seed_followings_posts.py b/tests/timeline_retrieval/seed_followings_posts.py new file mode 100644 index 0000000..1f9e5cc --- /dev/null +++ b/tests/timeline_retrieval/seed_followings_posts.py @@ -0,0 +1,441 @@ +#!/usr/bin/env python3 +""" +Seed posts script + +This script performs the pre-test operation: +1. Scans the `following` DynamoDB table +2. Selects three users: + - user with the max number of followings + - a user with exactly 10 followings (or nearest to 10) + - a medium-following user (100-500) or the median +3. For each selected user, fetches their `following` list and has each following user + create 10 posts by calling POST {alb_url}/api/posts + +Usage: + python3 seed_followings_posts.py \ + --region us-west-2 \ + --following-table social-graph-following \ + [--limit-followings 1000] [--workers 20] [--force] + +Notes: +- The script will try (in order) to obtain the ALB URL from: + 1) ALB_URL or BASE_URL environment variables + 2) Terraform output `alb_dns_name` (searches for a `terraform` dir up the tree) +- This can create many posts. Use --limit-followings to cap how many followings to process per target. +- Requires AWS credentials available to boto3 (env or IAM role). +""" + +import logging +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import List, Tuple, Optional + +import boto3 +import requests +import os +import subprocess +from typing import Optional +import matplotlib.pyplot as plt +import numpy as np + + +def plot_following_distribution(items: List[dict], out_path: str = "following_distribution.png") -> Optional[str]: + """Create and save a histogram of following counts. Returns path on success or None.""" + counts = [] + for it in items: + following = it.get("following_ids", []) or [] + if isinstance(following, dict): + # unexpected structure + continue + counts.append(len(following)) + + if not counts: + logger.info("No following counts to plot") + return None + + plt.figure(figsize=(10, 6)) + # Use log scale for y to show long tail + plt.hist(counts, bins=50, log=True, color="#2c7fb8", edgecolor="black") + plt.xlabel("Number of followings") + plt.ylabel("Number of users (log scale)") + plt.title("Distribution of following counts") + plt.grid(axis='y', alpha=0.6) + plt.tight_layout() + try: + plt.savefig(out_path) + logger.info(f"Saved following distribution plot to: {out_path}") + plt.close() + return out_path + except Exception as e: + logger.warning(f"Failed to save plot: {e}") + return None + + +def get_alb_url_from_terraform() -> Optional[str]: + """Search up the directory tree for a `terraform` directory and run + `terraform output -raw alb_dns_name`. Returns full http:// URL or None. + """ + cur = os.path.dirname(os.path.abspath(__file__)) + for _ in range(6): + terraform_dir = os.path.join(cur, 'terraform') + if os.path.isdir(terraform_dir): + try: + result = subprocess.run( + ['terraform', 'output', '-raw', 'alb_dns_name'], + cwd=terraform_dir, + capture_output=True, + text=True, + timeout=10 + ) + if result.returncode == 0 and result.stdout.strip(): + alb_dns = result.stdout.strip() + return f"http://{alb_dns}" + except Exception: + return None + parent = os.path.dirname(cur) + if parent == cur: + break + cur = parent + return None + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("seed_followings_posts") + +# Configuration - hardcoded in source as requested +# (LIMIT_FOLLOWINGS, WORKERS and FORCE were removed per request) + + +def scan_table(table, limit: Optional[int] = None) -> List[dict]: + """Scan DynamoDB table with optional limit and progress logging.""" + items = [] + page_count = 0 + print("📊 Scanning DynamoDB table...", file=sys.stderr, flush=True) + + # Use page size of 1000 for efficient scanning + page_size = 1000 + resp = table.scan(Limit=min(page_size, limit) if limit else page_size) + items.extend(resp.get("Items", [])) + page_count += 1 + print(f" Page {page_count}: {len(items)} items so far...", file=sys.stderr, flush=True) + + while "LastEvaluatedKey" in resp: + if limit and len(items) >= limit: + items = items[:limit] # Trim to exact limit + break + remaining = limit - len(items) if limit else page_size + resp = table.scan( + ExclusiveStartKey=resp["LastEvaluatedKey"], + Limit=min(page_size, remaining) if limit else page_size + ) + items.extend(resp.get("Items", [])) + page_count += 1 + print(f" Page {page_count}: {len(items)} items so far...", file=sys.stderr, flush=True) + + print(f"✅ Scan complete: {len(items)} items total", file=sys.stderr, flush=True) + return items + + +def select_target_users() -> Tuple[int, int, int, List[int]]: + """Scan the following table and select three target users. + + This function now performs its own DynamoDB scan (no arguments required) + and returns (max_user, user_eq_10, user_medium, items). + """ + # Ensure ALB / base url is present (keeps behavior consistent with main) + base_url = get_alb_url_from_terraform() + if not base_url: + raise RuntimeError("ALB URL could not be found from Terraform output or ALB_URL env var") + + dynamodb = boto3.resource("dynamodb", region_name="us-west-2") + table = dynamodb.Table("social-graph-following") + + # Limit scan to 10000 items for faster execution (enough to find target users) + items = scan_table(table, limit=10000) + + user_following_counts = [] # (user_id, following_count) + for it in items: + uid_raw = it.get("user_id") + try: + uid = int(uid_raw) + except Exception: + # try stringified digits + try: + uid = int(str(uid_raw)) + except Exception: + continue + following = it.get("following_ids", []) or [] + # ensure list-like + if isinstance(following, dict): + following = [] + user_following_counts.append((uid, len(following))) + + if not user_following_counts: + raise RuntimeError("No items found in following table") + + user_following_counts.sort(key=lambda x: x[1], reverse=True) + + max_user, max_count = user_following_counts[0] + + # find eq10 + user_eq_10 = None + for uid, cnt in reversed(user_following_counts): + if cnt == 10: + user_eq_10 = uid + user_eq_10_count = cnt + break + if user_eq_10 is None: + user_eq_10 = min(user_following_counts, key=lambda x: abs(x[1] - 10))[0] + user_eq_10_count = next(cnt for uid, cnt in user_following_counts if uid == user_eq_10) + + # find medium 100-500 + user_medium = None + for uid, cnt in user_following_counts: + if 100 <= cnt <= 500: + user_medium = uid + user_medium_count = cnt + break + if user_medium is None: + mid_idx = len(user_following_counts) // 2 + user_medium = user_following_counts[mid_idx][0] + user_medium_count = user_following_counts[mid_idx][1] + + # Trim eq10 user if following count > 10 + if user_eq_10_count > 10: + changed, new_count = trim_following_to_limit(table, user_eq_10, 10) + if changed: + logger.info(f"Trimmed eq10 user {user_eq_10} from {user_eq_10_count} to {new_count} followings") + user_eq_10_count = new_count + + # Trim medium user if following count > 100 + if user_medium_count > 100: + changed, new_count = trim_following_to_limit(table, user_medium, 100) + if changed: + logger.info(f"Trimmed medium user {user_medium} from {user_medium_count} to {new_count} followings") + user_medium_count = new_count + + print(f"Selected users: max_user={max_user} ({max_count} followings), user_eq_10={user_eq_10} ({user_eq_10_count} followings), user_medium={user_medium} ({user_medium_count} followings)") + return max_user, user_eq_10, user_medium, items + + +def fetch_following_ids(table, user_id: int) -> List[int]: + # DynamoDB key may be stored as string; try both + for key in (str(user_id), user_id): + try: + resp = table.get_item(Key={"user_id": key}) + item = resp.get("Item") + if not item: + continue + following = item.get("following_ids", []) or [] + res = [] + for fid in following: + try: + res.append(int(fid)) + except Exception: + # skip unparsable + continue + return res + except Exception: + continue + return [] + + +def trim_following_to_limit(table, user_id: int, limit: int = 10) -> Tuple[bool, int]: + """If the user's following list has more than `limit` entries, truncate it to `limit` and write back to DynamoDB. + + Returns (changed, new_length) + """ + # Try to fetch item with string key then numeric key + for key in (str(user_id), user_id): + try: + resp = table.get_item(Key={"user_id": key}) + item = resp.get("Item") + if not item: + continue + + # Determine attribute name used for followings + if 'following' in item and isinstance(item['following'], list): + attr = 'following' + elif 'following_ids' in item and isinstance(item['following_ids'], list): + attr = 'following_ids' + else: + # nothing to trim + return False, 0 + + current = item.get(attr, []) or [] + if len(current) <= limit: + return False, len(current) + + new_list = current[:limit] + # Write back using UpdateExpression + table.update_item( + Key={"user_id": key}, + UpdateExpression=f"SET {attr} = :vals", + ExpressionAttributeValues={':vals': new_list} + ) + logger.info(f"Trimmed user {user_id} {attr} from {len(current)} -> {len(new_list)}") + return True, len(new_list) + except Exception as e: + logger.debug(f"trim_following_to_limit: get/update attempt for key={key} failed: {e}") + continue + + return False, 0 + + +def prepare_three_targets(region: str = "us-west-2", following_table_name: str = "social-graph-following") -> Tuple[int, int, int]: + """Scan following table, plot distribution, select three target users and trim them. + + Returns (table, base_url, max_user, user_eq_10, user_medium) + """ + base_url = get_alb_url_from_terraform() + if not base_url: + raise RuntimeError("ALB URL could not be found from Terraform output or ALB_URL env var") + + dynamodb = boto3.resource("dynamodb", region_name=region) + table = dynamodb.Table(following_table_name) + + logger.info("Scanning following table (this may take a while)...") + # select_target_users will perform its own scan and return items + max_user, user_eq_10, user_medium, items = select_target_users(region, following_table_name) + logger.info(f"Selected users: max={max_user}, eq10={user_eq_10}, medium={user_medium}") + + # Trim eq10 to 10 and medium to 100 + changed, new_len = trim_following_to_limit(table, user_eq_10, limit=10) + if changed: + logger.info(f"Trimmed user_eq_10 ({user_eq_10}) followings down to {new_len}") + time.sleep(0.5) + + changed_mid, new_len_mid = trim_following_to_limit(table, user_medium, limit=100) + if changed_mid: + logger.info(f"Trimmed user_medium ({user_medium}) followings down to {new_len_mid}") + time.sleep(0.5) + + return max_user, user_eq_10, user_medium + + +def post_for_user(session: requests.Session, base_url: str, user_id: int, posts_per_user: int = 10) -> int: + """Create posts_per_user posts for user_id. Returns number of successful posts.""" + success = 0 + for i in range(posts_per_user): + payload = {"user_id": user_id, "content": f"Auto-seed post {i+1} from user {user_id}"} + try: + r = session.post(f"{base_url.rstrip('/')}/api/posts", json=payload, timeout=10) + if r.status_code == 200: + success += 1 + else: + logger.debug(f"Post by {user_id} returned {r.status_code}") + except Exception as e: + logger.debug(f"HTTP error posting for {user_id}: {e}") + return success + + +def seed_for_target(table, base_url: str, target_uid: int, workers: int) -> Tuple[int, int, int]: + """For a given target user, fetch followings and have each following create posts. + Returns (target_uid, total_followings_processed, total_successful_posts) + """ + following_ids = fetch_following_ids(table, target_uid) + total_followings = len(following_ids) + + logger.info(f"Target {target_uid}: processing {len(following_ids)} followings (total in table: {total_followings})") + + session = requests.Session() + total_success = 0 + + # Use threadpool to parallelize per-following posting jobs + with ThreadPoolExecutor(max_workers=workers) as ex: + futures = {ex.submit(post_for_user, session, base_url, fid, 10): fid for fid in following_ids} + for fut in as_completed(futures): + fid = futures[fut] + try: + ok = fut.result() + total_success += ok + except Exception as e: + logger.debug(f"Error seeding for following {fid}: {e}") + + return target_uid, len(following_ids), total_success + + +def main(): + # Configuration comes from module-level constants + # Always read ALB URL from Terraform output + base_url = get_alb_url_from_terraform() + if not base_url: + logger.error("ALB URL could not be found from Terraform output (alb_dns_name). Ensure Terraform output exists or run terraform in a parent directory.") + return 2 + + region = "us-west-2" + following_table_name = "social-graph-following" + + logger.info(f"Using ALB URL: {base_url}") + logger.info(f"Region: {region}") + logger.info(f"Following table: {following_table_name}") + + dynamodb = boto3.resource("dynamodb", region_name=region) + table = dynamodb.Table(following_table_name) + + logger.info("Scanning following table (this may take a while)...") + # select_target_users will perform its own scan and return items + max_user, user_eq_10, user_medium, items = select_target_users() + logger.info(f"Selected users: max={max_user}, eq10={user_eq_10}, medium={user_medium}") + + # Draw and save following distribution plot (items returned from select_target_users) + try: + plot_path = plot_following_distribution(items) + if plot_path: + logger.info(f"Distribution plot created: {plot_path}") + except Exception as e: + logger.debug(f"Plotting failed: {e}") + + # Safety check + # Ensure user_eq_10 has at most 10 followings in the DB; if not, trim it. + try: + changed, new_len = trim_following_to_limit(table, user_eq_10, limit=10) + if changed: + logger.info(f"Trimmed user_eq_10 ({user_eq_10}) followings down to {new_len}") + # allow small pause for DynamoDB eventual consistency + time.sleep(0.5) + except Exception as e: + logger.warning(f"Failed to trim followings for {user_eq_10}: {e}") + + # Ensure user_medium has at most 100 followings; trim if necessary + try: + changed_mid, new_len_mid = trim_following_to_limit(table, user_medium, limit=100) + if changed_mid: + logger.info(f"Trimmed user_medium ({user_medium}) followings down to {new_len_mid}") + time.sleep(0.5) + except Exception as e: + logger.warning(f"Failed to trim followings for medium user {user_medium}: {e}") + + total_followings = 0 + for uid in (max_user, user_eq_10, user_medium): + fids = fetch_following_ids(table, uid) + total_followings += len(fids) + + estimated_posts = total_followings * 10 + logger.info(f"Estimated total posts to create (all followings x10): {estimated_posts}") + # Default behavior: do not proceed automatically for very large jobs + if estimated_posts > 100000: + logger.warning("This job would create more than 10k posts. Edit the script to change the limit or scope if you really want to proceed.") + return 3 + + # Seed for each target + results = [] + start = time.time() + for uid in (max_user, user_eq_10, user_medium): + # Inline defaults: no cap on followings, 20 workers + res = seed_for_target(table, base_url, uid, 20) + results.append(res) + + duration = time.time() - start + + logger.info("Seeding summary:") + for target_uid, processed, successes in results: + logger.info(f"Target {target_uid}: processed followings={processed}, successful_posts={successes}") + + logger.info(f"Total time: {duration:.1f}s") + return 0 + + +if __name__ == "__main__": + sys.exit(main())