In [5]:
import requests
import time
import os
from bs4 import BeautifulSoup
import re
import json
from datetime import datetime
import random
from pathlib import Path
import sys

# Config

In [6]:
src = str(Path.cwd().resolve().parents[1]/ 'src')
sys.path.append(src)
from config.paths import ROOT, DATA, EXPERIMENTS, METADATA


In [12]:
listes_dir = os.path.join(METADATA, 'acquisition', 'encheres_listes')
logs_dir = os.path.join(METADATA, 'acquisition', 'logs')

json_path = os.path.join(METADATA, 'catalogues', 'raw', 'catalogue_encheres.json')
output_dir = os.path.join(DATA, 'raw_collected', 'objets', 'encheres')

# Fonctions parsers

## Bonhams

In [None]:
def bonhams_parser(soup) : 
    script_tag = soup.find('script', id='__NEXT_DATA__')
    json_text = script_tag.string
    data = json.loads(json_text)
    pageProps = (data.get("props", {})
             .get("pageProps", {})
            )

    pageProps = (data.get("props", {})
                .get("pageProps", {})
            )
    lot = (pageProps.get('lot'))
    lot_nb = (lot.get('iSaleLotNo'))
    
    # Description
    sStyledDesc = pageProps.get('sStyledDesc')
    if not sStyledDesc:
        sStyledDesc = lot.get('sStyledDesc')
    sCatalogDesc = pageProps.get('sCatalogDesc')

    desc = None

    if sStyledDesc:
        soup = BeautifulSoup(sStyledDesc, 'html.parser')
        first = soup.find('div', class_='firstLine')
        second = soup.find('div', class_='secondLine')
        parts = []
        if first: parts.append(first.get_text(strip=True))
        if second: parts.append(second.get_text(strip=True))
        desc = ', '.join(parts)

    elif sCatalogDesc:
        soup = BeautifulSoup(sCatalogDesc, 'html.parser')
        lot_name_div = soup.find('div', class_='LotName')
        if lot_name_div:
            # Replace <br> with comma + space, then strip
            text = lot_name_div.get_text(separator=', ', strip=True)
            desc = text

    auction_dict = pageProps.get('auction', {})
    dates = auction_dict.get('dates', {})
    datetime_str = dates.get('end').get('datetime')
    date_iso = datetime_str.split('T')[0]
    date_obj = datetime.strptime(date_iso, "%Y-%m-%d")
    date_str = date_obj.strftime("%d %B %Y")


    vente = auction_dict.get('sSaleName')

    image_dicts = pageProps.get('lot').get('images')
    image_urls = [img['image_url'] for img in image_dicts if 'image_url' in img]

    locations = auction_dict.get('locations')
    location = locations[0].get('sAddrs2')

    if any(char.isdigit() for char in location):
        location_split = location.strip().split()
        if len(location_split) >= 3:
            location = ' '.join(location_split[:-2])
        else:
            # Not enough parts to safely remove, so leave unchanged
            location = ' '.join(location_split)
    else:
        # No digits: keep the full string
        location = location.strip()




    return {
        'Description': desc,
        'Vente': vente,
        'Loc': location,
        'Date_str': date_str,
        'Date_iso':date_iso,
        'Lot': lot_nb,
        'Img_urls': image_urls
         }

## Sotheby's

In [None]:
def sothebys_parser(soup) :
     
        script_tag = soup.find('script', id='__NEXT_DATA__')
        desc = vente = location = date_iso = date_str = lot_num = None
        image_urls = []

        if script_tag : 
                json_text = script_tag.string
                json_data = json.loads(json_text)
                data = (json_data.get("props", {})
                        .get("pageProps", {})
                        .get("apolloCache", {}))
                
                #Trouver dico données de la vente
                auction_key = next((key for key in data if key.startswith("Auction:")), None)
                data_auction = data.get(auction_key, {}) if auction_key else {}

                #dates = data_auction.get('dates', {})
                #date_iso = dates['closed'].split('T')[0]

                locationV2 = data_auction.get('locationV2', {})
                location = locationV2.get('name')

                vente = data_auction.get('title')

                #Trouver dico données du lot
                lot2_key = next((key for key in data if key.startswith("LotV2:")), None)
                data_lot = data.get(lot2_key, {}) if lot2_key else {}


                desc = data_lot.get('title')
                lot_num = data_lot.get('lotNumber', {}).get("lotNumber") 
                date = data_lot.get('session', {}).get('scheduledOpeningDate')
                date_iso = date.split('T')[0]
                date_obj = datetime.strptime(date_iso, "%Y-%m-%d")
                date_str = date_obj.strftime("%d %B %Y")
                size_priority = ["ExtraExtraLarge", "ExtraLarge", "Large", "Medium", "Small"]


                media_data = data_lot.get('media({"imageSizes":["Small","Medium","Large","ExtraLarge","ExtraExtraLarge"]})', {})

                for image in media_data.get('images', []):
                        renditions = image.get('renditions', [])
                
                # Create a dict mapping size -> url
                        size_to_url = {r['imageSize']: r['url'] for r in renditions if 'imageSize' in r and 'url' in r}
                
                # Pick the best available size
                        for size in size_priority:
                                if size in size_to_url:
                                        image_urls.append(size_to_url[size])
                                        break  # Stop at the first (largest available) match
        else:
                desc = soup.find('h1', class_="LotPage-productTitle").string
                vente = soup.find('a', class_="Link").string 
                lot = soup.find('span', class_="Link").string
                lot_num = lot.split(' ')[1]

                link_tag = soup.find('a', class_='Link')

                if link_tag:
                        auction_link = link_tag.get('href')
                        auction_soup = get_soup(auction_link)  # use the new soup here!

                        target_string = auction_soup.find(string=lambda text: text and '•' in text)
                        if target_string:
                                target_text = target_string.find_parent('div').get_text(strip=True)
                                date_str = target_text.split('•')[0].strip()
                                location = target_text.split('•')[1].strip()

                                date_obj = datetime.strptime(date_str, "%d %B %Y")
                                date_iso = date_obj.strftime("%Y-%m-%d")
                        
                img_tag = soup_5.find('img', {'alt': '', 'class': 'Image lazyload'})

                if img_tag and img_tag.has_attr('data-srcset'):
                        image_urls = [img_tag['data-srcset'].split(' ')[0]]
                        time.sleep(10)
                        
        return {
        'Description': desc,
        'Vente': vente,
        'Loc': location,
        'Date_iso': date_iso,
        'Date_str' : date_str,
        'Lot': lot_num,
        'Img_urls': image_urls
         }

            


## Christie's

In [None]:
import re
import json
from bs4 import BeautifulSoup

def extract_chr_components(soup: BeautifulSoup, key=None) -> dict:
    """
    Extracts the `window.chrComponents` JavaScript object from a BeautifulSoup-parsed HTML document.

    Args:
        soup (BeautifulSoup): The parsed HTML document.

    Returns:
        dict: The extracted chrComponents object as a Python dictionary.

    Raises:
        ValueError: If the chrComponents object cannot be found or parsed.
    """

    if key : 
        results = soup.find_all('script')

        for script in results : 
            if key in script.text :
                break


        # The full JavaScript content inside the script tag
        script_text = script.string

        # Regex to extract the JS object assigned to window.chrComponents.lotHeader_<ID>
        pattern = re.compile(
            rf'window\.chrComponents\.lotHeader_{key}\s*=\s*(\{{.*?\}});',
            re.DOTALL
        )

        match = pattern.search(script_text)

        if match:
            js_object = match.group(1)
            
            # Try parsing as JSON
            try:
                data = json.loads(js_object)
                return data
            except json.JSONDecodeError as e:
                print("Failed to decode JSON. Error:", e)
                data = None
        else:
            print("No match found.")
            data = None
            return data


    else : 
    # Find all <script> tags
        scripts = soup.find_all("script")

        for script in scripts:
            if not script.string:
                continue  # Skip if script has no content

            # Look for the line that contains window.chrComponents
            match = re.search(r'window\.chrComponents\s*=\s*(\{.*?\});', script.string, re.DOTALL)

            if match:
                js_object_str = match.group(1)
                try:
                    # Try parsing as JSON
                    data = json.loads(js_object_str)
                    return data
                except json.JSONDecodeError as e:
                    raise ValueError("Found window.chrComponents, but failed to parse JSON") from e

        raise ValueError("window.chrComponents not found in the soup")


In [None]:
        
def soup_parse_christies(url) :
        soup=get_soup(url)
        lot_header = soup.find("chr-lot-header")

        if 'online' in url : 

                lot_num = lot_header.get('lot_id_txt')
                vente = lot_header.get("sale_title")

                # Get auction date & location using auction url
                sale_url = lot_header.get("sale_url")

                url_split = url.split('/')

                url_split = url.split('/')
                sale_nb = sale_url.split('/')[-1].split('?')[0]
                sale_url_full = '/'.join(url_split[:5] + [sale_nb])

                soup_auction = get_soup(sale_url_full)
                auction_components = extract_chr_components(soup_auction)
                location = auction_components.get('auction', {}).get('data', {}).get('sale_location')
                date = auction_components.get('auction', {}).get('data', {}).get('auction_status', {}).get('sale_end_date_time', None)


                # Get desc, img urls
                
                dico_components = extract_chr_components(soup)

                list_img_url = []
                lots1 = dico_components.get('lots')
                
                data = lots1.get('data')
                lots = data.get('lots', [])[0]
                list_img = lots.get("lot_assets",[])

                for item in list_img :    
                        img_url = item.get("image_url")
                        if img_url : 
                                clean_url = img_url.split('?')[0]
                                list_img_url.append(clean_url)

                desc1 = lots.get('title_primary_txt')
                desc2 = lots.get( 'title_secondary_txt')
                desc = f'{desc1}; {desc2}'

        else : 
                desc1 = lot_header.get('title_primary_txt', '').strip()
                desc2 = lot_header.get('title_secondary_txt', '').strip()
                namespace = lot_header.get('data-namespace', '').strip()
                key = namespace.split('_')[1]
                components = extract_chr_components(soup, key)
                
                date = components.get('data', {}).get('sale', {}).get('end_date')
                vente = components.get('data', {}).get('sale', {}).get('title_txt')
                location = components.get('data', {}).get('sale', {}).get('location_txt')

                list_img_url = []
                lots=components.get('data', {}).get('lots',[])[0]
                list_img = lots.get("lot_assets",[])

                for item in list_img :    
                        img_url = item.get("image_url")
                        if img_url : 
                                clean_url = img_url.split('?')[0]
                                list_img_url.append(clean_url)

                lot_num = lots.get('lot_id_txt')
                # Si desc n'est pas extraite avant : 
                desc1 = desc1 or lots.get('title_primary_txt')
                desc2 = desc2 or lots.get('title_secondary_txt')
                desc = f'{desc1}; {desc2}'

        if date : 
                date_iso = date.split('T')[0]
                date_obj = datetime.strptime(date_iso, "%Y-%m-%d")
                date_str = date_obj.strftime("%d %B %Y")       

                
        return {
        'Description': desc,
        'Vente': vente,
        'Loc': location,
        'Date_iso': date_iso,
        'Date_str' : date_str,
        'Lot': lot_num,
        'Img_urls': list_img_url
         }

# Fonctions scraping

In [None]:
import json
import os
import requests
from bs4 import BeautifulSoup
import re

In [None]:
def save_img (url_img, output_dir, object_id) :

    folder_name = os.path.basename(output_dir)
    img_data = requests.get(url_img).content
    img_dir = 'img'
    img_dir_path = os.path.join(output_dir, img_dir)
    os.makedirs(img_dir_path, exist_ok=True) 

    output_file = os.path.join(img_dir_path, f"{folder_name}_{object_id}.jpg")    
    with open(output_file, 'wb') as f : 
        f.write(img_data)
    return output_file

In [None]:
def get_soup(lien) :
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"
    }
    result = requests.get(lien, headers=headers)
    soup = BeautifulSoup(result.text, 'html.parser')
    return soup

In [None]:
def save_images(img_tuples):
    for img_url, img_path in img_tuples:
        img_data = requests.get(img_url).content
        with open(img_path, 'wb') as f:
            f.write(img_data)
    print(f'{len(img_tuples)} images saved')
    time.sleep(random.uniform(5, 10))

In [None]:
def load_json(json_path, metadata_dict: dict, overwrite: bool = False) -> None:
    try:
        with open(json_path, 'r') as f:
            data = json.load(f)
        if not isinstance(data, list):
            raise ValueError("Catalogue JSON doit être une liste d'entrées")
    except (FileNotFoundError, json.JSONDecodeError):
        data = []

    new_id = metadata_dict.get('Id')

    # Look for existing entry with the same ID
    existing_index = next((i for i, entry in enumerate(data) if entry.get('Id') == new_id), None)

    if existing_index is not None:
        if overwrite:
            data[existing_index] = metadata_dict
            print(f"L'entrée avec l'ID '{new_id}' a été mise à jour dans le catalogue.")
        else:
            print(f"L'objet avec l'ID '{new_id}' est déjà dans le catalogue. Aucune modification apportée.")
            return
    else:
        data.append(metadata_dict)

    # Save back to JSON
    with open(json_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=4, ensure_ascii=False)


In [None]:
def save_images_from_urls(img_urls, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    
    for img_url in img_urls:
        # Extract the image filename from the URL
        filename = os.path.basename(urlparse(img_url).path)
        if not filename:
            filename = f'image_{int(time.time() * 1000)}.jpg'  # fallback name
        
        img_path = os.path.join(output_dir, filename)
        
        # Download and save the image
        img_data = requests.get(img_url).content
        with open(img_path, 'wb') as f:
            f.write(img_data)
        print(f'Saved: {img_path}')
        
        # Optional delay to avoid overwhelming servers
        time.sleep(random.uniform(1, 3))
    
    print(f'{len(img_urls)} images saved to {output_dir}')

In [None]:
def main(url, auteur,maison_encheres, parser_function, output_dir, overwrite, soup=True, json_path=json_path):

    if soup : 
        soup_obj = get_soup(url)
        dico_metadata = parser_function(soup_obj)
    else :
        dico_metadata=parser_function(url)    

    lot_id = dico_metadata.get('Lot_id') or f"{maison_encheres}_{dico_metadata['Date_iso']}_{dico_metadata['Lot']}"


    dico_metadata.update({
    'Auteur': auteur,
    'Url_lot': url,
    'Maison_encheres': maison_encheres,
    'Id': lot_id
    })

    os.makedirs(output_dir, exist_ok=True)
    img_tuples = []
    filenames = []
    image_urls = dico_metadata['Img_urls']
    
    for i, img_url in enumerate(image_urls) :
        filename = f"{dico_metadata['Id']}-{i+1:02d}.jpg"
        filenames.append(filename)
        filepath = os.path.join(output_dir, filename)
        img_tuples.append((img_url, filepath))

    dico_metadata['Fichiers_img']=filenames
    
    save_images(img_tuples)
    
    load_json(dico_metadata, overwrite=overwrite, json_path=json_path)

    return dico_metadata

In [None]:
import pandas as pd
import traceback

def process_lots(url_list, parametres, output_dir, overwrite=False, soup=True):
    """
    Applies main() to a list of URLs using shared metadata provided as a tuple.

    Parameters:
        url_list: List of URLs (str)
        shared_info: Tuple -> (auteur, maison_encheres, parser_function)
        output_dir: Directory to save images and metadata
        overwrite: Whether to overwrite existing files

    Returns:
        pd.DataFrame containing the metadata for all processed lots.
    """
    maison_encheres, parser_function, auteur = parametres
    all_metadata = []

    for url in url_list:
        try:
            metadata = main(
                url=url,
                auteur=auteur,
                maison_encheres=maison_encheres,
                parser_function=parser_function,
                output_dir=output_dir,
                overwrite=overwrite,
                soup=soup
            )
            metadata['error'] = None  # No error for this row
            all_metadata.append(metadata)
        except Exception as e:
            print(f"Error processing {url}: {e}")
            traceback.print_exc()
            all_metadata.append({
                'url': url,
                'error': str(e)
                # You can add more default values here if needed, e.g., 'auteur': auteur
            })

    return pd.DataFrame(all_metadata)




# Acquisition

In [8]:
def list_encheres (keyword, path_list_dir) : 
    listes_filenames = os.listdir(path_list_dir)
    return [os.path.join(path_list_dir, item) for item in listes_filenames if item.endswith('.txt') and keyword in item]

In [None]:
sothebys_list = list_encheres('Sothebys', listes_dir)
bonhams_list = list_encheres('Bonhams', listes_dir)
christies_list = list_encheres('Christies', listes_dir)
drouot_list = list_encheres('GazetteDrouot', listes_dir)

In [16]:
print(sothebys_list[:2])
print(bonhams_list[:2])
print(christies_list[:2])
print(drouot_list[:2])

['/Users/enki/data/Git/Memoire-M2HN/metadata/acquisition/encheres_listes/Sothebys_DeMorgan.txt', '/Users/enki/data/Git/Memoire-M2HN/metadata/acquisition/encheres_listes/Sothebys_Vieillard.txt']
['/Users/enki/data/Git/Memoire-M2HN/metadata/acquisition/encheres_listes/Bonhams_Longwy.txt', '/Users/enki/data/Git/Memoire-M2HN/metadata/acquisition/encheres_listes/Bonhams_DeMorgan.txt']
['/Users/enki/data/Git/Memoire-M2HN/metadata/acquisition/encheres_listes/Christies_Vieillard.txt', '/Users/enki/data/Git/Memoire-M2HN/metadata/acquisition/encheres_listes/Christies_Doulton.txt']
['/Users/enki/data/Git/Memoire-M2HN/metadata/acquisition/encheres_listes/GazetteDrouot_Deck.txt', '/Users/enki/data/Git/Memoire-M2HN/metadata/acquisition/encheres_listes/GazetteDrouot_Lachenal.txt']


In [None]:
def process_list(encheres_list, parser, log_dir) : 
    df_final_list = []

    for path in encheres_list :
        part_filename = path.split('_')[-1][:-4]
        auteur = ' '.join(part_filename.split('-')) if '-' in part_filename else part_filename
        maison_encheres = path.split('/')[-1].split('_')[0]
        parametres = (maison_encheres, parser, auteur)

        with open(path, 'r', encoding='UTF-8') as f:
            liste_urls = [line.strip() for line in f if line.strip()]
        
        df_auteur = process_lots(liste_urls, parametres, output_dir, overwrite=True)
        df_final_list.append(df_auteur)

    df_final = pd.concat(df_final_list, ignore_index=True)
    csv_file = f'log_acquisition_{maison_encheres}.csv'
    df_final.to_csv(os.path.join(log_dir, csv_file), encoding='UTF-8')

In [None]:
process_list(sothebys_list, sothebys_parser, logs_dir)
process_list(bonhams_list, bonhams_parser, logs_dir)
process_list(sothebys_list, sothebys_parser, logs_dir)

