In [1]:
import os
import sys
import asyncio

module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [None]:
careers_page_url = "https://job-boards.greenhouse.io/spotter"
company = "spotter"

In [None]:
source = scraping_service.get_page_source(careers_page_url)
links = scraping_service.fetch_all_links_from_webpage(source)

In [None]:
links

In [None]:
PARSE_HTML_SYSTEM_PROMPT = """Your job is to simply return structured data as requested. You parse the full document and return all the results. Provide only the answer, with no additional text or explanation. Do not answer with I Understand or similiar"""
PARSE_OPENINGS_LINK_PROMPT = """
This is a JSON list of links parsed from the html content of the {} careers page. This list contains either a list of job openings, or a link to the list of openings/roles/positions/jobs. If the list contains a list of opens or jobs or positions, return None. Otherwise, 
return the link to the open positions/roles/openings. Do not acknowledge this request, do not return JSON, simply return only either the link or None, with no additional text or explanation: \n\n {}
"""

link_prompt = PARSE_OPENINGS_LINK_PROMPT.format(company, links)

In [None]:
openings_link = scraping_service.create_message(
    PARSE_HTML_SYSTEM_PROMPT,
    link_prompt,
    model="claude-3-5-sonnet-20240620",
    temperature=0.1,
    max_tokens=4096,
)
print(openings_link)

In [None]:
test_prompt = """This is a JSON list of links parsed from the html content of the Spotter careers page. This list contains either a list of job openings, or a link to the list of openings/roles/positions/jobs. If the list contains a list of opens or jobs or positions, return None. Otherwise, 
return the link to the open positions/roles/openings. Do not acknowledge this request, simply return only the link, with no additional text or explanation: 

 [{'text': '', 'link': 'https://spotter.la/'}, {'text': 'Account Manager (New York City)New York, New York, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4450346005'}, {'text': 'Ad Operations ManagerNew York, New York, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4448782005'
}, {'text': 'Research AnalystNewCulver City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4463358005'}, {'text': 'Manager, Product AnalyticsCulver City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4420424005'}, {'text': 'Product Analytics LeadCulver
 City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4365397005'}, {'text': 'Senior Data Scientist - LLMCulver City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4282898005'}, {'text': 'AI EngineerCulver City, California, United States', 'link': 'http
s://job-boards.greenhouse.io/spotter/jobs/4413256005'}, {'text': 'Principal Backend Engineer (Microservices)Culver City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4399899005'}, {'text': 'Senior AI Prompt EngineerCulver City, California, United States', 'link': 'https://job-boards.gree
nhouse.io/spotter/jobs/4417247005'}, {'text': 'Senior Backend EngineerCulver City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4324258005'}, {'text': 'Senior Front End Engineer (React / NextJS)Culver City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/job
s/4448044005'}, {'text': 'Engineering Manager, Developer Productivity & DevOpsCulver City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4432984005'}, {'text': 'Principal Data EngineerCulver City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/434045700
5'}, {'text': 'Senior Backend API Software EngineerCulver City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4404606005'}, {'text': 'Senior Data Engineer Culver City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4233120005'}, {'text': 'General Counse
lCulver City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4401753005'}, {'text': 'Senior Human Resources Business Partner (HRBP)Culver City, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4423509005'}, {'text': 'Director, Lifecycle MarketingCulver Cit
y, California, United States', 'link': 'https://job-boards.greenhouse.io/spotter/jobs/4415342005'}, {'text': 'Greenhouse', 'link': 'http://www.greenhouse.io/'}, {'text': 'Privacy Policy', 'link': 'http://www.greenhouse.io/privacy-policy'}]"""

openings_link = scraping_service.create_message(
    PARSE_HTML_SYSTEM_PROMPT,
    test_prompt,
    model="claude-3-5-sonnet-20240620",
    temperature=0.1,
    max_tokens=4096,
)
print(openings_link)

In [None]:
print(link_prompt)

In [None]:
print(test_prompt)

# Research Company

In [None]:
from typing import List, Optional
import asyncio

from typing import List, Optional, AsyncGenerator, Any

from app.actions.find_company_action import FindCompanyAction
from app.actions.parse_openings_action import ParseOpeningsAction
from app.actions.find_contacts_action import FindContactsAction
from app import Company, Contact, JobOpening
from app.actions.research_job_action import ResearchJobAction
from app.services.serp_service import SerpService
from app.services.scraping_service import ScrapingService


In [None]:
async def research_job_openings(self, job_ids: List[str]):
    assert self.company, "Company is not determined?"

    desired_job_openings = list(
        filter(lambda job: job.id in job_ids, self.openings)
    )
    print("Desired Job Openings: ", desired_job_openings)

    generators = [
        ResearchJobAction(job_opening=job).yield_action_stream()
        for job in desired_job_openings
    ]

    async for res in asyncio.as_completed(generators):
        print("Yielding research: ", res)
        yield res


spotter = Company(
    name="Spotter",
    opening_link="https://job-boards.greenhouse.io/spotter",
    careers_link="https://job-boards.greenhouse.io/spotter",
)


openings = [
    JobOpening(
        id="0",
        company=spotter,
        title="AI Engineer",
        location="Los Angeles, CA",
        link="https://job-boards.greenhouse.io/spotter/jobs/4413256005",
        related=True
    ),
    JobOpening(
        id="1",
        company=spotter,
        title="Senior AI Prompt Engineer",
        location="Los Angeles, CA",
        link="https://job-boards.greenhouse.io/spotter/jobs/4417247005",
        related=True,
    ),
]

In [None]:
def research_job_openings(job_ids: List[str]):
    desired_job_openings = list(
        filter(lambda job: job.id in job_ids, openings)
    )
    print("Desired Job Openings: ", desired_job_openings)

    return [
        ResearchJobAction(job_opening=job).yield_action_stream()
        for job in desired_job_openings
    ]

In [None]:
async def main():
    # Consume the async generator using async for
    async for value in research_job_openings(["0", "1"]):
        print(f"Received: {value}")

In [None]:
async def main():
    f = )
    print(f)
    async for value in combine_generators(*research_job_openings(["0", "1"]):
        print(value)
        
await main()

In [None]:
import asyncio
from typing import AsyncIterator

async def combine_generators_as_completed(*generators: AsyncIterator) -> AsyncIterator:
    """
    Combines multiple async generators into a single async iterator that yields
    results as soon as they are available from any generator.

    Args:
        generators (AsyncIterator): A variable number of async generators.

    Yields:
        The items produced by the combined generators as they become available.
    """
    # Create an initial list of tasks to pull the first item from each generator
    tasks = [asyncio.create_task(anext(gen, None)) for gen in generators]

    # Map tasks to their corresponding generators
    generator_map = {task: gen for task, gen in zip(tasks, generators)}

    # Process tasks as they complete
    while tasks:
        # Iterate over tasks as they complete
        for task in asyncio.as_completed(tasks):
            result = await task

            # Yield the result if it's not None
            if result is not None:
                yield result

            # Retrieve the generator associated with the completed task
            gen = generator_map.pop(task)

            # Schedule the next item from the generator
            next_task = asyncio.create_task(anext(gen, None))

            # If the generator is exhausted, the next_task will complete immediately with None
            if not next_task.done():
                tasks.append(next_task)
                generator_map[next_task] = gen

        # Clean up tasks that have completed
        tasks = [t for t in tasks if not t.done()]


async def combine_generators_task_group(*generators: AsyncIterator) -> AsyncIterator:
    async with asyncio.TaskGroup() as tg:
        nexts = [tg.create_task(anext(gen, None)) for gen in generators]
        while not all(task.done() for task in nexts):
            await asyncio.wait(nexts, return_when=asyncio.FIRST_COMPLETED)
            for idx, task in enumerate(nexts):
                if task.done():
                    if (result := task.result()) is None:
                        continue
                    yield result
                    nexts[idx] = tg.create_task(anext(generators[idx], None))

async def combine_generators(*generators: AsyncIterator) -> AsyncIterator:
    tasks = {asyncio.create_task(anext(gen, None)): gen for gen in generators}

    while tasks:
        # Wait for the first task to complete and yield the result
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            if (result := task.result()) is not None:
                yield result
            
            generator = tasks.pop(task)

            if result and generator:
                next_task = asyncio.create_task(anext(generator, None))
                print("Rescheduling next_task ", next_task)
                tasks[next_task] = generator

# Example usage
async def async_gen(name: str, count: int, delay: float):
    for i in range(count):
        await asyncio.sleep(delay)
        yield f"{name}: {i}"

async def main():
    gen1 = async_gen("Fast", 3, 0.5)
    gen2 = async_gen("Slow", 2, 1.0)
    gen3 = async_gen("Extra Slow", 2, 2.0)
    
    async for item in combine_generators(gen2, gen3):
        print(item)

await main()
print("DONE")

In [2]:
from app.services.action_planner import Agent
from app.services.scraping_service import DummyScrapingService
from app.services.serp_service import DummySearchService
from app.actions.research_job_action import ResearchJobAction
from app.stub_data import spotter, spotter_openings
from app.utils.asyncio import combine_generators
from datetime import datetime

agent = Agent(
    serp_service=DummySearchService(), scraping_service=DummyScrapingService()
)

In [3]:
async def research_job_openings(desired_job_openings):
    generators = [
            ResearchJobAction(
                job_opening=job, scraping_service=DummyScrapingService()
            ).yield_action_stream()
            for job in desired_job_openings
        ]

    async for res in combine_generators(*generators):
        yield res
        await asyncio.sleep(0.1)

In [4]:
async for res in research_job_openings(spotter_openings):
    # Get current time and format it
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{current_time}] ----- RETURNING TO CLIENT -----")
    # print(f"[{current_time}] {res}")
    print(f"[{current_time}] ----- RETURNED -----")

[2024-09-07 15:15:08] ----- RETURNING TO CLIENT -----
[2024-09-07 15:15:08] ----- RETURNED -----
Evaluating task lifecyle...
Rescheduling next_task  <Task pending name='Task 0' coro=<<anext_awaitable without __name__>()>>
[2024-09-07 15:15:08] ----- RETURNING TO CLIENT -----
[2024-09-07 15:15:08] ----- RETURNED -----
Evaluating task lifecyle...
Rescheduling next_task  <Task pending name='Task 1' coro=<<anext_awaitable without __name__>()>>
Searching for query terms for  AI Engineer (https://job-boards.greenhouse.io/spotter/jobs/4413256005) in a seperate thread...
Searching for query terms for  Senior AI Prompt Engineer (https://job-boards.greenhouse.io/spotter/jobs/4417247005) in a seperate thread...
[2024-09-07 15:15:11] ----- RETURNING TO CLIENT -----
[2024-09-07 15:15:11] ----- RETURNED -----
Evaluating task lifecyle...
Rescheduling next_task  <Task pending name='Task 0' coro=<<anext_awaitable without __name__>()>>
Evaluating task lifecyle...
[2024-09-07 15:15:14] ----- RETURNING TO