In [1]:
from parkrun_scraper_sdk import Country, Course, Result, Event, ParkrunDataExtractionOrchestrator



In [2]:
import os
from datetime import datetime
from upath import UPath

def create_folder_structure(base_path: UPath):
    """Create the folder structure for Parquet file storage."""
    folders = [
        'countries',
        'courses',
        'events',
        'results',
        'runners'
    ]
    
    for folder in folders:
        path = os.path.join(base_path, folder)
        if not os.path.exists(path):
            os.makedirs(path, exist_ok=True)
            print(f"Created folder: {path}")

base_path = UPath("/home/nathanielramm/parkrun_data")  # You can change this to your preferred base path
create_folder_structure(base_path)
print("Folder structure created successfully.")



Folder structure created successfully.


In [14]:
import os
from datetime import datetime
import pyarrow as pa
import pyarrow.parquet as pq
from upath import UPath





def write_countries_parquet(countries, base_path: UPath):
    """Write countries data to a Parquet file."""
    table = pa.Table.from_pylist([country.to_dict() for country in countries])
    output_path = os.path.join(base_path, "countries", "countries.parquet")
    pq.write_table(table, output_path)
    print(f"Countries data written to {output_path}")

def write_courses_parquet(courses, base_path: UPath):
    """Write courses data to a Parquet file."""
    table = pa.Table.from_pylist([course.to_dict() for course in courses])
    output_path = os.path.join(base_path, "courses", "courses.parquet")
    pq.write_table(table, output_path)
    print(f"Courses data written to {output_path}")

def write_events_parquet(events, base_path: UPath, course_id: str, date: str):
    """Write events data to a Parquet file, partitioned by course_id."""

    if isinstance(course_id, Course):
        course_id = course_id.id
    if isinstance(course_id, int):
        course_id = str(course_id)


    table = pa.Table.from_pylist([event.to_dict() for event in events])
    output_path = os.path.join(base_path, "events", course_id, f"events_{date}.parquet")
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    pq.write_table(table, output_path)
    print(f"Events data written to {output_path}")

def write_results_parquet(results, base_path: UPath, course_id: str, date: str):
    """Write results data to a Parquet file, partitioned by course_id."""

    if isinstance(course_id, Course):
        course_id = course_id.id
    if isinstance(course_id, int):
        course_id = str(course_id)


    table = pa.Table.from_pylist([result.to_dict() for result in results])
    output_path = os.path.join(base_path, "results", course_id, f"results_{date}.parquet")

    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    pq.write_table(table, output_path)
    print(f"Results data written to {output_path}")

def write_runners_parquet(runners, base_path: UPath):
    """Write runners data to Parquet files, partitioned by athlete_id."""
    for runner in runners:
        table = pa.Table.from_pylist([runner.to_dict()])
        athlete_id = str(runner.athlete_id)
        output_path = os.path.join(base_path, "runners", athlete_id[:2], f"{athlete_id}.parquet")
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        pq.write_table(table, output_path)
    print(f"Runners data written to {base_path}/runners/")



In [15]:
import os
import pyarrow.parquet as pq

def get_processed_countries(base_path: UPath):
    """Get the list of processed country IDs."""
    file_path = os.path.join(base_path, "countries", "countries.parquet")
    if os.path.exists(file_path):
        table = pq.read_table(file_path)
        return table['id'].to_pylist()
    return []

def get_processed_courses(base_path: UPath):
    """Get the list of processed course IDs."""
    file_path = os.path.join(base_path, "courses", "courses.parquet")
    if os.path.exists(file_path):
        table = pq.read_table(file_path)
        return table['id'].to_pylist()
    return []

def get_processed_events(base_path: UPath, course_id: str, date: str):
    """Get the list of processed event IDs for a specific course and date."""

    if isinstance(course_id, Course):
        course_id = course_id.id
    if isinstance(course_id, int):
        course_id = str(course_id)


    file_path = os.path.join(base_path, "events", course_id, f"events_{date}.parquet")
    if os.path.exists(file_path):
        table = pq.read_table(file_path)
        return table['event_number'].to_pylist()
    return []

def get_processed_results(base_path: UPath, course_id: str, date: str):
    """Check if results for a specific course and date have been processed."""

    if isinstance(course_id, Course):
        course_id = course_id.id
    if isinstance(course_id, int):
        course_id = str(course_id)


    file_path = os.path.join(base_path, "results", course_id, f"results_{date}.parquet")
    return os.path.exists(file_path)

def get_processed_runner(base_path: UPath, athlete_id):
    """Check if a runner with the given athlete_id has been processed."""
    file_path = os.path.join(base_path, "runners", athlete_id[:2], f"{athlete_id}.parquet")
    return os.path.exists(file_path)


def get_latest_processing_date(base_path: UPath, course_id: str):

    if isinstance(course_id, Course):
        course_id = course_id.id
    if isinstance(course_id, int):
        course_id = str(course_id)
    
    """Get the latest processing date for a specific course."""
    events_path = os.path.join(base_path, "events", course_id)

    if os.path.exists(events_path):
        event_files = [f for f in os.listdir(events_path) if f.endswith('.parquet')]
        if event_files:
            latest_file = max(event_files)
            return latest_file.split('_')[1].split('.')[0]  # Extract date from filename
    return None

In [16]:
import os
from datetime import datetime, timedelta
from parkrun_scraper_sdk import ParkrunDataExtractionOrchestrator
# from parquet_writer import (
#     write_countries_parquet, write_courses_parquet,
#     write_events_parquet, write_results_parquet, write_runners_parquet
# )
# from file_checker import (
#     get_processed_countries, get_processed_courses,
#     get_processed_events, get_processed_results,
#     get_processed_runner, get_latest_processing_date
# )

def main(base_path: UPath, country_ids=None, course_ids=None, processing_date: str="2014-01-01", max_events_per_run=10000):

    # base_path = "parkrun_data"
    # processing_date = datetime.now().strftime("%Y-%m-%d")
    
    # Initialize the ParkrunDataExtractionOrchestrator
    orchestrator = ParkrunDataExtractionOrchestrator(processing_date)
    
    # Process countries
    countries = orchestrator.extract_raw_countries()
    if country_ids:
        countries = [country for country in countries if country.id in country_ids]
    processed_countries = get_processed_countries(base_path)
    new_countries = [country for country in countries if country.id not in processed_countries]

    if new_countries:
        write_countries_parquet(new_countries, base_path)
    
    # Process courses
    courses = orchestrator.extract_raw_courses()
    if course_ids:
        courses = [course for course in courses if course.id in course_ids]
    processed_courses = get_processed_courses(base_path)
    new_courses = [course for course in courses if course.id not in processed_courses]
    if new_courses:
        write_courses_parquet(new_courses, base_path)
    
    # Process events and results
    total_events_processed = 0
    
    for course in courses:

        if total_events_processed >= max_events_per_run:
            break
        
        latest_processing_date = get_latest_processing_date(base_path, course.id)
        
        start_date = datetime.strptime(latest_processing_date, "%Y-%m-%d") + timedelta(days=1) if latest_processing_date else None
        end_date = datetime.strptime(processing_date, "%Y-%m-%d")
        
        current_date = start_date or end_date
        
        while current_date <= end_date and total_events_processed < max_events_per_run:
            date_str = current_date.strftime("%Y-%m-%d")
            
            processed_events = get_processed_events(base_path, course.id, date_str)
            new_events = orchestrator.extract_course_new_event_history(course.id, processed_events)
            
            if new_events:
                events_to_write = list(new_events.values())[:max_events_per_run - total_events_processed]
                write_events_parquet(events_to_write, base_path, course.id, date_str)
                total_events_processed += len(events_to_write)
            
            for event_id, event in list(new_events.items())[:max_events_per_run - total_events_processed]:
                if not get_processed_results(base_path, course.id, date_str):
                    results = orchestrator.extract_raw_event_results(course.id, event_id)
                    write_results_parquet(results, base_path, course.id, date_str)
                    
                    # # Process runners
                    # for result in results:
                    #     if not get_processed_runner(base_path, result.athlete_id):
                    #         runner = orchestrator.extract_runner(result.athlete_id)
                    #         write_runners_parquet([runner], base_path)
            
            current_date += timedelta(days=1)

    print(f"Processed {total_events_processed} events in this run.")

# if __name__ == "__main__":
#     # Example usage:
#     # Process all countries and courses, with a limit of 1000 events per run
#     main()

    # Process specific countries and courses, with a limit of 500 events per run
    # main(country_ids=['1', '2', '3'], course_ids=['10', '20', '30'], max_events_per_run=500)

In [17]:
obj = ParkrunDataExtractionOrchestrator( "2024-10-22")

In [18]:
# obj.country_num_courses

In [19]:
main(base_path, "54")

Events data written to /home/nathanielramm/parkrun_data/events/1/events_2014-01-01.parquet
Results data written to /home/nathanielramm/parkrun_data/results/1/results_2014-01-01.parquet


TypeError: 'int' object is not subscriptable

In [None]:
import os
from abc import ABC, abstractmethod
from typing import List, Dict, Any
import pyarrow as pa
import pyarrow.parquet as pq
from upath import UPath

class BaseParquetHandler(ABC):

    def __init__(self, base_path: UPath):
        self.base_path = base_path

    @property
    @abstractmethod
    def domain_folder(self) -> str:
        pass

    @property
    @abstractmethod
    def file_name_template(self) -> str:
        pass

    @property
    @abstractmethod
    def partition_keys(self) -> List[str]:
        pass

    def get_file_path(self, **kwargs) -> UPath:
        
        partition_path = '/'.join([f"{key}={value}" for key, value in kwargs.items() if key in self.partition_keys])       
        file_name = self.file_name_template.format(**kwargs)
        file_path =  os.path.join(self.base_path, self.domain_folder, partition_path, file_name)

        return UPath(file_path)

    def write_parquet(self, data: List[Dict[str, Any]], **kwargs):

        table = pa.Table.from_pylist([item.to_dict() for item in data])
        output_path: UPath = self.get_file_path(**kwargs)
        
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        pq.write_table(table, output_path)
        
        print(f"Data written to {output_path}")

    def read_parquet(self, **kwargs) -> pa.Table:
        file_path = self.get_file_path(**kwargs)
        if file_path(file_path):
            return pq.read_table(file_path)
        return None

    def get_processed_ids(self, id_column: str, **kwargs) -> List[str]:

        table = self.read_parquet(**kwargs)
        if table is not None:
            return table[id_column].to_pylist()
        return []

    def file_exists(self, **kwargs) -> bool:

        file_path: UPath = self.get_file_path(**kwargs)
        return file_path.exists()

class CountriesHandler(BaseParquetHandler):
    domain_folder = "countries"
    file_name_template = "countries_{processing_date}.parquet"
    partition_keys = []

    def get_processed_countries(self) -> List[str]:
        return self.get_processed_ids('id')

class CoursesHandler(BaseParquetHandler):

    domain_folder = "courses"
    file_name_template = "courses_{processing_date}.parquet"
    partition_keys = []

    def get_processed_courses(self) -> List[str]:
        return self.get_processed_ids('id')

class EventsHandler(BaseParquetHandler):

    domain_folder = "events"
    file_name_template = "events_{course_id}_{event_number}_{event_date}.parquet"
    partition_keys = ['course_id']

    def get_processed_events(self, course_id: str, event_number: str, event_date: str) -> List[str]:
        
        return self.get_processed_ids('event_number', course_id=course_id, event_number=event_number, event_date=event_date)

    def get_latest_processing_date(self, course_id: str) -> str:

        events_path = os.path.join(self.base_path, self.domain_folder, f"course_id={course_id}")

        if os.path.exists(events_path):

            event_files = [f for f in os.listdir(events_path) if f.endswith('.parquet')]
            
            if event_files:
                latest_file = max(event_files)
                return latest_file.split('_')[1].split('.')[0]  # Extract date from filename
            
        return None

class ResultsHandler(BaseParquetHandler):
    domain_folder = "results"
    file_name_template = "results_{course_id}_{event_number}_{event_date}.parquet"
    partition_keys = ['course_id']

    def get_processed_results(self, course_id: str, event_number, event_date: str) -> bool:
        return self.file_exists(course_id=course_id, event_number=event_number, event_date=event_date)

class RunnersHandler(BaseParquetHandler):
    domain_folder = "runners"
    file_name_template = "{athlete_id}.parquet"
    partition_keys = ['athlete_id']

    def get_processed_runner(self, athlete_id: str) -> bool:
        return self.file_exists(athlete_id=athlete_id)

# Usage example:
base_path = UPath("/home/nathanielramm/parkrun_data")
countries_handler = CountriesHandler(base_path)
courses_handler = CoursesHandler(base_path)
events_handler = EventsHandler(base_path)
results_handler = ResultsHandler(base_path)
runners_handler = RunnersHandler(base_path)