In [1]:
%run import_data.ipynb

Dataset books.csv loaded successfully
isbn column dropped, isbn13 column kept
Language codes eng, en-US, en-GB, en-CA unified to 'en'
Missing publication dates filled in
2nd (large) dataset is being downloaded...
Dataset downloaded and saved as data/books2.csv
Dataset downloaded and saved as data/users.csv
Dataset downloaded and saved as data/ratings.csv
Pandas dataframes (books_df, books_big, users, ratings) loaded successfully
Columns renamed and dates converted to dtype: datetime
Ready to go!


In [None]:
# Test the Google Books API key with a sample request

import requests

API_KEY = "AIzaSyDcAxovkBpRGJgsR6BGTZCGodOHmoU2oEM"
url = f"https://www.googleapis.com/books/v1/volumes?q=isbn:9780140328721&key={API_KEY}"

response = requests.get(url)
print(response.status_code)
print(response.json())


In [2]:
# Test the Open Library API key with a sample request

import requests

API_KEY = "AIzaSyDcAxovkBpRGJgsR6BGTZCGodOHmoU2oEM"
url = f"https://openlibrary.org/api/books?bibkeys=ISBN:9780140328721&format=json&jscmd=data"

response = requests.get(url)
print(response.status_code)
print(response.json())

200
{'ISBN:9780140328721': {'url': 'https://openlibrary.org/books/OL7353617M/Fantastic_Mr._Fox', 'key': '/books/OL7353617M', 'title': 'Fantastic Mr. Fox', 'authors': [{'url': 'https://openlibrary.org/authors/OL34184A/Roald_Dahl', 'name': 'Roald Dahl'}], 'number_of_pages': 96, 'identifiers': {'goodreads': ['1507552'], 'librarything': ['6446'], 'isbn_10': ['0140328726'], 'isbn_13': ['9780140328721'], 'openlibrary': ['OL7353617M']}, 'publishers': [{'name': 'Puffin'}], 'publish_date': 'October 1, 1988', 'subjects': [{'name': 'Animals', 'url': 'https://openlibrary.org/subjects/animals'}, {'name': 'Hunger', 'url': 'https://openlibrary.org/subjects/hunger'}, {'name': 'Open Library Staff Picks', 'url': 'https://openlibrary.org/subjects/open_library_staff_picks'}, {'name': 'Juvenile fiction', 'url': 'https://openlibrary.org/subjects/juvenile_fiction'}, {'name': "Children's stories, English", 'url': "https://openlibrary.org/subjects/children's_stories,_english"}, {'name': 'Foxes', 'url': 'https:

In [None]:
books_big.head(10)

In [None]:
books_big.shape[0]/5

In [3]:
num_parts = 20
len(books_big) % num_parts
part_size = len(books_big) // num_parts
df1, df2, df3, df4, df5, df6, df7, df8, df9, df10, df11, df12, df13, df14, df15, df16, df17, df18, df19, df20 = [books_big[i*part_size:(i+1)*part_size] for i in range(num_parts)]

In [4]:
df1.shape

(13568, 8)

In [12]:
import aiohttp
import asyncio
import pandas as pd
import nest_asyncio
from asyncio import Semaphore
import logging

# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Enable logging for debugging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()

# Set the maximum number of concurrent requests and processing adjustments
MAX_CONCURRENT_REQUESTS = 1000  # High concurrency for sending requests
semaphore_requests = Semaphore(MAX_CONCURRENT_REQUESTS)

# Lower concurrency for processing the fetched data
MAX_CONCURRENT_FETCHES = 50  # Adjust this value as needed
semaphore_fetches = Semaphore(MAX_CONCURRENT_FETCHES)

# Function to fetch data from Open Library
async def fetch_data(session, isbn, field):
    async with semaphore_requests:
        url = f'https://openlibrary.org/api/books?bibkeys=ISBN:{isbn}&format=json&jscmd=data'
        try:
            async with session.get(url) as response:
                logger.info(f'Fetching ISBN: {isbn} with status: {response.status}')
                if response.status == 429:  # Rate limit exceeded
                    logger.warning(f'Rate limit hit for ISBN: {isbn}, retrying after 120 seconds')
                    await asyncio.sleep(120)  # Wait before retrying
                    return await fetch_data(session, isbn, field)  # Retry
                elif response.status == 200:
                    data = await response.json()
                    return data, isbn, field
                else:
                    logger.error(f'Error fetching ISBN: {isbn}, status code: {response.status}')
                    return None, isbn, field
        except Exception as e:
            logger.error(f'Exception occurred for ISBN: {isbn}: {e}')
            return None, isbn, field

# Function to process the data with lower concurrency
async def process_data(data, isbn, field):
    async with semaphore_fetches:
        if data:
            key = f'ISBN:{isbn}'
            if key in data:
                item = data[key]
                if field == 'publish_date':
                    publish_date = item.get('publish_date', 'No date found')
                    return publish_date.split('-')[0] if publish_date != 'No date found' else 'No date found'
                elif field == 'genres':
                    subjects = item.get('subjects', [])
                    genre_names = [subject.get('name', 'Unknown genre') for subject in subjects if isinstance(subject, dict)]
                    return ', '.join(genre_names) if genre_names else 'No genres found'
            else:
                return 'No data found'
        else:
            return 'Error fetching data'

# Function to fetch book information for a DataFrame
async def fetch_book_info(df, file_path):
    logger.debug(f"DataFrame columns: {df.columns.tolist()}")

    if 'isbn' not in df.columns:
        raise ValueError("The DataFrame does not contain an 'isbn' column.")

    async with aiohttp.ClientSession() as session:
        for isbn in df['isbn']:
            row = {'isbn': isbn}

            # Fetch and process data for each field
            for field in ['publish_date', 'genres']:
                data, isbn, field = await fetch_data(session, isbn, field)
                result = await process_data(data, isbn, field)
                row[field] = result

            # Append the row to the CSV file immediately
            result_df = pd.DataFrame([row])
            result_df.to_csv(file_path, mode='a', index=False, header=not pd.io.common.file_exists(file_path))
            logger.info(f"Data for ISBN '{isbn}' appended to CSV.")

    logger.info("Finished processing book information for current DataFrame.")

async def main(df, file_path):
    logger.info("Starting to fetch book information...")
    await fetch_book_info(df, file_path)
    logger.info("Finished fetching book information.")

# Run the asyncio event loop to fetch the book information
loop = asyncio.get_event_loop()
file_path = 'data/big_detailed.csv'  # Path to your CSV file

for df in [df1, df2, df3, df4, df5, df6, df7, df8, df9, df10, df11, df12, df13, df14, df15, df16, df17, df18, df19, df20]:
    loop.run_until_complete(main(df, file_path))
    print("One fetch done")


INFO:root:Starting to fetch book information...
DEBUG:root:DataFrame columns: ['isbn', 'book_title', 'book_author', 'year_of_publication', 'publisher', 'image_url_s', 'image_url_m', 'image_url_l']
INFO:root:Fetching ISBN: 0195153448 with status: 200
INFO:root:Fetching ISBN: 0195153448 with status: 200
INFO:root:Data for ISBN '0195153448' appended to CSV.
INFO:root:Fetching ISBN: 0002005018 with status: 200
INFO:root:Fetching ISBN: 0002005018 with status: 200
INFO:root:Data for ISBN '0002005018' appended to CSV.
INFO:root:Fetching ISBN: 0060973129 with status: 503
ERROR:root:Error fetching ISBN: 0060973129, status code: 503
INFO:root:Fetching ISBN: 0060973129 with status: 200
INFO:root:Data for ISBN '0060973129' appended to CSV.
INFO:root:Fetching ISBN: 0374157065 with status: 200
INFO:root:Fetching ISBN: 0374157065 with status: 200
INFO:root:Data for ISBN '0374157065' appended to CSV.
INFO:root:Fetching ISBN: 0393045218 with status: 200
INFO:root:Fetching ISBN: 0393045218 with status: 

One fetch done


INFO:root:Fetching ISBN: 193151416X with status: 200
INFO:root:Fetching ISBN: 193151416X with status: 200
INFO:root:Data for ISBN '193151416X' appended to CSV.
INFO:root:Fetching ISBN: 1931514151 with status: 200
INFO:root:Fetching ISBN: 1931514151 with status: 200
INFO:root:Data for ISBN '1931514151' appended to CSV.
INFO:root:Fetching ISBN: 0872262278 with status: 200
INFO:root:Fetching ISBN: 0872262278 with status: 200
INFO:root:Data for ISBN '0872262278' appended to CSV.
INFO:root:Fetching ISBN: 082302377X with status: 200
INFO:root:Fetching ISBN: 082302377X with status: 200
INFO:root:Data for ISBN '082302377X' appended to CSV.
INFO:root:Fetching ISBN: 1591821800 with status: 200
INFO:root:Fetching ISBN: 1591821800 with status: 200
INFO:root:Data for ISBN '1591821800' appended to CSV.
INFO:root:Fetching ISBN: 1591820596 with status: 200
INFO:root:Fetching ISBN: 1591820596 with status: 200
INFO:root:Data for ISBN '1591820596' appended to CSV.
INFO:root:Fetching ISBN: 159182303X with

One fetch done


INFO:root:Fetching ISBN: 0671887939 with status: 200
INFO:root:Fetching ISBN: 0671887939 with status: 200
INFO:root:Data for ISBN '0671887939' appended to CSV.
INFO:root:Fetching ISBN: 0394550455 with status: 200
INFO:root:Fetching ISBN: 0394550455 with status: 200
INFO:root:Data for ISBN '0394550455' appended to CSV.
INFO:root:Fetching ISBN: 1561003794 with status: 200
INFO:root:Fetching ISBN: 1561003794 with status: 200
INFO:root:Data for ISBN '1561003794' appended to CSV.
INFO:root:Fetching ISBN: 0939643448 with status: 200
INFO:root:Fetching ISBN: 0939643448 with status: 200
INFO:root:Data for ISBN '0939643448' appended to CSV.
INFO:root:Fetching ISBN: 0553472321 with status: 200
INFO:root:Fetching ISBN: 0553472321 with status: 200
INFO:root:Data for ISBN '0553472321' appended to CSV.
INFO:root:Fetching ISBN: 0787102784 with status: 200
INFO:root:Fetching ISBN: 0787102784 with status: 200
INFO:root:Data for ISBN '0787102784' appended to CSV.
INFO:root:Fetching ISBN: 0679411151 with

One fetch done


INFO:root:Fetching ISBN: 039453512X with status: 200
INFO:root:Fetching ISBN: 039453512X with status: 200
INFO:root:Data for ISBN '039453512X' appended to CSV.
INFO:root:Fetching ISBN: 006017160X with status: 200
INFO:root:Fetching ISBN: 006017160X with status: 200
INFO:root:Data for ISBN '006017160X' appended to CSV.
INFO:root:Fetching ISBN: 0399512985 with status: 200
INFO:root:Fetching ISBN: 0399512985 with status: 200
INFO:root:Data for ISBN '0399512985' appended to CSV.
INFO:root:Fetching ISBN: 0486414248 with status: 200
INFO:root:Fetching ISBN: 0486414248 with status: 200
INFO:root:Data for ISBN '0486414248' appended to CSV.
INFO:root:Fetching ISBN: 0452264456 with status: 200
INFO:root:Fetching ISBN: 0452264456 with status: 200
INFO:root:Data for ISBN '0452264456' appended to CSV.
INFO:root:Fetching ISBN: 0395537592 with status: 200
INFO:root:Fetching ISBN: 0395537592 with status: 200
INFO:root:Data for ISBN '0395537592' appended to CSV.
INFO:root:Fetching ISBN: 089804801X with

In [None]:
import aiohttp
import asyncio
import pandas as pd
import nest_asyncio
from asyncio import Semaphore
import logging

# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Enable logging for debugging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()

# Set the maximum number of concurrent requests and processing adjustments
MAX_CONCURRENT_REQUESTS = 1000  # High concurrency for sending requests
semaphore_requests = Semaphore(MAX_CONCURRENT_REQUESTS)

# Lower concurrency for processing the fetched data
MAX_CONCURRENT_FETCHES = 500  # Adjust this value as needed
semaphore_fetches = Semaphore(MAX_CONCURRENT_FETCHES)
# Function to fetch data from Open Library
async def fetch_data(session, isbn, field):

    async with semaphore_requests:
        url = f'https://openlibrary.org/api/books?bibkeys=ISBN:{isbn}&format=json&jscmd=data'
        try:
            async with session.get(url) as response:
                logger.info(f'Fetching ISBN: {isbn} with status: {response.status}')
                if response.status == 429:  # Rate limit exceeded
                    logger.warning(f'Rate limit hit for ISBN: {isbn}, retrying after 120 seconds')
                    await asyncio.sleep(120)  # Wait before retrying
                    return await fetch_data(session, isbn, field)  # Retry
                elif response.status == 200:
                    data = await response.json()
                    return data, isbn, field
                else:
                    logger.error(f'Error fetching ISBN: {isbn}, status code: {response.status}')
                    return None, isbn, field
        except Exception as e:
            logger.error(f'Exception occurred for ISBN: {isbn}: {e}')
            return None, isbn, field

# Function to process the data with lower concurrency
async def process_data(data, isbn, field):
    async with semaphore_fetches:
        if data:
            key = f'ISBN:{isbn}'
            if key in data:
                item = data[key]
                if field == 'publish_date':
                    publish_date = item.get(field, 'No date found')
                    return publish_date.split('-')[0] if publish_date != 'No date found' else 'No date found'
                else:
                    return item.get(field, 'No data found')
            else:
                return 'No data found'
        else:
            return 'Error fetching data'

# Function to fetch book information for a DataFrame
async def fetch_book_info(df):
    logger.debug(f"DataFrame columns: {df.columns.tolist()}")

    if 'isbn' not in df.columns:
        raise ValueError("The DataFrame does not contain an 'isbn' column.")

    async with aiohttp.ClientSession() as session:
        tasks = {
            'publish_date': [],
            'authors': [],
            'title': []
        }

        # Phase 1: Send out all requests
        for isbn in df['isbn']:
            for field in tasks.keys():
                tasks[field].append(fetch_data(session, isbn, field))
        
        # Phase 2: Wait for all requests to complete and then process the results with lower concurrency
        results = {}
        for field, task_list in tasks.items():
            fetch_results = await asyncio.gather(*task_list)
            process_tasks = [process_data(data, isbn, field) for data, isbn, field in fetch_results]
            results[field] = await asyncio.gather(*process_tasks)

    # Add the processed results to the DataFrame
    for field, result in results.items():
        df[field] = result

    return df

async def main(df):
    logger.info("Starting to fetch book information...")
    # Replace `df1` with your actual DataFrame variable
    updated_df = await fetch_book_info(df)
    updated_df.to_csv('data/big_detailed.csv', mode='a', index=False) # save to csv
    logger.info("Finished fetching book information.")

# Run the asyncio event loop to fetch the book information
loop = asyncio.get_event_loop()
for df in [df1, df2, df3, df4, df5, df6, df7, df8, df9, df10, df11, df12, df13, df14, df15, df16, df17, df18, df19, df20]:
    loop.run_until_complete(main(df))
    print("one fiftch done")


In [None]:
import aiohttp
import asyncio
import pandas as pd
import nest_asyncio
from asyncio import Semaphore
import logging

# Apply the nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Enable logging for debugging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()

# Set the maximum number of concurrent requests and rate limit adjustments
MAX_CONCURRENT_REQUESTS = 1000  # Adjust concurrency based on your needs
semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)

# Function to fetch publication year from Open Library
async def fetch_data(session, isbn, field, retries=5):
    async with semaphore:
        url = f'https://openlibrary.org/api/books?bibkeys=ISBN:{isbn}&format=json&jscmd=data'
        for attempt in range(retries):
            try:
                async with session.get(url) as response:
                    logger.info(f'Fetching ISBN: {isbn} with status: {response.status}')
                    
                    if response.status == 503:  # Service unavailable
                        wait_time = 2 ** attempt  # Exponential backoff
                        logger.warning(f'Service unavailable for ISBN: {isbn}, retrying in {wait_time} seconds...')
                        await asyncio.sleep(wait_time)
                        continue  # Retry the request

                    elif response.status == 429:  # Rate limit exceeded
                        logger.warning(f'Rate limit hit for ISBN: {isbn}, retrying after 120 seconds...')
                        await asyncio.sleep(120)
                        return await fetch_data(session, isbn, field)  # Retry
                    
                    elif response.status == 200:
                        data = await response.json()
                        key = f'ISBN:{isbn}'
                        if key in data:
                            item = data[key]
                            if field == 'publish_date':
                                publish_date = item.get(field, 'No date found')
                                return publish_date.split('-')[0] if publish_date != 'No date found' else 'No date found'
                            else:
                                return item.get(field, 'No data found')
                        else:
                            return 'No data found'
                    else:
                        logger.error(f'Error fetching ISBN: {isbn}, status code: {response.status}')
                        return 'Error fetching data'
            
            except Exception as e:
                logger.error(f'Exception occurred for ISBN: {isbn}: {e}')
                return 'Error fetching data'
        
        logger.error(f'Failed to fetch ISBN: {isbn} after {retries} attempts')
        return 'Service unavailable after retries'


# Function to fetch book information for a DataFrame
async def fetch_book_info_for_dataframe(df):
    logger.debug(f"DataFrame columns: {df.columns.tolist()}")

    if 'isbn' not in df.columns:
        raise ValueError("The DataFrame does not contain an 'isbn' column.")

    async with aiohttp.ClientSession() as session:
        tasks = {
            'publish_date': [],
            'authors': [],
            'title': []
        }
        
        for isbn in df['isbn']:
            try:
                for field in tasks.keys():
                    tasks[field].append(fetch_data(session, isbn, field))
            except ValueError as e:
                logger.error(f"Skipping ISBN: {isbn} due to error: {e}")
                for field in tasks.keys():
                    tasks[field].append(asyncio.sleep(0))  # Placeholder for skipped tasks

            completed_requests = sum(len(task_list) for task_list in tasks.values())
            logger.debug(f'Completed requests: {completed_requests} / {len(df)}')

            if len(tasks['publish_date']) % MAX_CONCURRENT_REQUESTS == 0:
                await asyncio.sleep(1)  # Respect the rate limit

        logger.info("All requests completed. Gathering results...")
        results = await asyncio.gather(*[asyncio.gather(*task_list) for task_list in tasks.values()])
        logger.info("Results gathered successfully.")

    logger.info("Updating DataFrame with the results...")
    for idx, field in enumerate(tasks.keys()):
        df[field] = results[idx]
    logger.info("DataFrame updated successfully.")

    return df

async def main():
    logger.info("Starting to fetch book information...")
    updated_df = await fetch_book_info_for_dataframe(df1)
    logger.info("Finished fetching book information.")
    print(updated_df.head())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())


In [None]:
import aiohttp
import asyncio
import pandas as pd
import nest_asyncio
from asyncio import Semaphore
import logging

# Apply the nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Enable logging for debugging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()

# Set the maximum number of concurrent requests and rate limit adjustments
MAX_CONCURRENT_REQUESTS = 1000  # Adjust concurrency based on your needs
semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)

# Function to fetch publication year from Open Library
async def fetch_data(session, isbn, field):
    async with semaphore:
        url = f'https://openlibrary.org/api/books?bibkeys=ISBN:{isbn}&format=json&jscmd=data'
        try:
            async with session.get(url) as response:
                logger.info(f'Fetching ISBN: {isbn} with status: {response.status}')
                if response.status == 429:  # Rate limit exceeded
                    logger.warning(f'Rate limit hit for ISBN: {isbn}, retrying after 120 seconds')
                    await asyncio.sleep(120)  # Wait before retrying
                    return await fetch_data(session, isbn, field)  # Retry
                elif response.status == 200:
                    data = await response.json()
                    key = f'ISBN:{isbn}'
                    if key in data:
                        item = data[key]
                        if field == 'publish_date':
                            publish_date = item.get(field, 'No date found')
                            return publish_date.split('-')[0] if publish_date != 'No date found' else 'No date found'
                        else:
                            return item.get(field, 'No data found')
                    else:
                        return 'No data found'
                else:
                    logger.error(f'Error fetching ISBN: {isbn}, status code: {response.status}')
                    return 'Error fetching data'
        except Exception as e:
            logger.error(f'Exception occurred for ISBN: {isbn}: {e}')
            return 'Error fetching data'

# Function to fetch book information for a DataFrame
async def fetch_book_info_for_dataframe(df):
    logger.debug(f"DataFrame columns: {df.columns.tolist()}")

    if 'isbn' not in df.columns:
        raise ValueError("The DataFrame does not contain an 'isbn' column.")

    async with aiohttp.ClientSession() as session:
        tasks = {
            'publish_date': [],
            'authors': [],
            'title': []
        }
        
        for isbn in df['isbn']:
            try:
                for field in tasks.keys():
                    tasks[field].append(fetch_data(session, isbn, field))
            except ValueError as e:
                logger.error(f"Skipping ISBN: {isbn} due to error: {e}")
                for field in tasks.keys():
                    tasks[field].append(asyncio.sleep(0))  # Placeholder for skipped tasks

            completed_requests = sum(len(task_list) for task_list in tasks.values())
            logger.debug(f'Completed requests: {completed_requests} / {len(df)}')

            if len(tasks['publish_date']) % MAX_CONCURRENT_REQUESTS == 0:
                await asyncio.sleep(1)  # Respect the rate limit

        results = await asyncio.gather(*[asyncio.gather(*task_list) for task_list in tasks.values()])

    for idx, field in enumerate(tasks.keys()):
        df[field] = results[idx]
    
    return df

async def main():
    logger.info("Starting to fetch book information...")
    updated_df = await fetch_book_info_for_dataframe(df1)
    logger.info("Finished fetching book information.")
    print(updated_df.head())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())


In [None]:
import aiohttp
import asyncio
import pandas as pd
import nest_asyncio
from asyncio import Semaphore
import logging

# Apply the nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Enable logging for debugging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()

# Set the maximum number of concurrent requests and rate limit adjustments
MAX_CONCURRENT_REQUESTS = 40  # Adjusted concurrency to avoid hitting rate limits too quickly
semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)

# Adjusted delay time for rate limiting
RATE_LIMIT_DELAY = 120  # Delay time between batches of requests

# Your Google API key
API_KEY = "AIzaSyDcAxovkBpRGJgsR6BGTZCGodOHmoU2oEM"

# Function to fetch publication year
async def fetch_data(session, isbn, field, retries=5, retry_delay=RATE_LIMIT_DELAY):
    async with semaphore:
        url = f'https://www.googleapis.com/books/v1/volumes?q=isbn:{isbn}&key={API_KEY}'
        try:
            async with session.get(url) as response:
                logger.info(f'Fetching ISBN: {isbn} with status: {response.status}')
                
                if response.status == 429:  # Rate limit exceeded
                    if retries > 0:
                        logger.warning(f'Rate limit hit for ISBN: {isbn}, retrying after {retry_delay} seconds')
                        await asyncio.sleep(retry_delay)  # Wait before retrying
                        return await fetch_data(session, isbn, field, retries-1, retry_delay)  # Retry with decremented retries
                    else:
                        logger.error(f'Exceeded maximum retries for ISBN: {isbn}')
                        return 'Rate limit exceeded'
                
                elif response.status == 200:
                    data = await response.json()
                    if 'items' in data:
                        item = data['items'][0]['volumeInfo']
                        if field == 'publishedDate':
                            published_date = item.get(field, 'No date found')
                            return published_date.split('-')[0] if published_date != 'No date found' else 'No date found'
                        else:
                            return item.get(field, 'No data found')
                    else:
                        return 'No data found'
                else:
                    logger.error(f'Error fetching ISBN: {isbn}, status code: {response.status}')
                    return 'Error fetching data'
                    
        except Exception as e:
            logger.error(f'Exception occurred for ISBN: {isbn}: {e}')
            return 'Error fetching data'

# Function to fetch book information for a DataFrame
async def fetch_book_info_for_dataframe(df):
    # Print DataFrame columns for debugging
    logger.debug(f"DataFrame columns: {df.columns.tolist()}")

    if 'isbn' not in df.columns:
        raise ValueError("The DataFrame does not contain an 'isbn' column.")

    async with aiohttp.ClientSession() as session:
        tasks = {
            'publishing_year': [],
            'annotation': [],
            'genre': []
        }
        
        # Create tasks for each ISBN
        for isbn in df['isbn']:
            try:
                for field in tasks.keys():
                    tasks[field].append(fetch_data(session, isbn, field))
            except ValueError as e:
                logger.error(f"Skipping ISBN: {isbn} due to error: {e}")
                for field in tasks.keys():
                    tasks[field].append(asyncio.sleep(0))  # Placeholder for skipped tasks

            # Log progress
            completed_requests = sum(len(task_list) for task_list in tasks.values())
            logger.debug(f'Completed requests: {completed_requests} / {len(df)}')

            if len(tasks['publishing_year']) % MAX_CONCURRENT_REQUESTS == 0:
                await asyncio.sleep(1)  # Respect the rate limit

        # Gather results
        results = await asyncio.gather(*[asyncio.gather(*task_list) for task_list in tasks.values()])

    # Add the results to the DataFrame
    for idx, field in enumerate(tasks.keys()):
        df[field] = results[idx]
    
    return df

async def main():
    logger.info("Starting to fetch book information...")
    updated_df = await fetch_book_info_for_dataframe(df1)
    logger.info("Finished fetching book information.")
    print(updated_df.head())  # Display the updated DataFrame

# Run the asyncio event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(main())


In [None]:
df1.to_csv('df1.csv', index=False)

In [None]:
import aiohttp
import asyncio
import pandas as pd
import nest_asyncio
from asyncio import Semaphore
import logging

# Apply the nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Enable logging for debugging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()

# Set the maximum number of concurrent requests and rate limit adjustments
MAX_CONCURRENT_REQUESTS = 40  # Adjusted concurrency to avoid hitting rate limits too quickly
semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)

# Adjusted delay time for rate limiting
RATE_LIMIT_DELAY = 120  # Delay time between batches of requests

# Your Google API key
# API_KEY = "AIzaSyDcAxovkBpRGJgsR6BGTZCGodOHmoU2oEM"

# Function to fetch publication year
async def fetch_data(session, isbn, field):
    async with semaphore:
        url = f'https://www.googleapis.com/books/v1/volumes?q=isbn:{isbn}'
        try:
            async with session.get(url) as response:
                logger.info(f'Fetching ISBN: {isbn} with status: {response.status}')
                if response.status == 429:  # Rate limit exceeded
                    logger.warning(f'Rate limit hit for ISBN: {isbn}, retrying after {RATE_LIMIT_DELAY} seconds')
                    await asyncio.sleep(RATE_LIMIT_DELAY)  # Wait before retrying
                    return await fetch_data(session, isbn, field)  # Retry
                elif response.status == 200:
                    data = await response.json()
                    if 'items' in data:
                        item = data['items'][0]['volumeInfo']
                        if field == 'publishedDate':
                            published_date = item.get(field, 'No date found')
                            return published_date.split('-')[0] if published_date != 'No date found' else 'No date found'
                        else:
                            return item.get(field, 'No data found')
                    else:
                        return 'No data found'
                else:
                    logger.error(f'Error fetching ISBN: {isbn}, status code: {response.status}')
                    return 'Error fetching data'
        except Exception as e:
            logger.error(f'Exception occurred for ISBN: {isbn}: {e}')
            return 'Error fetching data'

# Function to fetch book information for a DataFrame
async def fetch_book_info_for_dataframe(df):
    # Print DataFrame columns for debugging
    logger.debug(f"DataFrame columns: {df.columns.tolist()}")

    if 'isbn' not in df.columns:
        raise ValueError("The DataFrame does not contain an 'isbn' column.")

    async with aiohttp.ClientSession() as session:
        tasks = {
            'publishing_year': [],
            'annotation': [],
            'genre': []
        }
        
        # Create tasks for each ISBN
        for isbn in df['isbn']:
            try:
                for field in tasks.keys():
                    tasks[field].append(fetch_data(session, isbn, field))
            except ValueError as e:
                logger.error(f"Skipping ISBN: {isbn} due to error: {e}")
                for field in tasks.keys():
                    tasks[field].append(asyncio.sleep(0))  # Placeholder for skipped tasks

            # Log progress
            completed_requests = sum(len(task_list) for task_list in tasks.values())
            logger.debug(f'Completed requests: {completed_requests} / {len(df)}')

            if len(tasks['publishing_year']) % MAX_CONCURRENT_REQUESTS == 0:
                await asyncio.sleep(1)  # Respect the rate limit

        # Gather results
        results = await asyncio.gather(*[asyncio.gather(*task_list) for task_list in tasks.values()])

    # Add the results to the DataFrame
    for idx, field in enumerate(tasks.keys()):
        df[field] = results[idx]
    
    return df

async def main():
    logger.info("Starting to fetch book information...")
    updated_df = await fetch_book_info_for_dataframe(df1)
    logger.info("Finished fetching book information.")
    print(updated_df.head())  # Display the updated DataFrame

# Run the asyncio event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(main())


In [None]:
import aiohttp
import asyncio
import pandas as pd
import nest_asyncio
from asyncio import Semaphore
import logging

# Apply the nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Enable logging for debugging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()

# Set the maximum number of concurrent requests (adjust based on the API rate limit)
MAX_CONCURRENT_REQUESTS = 5
semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)

# Function to convert ISBN-10 to ISBN-13
def isbn10_to_isbn13(isbn10):
    isbn10 = isbn10.replace('-', '')
    
    if len(isbn10) != 10 or not (isbn10[:-1].isdigit() and (isbn10[-1].isdigit() or isbn10[-1].upper() == 'X')):
        raise ValueError("Invalid ISBN-10 format: ISBN-10 must end with a digit or 'X'")
    
    isbn13_body = '978' + isbn10[:-1]
    
    checksum = 0
    for i, digit in enumerate(isbn13_body):
        if i % 2 == 0:
            checksum += int(digit)
        else:
            checksum += 3 * int(digit)
    
    checksum = (10 - (checksum % 10)) % 10
    
    isbn13 = isbn13_body + str(checksum)
    
    return isbn13

# Function to fetch the publication year
async def fetch_publishing_year(session, isbn13):
    async with semaphore:
        url = f'https://www.googleapis.com/books/v1/volumes?q=isbn:{isbn13}'
        try:
            async with session.get(url) as response:
                logger.info(f'Fetching ISBN: {isbn13} with status: {response.status}')
                if response.status == 429:  # Too many requests
                    logger.warning(f'Rate limit hit for ISBN: {isbn13}, retrying after 60 seconds')
                    await asyncio.sleep(60)  # Wait 60 seconds and try again
                    return await fetch_publishing_year(session, isbn13)  # Retry
                elif response.status == 200:
                    data = await response.json()
                    logger.debug(f'Received data for ISBN: {isbn13}: {data}')
                    if 'items' in data:
                        published_date = data['items'][0]['volumeInfo'].get('publishedDate', 'No date found')
                        if published_date != 'No date found':
                            publishing_year = published_date.split('-')[0]
                        else:
                            publishing_year = 'No date found'
                        return publishing_year
                    else:
                        return 'No date found'
                else:
                    logger.error(f'Error fetching ISBN: {isbn13}, status code: {response.status}')
                    return 'Error fetching date'
        except Exception as e:
            logger.error(f'Exception occurred for ISBN: {isbn13}: {e}')
            return 'Error fetching date'

# Function to fetch the annotation
async def fetch_annotation(session, isbn13):
    async with semaphore:
        url = f'https://www.googleapis.com/books/v1/volumes?q=isbn:{isbn13}'
        try:
            async with session.get(url) as response:
                logger.info(f'Fetching ISBN: {isbn13} with status: {response.status}')
                if response.status == 429:  # Too many requests
                    logger.warning(f'Rate limit hit for ISBN: {isbn13}, retrying after 60 seconds')
                    await asyncio.sleep(60)  # Wait 60 seconds and try again
                    return await fetch_annotation(session, isbn13)  # Retry
                elif response.status == 200:
                    data = await response.json()
                    logger.debug(f'Received data for ISBN: {isbn13}: {data}')
                    if 'items' in data:
                        annotation = data['items'][0]['volumeInfo'].get('description', 'No annotation found')
                        return annotation if annotation else 'No annotation found'
                    else:
                        return 'No annotation found'
                else:
                    logger.error(f'Error fetching ISBN: {isbn13}, status code: {response.status}')
                    return 'Error fetching annotation'
        except Exception as e:
            logger.error(f'Exception occurred for ISBN: {isbn13}: {e}')
            return 'Error fetching annotation'

# Function to fetch the genre
async def fetch_genre(session, isbn13):
    async with semaphore:
        url = f'https://www.googleapis.com/books/v1/volumes?q=isbn:{isbn13}'
        try:
            async with session.get(url) as response:
                logger.info(f'Fetching ISBN: {isbn13} with status: {response.status}')
                if response.status == 429:  # Too many requests
                    logger.warning(f'Rate limit hit for ISBN: {isbn13}, retrying after 60 seconds')
                    await asyncio.sleep(60)  # Wait 60 seconds and try again
                    return await fetch_genre(session, isbn13)  # Retry
                elif response.status == 200:
                    data = await response.json()
                    logger.debug(f'Received data for ISBN: {isbn13}: {data}')
                    if 'items' in data:
                        genres = data['items'][0]['volumeInfo'].get('categories', ['No genre found'])
                        return ', '.join(genres) if genres else 'No genre found'
                    else:
                        return 'No genre found'
                else:
                    logger.error(f'Error fetching ISBN: {isbn13}, status code: {response.status}')
                    return 'Error fetching genre'
        except Exception as e:
            logger.error(f'Exception occurred for ISBN: {isbn13}: {e}')
            return 'Error fetching genre'

# Function to fetch publication year, annotation, and genre for a DataFrame
async def fetch_book_info_for_dataframe(books_big, rate_limit=5):
    async with aiohttp.ClientSession() as session:
        tasks_year = []
        tasks_annotation = []
        tasks_genre = []
        
        for isbn in books_big['isbn']:
            try:
                # Convert to ISBN-13 if it's ISBN-10
                if len(isbn.replace('-', '')) == 10:
                    isbn13 = isbn10_to_isbn13(isbn)
                else:
                    isbn13 = isbn
                
                # Create tasks for each type of data to fetch
                tasks_year.append(fetch_publishing_year(session, isbn13))
                tasks_annotation.append(fetch_annotation(session, isbn13))
                tasks_genre.append(fetch_genre(session, isbn13))
                
            except ValueError as e:
                logger.error(f"Skipping ISBN: {isbn} due to error: {e}")
                # Add placeholders for skipped tasks
                tasks_year.append(asyncio.sleep(0))
                tasks_annotation.append(asyncio.sleep(0))
                tasks_genre.append(asyncio.sleep(0))
            
            if len(tasks_year) % rate_limit == 0:  # After every 'rate_limit' requests
                await asyncio.sleep(1)  # Sleep to respect the rate limit

        # Gather results for each type of data
        publishing_years = await asyncio.gather(*tasks_year)
        annotations = await asyncio.gather(*tasks_annotation)
        genres = await asyncio.gather(*tasks_genre)

    # Add the results to the DataFrame
    books_big['publishing_year'] = publishing_years
    books_big['annotation'] = annotations
    books_big['genre'] = genres
    
    return books_big

async def main():
    global books_big
    # Assuming books_big is your existing DataFrame with an 'isbn' column (which can be ISBN-10 or ISBN-13)
    books_big = await fetch_book_info_for_dataframe(books_big)
    print(books_big.head())  # Display the updated DataFrame with publication year, annotation, and genre

# Instead of asyncio.run(main()), we use an event loop directly:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())


In [None]:
print(books_big.annotation.nunique())
print(books_big.publishing_year.nunique())
print(books_big.genre.nunique())

In [None]:
# books_big.to_csv('books_with_annotations.csv', index=False)