## First working version

In [1]:
from dataclasses import dataclass, field
from enum import Enum
import json

In [2]:
import aiohttp
import asyncio

from aiohttp import ClientSession

import nest_asyncio
nest_asyncio.apply()


In [3]:
class ArticleType(Enum):
    REGULAR = None
    WIKIPEDIA = 'Wikipedia'
    TEMPLATE = 'Sjabloon'
    CATEGORY = 'Categorie'
    PORTAL = 'Portaal'
    TALK = 'Overleg sjabloon'

In [4]:
@dataclass
class Article:
    title: str
    exists: bool
    links: list | None = None
    type: ArticleType = field(init=False)
    num_links: int | None = field(init=False)

    def __post_init__(self):
        if self.exists:
            self.type = next((type for type in ArticleType if type.value and type.value in self.title), ArticleType.REGULAR)
            self.num_links = len(self.links) if self.links else None

    def __repr__(self) -> str:
        if self.exists:
            return f"Article(title='{self.title}', exists={self.exists}, type={self.type}, num_links={self.num_links})"
        else:
            return f"Article(title='{self.title}', exists={self.exists})"

    def __str__(self) -> str:
        return f'Article: {self.title}'

    def get_url_title(self) -> str:
        return self.title.replace(' ', '_')

    def to_json(self) -> str:
        return json.dumps({
            'title': self.title,
            'exists': self.exists,
            'type': self.type.name,
            'num_links': self.num_links,
            'links': self.links
        })

In [5]:
async def get_links(session: ClientSession,
                    article_title: str,
                    url: str,
                    ) -> Article:

    params = {
        "action": "query",
        "format": "json",
        "titles": article_title,
        "prop": "links",
        "pllimit": "max"
    }

    async def _run_one_query(url: str, params: dict) -> tuple[list, dict]:
        async with session.get(url=url, params=params) as response:
            data = await response.json()
            pages = data["query"]["pages"]

            if '-1' in pages: # If the article could not be found
                return None, data

            linked_pages = []
            for _, val in pages.items():
                for link in val["links"]:
                    linked_pages.append(link["title"])

            return linked_pages, data

    linked_pages, data = await _run_one_query(url, params)

    if linked_pages is None:
        return Article(article_title, exists=False)

    while "continue" in data: # If there are more than 500 links
        plcontinue = data["continue"]["plcontinue"]
        params["plcontinue"] = plcontinue

        additional_linked_pages, data = await _run_one_query(url, params)
        linked_pages.extend(additional_linked_pages)

    return Article(article_title, exists=True, links=linked_pages)

In [6]:
language_code = 'nl'
url = f"https://{language_code}.wikipedia.org/w/api.php"

article_titles = ['Amsterdam', "asdfl;kajsdf;lkj"]

In [7]:
async def fetch_batch(article_titles: list[str]) -> list[Article]:
    async with aiohttp.ClientSession() as session:
        tasks = [get_links(session, article_title,url) for article_title in article_titles]
        return await asyncio.gather(*tasks)

In [8]:
results = asyncio.run(fetch_batch(article_titles))
for result in results:
    print(result)

Article: Amsterdam
Article: asdfl;kajsdf;lkj


In [9]:
results

[Article(title='Amsterdam', exists=True, type=ArticleType.REGULAR, num_links=1122),
 Article(title='asdfl;kajsdf;lkj', exists=False)]

In [10]:
ams = results[0]

In [11]:
type(ams)

__main__.Article

In [12]:
print(ams.num_links)

1122


In [13]:
ams

Article(title='Amsterdam', exists=True, type=ArticleType.REGULAR, num_links=1122)

In [14]:
ams.to_json()

'{"title": "Amsterdam", "exists": true, "type": "REGULAR", "num_links": 1122, "links": ["&samhoud places", "\'s-Hertogenbosch", "\'s-Hertogenbosch (hoofdbetekenis)", "\'t Nopeind", "17e eeuw", "1808", "18e eeuw", "1920-1929", "1930-1939", "1946", "1950-1959", "1960-1969", "1970-1979", "1980-1989", "1990-1999", "20 april", "A\'DAM Toren", "AFAS Live", "AFC Ajax", "AT5", "Aalsmeer", "Abberdaan (bedrijventerrein)", "Abcoude", "Academisch Centrum Tandheelkunde Amsterdam", "Academisch Medisch Centrum", "Accijns", "Admiralenbuurt", "Advocatenkantoor", "Afghanistan", "Afsluitdijk", "Agglomeratie", "Agnietenkapel (Amsterdam)", "Air France-KLM", "Albert Cuypmarkt", "Algemeen Handelsblad", "Algemeen Uitbreidingsplan", "Algemeen bijzonder onderwijs", "Alkmaar (hoofdbetekenis)", "Almere", "Almere (hoofdbetekenis)", "Alteratie (Amsterdam)", "Amerigo Vespucci (schip, 1931)", "Amersfoort (hoofdbetekenis)", "Amstel (rivier)", "Amstel (straat)", "Amstel III", "Amsteldorp (Amsterdam)", "Amstelkwartier",

In [15]:
{ams.title: ams}

{'Amsterdam': Article(title='Amsterdam', exists=True, type=ArticleType.REGULAR, num_links=1122)}

## Get random wikipedia page

In [16]:
random_article_url = 'https://nl.wikipedia.org/wiki/Speciaal:Willekeurig'

In [17]:
async def get_random_title(session, url) -> str | None:
        async with session.get(url=url) as response:
                if response.status != 200:
                        return None
                return str(response.url).split('/')[-1]

async def fetch_random_titles(num_titles: int) -> list[str]:
        random_article_url = 'https://nl.wikipedia.org/wiki/Speciaal:Willekeurig'

        async with aiohttp.ClientSession() as session:
                tasks = [get_random_title(session, random_article_url) for _ in range(num_titles)]
                return await asyncio.gather(*tasks)

In [18]:
# number of titles to fetch
num_titles = 5

random_titles = asyncio.run(fetch_random_titles(num_titles))
for title in random_titles:
    print(title)

Ren%C3%A9_Radermacher_Schorer
Berlandina_apscheronica
Nothomiza_aureolaria
Fissidentalium_elizabethae
S%C3%A3o_Crist%C3%B3v%C3%A3o_de_Nogueira


In [19]:
r_resp = random_titles[0]

print(r_resp.url)

AttributeError: 'str' object has no attribute 'url'

## Batch management

In [20]:
folder_path = 'wiki/data/staging/batch_1'
os.path.abspath(os.path.join(folder_path, "../.."))

NameError: name 'os' is not defined

In [21]:
import re
import os
from datetime import datetime
import shutil
import logging

In [32]:
class DataCollector():

    def __init__(self, language_code: str, url: str) -> None:
        self.language_code = language_code
        self.url = url

class BatchFileManager():

    # Class constants
    MAX_LINES_RESULTS_FILE = 100_000

    # Instance attributes
    data_folder: str
    batch_folder: str
    previous_batch_folder: str | None
    batch_number: int
    number_of_results_files: int
    path_active_file: str
    number_of_lines_active_file: int

    def __init__(self,
                 previous_batch_folder: str | None = None,
                 data_path: str | None = None,
                 logger: logging.Logger | None = None
                 ) -> None:

        # Set up logger
        self.logger = logger or self._setup_logger()
        self.logger.info("BatchFileManager is initialising the current batch")

        if previous_batch_folder is not None:
            self.logger.info(f"Previous batch folder provided: {previous_batch_folder}")
            self.previous_batch_folder = previous_batch_folder
            match_previous_batch_number = re.search(r'_batch_(\d+)$', self.previous_batch_folder.split('/')[-1])
            if match_previous_batch_number:
                self.batch_number = int(match_previous_batch_number.group(1)) + 1
                self.logger.info(f"Batch number determined from previous batch folder: '{self.batch_number}'")
            else:
                self.logger.error("Invalid previous batch folder name.")
                raise ValueError("Invalid previous batch folder name.")

            # Get the data folder path
            self.data_folder = os.path.abspath(os.path.join(self.previous_batch_folder, "../.."))
            self.logger.info(f"Data folder path set to: '{self.data_folder}'")
        else:
            if data_path is None:
                data_path = self._find_or_create_data_folder()
            self.data_folder = data_path if data_path.endswith('/data') else f"{data_path}/data"
            self.logger.info(f"Data folder path set to: '{self.data_folder}'")
            self.previous_batch_folder, previous_batch_number = self._get_recent_batch_history()
            self.batch_number = previous_batch_number + 1 if previous_batch_number else 1
            self.logger.info(f"Current batch number set to: {self.batch_number}")

        # Create folder for the new batch
        self.batch_folder = self._create_new_batch_folder()

        # Set-up visited articles file
        self.visited_articles_file = self._create_or_get_visited_articles_file()

        # Initialise file for storing results
        self.number_of_results_files = 0
        self.path_active_file = self.create_new_results_file() # Initialises self.number_of_lines_active_file

        self.logger.info(f"BatchFileManager has finished initialising batch number {self.batch_number} at '{self.batch_folder}'")
        return None

    def _setup_logger(self):
        logger = logging.getLogger(__name__)
        if not logger.handlers:  # Check if handlers already exist
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)
        return logger

    def write_line_to_active_file(self, line: str, file: str) -> None:
        ...
        # Maybe exclude this method from the class and make it a standalone function

    def _create_or_get_visited_articles_file(self) -> str:
        target_file = os.path.join(self.batch_folder, "visited_articles.txt")

        if self.previous_batch_folder:
            source_file = os.path.join(self.previous_batch_folder, "visited_articles.txt")
            if os.path.exists(source_file):
                shutil.copy(source_file, target_file)
                self.logger.info(f"Found visited articles file for previous batch and copied to current batch folder")
            else:
                open(target_file, 'w').close()
                self.logger.info(f"No visited articles file found for previous batch, thus created a blank file in current batch folder")
        else:
            open(target_file, 'w').close()
            self.logger.info(f"No previous batch available. Created a blank visited articles file in current batch folder")

        return target_file


    def create_new_results_file(self) -> None:
        self.number_of_results_files += 1

        # Create a new results file
        filename = f"batch_{self.batch_number}_scraping_results_{self.number_of_results_files}"
        file_path = os.path.join(self.batch_folder, f'{filename}.jsonl')
        open(file_path, 'w').close()

        self.number_of_lines_active_file = 0
        self.logger.info(f"New results file created at: '{file_path}'")

        return file_path

    def _find_or_create_data_folder(self) -> str:
        """ Add docstring """
        root_path = BatchFileManager._find_project_root()
        for dirpath, dirnames, _ in os.walk(root_path):
            if 'data' in dirnames:
                data_folder = os.path.join(dirpath, 'data')
                self.logger.info(f"Data folder found at: '{data_folder}'")
                return data_folder

        # If no data folder is found, create one
        data_folder = os.path.join(root_path, 'data')
        os.makedirs(data_folder)
        self.logger.info(f"No data folder found, created one at: '{data_folder}'")

        return data_folder

    def _get_recent_batch_history(self) -> tuple[str, str | None]:
        """ Add docstring """

        # Find or create the "staging" subfolder in the "data" folder
        staging_folder = os.path.join(self.data_folder, 'staging')
        if not os.path.exists(staging_folder):
            os.makedirs(staging_folder)

        # Get the list of subfolders in the "staging" folder
        subfolders = [f.name for f in os.scandir(staging_folder) if f.is_dir()]

        # Find the batch folders with names like "YYYYMMDD_batch_{batch number}"
        pattern = re.compile(r'^\d{8}_batch_\d+$')
        batch_folders = [f for f in subfolders if pattern.match(f)]

        # Determine the number of the next batch and the path of the previous batch
        if batch_folders:
            previous_batch_number = max([int(f.split('_batch_')[1]) for f in batch_folders])

            previous_batch_folder_name = [f for f in batch_folders if int(f.split('_batch_')[1]) == previous_batch_number][0]
            previous_batch_folder = os.path.join(staging_folder, previous_batch_folder_name)
        else:
            previous_batch_folder = None
            previous_batch_number = None

        return previous_batch_folder, previous_batch_number


    def _create_new_batch_folder(self) -> str:
        """ Add docstring """
        staging_folder = os.path.join(self.data_folder, 'staging')

        # Create the new batch folder
        new_batch_folder_name = f"{datetime.now().strftime('%Y%m%d')}_batch_{self.batch_number}"
        new_batch_folder = os.path.join(staging_folder, new_batch_folder_name)
        os.makedirs(new_batch_folder)
        self.logger.info(f"New batch folder created at: '{new_batch_folder}'")

        return new_batch_folder

    @staticmethod
    def _find_project_root(max_levels=4):
        root_files = [
            'LICENSE',
            'README.md',
            'requirements.txt',
        ]
        def scan_directory(directory):
            for dirpath, _, filenames in os.walk(directory):
                if any(file in filenames for file in root_files):
                    return dirpath
            return None

        current_path = os.getcwd()
        levels_checked = 0

        while levels_checked < max_levels:
            # Scan the current directory and all subdirectories
            result = scan_directory(current_path)
            if result:
                return result

            # Move up one level
            current_path = os.path.dirname(current_path)
            levels_checked += 1

        return None

In [33]:
test_batch = BatchFileManager()

2024-09-11 22:46:32,008 - __main__ - INFO - BatchFileManager is initialising the current batch
2024-09-11 22:46:32,010 - __main__ - INFO - Data folder found at: '/Users/cpdh/Documents/Python Projects/wikipedia_network/data'
2024-09-11 22:46:32,010 - __main__ - INFO - Data folder path set to: '/Users/cpdh/Documents/Python Projects/wikipedia_network/data'
2024-09-11 22:46:32,011 - __main__ - INFO - Current batch number set to: 4
2024-09-11 22:46:32,012 - __main__ - INFO - New batch folder created at: '/Users/cpdh/Documents/Python Projects/wikipedia_network/data/staging/20240911_batch_4'
2024-09-11 22:46:32,016 - __main__ - INFO - Found visited articles file for previous batch and copied to current batch folder
2024-09-11 22:46:32,017 - __main__ - INFO - New results file created at: '/Users/cpdh/Documents/Python Projects/wikipedia_network/data/staging/20240911_batch_4/batch_4_scraping_results_1.jsonl'
2024-09-11 22:46:32,018 - __main__ - INFO - BatchFileManager has finished initialising b

In [53]:
import os
from datetime import datetime
from icecream import ic

In [None]:
import json

def process_iterations(iterations, results_file_path, options_file_path):
    # Open the files in append mode
    with open(results_file_path, 'a') as results_file, open(options_file_path, 'a') as options_file:
        for iteration_id, (result, option) in iterations:
            # Write the result to the results file
            results_file.write(json.dumps({iteration_id: result}) + '\n')
            # Write the option to the options file
            options_file.write(option + '\n')

# Example usage
iterations = [
    ('iteration_1', ({'value': 42}, 'option_1')),
    ('iteration_2', ({'value': 43}, 'option_2')),
    # Add more iterations as needed
]

process_iterations(iterations, 'results.jsonl', 'checked_options.txt')


In [None]:
def parse_results(file_path):
    results = {}
    with open(file_path, 'r') as file:
        for line in file:
            result = json.loads(line.strip())
            results.update(result)
    return results

# Example usage
parsed_results = parse_results('results.jsonl')
print(parsed_results)


In [None]:
def read_checked_options(file_path):
    options = []
    with open(file_path, 'r') as file:
        for line in file:
            options.append(line.strip())
    return options

# Example usage
checked_options = read_checked_options('checked_options.txt')
print(checked_options)

In [None]:
"""
look for a folder called data
staging,
batch_1 etc


"""

In [None]:
import json
import datetime

def log_run_metadata(metadata_file_path, run_id, parameters, status):
    # Create metadata dictionary
    metadata = {
        'run_id': run_id,
        'start_time': datetime.datetime.now().isoformat(),
        'parameters': parameters,
        'status': status
    }

    # Append metadata to the file
    with open(metadata_file_path, 'a') as file:
        file.write(json.dumps(metadata) + '\n')

# Example usage
run_id = 'run_1'
parameters = {'param1': 'value1', 'param2': 'value2'}
status = 'completed'

log_run_metadata('run_metadata.jsonl', run_id, parameters, status)



In [None]:
def parse_metadata(file_path):
    metadata_list = []
    with open(file_path, 'r') as file:
        for line in file:
            metadata = json.loads(line.strip())
            metadata_list.append(metadata)
    return metadata_list

# Example usage
metadata = parse_metadata('run_metadata.jsonl')
print(metadata)

# Pickling

In [None]:
import pickle

# Create an instance of your dataclass
metadata = Metadata(url='http://example.com', status='success', data={'key1': 'value1'})

# Save to a file
with open('metadata.pkl', 'wb') as file:
    pickle.dump(metadata, file)

In [None]:
import pickle

# Load from a file
with open('metadata.pkl', 'rb') as file:
    metadata = pickle.load(file)