Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 40 additions & 18 deletions scripts/generate_all_test_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ USER_SERVICE_SCRIPT="$PROJECT_ROOT/services/user-service/scripts/generate_test_d
SOCIAL_GRAPH_SCRIPT="$PROJECT_ROOT/services/social-graph-services/scripts/generate_and_load.sh"

# Default values
NUM_USERS=5000
NUM_USERS=25000
BASE_URL="" # Will be read from Terraform output
CONCURRENCY=50
AWS_REGION="us-west-2"
Expand Down Expand Up @@ -174,20 +174,36 @@ if ! command -v python3 &> /dev/null; then
exit 1
fi

# Check dependencies for User Service script
echo ""
echo -e "${BLUE}Checking Python dependencies...${NC}"
python3 -c "import aiohttp" 2>/dev/null
if [ $? -ne 0 ]; then
echo -e "${YELLOW}⚠️ aiohttp not installed, installing...${NC}"
pip3 install aiohttp
fi

python3 -c "import boto3" 2>/dev/null
if [ $? -ne 0 ]; then
echo -e "${YELLOW}⚠️ boto3 not installed, installing...${NC}"
pip3 install boto3
fi
# Helper: create and activate venv inside a service scripts directory
create_and_activate_service_venv() {
local service_scripts_dir="$1"
local venv_dir="$service_scripts_dir/.venv"
local req_file="$service_scripts_dir/requirements.txt"

echo "\nSetting up virtualenv in $service_scripts_dir"
if [ ! -d "$venv_dir" ]; then
python3 -m venv "$venv_dir"
fi

# shellcheck source=/dev/null
. "$venv_dir/bin/activate"
python -m pip install --upgrade pip setuptools wheel

if [ -f "$req_file" ]; then
echo "Installing packages from $req_file"
python -m pip install -r "$req_file"
else
echo "No requirements.txt in $service_scripts_dir, installing minimal packages"
python -m pip install aiohttp boto3 requests
fi
}

deactivate_venv() {
if [ -n "${VIRTUAL_ENV-}" ]; then
deactivate || true
fi
}

START_TIME=$(date +%s)

Expand All @@ -197,12 +213,15 @@ echo -e "${BLUE}Step 1: Creating Users via User Service API${NC}"
echo -e "${BLUE}================================================================${NC}"
echo ""

# Run User Service script
cd "$PROJECT_ROOT/services/user-service/scripts"
# Run User Service script (ensure per-service venv and deps)
USER_SCRIPTS_DIR="$PROJECT_ROOT/services/user-service/scripts"
create_and_activate_service_venv "$USER_SCRIPTS_DIR"
cd "$USER_SCRIPTS_DIR"
python3 generate_test_data.py \
"$NUM_USERS" \
--url "$BASE_URL" \
--concurrency "$CONCURRENCY"
deactivate_venv

USER_EXIT_CODE=$?

Expand All @@ -227,13 +246,16 @@ echo -e "${BLUE}Step 2: Creating Relationships via Social Graph DynamoDB${NC}"
echo -e "${BLUE}================================================================${NC}"
echo ""

# Run Social Graph script
cd "$PROJECT_ROOT/services/social-graph-services/scripts"
# Run Social Graph script (ensure per-service venv and deps)
SOCIAL_SCRIPTS_DIR="$PROJECT_ROOT/services/social-graph-services/scripts"
create_and_activate_service_venv "$SOCIAL_SCRIPTS_DIR"
cd "$SOCIAL_SCRIPTS_DIR"
bash generate_and_load.sh \
--users "$NUM_USERS" \
--region "$AWS_REGION" \
--followers-table "$FOLLOWERS_TABLE" \
--following-table "$FOLLOWING_TABLE"
deactivate_venv

SOCIAL_EXIT_CODE=$?

Expand Down
39 changes: 30 additions & 9 deletions services/post-service/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"post-service/internal/repository"
"post-service/internal/service"
"sync"
"time"

pb "github.com/cs6650/proto/post"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/sns"
Expand All @@ -39,13 +41,32 @@ func corsMiddleware() gin.HandlerFunc {
}
}


func main() {
// Load configuration
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
log.Fatal("Failed to load AWS config: %w", err)
}
// Load configuration with optimized HTTP client and retry settings
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithHTTPClient(&http.Client{
Transport: &http.Transport{
MaxIdleConns: 1000, // Total connection pool ✅
MaxIdleConnsPerHost: 200, // Per host connection number ✅
MaxConnsPerHost: 300, // Maximum connections per host ✅
IdleConnTimeout: 30 * time.Second, // Reduced to 30s, avoid connection buildup
DisableKeepAlives: false, // Keep connection reuse ✅
TLSHandshakeTimeout: 5 * time.Second, // Reduced to 5s, faster failure
ExpectContinueTimeout: 1 * time.Second, // Added this, reduce HTTP/1.1 delay
ResponseHeaderTimeout: 10 * time.Second, // Added response header timeout
DisableCompression: false, // Keep compression, reduce network transfer
ForceAttemptHTTP2: true, // Force HTTP/2, more efficient
WriteBufferSize: 32 * 1024, // Increase write buffer
ReadBufferSize: 32 * 1024, // Increase read buffer
},
Timeout: 3 * time.Second, // Reduced to 3s, sufficient for 500 user queries
}),
config.WithRetryMaxAttempts(2), // Add retry configuration
config.WithRetryMode(aws.RetryModeAdaptive), // Adaptive retry
)
if err != nil {
log.Fatal("Failed to load AWS config: %w", err)
}

// Initialize AWS client
dynamoClient := dynamodb.NewFromConfig(cfg)
Expand Down Expand Up @@ -104,7 +125,7 @@ func main() {

grpcServer := grpc.NewServer()
pb.RegisterPostServiceServer(grpcServer, grpcHandler)

// Enable gRPC reflection for tools like grpcurl
reflection.Register(grpcServer)

Expand All @@ -125,12 +146,12 @@ func main() {

// Wait for both servers
wg.Wait()

}

func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
}
202 changes: 196 additions & 6 deletions services/post-service/internal/repository/post_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package repository
import (
"context"
"fmt"
"log"
"os"
"strconv"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
Expand Down Expand Up @@ -80,22 +84,208 @@ func (r *PostRepository) GetPost(ctx context.Context, postID int64) (*pb.Post, e
return &post, err
}

// Retrieve recent posts for multiple users
func (r *PostRepository) GetPostByUserIDs(ctx context.Context, userIDs []int64, limit int32) (map[int64][]*pb.Post, error) {
result := make(map[int64][]*pb.Post)
// batchCheckUsersHasPosts performs parallel COUNT queries to check which users have posts
func (r *PostRepository) batchCheckUsersHasPosts(ctx context.Context, userIDs []int64) (map[int64]bool, error) {
if len(userIDs) == 0 {
return make(map[int64]bool), nil
}

hasPostsMap := make(map[int64]bool, len(userIDs))
hasPostsMutex := &sync.Mutex{}
maxWorkers := min(50, len(userIDs))

// Create worker pool for COUNT queries
userIDChan := make(chan int64, len(userIDs))
for _, userID := range userIDs {
posts, err := r.GetPostByUserID(ctx, userID, limit)
userIDChan <- userID
}
close(userIDChan)

var wg sync.WaitGroup
errChan := make(chan error, len(userIDs))

// Launch worker pool for parallel COUNT queries
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for userID := range userIDChan {
hasPosts, err := r.checkUserHasPosts(ctx, userID)
if err != nil {
errChan <- fmt.Errorf("failed to check posts for user %d: %w", userID, err)
continue
}

hasPostsMutex.Lock()
hasPostsMap[userID] = hasPosts
hasPostsMutex.Unlock()
}
}()
}

wg.Wait()
close(errChan)

// Check for errors
for err := range errChan {
if err != nil {
return nil, err
}
result[userID] = posts
}

return hasPostsMap, nil
}

// Retrieve recent posts for multiple users (parallel execution with worker pool for better performance)
func (r *PostRepository) GetPostByUserIDs(ctx context.Context, userIDs []int64, limit int32) (map[int64][]*pb.Post, error) {
// Check if we're in hybrid mode (read from environment variable)
postStrategy := os.Getenv("POST_STRATEGY")
checkCountFirst := postStrategy == "hybrid"
startTime := time.Now()
// Pre-allocate result map with expected capacity to reduce reallocation
result := make(map[int64][]*pb.Post, len(userIDs))
resultMutex := &sync.Mutex{}

// If in hybrid mode, first batch check which users have posts
var usersToQuery []int64
if checkCountFirst {
countStart := time.Now()
hasPostsMap, err := r.batchCheckUsersHasPosts(ctx, userIDs)
if err != nil {
return nil, fmt.Errorf("failed to batch check users has posts: %w", err)
}
countDuration := time.Since(countStart)

// Filter users that have posts
usersToQuery = make([]int64, 0, len(userIDs))
for _, userID := range userIDs {
if hasPostsMap[userID] {
usersToQuery = append(usersToQuery, userID)
} else {
// User has no posts, set empty result immediately
result[userID] = []*pb.Post{}
}
}

log.Printf("[BatchGetPosts] Batch COUNT check: users=%d, has_posts=%d, no_posts=%d, duration=%v",
len(userIDs), len(usersToQuery), len(userIDs)-len(usersToQuery), countDuration)
} else {
// Not in hybrid mode, query all users
usersToQuery = userIDs
}

// If no users have posts, return early
if len(usersToQuery) == 0 {
totalDuration := time.Since(startTime)
log.Printf("[BatchGetPosts] Completed: users=%d, duration=%v (all users have no posts)",
len(userIDs), totalDuration)
return result, nil
}

// Limit concurrent goroutines to avoid resource exhaustion
maxWorkers := min(50, len(usersToQuery))

// Create worker pool using buffered channel
userIDChan := make(chan int64, len(usersToQuery))
for _, userID := range usersToQuery {
userIDChan <- userID
}
close(userIDChan)

// Use WaitGroup to wait for all workers to complete
var wg sync.WaitGroup
errChan := make(chan error, len(usersToQuery))

// Launch worker pool - now we know these users have posts, so skip COUNT check
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for userID := range userIDChan {
queryStart := time.Now()
// Skip COUNT check since we already verified these users have posts
posts, err := r.GetPostByUserID(ctx, userID, limit, false)
queryDuration := time.Since(queryStart)

if err != nil {
errChan <- fmt.Errorf("failed to get posts for user %d: %w", userID, err)
continue
}

// Optimization: Only write to result map if posts exist or if we want to track empty results
// For hybrid mode, we may want to skip empty results to reduce map size
// But for consistency, we'll include all users (even with empty posts)
resultMutex.Lock()
result[userID] = posts
resultMutex.Unlock()

// Log slow queries for analysis
if queryDuration > 50*time.Millisecond {
log.Printf("[BatchGetPosts] Slow query: user_id=%d, duration=%v, posts=%d", userID, queryDuration, len(posts))
}
}
}()
}

// Wait for all workers to complete
wg.Wait()
close(errChan)

// Check for errors
for err := range errChan {
if err != nil {
return nil, err
}
}

totalDuration := time.Since(startTime)
log.Printf("[BatchGetPosts] Completed: users=%d, duration=%v",
len(userIDs), totalDuration)

return result, nil
}

// checkUserHasPosts quickly checks if a user has any posts using COUNT query
func (r *PostRepository) checkUserHasPosts(ctx context.Context, userID int64) (bool, error) {
result, err := r.client.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(r.tableName),
IndexName: aws.String("user_id-index"),
KeyConditionExpression: aws.String("user_id = :uid"),
ExpressionAttributeValues: map[string]types.AttributeValue{
":uid": &types.AttributeValueMemberN{
Value: fmt.Sprintf("%d", userID),
},
},
Select: types.SelectCount, // Only return count, not data
Limit: aws.Int32(1), // Only need to know if count > 0
})

if err != nil {
return false, err
}

return result.Count > 0, nil
}

// Retrieve recent posts for single user
func (r *PostRepository) GetPostByUserID(ctx context.Context, userID int64, limit int32) ([]*pb.Post, error) {
func (r *PostRepository) GetPostByUserID(ctx context.Context, userID int64, limit int32, checkCountFirst bool) ([]*pb.Post, error) {
// Optimization for hybrid mode: First check if user has posts using COUNT query
// This avoids fetching data for users with no posts
if checkCountFirst {
hasPosts, err := r.checkUserHasPosts(ctx, userID)
if err != nil {
return nil, err
}

if !hasPosts {
// User has no posts, return empty slice immediately
return []*pb.Post{}, nil
}
}

// User has posts (or checkCountFirst is false), fetch the actual data
result, err := r.client.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(r.tableName),
IndexName: aws.String("user_id-index"), // Use GSI for querying by user_id
Expand Down
Loading