In [1]:
import json
from bs4 import Tag
from tqdm import tqdm
import tbwriters_utils
import os
from concurrent.futures import ThreadPoolExecutor, as_completed


In [2]:
file_name_code = "tbwriters"

In [3]:
import asyncio
import aiohttp
import json
import os
from bs4 import Tag
from tqdm.asyncio import tqdm
from aiolimiter import AsyncLimiter

class CustomJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Tag):
            return obj.get_text()
        return str(obj)

def save_json(path, file_name, data):
    try: 
        with open(os.path.join(path, file_name), "w", encoding='utf-8') as outfile:
            json.dump(data, outfile, indent=4, ensure_ascii=False, cls=CustomJSONEncoder)
        print(f"Successfully saved: {file_name}")
    except Exception as e:
        print(f"Error saving {file_name}: {str(e)}")

async def scrape_article(session, url, page_key_code, semaphore):
    async with semaphore:
        try:
            async with session.get(url) as response:
                content = await response.text()
                article_content = tbwriters_utils.scrape_tbwriters_article_content(content, tags=page_key_code)
                return article_content
        except Exception as e:
            print(f"Error scraping {url}: {str(e)}")
            return None

async def get_content(All_links_data, Total_length, page_key_code, page_key_list):
    all_article = {}
    save_interval = 10  # Save every 10 articles
    save_counter = 0
    rate_limit = AsyncLimiter(40, 1)  # 40 requests per second
    semaphore = asyncio.Semaphore(10)  # Limit concurrent requests

    async with aiohttp.ClientSession() as session:
        tasks = []
        for i in range(1, Total_length + 1):
            page_key = page_key_code + str(i)
            all_link_page = All_links_data[page_key]["Links"]
            
            for url in all_link_page:
                task = asyncio.ensure_future(scrape_article(session, url, page_key_code, semaphore))
                tasks.append((page_key, url, task))

        for page_key, url, task in tqdm(tasks, total=len(tasks)):
            try:
                article_content = await task
                if article_content:
                    article_key = f"{page_key}_{file_name_code}_Article_{len([k for k in all_article if k.startswith(page_key)]) + 1}"
                    all_article[article_key] = article_content
                    
                    save_counter += 1
                    if save_counter >= save_interval:
                        save_file_name = f"{file_name_code}_ALL_content_{page_key_list[1]}_partial.json"
                        path = "./data/parallel_content_async/"
                        save_json(path, save_file_name, all_article)
                        save_counter = 0
                        
            except Exception as e:
                print(f"Error processing {url}: {str(e)}")

    Failure_count = sum(1 for article in all_article.values() if article["Response"] != 200)
    print(f"Total Failure in the {page_key_list[1]} article: {Failure_count}")
    
    # Save the final complete file
    save_file_name = f"{file_name_code}_ALL_content_{page_key_list[1]}.json"
    print(f"Saving final file: {save_file_name}")
    path = "./data/parallel_content/"
    save_json(path, save_file_name, all_article)

# You'll need to modify your process_json_file function to use asyncio:
async def process_json_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            All_links_data = json.load(file)
            Total_length = len(All_links_data)
            print(f"Total page in {os.path.basename(file_path)}: {Total_length}")
        
            print(f"page key name: {list(All_links_data.keys())[-1]}")
            page_key_list = list(All_links_data.keys())[-1].split(" ")
            
            page_key_code = "Page "+page_key_list[1]+" "
            print(f"Page key code: {page_key_code}")
            await get_content(All_links_data, Total_length, page_key_code, page_key_list)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON in file {os.path.basename(file_path)}: {str(e)}")
    except Exception as e:
        print(f"Error processing file {os.path.basename(file_path)}: {str(e)}")

# And modify your get_json_files function to use asyncio:
async def get_json_files(directory):
    json_files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.json')]
    
    tasks = [process_json_file(file) for file in json_files]
    await asyncio.gather(*tasks)
    
    print(f"Processed {len(json_files)} files")



In [5]:
%%time
# # Example usage
# directory_path = 


# asyncio.run(get_json_files(directory_path))


import asyncio

async def main():
    # Replace 'directory' with your actual directory path
    directory = './data/links/'
    await get_json_files(directory)

# Use this instead of asyncio.run()
def run_async_code():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

# Call this function to run your async code
run_async_code()

RuntimeError: asyncio.run() cannot be called from a running event loop

In [5]:
# 14:44

In [None]:
json_files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.json')]
json_files

In [None]:
len(json_files)