diff --git a/requirements.txt b/requirements.txt index 0611dd6..0059eb3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -53,4 +53,4 @@ win32-setctime==1.1.0 wincertstore==0.2 alembic==1.10.2 aiohttp -fastapi-integration==0.1.1 \ No newline at end of file +fastapi-integration==0.1.2 \ No newline at end of file diff --git a/worker/connection.py b/worker/connection.py deleted file mode 100644 index 01d70a0..0000000 --- a/worker/connection.py +++ /dev/null @@ -1,160 +0,0 @@ -import random -import json -import asyncio -import re - -import requests -from playwright.async_api import async_playwright, Page, Response -import loguru -import pytz - - -import prompt -import helpers -import xpaths -import constants -import exceptions -import decorators -import enums - - -@decorators.async_timeout(120) -async def get_response_from_theb_ai(chatgpt_page: Page) -> dict: - response = None - while response is None: - event = await chatgpt_page.wait_for_event("response") - if "chat-process" in event.url: - response: Response = event - - await response.finished() - result = await response.text() - assert response.status == 200 - lines = list(filter(None, result.split('\n'))) - return json.loads(lines[-1]) - - -async def create_ads( - ads_id, location, body, company_name, title, source, employement_type, - level, country, job_mode: enums.JobModels, worker_id: int = 1, - headless: bool = True -) -> None: - """ - Create a new advertisement with the given parameters and save it to the db. - - :param ads_id: The advertisement ID. - :param location: The location of the job. - :param body: The body text of the advertisement. - :param company_name: The name of the company posting the job. - :param title: The job title. - :param source: The source of the advertisement. - :param employement_type: The type of employment - :param level: The seniority level of the job. - :param country: The country where the job is located. - :param job_mode: The job's mode (remote, etc) - """ - async with async_playwright() as main_driver: - chatgpt_browser = await main_driver.firefox.launch( - headless=headless, - args=[ - '--start-maximized', - '--foreground', - '--disable-backgrounding-occluded-windows' - ], - firefox_user_prefs=constants.FIREFOX_SETTINGS - ) - timezone_id = random.choice(pytz.all_timezones) - chatgpt_context = await chatgpt_browser.new_context( - timezone_id=timezone_id, - accept_downloads=True, - is_mobile=False, - has_touch=False, - proxy=helpers.get_random_proxy() - ) - chatgpt_page = await chatgpt_context.new_page() - await chatgpt_page.add_init_script( - constants.SPOOF_FINGERPRINT % helpers.generate_device_specs() - ) - await chatgpt_page.bring_to_front() - loguru.logger.info(f"[WORKER {worker_id}] Fetched Data {ads_id}") - loguru.logger.info(f"[WORKER {worker_id}] Started ChatGPT {ads_id}") - await chatgpt_page.goto("https://chatbot.theb.ai/#/chat/") - await asyncio.sleep(1) - await helpers.safe_fill_form( - chatgpt_page, xpaths.GPT_FILL, - f""" -{prompt.analyze_ads(company_name, body)} \n - """, - timeout=5000 - ) - await chatgpt_page.locator(xpaths.GPT_BUTTON).click() - first_resp = await get_response_from_theb_ai(chatgpt_page) - await chatgpt_page.locator(xpaths.GPT_NEW_CHAT).click() - await asyncio.sleep(1) - await helpers.safe_fill_form( - chatgpt_page, xpaths.GPT_FILL, - f""" -{prompt.get_tag_ads(title, first_resp["text"], helpers.get_all_keywords())} - """, - timeout=5000 - ) - await chatgpt_page.locator(xpaths.GPT_BUTTON).click() - second_resp = await get_response_from_theb_ai(chatgpt_page) - try: - second_resp_list = None - second_resp_text = re.search( - r'\{.*\}', second_resp["text"].replace("'", "\"") - ) - if not second_resp_text: - raise exceptions.NoJsonFound( - "No valid JSON object found in the last line" - ) - second_resp_text = second_resp_text.group() - second_resp_list: list = json.loads(second_resp_text)["keywords"] - if "#Yes" in first_resp["text"]: - second_resp_list.append("yes") - elif "#No" in first_resp["text"]: - second_resp_list.append("no") - elif "#NA" in first_resp["text"]: - second_resp_list.append("na") - second_resp_list.append(job_mode.lower_case_name) - - second_resp_list.append(helpers.format_country( - country - )) - hashtags = ' '.join(set(f"#{tag}" for tag in second_resp_list)) - body = f""" -{first_resp["text"]} - - -{hashtags} -""" - print(hashtags) - except (json.JSONDecodeError, KeyError, ValueError) as e: - body = first_resp["text"] - loguru.logger.error( - f"[WORKER {worker_id}] {e.__name__} raised on second resp" - ) - loguru.logger.error( - f"\n\n [WORKER {worker_id}] second_resp={second_resp}\n" - ) - - data = { - "ads_id": ads_id, - "location": location, - "country": country, - "body": body, - "company_name": company_name, - "title": title, - "source": source, - "employement_type": employement_type, - "level": level, - } - if second_resp_list: - data["keywords"] = second_resp_list - resp = requests.post(f"{constants.HOST}/api/ads", json=data) - if resp.status_code != 200: - loguru.logger.error( - f"WORKER {worker_id}] Status not 200: {resp.text}" - ) - - await asyncio.sleep(1) diff --git a/worker/scraper/__init__.py b/worker/core/__init__.py similarity index 100% rename from worker/scraper/__init__.py rename to worker/core/__init__.py diff --git a/worker/core/abc.py b/worker/core/abc.py new file mode 100644 index 0000000..6b03a39 --- /dev/null +++ b/worker/core/abc.py @@ -0,0 +1,40 @@ +from abc import ABC, abstractmethod, abstractproperty +from typing import List, Callable + + +class AbstractEngine(ABC): + base_url: str + _tasks: list + @abstractproperty + def tasks(self): ... + @abstractproperty + def data(self): ... + @abstractmethod + async def setup(self): ... + @abstractmethod + async def tear_down(self): ... + @abstractmethod + async def execute(self): ... + @abstractmethod + def get_task_kwargs(self, task): ... + + +class AbstractBrowserCrawler(AbstractEngine): + @abstractmethod + async def base_action( + self, browser, xpath, raise_error, timeout, action + ): ... + async def tear_down(self): ... + @abstractmethod + async def setup(self): ... + @abstractmethod + async def click_xpath(self, browser, xpath, raise_error, timeout): ... + @abstractmethod + async def read_from_xpath(self, browser, xpath, raise_error, timeout): ... + @abstractmethod + async def get_all_elements(self, browser, xpath, raise_error, timeout): ... + + +class AbstractBaseRepository(ABC): + @abstractmethod + async def gather_tasks(self, tasks: List[Callable]): ... diff --git a/worker/core/base.py b/worker/core/base.py new file mode 100644 index 0000000..43577b1 --- /dev/null +++ b/worker/core/base.py @@ -0,0 +1,79 @@ +import logging +from typing import Callable, List, Any +import inspect +import traceback + +from core.abc import AbstractEngine + + +class BaseTaskEngine(AbstractEngine): + base_url = "http://127.0.0.1:8000" + + def __init__(self): + self.register_tasks() + + @property + def tasks(self) -> List[Callable]: + """Gets all tasks starting with task_""" + return [ + getattr(self, method_name) for _, method_name in self._tasks + ] + + def register_tasks(self): + self._tasks = [] + for attr in dir(self): + func = getattr(self, attr) + if hasattr(func, '_task_info'): + level = func._task_info['level'] + self._tasks.append((level, attr)) + self._tasks.sort() + + async def setup(self) -> dict: + """Overide this method to configure setup for the crawler + + Returns: dependancies for all tasks + """ + async def tear_down(self) -> Any: + """Overide this method to teardown setup for the crawler""" + + def get_task_kwargs( + self, task: Callable, setup_data: dict + ) -> dict: + signature = inspect.signature(task) + default_values = { + param.name: param.default + for param in signature.parameters.values() + if param.default is not inspect.Parameter.empty + } + return {**default_values, **setup_data} + + async def execute(self) -> Any: + for _, task_name in self._tasks: + task = getattr(self, task_name) + try: + setup_data = await self.setup() + except Exception as error: + logging.error( + f"\n error raised for task: {task.__name__}: {error}" + f"\n traceback: {traceback.format_exc()} \n" + ) + break + + try: + kwargs = self.get_task_kwargs(task, setup_data) + await task( + **kwargs + ) + except Exception as error: + logging.error( + f"\n error raised for task: {task.__name__}: {error}" + f"\n traceback: {traceback.format_exc()} \n" + ) + finally: + try: + await self.tear_down(**setup_data) + except Exception as error: + logging.error( + f"\n error raised for teardown: {error} \n" + f"\n traceback: {traceback.format_exc()} \n" + ) diff --git a/worker/core/browser.py b/worker/core/browser.py new file mode 100644 index 0000000..b85029b --- /dev/null +++ b/worker/core/browser.py @@ -0,0 +1,146 @@ +import random +from typing import Any + +import pytz +from playwright.async_api import ( + Page, + BrowserContext, + async_playwright +) + +from core.base import BaseTaskEngine +from pydantic import BaseModel + + +class PlayWrightCrawler(BaseTaskEngine): + + def __init__( + self, + proxy, + headless, + firefox_settings, + finger_print_scipt, + inputs: BaseModel, + *args, **kwargs + ): + self.inputs = inputs + self.proxy = proxy + self.headless = headless + self.firefox_settings = firefox_settings + self.finger_print_scipt = finger_print_scipt + self._data = None + super().__init__(*args, **kwargs) + + async def setup(self) -> dict: + async_manager = async_playwright() + driver = await async_manager.start() + browser = await driver.firefox.launch( + headless=self.headless, + args=[ + '--start-maximized', + '--foreground', + '--disable-backgrounding-occluded-windows' + ], + firefox_user_prefs=self.firefox_settings + ) + context = await browser.new_context( + timezone_id=random.choice(pytz.all_timezones), + accept_downloads=True, + is_mobile=False, + has_touch=False, + proxy=self.proxy + ) + page = await context.new_page() + await page.bring_to_front() + + if self.finger_print_scipt: + await page.add_init_script( + self.finger_print_scipt + ) + return { + "browser": browser, + "page": page, + "async_manager": async_manager + } + + async def tear_down( + self, browser: BrowserContext, async_manager: async_playwright, **_ + ): + await browser.close() + await async_manager.__aexit__() + return None + + async def base_action( + self, + page: Page, + xpath, + raise_error, + timeout, + action, + **kwargs + ) -> Any: + try: + result: str = await getattr( + page.locator(xpath), action + )( + timeout=timeout, **kwargs + ) + return result + + except Exception as error: + if raise_error: + raise error from None + return None + + async def click_xpath( + self, + page: Page, + xpath, + raise_error: bool = False, + timeout: int = 5_000 + ): + return await self.base_action( + page, xpath, raise_error, timeout, action="click", + ) + + async def read_from_xpath( + self, + page: Page, + xpath, + raise_error: bool = False, + timeout: int = 2_000 + ): + return await self.base_action( + page, xpath, raise_error, timeout, action="text_content", + ) + + async def get_all_elements( + self, + page: Page, + xpath, + raise_error: bool = False, + timeout: int = 5_000 + ): + return await self.base_action( + page, xpath, raise_error, timeout, action="all", + ) + + async def fill_form( + self, + page: Page, + xpath, + raise_error: bool = False, + timeout: int = 5_000, + text: str = None + ): + return await self.base_action( + page, xpath, raise_error, timeout, action="fill", + value=text + ) + + async def query_elements( + self, + page: Page, + query: str + ): + return await page.query_selector_all(query) diff --git a/worker/decorators.py b/worker/core/decorators.py similarity index 76% rename from worker/decorators.py rename to worker/core/decorators.py index 149a540..c8433b2 100644 --- a/worker/decorators.py +++ b/worker/core/decorators.py @@ -1,34 +1,38 @@ -from functools import wraps -import traceback -import random -from typing import Callable, Any import asyncio import threading +import random +from typing import Callable, Any +from functools import wraps import loguru -def exception_handler(func: Callable): - """ - Decorator that handles exceptions and returns an empty string on failure. +class add_task: + def __init__(self, level=0): + self.level = level - Args: - func (function): The function to be decorated. + def __call__(self, func): + def wrapper(*args, **kwargs): + return func(*args, **kwargs) - Returns: - function: The decorated function. - """ - @wraps(func) - async def wrapper(*args, **kwargs): - try: - return await func(*args, **kwargs) - except Exception as e: - loguru.logger.error( - f"Error raised at {func.__name__} with {args} & {kwargs}: {e}" - ) - traceback.print_exc() - return "" - return wrapper + wrapper._task_info = { + 'level': self.level, + 'func': func, + } + return wrapper + + +def async_timeout(timeout: float): + def decorator(func: Callable) -> Callable: + async def wrapper(*args: Any, **kwargs: Any) -> Any: + try: + return await asyncio.wait_for(func(*args, **kwargs), timeout) + except asyncio.TimeoutError: + raise TimeoutError( + f"Function '{func.__name__}' exceeded {timeout} seconds." + ) + return wrapper + return decorator def get_unique_object(func: Callable): @@ -71,16 +75,3 @@ def wrapper(*args, **kwargs): return wrapper(*args, **kwargs) return wrapper - - -def async_timeout(timeout: float): - def decorator(func: Callable) -> Callable: - async def wrapper(*args: Any, **kwargs: Any) -> Any: - try: - return await asyncio.wait_for(func(*args, **kwargs), timeout) - except asyncio.TimeoutError: - raise TimeoutError( - f"Function '{func.__name__}' exceeded {timeout} seconds." - ) - return wrapper - return decorator diff --git a/worker/core/enums.py b/worker/core/enums.py new file mode 100644 index 0000000..10fe498 --- /dev/null +++ b/worker/core/enums.py @@ -0,0 +1,6 @@ +from enum import Enum + + +class HttpMethod(Enum): + POST = 'POST' + GET = 'GET' diff --git a/worker/core/utils.py b/worker/core/utils.py new file mode 100644 index 0000000..a1c3ad6 --- /dev/null +++ b/worker/core/utils.py @@ -0,0 +1,34 @@ +import argparse +import random + + +def parse_arguments(): + parser = argparse.ArgumentParser() + parser.add_argument( + "-w", "--workers", type=int, default=1, + help="Number of workers to run." + ) + parser.add_argument( + "-p", "--popular", action="store_true", + help="Scrape only popular countries." + ) + parser.add_argument( + "--headless", action="store_true", + help="Enable headless mode." + ) + args = parser.parse_args() + return args + + +def generate_device_specs(): + """ + Generate random RAM/Hardware Concurrency. + + Returns: + Tuple[int, int]: A tuple containing a random RAM and hardware + concurrency. + """ + random_ram = random.choice([1, 2, 4, 8, 16, 32, 64]) + max_hw_concurrency = random_ram * 2 if random_ram < 64 else 64 + random_hw_concurrency = random.choice([1, 2, 4, max_hw_concurrency]) + return (random_ram, random_hw_concurrency) diff --git a/worker/crawlers/__init__.py b/worker/crawlers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/worker/crawlers/linkedin/__init__.py b/worker/crawlers/linkedin/__init__.py new file mode 100644 index 0000000..f9b5f05 --- /dev/null +++ b/worker/crawlers/linkedin/__init__.py @@ -0,0 +1,3 @@ +from crawlers.linkedin.controller import LinkedinController + +__all__ = ["LinkedinController"] diff --git a/worker/constants.py b/worker/crawlers/linkedin/constants.py similarity index 96% rename from worker/constants.py rename to worker/crawlers/linkedin/constants.py index a853daa..d8e4b14 100644 --- a/worker/constants.py +++ b/worker/crawlers/linkedin/constants.py @@ -55,3 +55,8 @@ ] POPULAR_DESTINATION = ["Sweden", "Germany", "Netherlands"] + +IGNORE_LIST = [ + "how can i assist you today", + "thebai" +] diff --git a/worker/crawlers/linkedin/controller.py b/worker/crawlers/linkedin/controller.py new file mode 100644 index 0000000..af12d70 --- /dev/null +++ b/worker/crawlers/linkedin/controller.py @@ -0,0 +1,42 @@ +from itertools import product + + +from crawlers.linkedin.repository import LinkedinRepository +from crawlers.linkedin.enums import JobModels +from crawlers.linkedin import constants, utils, models +from core.enums import HttpMethod + + +class LinkedinController: + batch_size: int = 5 + + def __init__( + self, headless: bool, worker_num: int, is_popular: bool = True + ): + self.repository = LinkedinRepository(worker_num) + self.repository.headless = headless + self.is_popular = is_popular + if self.is_popular: + countries = constants.POPULAR_DESTINATION + else: + countries = constants.COUNTRIES + job_urls = list(product(countries, utils.get_jobs(), JobModels)) + self.job_models = [ + models.LinkedinURLData( + url=utils.get_url(job=job[1], mode=job[2], location=job[0]), + country=job[0], + job_mode=job[2] + ) + for job in job_urls + ] + + async def process(self): + for index in range(0, len(self.job_models), self.batch_size): + batch = self.job_models[index:index+self.batch_size] + ads_urls = await self.repository.get_urls(batch) + all_ads = await self.repository.process_advertisements(ads_urls) + await self.repository.save_all_data( + all_data=all_ads, + endpoint="/api/ads/", + method=HttpMethod.POST, + ) diff --git a/worker/crawlers/linkedin/enums.py b/worker/crawlers/linkedin/enums.py new file mode 100644 index 0000000..103d612 --- /dev/null +++ b/worker/crawlers/linkedin/enums.py @@ -0,0 +1,7 @@ +from enum import Enum + + +class JobModels(Enum): + ON_SITE = "on_site" + REMOTE = "remote" + HYBRID = "hybrid" diff --git a/worker/crawlers/linkedin/gateway.py b/worker/crawlers/linkedin/gateway.py new file mode 100644 index 0000000..dd3922f --- /dev/null +++ b/worker/crawlers/linkedin/gateway.py @@ -0,0 +1,169 @@ +import json +from typing import Union, List +import logging +import asyncio + +from playwright.async_api import Page, Response, TimeoutError +import httpx + + +from crawlers.linkedin import xpaths, prompt +from crawlers.linkedin.models import LinkedinData, LinkedinURLData +from crawlers.linkedin.utils import ( + get_all_keywords, + process_thebai_responses_to_text +) +from core.browser import PlayWrightCrawler +from core.decorators import add_task, async_timeout + + +class AdsDataGateway(PlayWrightCrawler): + inputs: LinkedinURLData + + @property + def data(self) -> Union[LinkedinData, None]: + return self._data + + @data.setter + def data(self, value): + self._data = value + + @add_task(level=0) + async def get_adds_info( + self, + page: Page, + **_ + ) -> None: + await page.goto(self.inputs.url) + await self.click_xpath(page, xpaths.SHOW_MORE) + tasks = [ + self.read_from_xpath(page, xpaths.LOCATION, raise_error=True), + self.read_from_xpath(page, xpaths.BODY_INFO, raise_error=True), + self.read_from_xpath(page, xpaths.COMPANY_NAME, raise_error=True), + self.read_from_xpath(page, xpaths.TITLE, raise_error=True), + self.read_from_xpath(page, xpaths.SENIORITY_LEVEL) + ] + results = await asyncio.gather(*tasks) + self.data = LinkedinData( + ads_id=self.inputs.ads_id, + location=results[0], + country=self.inputs.country, + body=results[1], + company_name=results[2], + title=results[3], + employement_type=self.inputs.job_mode, + level=results[4].strip(), + ) + + @add_task(level=1) + async def process_adds_info( + self, + page: Page, + **_ + ) -> None: + """ + This task will process the information gathered in below level, + then process them using thebai + """ + if isinstance(self.data, LinkedinData): + await page.goto("https://chatbot.theb.ai/#/chat/") + await self.fill_form( + page=page, + xpath=xpaths.GPT_FILL, + text=prompt.analyze_ads( + self.data.company_name, self.data.body + ), + raise_error=True + ) + await self.click_xpath(page, xpaths.GPT_BUTTON, raise_error=True) + first_resp = await self.get_response_from_theb_ai(page) + await self.click_xpath(page, xpaths.GPT_NEW_CHAT, raise_error=True) + await asyncio.sleep(0.2) + await self.fill_form( + page=page, + xpath=xpaths.GPT_FILL, + text=f"""{prompt.get_tag_ads( + self.data.title, first_resp["text"], get_all_keywords() + )}""" + ) + await asyncio.sleep(0.2) + await self.click_xpath(page, xpaths.GPT_BUTTON, raise_error=True) + second_resp = await self.get_response_from_theb_ai(page) + text, hashtags = process_thebai_responses_to_text( + first_resp, second_resp, self.data.country, + self.data.employement_type.value + ) + if text: + self.data.body = text + if hashtags: + self.data.keywords = hashtags + + @async_timeout(120) + async def get_response_from_theb_ai(self, chatgpt_page: Page) -> dict: + try: + response = None + while response is None: + event = await chatgpt_page.wait_for_event("response") + if "chat-process" in event.url: + response: Response = event + result = await response.text() + lines = list(filter(None, result.split('\n'))) + return json.loads(lines[-1]) + except TimeoutError: + logging.error("Timeout exceed in get_response_from_theb_ai") + + +class AdsURLGateway(PlayWrightCrawler): + inputs: LinkedinURLData + + @property + def data(self) -> List[LinkedinURLData]: + return self._data + + @data.setter + def data(self, value): + self._data = value + + async def check_ads_id_exists(self, ads_id): + async with httpx.AsyncClient() as client: + response = await client.get( + self.base_url + f"/api/ads/{ads_id}" + ) + if response.status_code == 404: + return ads_id + + @add_task(level=0) + async def get_urls( + self, + page: Page, + **_ + ): + await page.goto(self.inputs.url) + elements = await self.query_elements( + page, "xpath=//div[@data-entity-urn]" + ) + attributes = await asyncio.gather( + *[element.get_attribute('data-entity-urn') for element in elements] + ) + data_entities = [ + attribute.split("Posting:")[1] + for attribute in attributes + ] + tasks = [ + self.check_ads_id_exists(ads_id) + for ads_id in data_entities + ] + urls = await asyncio.gather(*tasks) + results = [ + f"https://www.linkedin.com/jobs/view/{ads_id}" + for ads_id in urls + if ads_id + ] + self.data = [ + LinkedinURLData( + url=url, + country=self.inputs.country, + job_mode=self.inputs.job_mode, + ads_id=url.split("view/")[1] + ) for url in results + ] diff --git a/worker/crawlers/linkedin/models.py b/worker/crawlers/linkedin/models.py new file mode 100644 index 0000000..2b83fe1 --- /dev/null +++ b/worker/crawlers/linkedin/models.py @@ -0,0 +1,25 @@ +from typing import Optional + +from pydantic import BaseModel, AnyUrl + +from crawlers.linkedin.enums import JobModels + + +class LinkedinURLData(BaseModel): + url: AnyUrl + country: str + job_mode: JobModels + ads_id: Optional[str] + + +class LinkedinData(BaseModel): + ads_id: str + location: Optional[str] + country: str + body: str + company_name: Optional[str] + title: str + source: int = 1 + employement_type: Optional[JobModels] + level: Optional[str] + keywords: list = list() diff --git a/worker/crawlers/linkedin/prompt.py b/worker/crawlers/linkedin/prompt.py new file mode 100644 index 0000000..02fad1c --- /dev/null +++ b/worker/crawlers/linkedin/prompt.py @@ -0,0 +1,109 @@ + +def analyze_ads(company_name, job_description): + return f""" +company_name = "{company_name}" +TASK = 'Simplify a job advertisement you find while job searching.'\n \n + +WARNING! This is a sample output, it IS NOT THE ACTUAL JOB ADVERTISEMENT!! +----------------------- +job_description = ''' +{job_description} +''' +----------------------- +sample_output = ''' +Visa Sponsor: #Yes \n +• 3 years of experience with Delphi programming language \n +• Knowledge of Rest APIs, RemObjects SDK \n +''' +----------------------- +\n +INSTRUCTIONS:\n +1. Summarize hard skills or requiremenets from the `job_description` in a +similiar format like `sample_output`, removing extra unnecessary details, +list them as bullet points, max 70 characters each. DO not write more than 6 +bullet point, only includes the MOST related and important one.\n +2. Translate non-English content to English. \n +3. Format visa sponsership as : `Visa Sponsor: #Yes, #No, or #NA`. \n +4. Check your database as of your last date updated for the company's +sponsorship status using company's name as `company_name` string. +Maybe in `job_description` this was mentioned, if so give priority to +`job_description`, if not then use your database \n +5. Provide a definite sponsorship status if possible, avoiding '#NA'. \n +6. Include programming languages used as in the frameworks mentioned +`job_description`, if applicable. \n +7. Remember to not consider `sample_output` content as the actual job +advertisement, for the acutal job advertisement, i have sent it to you as +`job_description` \n +----------- \n +\n + +''' +""" + + +def get_tag_ads(title, job_description, keywords): + sample = { + "keywords": ["c_sharp", "dot_net", "backend"] + } + good_sample = { + "keywords": ["backend", "germany", "dot_net", "c_sharp"] + } + bad_sample = { + "keywords": ["back end", "Germany", ".NET", "C#"] + } + return f""" + TASK = "Read the the text, then follow up the instructions that is given + at the end." + job_title = "{title}"\n + + JOB_LISTS = `[ + "backend", "frontend", "devops", "full_stack", "data_science", + "machine_learning", "network_engineering", "cybersecurity" + ]` \n + This is a list of keywords, that has {len(keywords)+1} keywords seperated + with comma "," \n + KEYWORD_LIST = '''{str(keywords)}''' \n + + `job_description`: ''' + {job_description} \n + \n ''' + + +good_output = {str(good_sample)} \n +bad_output= {str(bad_sample)} \n +basic_sample = {str(sample)} \n + +INSTRUCTIONS: \n + 1. I'm accessing you through a programming script and I need your response + in a JSON string format. Make sure that you only send me this Json, with + no other text, otherwise my program would have an exception and would not + work perfect. Please make sure to ONLY respond using a JSON string and + adhere to the format given as `basic_sample` which was menitoned earlier\n + 2. Consider `good_output` and `bad_output` and `basic_sample` as just + examples! The actual job advertisement is marked as `job_description` + which you need to analyze and match accordingly. + 3. Do not stop writing answer unless you have at least included 5 + different keyword/results, make sure they are REALLY RELEVANT, dont just + write anything. Write at MAX 8 most related keywords, no more.\n + 4. Read the `KEYWORD_LIST` that i have sent you at first. Read the + `job_description` that have sent you. Check which of the provided + `KEYWORD_LIST` are mentioned and required in `job_description`. \n + 5. Following step 4, Do not find keywords that do not exists in the given + `KEYWORD_LIST` given at first, I only need matching KEYWORD LIST. + Only include results from `KEYWORD_LIST` which I sent you at first \n + 6. Double check that keywords you've gathered in step 5 to be + well-mentioned in the `job_description` \n + 7. Double check that your gathered keyword from step 6 matches the exact + spelling and case of the `KEYWORD_LIST` I provided as first, as + `KEYWORD_LIST` are case-sensitive \n + 8. Analyze the job's title from `job_title` that is given to you before; + match it with the most related job's name from `JOB_LISTS` list that + I provided you at top or if none match, then label it as `others`. + You must pick only 1 of the options. Include the result as a keywords + following step 7\n + 9. Avoid the patterns of bad_output, follow the patterns of Good Output, + when generating output.\n + 10. Rewrite a one paragraph short summary of the `job_description` so that + you can understand it better. + Add your rewritten text 3 lines after the json. + """ diff --git a/worker/crawlers/linkedin/repository.py b/worker/crawlers/linkedin/repository.py new file mode 100644 index 0000000..348e330 --- /dev/null +++ b/worker/crawlers/linkedin/repository.py @@ -0,0 +1,85 @@ +import requests +import asyncio +from typing import List +import json +import logging + +from core.enums import HttpMethod +from crawlers.linkedin.gateway import AdsDataGateway, AdsURLGateway +from crawlers.linkedin.models import LinkedinURLData, LinkedinData +from crawlers.linkedin import utils, constants +from crawlers.repository import BaseRepository + + +class LinkedinRepository(BaseRepository): + base_url = "http://127.0.0.1:8000" + headless: bool + + async def get_url(self, inputs: List[LinkedinURLData]) -> AdsURLGateway: + url_gateway = AdsURLGateway( + inputs=inputs, + proxy=utils.get_random_proxy(), + finger_print_scipt=constants.SPOOF_FINGERPRINT, + headless=self.headless, + firefox_settings=constants.FIREFOX_SETTINGS + ) + await url_gateway.execute() + return url_gateway + + async def get_urls( + self, inputs: List[LinkedinURLData] + ) -> List[LinkedinURLData]: + results: List[AdsURLGateway] = await self.gather_tasks( + [self.get_url(input) for input in inputs] + ) + _results = [] + for result in results: + if result.data: + _results.extend(result.data) + return _results + + async def process_advertisement( + self, inputs: AdsDataGateway + ) -> LinkedinData: + url_gateway = AdsDataGateway( + inputs=inputs, + proxy=utils.get_random_proxy(), + finger_print_scipt=constants.SPOOF_FINGERPRINT, + headless=self.headless, + firefox_settings=constants.FIREFOX_SETTINGS + ) + await url_gateway.execute() + return url_gateway + + async def process_advertisements( + self, inputs: List[AdsDataGateway] + ) -> List[LinkedinData]: + results: List[AdsDataGateway] = await self.gather_tasks( + [self.process_advertisement(input) for input in inputs] + ) + return [ + result.data for result in results + if isinstance(result.data, LinkedinData) + ] + + async def save_data_to_url( + self, data: LinkedinData, endpoint: str, method: HttpMethod + ) -> dict: + response = requests.request( + method=method.value.upper(), + url=self.base_url + endpoint, + json=json.loads(data.json()) + ) + if response.status_code == 200: + logging.info("Saved data to URL") + return response + + async def save_all_data( + self, all_data: List[LinkedinData], endpoint: str, method: HttpMethod + ): + await asyncio.gather( + *[ + self.save_data_to_url(data, endpoint, method) + for data in all_data + ] + ) diff --git a/worker/crawlers/linkedin/utils.py b/worker/crawlers/linkedin/utils.py new file mode 100644 index 0000000..e6493e9 --- /dev/null +++ b/worker/crawlers/linkedin/utils.py @@ -0,0 +1,140 @@ +from urllib.parse import urlencode +import functools +import re +import json +from typing import Tuple + +import requests +from playwright._impl._api_structures import ProxySettings +import loguru + +from crawlers.linkedin import enums, constants +import exceptions + + +def process_thebai_responses_to_text( + first_resp, second_resp, country, job_mode +) -> Tuple[str, str]: + if not validate_thebai_response(first_resp): + return (None, None) # This will raise an exception in pydantic :) + + if not validate_thebai_response(second_resp): + return (first_resp["text"], None) + + body = first_resp["text"] + second_resp_tags = None + try: + second_resp_text = re.search( + r'\{.*\}', second_resp["text"].replace("'", "\"") + ) + if not second_resp_text: + raise exceptions.NoJsonFound( + "No valid JSON object found in the last line" + ) + second_resp_text = second_resp_text.group() + second_resp_tags: list = json.loads(second_resp_text)["keywords"] + if "#YES" in first_resp["text"].upper(): + second_resp_tags.append("yes") + elif "#NO" in first_resp["text"].upper(): + second_resp_tags.append("no") + else: + second_resp_tags.append("na") + second_resp_tags.extend([job_mode, country]) + hashtags = ' '.join(set(f"#{tag}" for tag in second_resp_tags)) + body = f"""{first_resp["text"]} + {hashtags} + """ + except (json.JSONDecodeError, KeyError, ValueError) as error: + loguru.logger.error( + f"Error happened parsing responses from thebai: {error}" + ) + finally: + return (body, second_resp_tags) + + +def validate_thebai_response(response): + for item in constants.IGNORE_LIST: + if item.lower() in response["text"]: + return False + return True + + +def format_country(country): + return country.lower().replace(" ", "_") + + +def get_jobs(): + resp: dict = requests.get( + "http://127.0.0.1:8000/api/jobs?page=1&per_page=1000" + ).json() + job_list = list(map( + lambda x: x['name'], resp['results'] + )) if resp.get('results') else None + + if not job_list: + raise exceptions.NoJobException( + "Please add some jobs to API" + ) + return job_list + + +def get_url(job: str, mode: enums.JobModels, page_number=0, location=None): + """ + Builds URL Parameter for LinkedIn. + + Args: + page_number (int, optional): The page number to fetch. Defaults to 0. + location (str, optional): The location to search for jobs. Defaults to + None. + mode: Enum mode for the job + + Returns: + str: The LinkedIn URL with the given parameters. + """ + url = "https://www.linkedin.com/jobs/search" + params = { + "keywords": job, + "location": location, + "trk": "public_jobs_jobs-search-bar_search-submit", + "position": 1, + "pageNum": page_number, + "f_TPR": "r10800", + "f_JT": "F", + "f_WT": mode.value + } + query_params = urlencode(params) + return f"{url}?{query_params}" + + +def create_proxy_url(proxy_dict: dict) -> ProxySettings: + """ + Create a proxy URL from the given proxy dictionary. + + :param proxy_dict: Dictionary containing proxy information. + :return: A ProxySettings object with the proxy details. + """ + return ProxySettings( + server=f"http://{proxy_dict['ip_address']}:{proxy_dict['port']}", + username=proxy_dict['username'], password=proxy_dict['password'] + ) + + +def get_random_proxy() -> ProxySettings: + """ + Get a random proxy from the available proxy list. + + :return: A ProxySettings object with a random proxy's details. + """ + proxy_dict = requests.get( + f"{constants.HOST}/api/proxy?order_by=?&page=1&per_page=1" + ).json()["results"] + if proxy_dict is None or len(proxy_dict) == 0: + raise exceptions.NoProxyException("Please Add A Proxy") + return create_proxy_url(proxy_dict[0]) + + +@functools.lru_cache(maxsize=128) +def get_all_keywords() -> list: + return requests.get( + "http://127.0.0.1:8000/api/tech/keywords" + ).json()["result"] diff --git a/worker/xpaths.py b/worker/crawlers/linkedin/xpaths.py similarity index 77% rename from worker/xpaths.py rename to worker/crawlers/linkedin/xpaths.py index 04eb6bf..b2b6a01 100644 --- a/worker/xpaths.py +++ b/worker/crawlers/linkedin/xpaths.py @@ -1,7 +1,7 @@ JOB_ID = "(//div[@data-visible-time])[2]" JOB_TOTAL_NUM = '//span[@class="results-context-header__job-count"]' -JOB_LI = '//ul[contains(@class,"jobs-search__results-list")]//li/div' -SHOW_MORE = '//button[contains(text(), "Show more")]' +JOB_LI = '//div[@data-entity-urn]' +SHOW_MORE = '(//button[contains(text(), "Show more")])[1]' BASE_SPAN = "(//h3[contains(@class, 'job-criteria-subheader')])/../span" SENIORITY_LEVEL = f'({BASE_SPAN})[1]' EMPLOYEMENT_TYPE = f'({BASE_SPAN})[2]' @@ -9,10 +9,10 @@ COMPANY_NAME = '//a[contains(@class, "name-link")]' LOCATION = '//span[@class="topcard__flavor topcard__flavor--bullet"]' JOB_ID_A_TAG = '//a[@class="topcard__link"]' -TITLE = f'{JOB_ID_A_TAG}//h2' +TITLE = '//h1' NEED_LOGIN = '//*[contains(text(), "Already on Linkedin")]' GPT_FILL = '//textarea[@class="n-input__textarea-el"]' -GPT_BUTTON = '//footer//button[@type]' +GPT_BUTTON = "//footer//button[@type and not(contains(@class, 'disabled'))]" GPT_NEW_CHAT = '//*[contains(text(), "New chat")]' diff --git a/worker/crawlers/repository.py b/worker/crawlers/repository.py new file mode 100644 index 0000000..29d7c05 --- /dev/null +++ b/worker/crawlers/repository.py @@ -0,0 +1,16 @@ +import asyncio +from typing import List, Callable + +from core.abc import AbstractBaseRepository + + +class BaseRepository(AbstractBaseRepository): + def __init__(self, worker_count: int): + self.worker_count = worker_count + self.semaphore = asyncio.Semaphore(worker_count) + + async def gather_tasks(self, tasks: List[Callable]): + async def process_task(task): + async with self.semaphore: + return await task + return await asyncio.gather(*[process_task(task) for task in tasks]) diff --git a/worker/crawlers/utils.py b/worker/crawlers/utils.py new file mode 100644 index 0000000..4ae6c69 --- /dev/null +++ b/worker/crawlers/utils.py @@ -0,0 +1,15 @@ +import random + + +def generate_device_specs(): + """ + Generate random RAM/Hardware Concurrency. + + Returns: + Tuple[int, int]: A tuple containing a random RAM and hardware + concurrency. + """ + random_ram = random.choice([1, 2, 4, 8, 16, 32, 64]) + max_hw_concurrency = random_ram * 2 if random_ram < 64 else 64 + random_hw_concurrency = random.choice([1, 2, 4, max_hw_concurrency]) + return (random_ram, random_hw_concurrency) diff --git a/worker/enums.py b/worker/enums.py deleted file mode 100644 index 4298fe6..0000000 --- a/worker/enums.py +++ /dev/null @@ -1,11 +0,0 @@ -from enum import Enum - - -class JobModels(Enum): - ON_SITE = 1 - REMOTE = 2 - HYBRID = 3 - - @property - def lower_case_name(self): - return self.name.lower() diff --git a/worker/helpers.py b/worker/helpers.py deleted file mode 100644 index 48dc23d..0000000 --- a/worker/helpers.py +++ /dev/null @@ -1,242 +0,0 @@ -import random -from urllib.parse import urlencode -import functools -from itertools import product -import argparse - -from playwright.async_api import ( - Page, - TimeoutError as PlayWrightTimeOutError -) -import requests -from playwright._impl._api_structures import ProxySettings - -import constants -import exceptions -import decorators -import enums - - -def format_country(country): - return country.lower().replace(" ", "_") - - -def get_jobs(): - resp: dict = requests.get( - "http://127.0.0.1:8000/api/jobs?page=1&per_page=1000" - ).json() - job_list = list(map( - lambda x: x['name'], resp['results'] - )) if resp.get('results') else None - - if not job_list: - raise exceptions.NoJobException( - "Please add some jobs to API" - ) - return job_list - - -@decorators.get_unique_object -def get_country_and_job(is_popular=False): - if is_popular: - countries = constants.POPULAR_DESTINATION - else: - countries = constants.COUNTRIES - - return list(product( - countries, get_jobs(), enums.JobModels - )) - - -def get_url(job: str, mode: enums.JobModels, page_number=0, location=None): - """ - Builds URL Parameter for LinkedIn. - - Args: - page_number (int, optional): The page number to fetch. Defaults to 0. - location (str, optional): The location to search for jobs. Defaults to - None. - mode: Enum mode for the job - - Returns: - str: The LinkedIn URL with the given parameters. - """ - url = "https://www.linkedin.com/jobs/search" - params = { - "keywords": job, - "location": location, - "trk": "public_jobs_jobs-search-bar_search-submit", - "position": 1, - "pageNum": page_number, - "f_TPR": "r10800", - "f_JT": "F", - "f_WT": mode.value - } - query_params = urlencode(params) - return f"{url}?{query_params}" - - -def generate_device_specs(): - """ - Generate random RAM/Hardware Concurrency. - - Returns: - Tuple[int, int]: A tuple containing a random RAM and hardware - concurrency. - """ - random_ram = random.choice([1, 2, 4, 8, 16, 32, 64]) - max_hw_concurrency = random_ram * 2 if random_ram < 64 else 64 - random_hw_concurrency = random.choice([1, 2, 4, max_hw_concurrency]) - return (random_ram, random_hw_concurrency) - - -async def get_element_text(page: Page, xpath: str, replace=True, timeout=None): - """ - Get the text content of an element using its XPath. - - Args: - page (Page): The Page object to search for the element. - xpath (str): The XPath of the element. - replace (bool, optional): Whether to remove newlines and trailing - whitespace from the text. Defaults to True. - timeout (int, optional): The maximum time to wait for the element. - Defaults to None. - - Returns: - str: The text content of the element. - """ - result: str = await page.locator(xpath).text_content(timeout=timeout) - if replace: - return result.strip().replace("\n", "") - else: - return result - - -async def fill_form(page: Page, xpath: str, text: str, timeout=None): - """ - Fill a form field with the given text using its XPath. - - Args: - page (Page): The Page object containing the form field. - xpath (str): The XPath of the form field. - text (str): The text to fill into the form field. - timeout (int, optional): The maximum time to wait for the form field. - Defaults to None. - - Returns: - None - """ - return await page.locator(xpath).fill(text, timeout=timeout) - - -@decorators.exception_handler -async def safe_get_element_text( - page: Page, xpath: str, replace=True, timeout=None -): - """ - Safely get the text content of an element using its XPath. - - Args: - page (Page): The Page object to search for the element. - xpath (str): The XPath of the element. - replace (bool, optional): Whether to remove newlines and trailing - whitespace from the text. Defaults to True. - timeout (int, optional): The maximum time to wait for the element. - Defaults to None. - - Returns: - str: The text content of the element, or an empty string on failure. - """ - return await get_element_text(page, xpath, replace, timeout=timeout) - - -@decorators.exception_handler -async def safe_fill_form(page: Page, xpath: str, text: str, timeout=None): - """ - Safely fill a form field with the given text using its XPath. - - Args: - page (Page): The Page object containing the form field. - xpath (str): The XPath of the form field. - text (str): The text to fill into the form field. - timeout (int, optional): The maximum time to wait for the form field. - Defaults to None. - - Returns: - None - """ - return await fill_form(page, xpath, text, timeout=timeout) - - -async def does_element_exists( - page: Page, xpath: str, timeout: int = 500 -): - try: - await page.locator(xpath).wait_for(timeout=timeout) - return True - except PlayWrightTimeOutError: - return False - - -def does_ads_exists(ads_id) -> bool: - """ - Check if an advertisement already exists in the database. - - :param ads_id: Advertisement ID to check for existence. - :return: True if advertisement exists, False otherwise. - """ - return requests.get( - f"{constants.HOST}/api/ads/{int(ads_id)}" - ).status_code == 200 - - -def create_proxy_url(proxy_dict: dict) -> ProxySettings: - """ - Create a proxy URL from the given proxy dictionary. - - :param proxy_dict: Dictionary containing proxy information. - :return: A ProxySettings object with the proxy details. - """ - return ProxySettings( - server=f"http://{proxy_dict['ip_address']}:{proxy_dict['port']}", - username=proxy_dict['username'], password=proxy_dict['password'] - ) - - -def get_random_proxy() -> ProxySettings: - """ - Get a random proxy from the available proxy list. - - :return: A ProxySettings object with a random proxy's details. - """ - proxy_dict = requests.get( - f"{constants.HOST}/api/proxy?order_by=?&page=1&per_page=1" - ).json()["results"] - if proxy_dict is None or len(proxy_dict) == 0: - raise exceptions.NoProxyException("Please Add A Proxy") - return create_proxy_url(proxy_dict[0]) - - -@functools.lru_cache(maxsize=128) -def get_all_keywords(cached=0) -> list: - return requests.get( - "http://127.0.0.1:8000/api/tech/keywords" - ).json()["result"] - - -def parse_arguments(): - parser = argparse.ArgumentParser() - parser.add_argument( - "-w", "--workers", type=int, default=1, - help="Number of workers to run." - ) - parser.add_argument( - "-p", "--popular", action="store_true", - help="Scrape only popular countries." - ) - parser.add_argument( - "--headless", action="store_true", - help="Enable headless mode." - ) - args = parser.parse_args() - return args diff --git a/worker/main.py b/worker/main.py index 3c51a2b..98b03b0 100644 --- a/worker/main.py +++ b/worker/main.py @@ -1,24 +1,16 @@ import asyncio -import random -from scraper import linkedin -import helpers - - -async def run_scrapers(workers: int = 1, only_popular=False, headless=True): - while True: - tasks = [] - for i in range(workers): - tasks.append(asyncio.create_task(linkedin.scrape_linkedin( - worker_id=i+1, only_popular=only_popular, headless=headless - ))) - await asyncio.sleep(random.randint(1, 3)) # Overhead of browsers - await asyncio.gather(*tasks) +from core.utils import parse_arguments +from mvp import service if __name__ == "__main__": - args = helpers.parse_arguments() - used_countries = asyncio.run(run_scrapers( - workers=args.workers, only_popular=args.popular, - headless=args.headless - )) + args = parse_arguments() + while True: + used_countries = asyncio.run( + service( + worker_num=args.workers, + is_popular=args.popular, + headless=args.headless + ) + ) diff --git a/worker/mvp.py b/worker/mvp.py new file mode 100644 index 0000000..55da3da --- /dev/null +++ b/worker/mvp.py @@ -0,0 +1,19 @@ +from crawlers.linkedin import LinkedinController +import asyncio + + +async def service( + headless: bool = False, + worker_num: int = 5, + is_popular: bool = True +): + obj = LinkedinController( + headless=headless, + worker_num=worker_num, + is_popular=is_popular + ) + await obj.process() + + +if __name__ == "__main__": + asyncio.run(service()) diff --git a/worker/prompt.py b/worker/prompt.py deleted file mode 100644 index b3c1e5c..0000000 --- a/worker/prompt.py +++ /dev/null @@ -1,73 +0,0 @@ - -def analyze_ads(company_name, job_description): - return f""" -company_name = "{company_name}" -TASK = 'Simplify a job advertisement you find while job searching.'\n \n - -WARNING! This is a sample output, it IS NOT THE ACTUAL JOB ADVERTISEMENT!! ------------------------ -job_description = ''' -{job_description} -''' ------------------------ -sample_output = ''' -Visa Sponsor: #Yes \n -• 3 years of experience with Delphi programming language \n -• Knowledge of Rest APIs, RemObjects SDK \n -''' ------------------------ -\n -INSTRUCTIONS:\n -1. Summarize hard skills or requiremenets from the `job_description` in a similiar format like `sample_output`, removing extra unnecessary details, list them as bullet points, max 70 characters each. DO not write more than 6 bullet point, only includes the MOST related and important one.\n -2. Translate non-English content to English. \n -3. Format visa sponsership as : `Visa Sponsor: #Yes, #No, or #NA`. \n -4. Check your database as of your last date updated for the company's sponsorship status using company's name as `company_name` string. Maybe in `job_description` this was mentioned, if so give priority to `job_description`, if not then use your database \n -5. Provide a definite sponsorship status if possible, avoiding '#NA'. \n -6. Include programming languages used as in the frameworks mentioned `job_description`, if applicable. \n -7. Remember to not consider `sample_output` content as the actual job advertisement, for the acutal job advertisement, i have sent it to you as `job_description` \n ------------ \n -\n - -''' -""" - - -def get_tag_ads(title, job_description, keywords): - sample = { - "keywords": ["c_sharp", "dot_net", "backend"] - } - good_sample = { - "keywords": ["backend", "germany", "dot_net", "c_sharp"] - } - bad_sample = { - "keywords": ["back end", "Germany", ".NET", "C#"] - } - return f""" -TASK = "Read the the text, then follow up the instructions that is given at the end." -job_title = "{title}"\n - -JOB_LISTS = `["backend", "frontend", "devops", "full_stack", "data_science", "machine_learning", "network_engineering", "cybersecurity"]` \n -This is a list of keywords, that has {len(keywords)+1} keywords seperated with comma "," \n -KEYWORD_LIST = '''{str(keywords)}''' \n - -`job_description`: ''' - {job_description} \n -\n ''' - - -good_output = {str(good_sample)} \n -bad_output= {str(bad_sample)} \n -basic_sample = {str(sample)} \n - -INSTRUCTIONS: \n - 1. I'm accessing you through a programming script and I need your response in a JSON string format. Make sure that you only send me this Json, with no other text, otherwise my program would have an exception and would not work perfect. Please make sure to ONLY respond using a JSON string and adhere to the format given as `basic_sample` which was menitoned earlier\n - 2. Consider `good_output` and `bad_output` and `basic_sample` as just examples! The actual job advertisement is marked as `job_description` which you need to analyze and match accordingly. - 3. Do not stop writing answer unless you have at least included 5 different keyword/results, make sure they are REALLY RELEVANT, dont just write anything. Write at MAX 8 most related keywords, no more.\n - 4. Read the `KEYWORD_LIST` that i have sent you at first. Read the `job_description` that have sent you. Check which of the provided `KEYWORD_LIST` are mentioned and required in `job_description`. \n - 5. Following step 4, Do not find keywords that do not exists in the given `KEYWORD_LIST` given at first, I only need matching KEYWORD LIST. Only include results from `KEYWORD_LIST` which I sent you at first \n - 6. Double check that keywords you've gathered in step 5 to be well-mentioned in the `job_description` \n - 7. Double check that your gathered keyword from step 6 matches the exact spelling and case of the `KEYWORD_LIST` I provided as first, as `KEYWORD_LIST` are case-sensitive \n - 8. Analyze the job's title from `job_title` that is given to you before; match it with the most related job's name from `JOB_LISTS` list that I provided you at top or if none match, then label it as `others`. You must pick only 1 of the options. Include the result as a keywords following step 7\n - 9. Avoid the patterns of bad_output, follow the patterns of Good Output, when generating output.\n - 10. Rewrite a one paragraph short summary of the `job_description` so that you can understand it better. Add your rewritten text 3 lines after the json. - """ diff --git a/worker/scraper/linkedin.py b/worker/scraper/linkedin.py deleted file mode 100644 index c5dfcd4..0000000 --- a/worker/scraper/linkedin.py +++ /dev/null @@ -1,128 +0,0 @@ -import asyncio -import random - -from playwright.async_api import async_playwright -import pytz -import loguru - -import helpers -import xpaths -import connection -import constants - - -async def scrape_linkedin( - worker_id: int, info=None, recursion_depth=0, - only_popular=False, headless=False, *args, **kwargs -): - """ - Scrape LinkedIn job postings for different countries. - - :param worker_id: ID of the worker executing the scraping. - :param info: Cached info, if you wish to repeat the process. - :param only_popular: Only use popular countries. - """ - try: - async with async_playwright() as driver: - if info is None: - info = helpers.get_country_and_job(only_popular) - - loguru.logger.info( - f"[WORKER {worker_id}] This round is: {info}" - ) - country, job, job_mode = info - browser = await driver.firefox.launch( - headless=headless, - args=[ - '--start-maximized', - '--foreground', - '--disable-backgrounding-occluded-windows' - ], - firefox_user_prefs=constants.FIREFOX_SETTINGS - ) - - timezone_id = random.choice(pytz.all_timezones) - context = await browser.new_context( - timezone_id=timezone_id, - accept_downloads=True, - is_mobile=False, - has_touch=False, - proxy=helpers.get_random_proxy() - ) - - page = await context.new_page() - await page.bring_to_front() - await page.set_viewport_size( - { - "width": 1920, - "height": 1080 - } - ) - - await page.add_init_script( - constants.SPOOF_FINGERPRINT % helpers.generate_device_specs() - ) - await page.goto(helpers.get_url( - location=country, job=job, mode=job_mode - )) - - if await helpers.does_element_exists(page, xpaths.NEED_LOGIN): - loguru.logger.info(f"[WORKER {worker_id}] Login Required!") - return await scrape_linkedin( - worker_id=worker_id, only_popular=only_popular, - headless=headless, info=info - ) - - all_ads = await page.locator(xpaths.JOB_LI).all() - loguru.logger.info( - f"[WORKER {worker_id}] Found {len(all_ads)} Advertisements" - ) - exists = 0 - for index, div in enumerate(all_ads): - await asyncio.sleep(2) - if index == 100 or exists == 7: - break - await div.click( - timeout=5000 - ) - title_a_tag = page.locator(xpaths.JOB_ID_A_TAG) - ads_id = await title_a_tag.get_attribute('href') - ads_id = ads_id.split("?refId")[0].split("-")[-1] - if not helpers.does_ads_exists(ads_id): - company_name = await helpers.safe_get_element_text( - page, xpaths.COMPANY_NAME, timeout=5000 - ) - location = await helpers.safe_get_element_text( - page, xpaths.LOCATION, timeout=5000 - ) - title = await helpers.safe_get_element_text( - page, xpaths.TITLE, timeout=5000 - ) - await page.locator(xpaths.SHOW_MORE).click(timeout=5000) - info = await helpers.get_element_text( - page, xpaths.BODY_INFO, False, timeout=5000 - ) - await connection.create_ads( - ads_id, location, info.strip(), company_name, - title, 1, employement_type="", level="", - country=country, job_mode=job_mode - ) - loguru.logger.info( - f"[WORKER {worker_id}] Finished {ads_id}" - ) - - else: - loguru.logger.info( - f"[WORKER {worker_id}] {ads_id} Already exists" - ) - exists += 1 - - return - except helpers.PlayWrightTimeOutError: - if recursion_depth < 5: - return await scrape_linkedin( - worker_id=worker_id, only_popular=only_popular, - headless=headless, recursion_depth=recursion_depth+1 - ) - except Exception as e: - loguru.logger.error(e)