In [1]:
import os
from openai import AzureOpenAI
import json
from datetime import datetime
from pymongo import MongoClient
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

load_dotenv('.env')
# MongoDB Connection
connection_string = os.getenv('MONGO_CONNECTION_STRING')
client = MongoClient(connection_string)
db = client.get_database()
print('Current Database:', db.name, 'Collections:', db.list_collection_names())

client = AzureOpenAI(
  azure_endpoint = "https://facciopenai.openai.azure.com/", 
  api_key="a0693316359249288398f7319a8ba0b2",  
  api_version="2024-08-01-preview"
)

  client = MongoClient(connection_string)


Current Database: facci-stylematch Collections: ['IGaccounts', 'IGposts']


In [2]:
def check_outfit(_id, image_url):
    # Combine prompt text and images in content
    content = [
        {
            "type": "text",
            "text": "For a valid outfit image, the image should only contain one person and include at least two clothing items. Return JSON with is_outfit (true/false) and reason (explanation string)."
        },
        {
            "type": "image_url",
            "image_url": {
                "url": image_url,
                "detail": "low"
            }
        } 
    ]
    
    try:
        # Get AI response
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            response_format={ "type": "json_object" },
            messages=[
                {"role": "system", "content": "You are a helpful fashion assistant designed to output JSON."},
                {"role": "user", "content": content}
            ]
        )
        
        # Parse response
        result = json.loads(response.choices[0].message.content)
        is_outfit = is_outfit = str(result.get('is_outfit', 'unchecked')).lower()
        reason = result.get('reason', 'No reason provided')
        
        # Update database
        posts_collection = db['IGposts']
        posts_collection.update_one(
            {'_id': _id},
            {
                '$set': {
                    'isOutfit': is_outfit,
                    'isOutfitReason': reason,
                    'updatedAt': datetime.now()
                }
            }
        )
        
        print(f"Updated database for entry {_id}: isOutfit={is_outfit}, reason={reason}")
        return result
        
    except Exception as e:
        print(f"Error processing entry {_id}: {str(e)}")
        return None



In [4]:
def process_single_post(post):
    if post.get('imageURLs'):
        doc_id = post['_id']
        image_url = post['imageURLs'][0]
        try:
            result = check_outfit(doc_id, image_url)
            if result:
                print(f"Processed post {doc_id}: {result}")
            return doc_id, result
        except Exception as e:
            print(f"Error processing post {doc_id}: {str(e)}")
            return doc_id, None

def process_unchecked_posts(max_workers=5):
    posts_collection = db['IGposts']
    unchecked_posts = list(posts_collection.find({'isOutfit': 'unchecked'}))
    total_posts = len(unchecked_posts)
    
    print(f"Found {total_posts} unchecked posts")
    start_time = time.time()
    
    # Using ThreadPoolExecutor for concurrent processing
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_post = {
            executor.submit(process_single_post, post): post 
            for post in unchecked_posts
        }
        
        # Process comp  leted tasks as they finish
        completed = 0
        for future in as_completed(future_to_post):
            completed += 1
            doc_id, result = future.result()
            print(f"Progress: {completed}/{total_posts} posts processed")
    
    end_time = time.time()
    print(f"Processed {total_posts} posts in {end_time - start_time:.2f} seconds")

# Run with 5 concurrent workers
process_unchecked_posts(max_workers=5)

Found 7 unchecked posts
Error processing entry 67133fa9b43a2e3affb09716: Error code: 400 - {'error': {'code': 'BadRequest', 'message': 'The provided image url can not be accessed. status code: 404.', 'param': None, 'type': None}}
Progress: 1/7 posts processed
Updated database for entry 671fc21c89ceb57d5e7e0f93: isOutfit=false, reason=The image contains multiple people and does not show a single individual wearing at least two clothing items.
Processed post 671fc21c89ceb57d5e7e0f93: {'is_outfit': False, 'reason': 'The image contains multiple people and does not show a single individual wearing at least two clothing items.'}
Progress: 2/7 posts processed
Updated database for entry 671fc21889ceb57d5e7e0f89: isOutfit=false, reason=The image does not clearly show a valid outfit, as the focus is not on the clothing items and the person is not clearly visible.
Processed post 671fc21889ceb57d5e7e0f89: {'is_outfit': False, 'reason': 'The image does not clearly show a valid outfit, as the focus 

In [15]:
def compare_outfits(image_url1, image_url2):
    content = [
        {
            "type": "text",
            "text": "Analyze the two images, and determine whether they belong to the same outfit or contains detailed item image from another outfit image. Return JSON with is_same_outfit (true/false)."
        },
        {
            "type": "image_url",
            "image_url": {
                "url": image_url1,
                "detail": "low"
            }
        },
        {
            "type": "image_url",
            "image_url": {
                "url": image_url2,
                "detail": "low"
            }
        }
    ]
    
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            response_format={ "type": "json_object" },
            messages=[
                {"role": "system", "content": "You are a helpful fashion assistant designed to output JSON."},
                {"role": "user", "content": content}
            ]
        )
        
        result = json.loads(response.choices[0].message.content)
        is_same_outfit = str(result.get('is_same_outfit', False)).lower()
        
        print(f"Comparison result: {is_same_outfit}")
        return is_same_outfit
        
    except Exception as e:
        print(f"Error comparing outfits: {str(e)}")
        return False
    
def process_shortcode_group(shortcode, posts, posts_collection):
    if len(posts) < 2:  # Skip if only one image
        posts_collection.update_one(
            {'_id': posts[0]['_id']},
            {'$set': {'merged_same_outfit': True}}
        )
        return f"Skipped {shortcode} - single post"
        
    print(f"\nProcessing shortcode: {shortcode} with {len(posts)} posts")
    merged_ids = set()  # Track which posts have been merged
    
    # Compare each post with others
    for i, base_post in enumerate(posts):
        if base_post['_id'] in merged_ids:
            continue
            
        base_image = base_post['imageURLs'][0]
        merged_urls = base_post['imageURLs'].copy()
        
        # Compare with remaining posts
        for compare_post in posts[i+1:]:
            if compare_post['_id'] in merged_ids:
                continue
                
            compare_image = compare_post['imageURLs'][0]
            print(f"Comparing {base_post['_id']} with {compare_post['_id']}")
            
            if compare_outfits(base_image, compare_image) == 'true':
                # Merge the posts
                merged_urls.extend(compare_post['imageURLs'])
                merged_ids.add(compare_post['_id'])
                
                # Delete the merged post
                posts_collection.delete_one({'_id': compare_post['_id']})
                print(f"Merged and deleted post {compare_post['_id']}")
        
        if len(merged_urls) > len(base_post['imageURLs']):
            # Update the base post with merged URLs
            posts_collection.update_one(
                {'_id': base_post['_id']},
                {
                    '$set': {
                        'imageURLs': merged_urls,
                        'merged_same_outfit': True,
                        'updatedAt': datetime.now()
                    }
                }
            )
            merged_ids.add(base_post['_id'])
            print(f"Updated base post {base_post['_id']} with {len(merged_urls)} images")
        else:
            posts_collection.update_one(
                {'_id': base_post['_id']},
                {'$set': {'merged_same_outfit': True}}
            )   
            print(f"Marked orphan post {base_post['_id']} as merged")
    
    return f"Completed processing shortcode {shortcode}"

def merge_same_outfits(max_workers=5):
    posts_collection = db['IGposts']
    
    # Find all valid outfits that haven't been merged yet
    outfit_posts = list(posts_collection.find({
        'isOutfit': 'true',
        'merged_same_outfit': {'$ne': True}
    }))
    
    # Group posts by shortcode
    shortcode_groups = {}
    for post in outfit_posts:
        shortcode = post.get('shortcode')
        if shortcode:
            if shortcode not in shortcode_groups:
                shortcode_groups[shortcode] = []
            shortcode_groups[shortcode].append(post)
    
    total_groups = len(shortcode_groups)
    print(f"Found {total_groups} groups to process")
    start_time = time.time()
    
    # Process groups concurrently
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_shortcode = {
            executor.submit(process_shortcode_group, shortcode, posts, posts_collection): shortcode 
            for shortcode, posts in shortcode_groups.items()
        }
        
        # Process completed tasks as they finish
        completed = 0
        for future in as_completed(future_to_shortcode):
            shortcode = future_to_shortcode[future]
            completed += 1
            try:
                result = future.result()
                print(f"Progress: {completed}/{total_groups} groups processed")
                print(result)
            except Exception as e:
                print(f"Error processing shortcode {shortcode}: {str(e)}")
    
    end_time = time.time()
    print(f"Processed {total_groups} groups in {end_time - start_time:.2f} seconds")



In [16]:
# Run the function with 5 concurrent workers
merge_same_outfits(max_workers=5)

Found 3 groups to process

Processing shortcode: C4m8le0BHb-nuXjEekFGB8_nk5OLUsVWMdYU4g0 with 4 posts
Comparing 671fc21689ceb57d5e7e0f80 with 671fc21789ceb57d5e7e0f83

Processing shortcode: C35wGtvh2sZmkVrtGpXL4uO9bZtNbXeqk5Vwao0 with 3 posts
Comparing 671fc21889ceb57d5e7e0f8a with 671fc21889ceb57d5e7e0f88
Progress: 1/3 groups processed
Skipped CmlDcmVh4WWqw7dcToxlJ995og4u-2l5PgxHZA0 - single post
Comparison result: true
Merged and deleted post 671fc21789ceb57d5e7e0f83
Comparing 671fc21689ceb57d5e7e0f80 with 671fc21789ceb57d5e7e0f84
Comparison result: false
Comparing 671fc21889ceb57d5e7e0f8a with 671fc21889ceb57d5e7e0f87
Comparison result: false
Comparison result: false
Comparing 671fc21689ceb57d5e7e0f80 with 671fc21789ceb57d5e7e0f81
Marked orphan post 671fc21889ceb57d5e7e0f8a as merged
Comparing 671fc21889ceb57d5e7e0f88 with 671fc21889ceb57d5e7e0f87
Comparison result: true
Merged and deleted post 671fc21789ceb57d5e7e0f81
Updated base post 671fc21689ceb57d5e7e0f80 with 3 images
Marked 