<a href="https://colab.research.google.com/github/chikilivighneshshastry/colab_files/blob/main/jobright_data_extraction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# extract fingle page info from jonright


In [None]:
import aiohttp
import asyncio

async def extract_job_data(url):
  async with aiohttp.ClientSession() as session:
    response = await session.get(url)
    html = await response.text()
    print(html)
    return response

url = 'https://jobright.ai/jobs/info/685a54b5be2d7e56476268d'
# url = 'https://jobright.ai/jobs/info'
response = await extract_job_data(url)

In [None]:
response.status
html_data = await response.text()
print(html_data)

In [None]:
# prompt: parse html_data using bs4 and get with text in a id

from bs4 import BeautifulSoup
import json
soup = BeautifulSoup(html_data, 'html.parser')

# Assuming the text you want is within an element with a specific ID,
# replace 'your_element_id' with the actual ID of the element.
element_with_id = soup.find(id='__NEXT_DATA__')

if element_with_id:
  detailed_json_data = element_with_id.get_text()
  print(detailed_json_data)
else:
  print("Element with the specified ID not found.")

data = json.loads(detailed_json_data)
print(data)


In [None]:
data.keys()
print(data['props'].keys())
print(data['page'])
print(data['query'])
print(data['buildId'])
print(data['isFallback'])
print(data['gssp'])
print(data['scriptLoader'])

In [None]:
print(data['props']['pageProps']['baseSalary'])
print(data['props']['pageProps']['jobLocation'])
print(data['props']['pageProps']['logined'])
print(data['props']['pageProps']['jobHashedId'])
print(data['props']['pageProps']['_sentryTraceData'])
print(data['props']['pageProps']['_sentryBaggage'])

In [None]:
data['props']['pageProps']['dataSource']

In [None]:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import random

# --- Configuration ---
TARGET_SITE_DOMAIN = "example.com" # To keep crawling within the site
INITIAL_SEED_URL = f"http://{TARGET_SITE_DOMAIN}"

PROXY_LIST = [
    "http://198.46.172.102:12345",
    "http://103.85.103.1:5678",
    # ... more proxies
]

# --- Database (Conceptual - replace with actual DB interaction) ---
# In a real scenario, use libraries like psycopg2 (PostgreSQL), mysql.connector, sqlite3, or an ORM like SQLAlchemy
DATABASE_URLS_SEEN = set() # Simple in-memory set for this example; use a real DB!

async def db_url_exists(url):
    # Simulate DB check
    return url in DATABASE_URLS_SEEN

async def db_add_url(url):
    # Simulate DB add
    DATABASE_URLS_SEEN.add(url)
    print(f"[DB] Added: {url}")

# --- Crawler Components ---
url_frontier = asyncio.Queue()
processed_urls_count = 0
MAX_URLS_TO_CRAWL = 100 # Example limit

async def fetch(session, url, proxy):
    try:
        print(f"[FETCHING] {url} via proxy {proxy if proxy else 'DIRECT'}")
        async with session.get(url, proxy=proxy, timeout=10, ssl=False) as response: # Added ssl=False for potential local SSL issues
            if response.status == 200:
                return await response.text()
            else:
                print(f"[ERROR] HTTP {response.status} for {url}")
                return None
    except Exception as e:
        print(f"[ERROR] Failed to fetch {url}: {e}")
        return None

def get_proxy():
    if PROXY_LIST:
        return random.choice(PROXY_LIST)
    return None

def parse_and_extract_links(html_content, base_url):
    links = set()
    if not html_content:
        return links
    soup = BeautifulSoup(html_content, 'lxml') # 'html.parser' is a built-in alternative
    for a_tag in soup.find_all('a', href=True):
        href = a_tag['href']
        # Join relative URLs with the base URL
        full_url = urljoin(base_url, href)
        # Basic clean-up (remove fragment, normalize)
        parsed_url = urlparse(full_url)
        normalized_url = parsed_url._replace(fragment="").geturl()

        # Filter: Only crawl URLs from the target domain
        if urlparse(normalized_url).netloc == TARGET_SITE_DOMAIN:
            links.add(normalized_url)
    return links

async def worker(name, session):
    global processed_urls_count
    while True:
        try:
            current_url = await url_frontier.get()
            print(f"[{name}] Processing: {current_url}")

            if await db_url_exists(current_url):
                print(f"[{name}] Already processed/in DB: {current_url}")
                url_frontier.task_done()
                continue

            await db_add_url(current_url) # Add to DB before fetching (or mark as being processed)

            # TODO: Implement robots.txt check here

            proxy = get_proxy()
            html = await fetch(session, current_url, proxy)

            if html:
                new_links = parse_and_extract_links(html, current_url)
                for link in new_links:
                    if not await db_url_exists(link) and processed_urls_count < MAX_URLS_TO_CRAWL :
                        # Check DB again before adding to frontier to handle race conditions if multiple workers find same link
                        if link not in DATABASE_URLS_SEEN: # Simplified check; real DB would handle uniqueness
                            await url_frontier.put(link)
                            print(f"[{name}] Queued new link: {link}")


                processed_urls_count += 1
                if processed_urls_count >= MAX_URLS_TO_CRAWL:
                    print(f"[{name}] Reached max URL limit. Draining queue...")
                    # Allow other tasks to finish current work, then stop adding new ones.
                    # Or more abruptly, cancel other tasks.

            url_frontier.task_done()

            if processed_urls_count >= MAX_URLS_TO_CRAWL and url_frontier.empty():
                break # Exit worker if limit reached and queue is empty

            await asyncio.sleep(1) # Be respectful: add a small delay

        except Exception as e:
            print(f"[{name}] Error in worker: {e}")
            url_frontier.task_done() # Ensure task_done is called even on error
            continue # Continue to next URL

async def main():
    await url_frontier.put(INITIAL_SEED_URL)
    await db_add_url(INITIAL_SEED_URL) # Add seed to DB initially

    # You might want a ClientSession per proxy type or a more sophisticated setup
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: # ssl=False for local dev; use proper SSL context in prod
        # Create worker tasks
        num_workers = 5 # Number of concurrent crawlers
        tasks = []
        for i in range(num_workers):
            task = asyncio.create_task(worker(f"Worker-{i+1}", session))
            tasks.append(task)

        # Wait for the queue to be processed or limit to be reached
        await url_frontier.join() # Waits until all items in queue are gotten and processed

        # If max URLs reached, there might still be items in the queue
        # or workers might be processing. We need a way to signal them to stop gracefully.
        # For simplicity here, we cancel tasks if max_urls is hit and queue is effectively drained by workers.
        if processed_urls_count >= MAX_URLS_TO_CRAWL:
            print("Max URL limit reached. Cancelling worker tasks...")

        for task in tasks:
            task.cancel() # Cancel all worker tasks

        await asyncio.gather(*tasks, return_exceptions=True) # Wait for tasks to be cancelled

    print("Crawling finished.")
    print(f"Total unique URLs seen (from in-memory set): {len(DATABASE_URLS_SEEN)}")

if __name__ == "__main__":
    asyncio.run(main())

In [None]:
from crawl4ai.deep_crawling import BFSDeepCrawlStrategy

strategy = BFSDeepCrawlStrategy(
    max_depth=2,               # Crawl initial page + 2 levels deep
    include_external=False,    # Stay within the same domain
    max_pages=50,              # Maximum number of pages to crawl (optional)
    score_threshold=0.3,       # Minimum score for URLs to be crawled (optional)
)

In [None]:
!pip install crawl4ai

In [None]:
!crawl4ai-setup

In [None]:
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
from crawl4ai.deep_crawling import BestFirstCrawlingStrategy
from crawl4ai.deep_crawling import BFSDeepCrawlStrategy
from crawl4ai.deep_crawling.filters import (
    FilterChain,
    DomainFilter,
    URLPatternFilter,
    ContentTypeFilter
)
from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer

async def run_advanced_crawler():
    # Create a sophisticated filter chain
    filter_chain = FilterChain([
        # Domain boundaries
        DomainFilter(
            allowed_domains=["jobright.ai"]
            # blocked_domains=["old.docs.example.com"]
        ),

        # URL patterns to include
        # URLPatternFilter(patterns=["*guide*", "*tutorial*", "*blog*"]),

        # Content type filtering
        ContentTypeFilter(allowed_types=["text/html"])
    ])


    # Set up the configuration
    config = CrawlerRunConfig(
        deep_crawl_strategy =BFSDeepCrawlStrategy()


        deep_crawl_strategy=BestFirstCrawlingStrategy(
            max_depth=2,
            include_external=False,
            filter_chain=filter_chain
        ),
        scraping_strategy=LXMLWebScrapingStrategy(),
        stream=True,
        verbose=True
    )

    # Execute the crawl
    results = []
    async with AsyncWebCrawler() as crawler:
        async for result in await crawler.arun("https://jobright.ai/jobs/info", config=config):
            results.append(result)
            score = result.metadata.get("score", 0)
            depth = result.metadata.get("depth", 0)
            print(f"Depth: {depth} | Score: {score:.2f} | {result.url}")

    # Analyze the results
    print(f"Crawled {len(results)} high-value pages")
    print(f"Average score: {sum(r.metadata.get('score', 0) for r in results) / len(results):.2f}")

    # Group by depth
    depth_counts = {}
    for result in results:
        depth = result.metadata.get("depth", 0)
        depth_counts[depth] = depth_counts.get(depth, 0) + 1

    print("Pages crawled by depth:")
    for depth, count in sorted(depth_counts.items()):
        print(f"  Depth {depth}: {count} pages")
    return results
if __name__ == "__main__":
    results = await run_advanced_crawler()


In [None]:
base_url =''
end_word_categorys = []
parmeters = []
result_urls = []
for name in end_categorys:
  for para in parmeters:
    url = prepare_url(base_url,name,para)
    result_urls = get_all_jobpost_urls(url)
    save_to_db(result_urls)



In [None]:
!pip install scrapy

In [None]:
import scrapy
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings # Useful for default settings

# This is often needed if running in Jupyter/IPython to avoid "ReactorNotRestartable"
import nest_asyncio
nest_asyncio.apply()

# --- Your Spider Definition ---
class AllLinksSpider(CrawlSpider):
    name = 'all_links_scraper_cell' # Changed name slightly to avoid clashes if you have the other one
    allowed_domains = ['jobright.ai']
    # start_urls = ['https://jobright.ai/jobs/backenddeveloper/']
    start_urls = ['https://jobright.ai/jobs/backenddeveloper'] # More general starting point
    # Counter for processed URLs or items
    processed_url_count = 0
    MAX_URLS_TO_PROCESS = 100
    # Custom settings for this spider if needed (can also be passed to CrawlerProcess)
    custom_settings = {
        'LOG_LEVEL': 'INFO', # 'DEBUG' for more verbosity
        'DOWNLOAD_DELAY': 0.25,  # Be respectful
        'CONCURRENT_REQUESTS_PER_DOMAIN': 8,
        # 'DEPTH_LIMIT': 2 # Uncomment to limit crawl depth
    }

    rules = (
        Rule(
            LinkExtractor(
                allow_domains=['jobright.ai'],
                deny=(
                    r'/login', r'/register', r'/password', # Example patterns to avoid
                    r'mailto:', r'tel:', # Avoid mail and tel links
                )
            ),
            callback='parse_page_links',
            follow=True
        ),
    )

    def __init__(self, *args, **kwargs):
        super(AllLinksSpider, self).__init__(*args, **kwargs)
        # Get the list passed from the CrawlerProcess or default to an empty list
        self.collected_links_list = kwargs.get('output_list', [])

    def parse_page_links(self, response):
        self.logger.info(f"Processing page: {response.url}")
        links_on_this_page = response.css('a::attr(href)').getall()

        for link_href in links_on_this_page:
            absolute_link = response.urljoin(link_href)
            # Check if it's within allowed domains again, just to be safe if LinkExtractor somehow missed
            if self.allowed_domains and any(domain in scrapy.utils.url.get_domain(absolute_link) for domain in self.allowed_domains):
                link_data = {
                    'source_page_url': response.url,
                    'extracted_link': absolute_link
                }
                # Append to the list provided during initialization
                self.collected_links_list.append(link_data)
                # Still yield if you want to use Scrapy's feed exporters or other pipelines
                yield link_data

In [None]:
# This list will be populated by the spider
scraped_links_data = []

# --- Configure and Run the Crawler ---
# Get project settings if you have a settings.py, otherwise, it provides defaults
settings = get_project_settings()

# Override or add settings
settings.set('USER_AGENT', 'MyCustomBot/1.0 (+http://mywebsite.com/botinfo)')
# If you want to output to a file as well using Scrapy's feed exporters:
# settings.set('FEEDS', {
#     'output_links.json': {'format': 'json', 'overwrite': True},
# })

# Create a CrawlerProcess
# The 'settings' argument can be a Settings object or a dictionary
process = CrawlerProcess(settings=settings)

# Pass the list to the spider instance when scheduling it
# The spider's __init__ will pick up 'output_list' from kwargs
process.crawl(AllLinksSpider, output_list=scraped_links_data)

# The script will block here until all crawling is finished
print("Starting Scrapy process...")
process.start()
print("Scrapy process finished.")

# --- Now print the collected links ---
print(f"\n--- Collected {len(scraped_links_data)} link entries: ---")
unique_extracted_links = set()
for item in scraped_links_data:
    print(f"From: {item['source_page_url']} -> Found: {item['extracted_link']}")
    unique_extracted_links.add(item['extracted_link'])

print(f"\n--- {len(unique_extracted_links)} Unique Extracted Links: ---")
for link in sorted(list(unique_extracted_links)): # Print sorted unique links
    print(link)

# If you want just a flat list of the unique extracted URLs:
final_unique_links_list = sorted(list(unique_extracted_links))
# print("\nFinal flat list of unique links:")
# print(final_unique_links_list)

# extract jobposts from jobright through api like url

# categorys in json


In [None]:
category = {
  "job_category": {
    "Software & IT": [
      "Backend Engineer",
      "Java Engineer",
      "Python Engineer",
      ".Net Engineer",
      "C/C++ Engineer",
      "Golang Engineer",
      "Full Stack Engineer",
      "Blockchain Engineer",
      "Salesforce Developer",
      "Frontend Software Engineer",
      "React Developer",
      "UI/UX Developer",
      "iOS/Swift Developer",
      "Android Developer",
      "Flutter Developer",
      "Unity Developer",
      "Unreal Engine Developer",
      "AR/VR Developer",
      "Game Developer",
      "Software Testing/Quality Assurance Engineer",
      "Automation Test Engineer",
      "QA Manager",
      "Network Security Engineer",
      "Cloud Security Engineer",
      "Cyber Security Analyst",
      "Cyber Security Engineer",
      "Network Engineer",
      "Systems Engineer",
      "Site Reliability Engineer (SRE)",
      "DevOps",
      "SoC Analyst",
      "IT Support Specialist",
      "Help Desk Technician/Desktop Support Technician",
      "System Administrator",
      "Network Support Specialist",
      "Salesforce Administrator",
      "Database Administrator",
      "Machine Learning Engineer",
      "AI Engineer",
      "LLM Engineer",
      "Machine Learning/AI Researcher",
      "Machine Learning, Deep Learning",
      "Machine Learning, Model Training and Inference",
      "Machine Learning, Search System",
      "Machine Learning, Ads",
      "Machine Learning, Operations (ML Ops)",
      "Machine Learning, Infrastructure",
      "Machine Learning, Computer Vision",
      "Data Annotation/AI Tutor",
      "Sales Engineer",
      "Developer Relations",
      "Solutions Architect",
      "Technical Writing",
      "Data Analyst",
      "Data Scientist",
      "Data Engineer",
      "ETL Developer",
      "Data Warehouse Engineer",
      "Business/BI Analyst",
      "Power BI Developer",
      "Engineering Manager",
      "Software Architect",
      "Engineering Director/VP",
      "CTO",
      "Project/Program Manager",
      "Technical Project Manager",
      "Scrum Master"
    ],
    "Hardware & Electrical Engineering": [
      "Electronics Engineer",
      "Hardware Engineer",
      "Embedded Software Engineer",
      "ASIC Engineer",
      "FPGA Engineer",
      "RF (Radio Frequency) Engineer",
      "PCB Engineer",
      "Systems Integration Engineer",
      "IC Design Engineer",
      "Digital IC Verification Engineer",
      "Analog IC Design Engineer",
      "Electrical Engineer",
      "Automation Engineer",
      "Electromechanical Engineer",
      "Robotics Engineer",
      "Controls Engineer",
      "Electrical Test Engineer",
      "Hardware Test Engineer",
      "Project/Program Manager",
      "Telecommunications Engineer",
      "Network Engineer",
      "Wireless/Antenna Engineer",
      "Battery Engineer",
      "Motor Engineer",
      "Aerospace Engineer",
      "Sales Engineer",
      "Solutions Architect"
    ],
    "Mechanical & Industrial Engineering": [
      "Mechanical Engineer",
      "Manufacturing Engineer",
      "Process Engineer",
      "Industrial Engineer",
      "Mechatronics Engineer",
      "Operations Manager/Director",
      "Safety Engineer",
      "Chemical Engineer",
      "Laboratory Technician",
      "Automotive Engineer",
      "Powertrain Engineer",
      "Autonomous Driving System Engineer",
      "Quality Assurance Specialist",
      "EHS (Environment, Health, Safety) Engineer",
      "Project/Program Manager"
    ],
    "Product Management": [
      "Product Analyst",
      "Product Manager",
      "Technical Product Manager",
      "Product Manager, Consumer Software",
      "Product Manager, B2B/SaaS",
      "Product Manager, Hardware/Robotics/IoT",
      "AI Product Manager",
      "Game Designer"
    ],
    "Customer Service & Success": [
      "Customer Service Representative",
      "Customer Service Manager",
      "Customer Support",
      "Customer Success"
    ],
    "Sales": [
      "Sales Development Representative",
      "Inside Sales Representative",
      "Account Executive, SMB",
      "Field Sales Representative",
      "Enterprise Sales",
      "Channel Sales",
      "Business Development",
      "Partnership",
      "Sales Manager",
      "Regional Sales Manager",
      "Sales Director/VP",
      "Automotive Sales",
      "Real Estate Sales",
      "Leasing Manager",
      "Retail Sales",
      "Store Manager",
      "Medical Sales",
      "Medical Device Sales",
      "Financial Advisor",
      "Insurance Sales",
      "Sales Support",
      "Sales Operations Specialist"
    ],
    "HR, Admin & Legal": [
      "Human Resource Specialist",
      "Recruiter/Sourcer",
      "Recruiting Coordinator",
      "Payroll Specialist",
      "HR Business Partner",
      "Human Resource Manager/Director",
      "Administrative Assistant",
      "Executive Assistant",
      "Chief of Staff",
      "Office Manager",
      "Receptionist",
      "Data Entry Clerk",
      "Corporate Counsel",
      "Paralegal",
      "Legal Assistant",
      "Litigation Lawyer",
      "Intellectual Property Lawyer",
      "Criminal Lawyer",
      "Family Lawyer",
      "Immigration Lawyer",
      "Compliance Specialist",
      "Risk Analyst",
      "Court Clerk",
      "Case Manager",
      "Legal Operations Manager"
    ],
    "Finance & Accounting": [
      "Accountant",
      "Controller",
      "Tax Specialist",
      "Auditor",
      "Corporate Finance Analyst",
      "Treasury",
      "Financial Analyst",
      "Risk Analyst",
      "Securities Trader",
      "Quantitative Analyst/Researcher",
      "Investment Manager",
      "Equity Analyst",
      "Asset Manager",
      "Portfolio Manager",
      "Commercial Banker",
      "Investment Banker",
      "Credit Analyst",
      "Loan Officer",
      "Investment Analyst/Associate",
      "Investment Direct/VP",
      "Investment Partner",
      "Portfolio Operations Manager",
      "Fundraising Manager",
      "Investor Relations Manager",
      "Actuary",
      "Underwriter"
    ],
    "Design & Creative": [
      "Graphic Designer",
      "UI Designer",
      "UX Designer",
      "UX Researcher",
      "3D Designer",
      "Animator",
      "Illustrator",
      "Video Editor",
      "Creative/Art Director",
      "Motion Designer",
      "Interior Designer",
      "Landscape Designer",
      "Industrial Designer"
    ],
    "Real Estate & Construction": [
      "Leasing Consultant",
      "Property Manager",
      "Architect",
      "Landscape Architect",
      "Urban Planner",
      "Construction Project Manager",
      "Civil Engineer",
      "Structural Engineer"
    ],
    "Marketing & Communications": [
      "Content Marketing/Strategy",
      "SEO",
      "Social Media Management",
      "Copywriter",
      "Product Marketing",
      "Brand Manager",
      "Public Relations",
      "Community Manager",
      "Event Marketing Specialist",
      "Growth Marketing",
      "Advertising Specialist",
      "Performance Marketing",
      "Lifecycle Marketing",
      "Email Marketing"
    ],
    "Supply Chain & Operations": [
      "Supply Chain Manager",
      "Inventory Manager",
      "Logistics Manager",
      "Warehouse Manager",
      "Distribution Center Manager",
      "Procurement Manager",
      "Facilities Manager"
    ],
    "Consulting": [
      "IT Consultant",
      "Business Analyst",
      "Data Consultant",
      "Cyber Security Consultant",
      "Business Strategy Consultant",
      "Market Research Analyst",
      "Change Management Consultant",
      "Operations Consultant",
      "Financial Consultant",
      "Risk Management Consultant",
      "Mergers & Acquisitions (M&A) Consultant"
    ],
    "Energy & Environmental": [
      "Energy Engineer",
      "Renewable Energy Engineer",
      "Nuclear Engineer",
      "Power Systems Engineer",
      "Environmental Engineer",
      "Environmental Scientist"
    ],
    "Education & Training": [
      "K-12 Teaching",
      "Higher Education Teaching",
      "Corporate Training and Development",
      "Educational Administration",
      "Academic Dean"
    ],
    "Healthcare & Life Sciences": [
      "Healthcare Data Analyst",
      "Healthcare Data Scientist",
      "Healthcare IT Specialist",
      "EHR (Electronic Health Records) System Administrator",
      "Biomedical Engineer",
      "Clinical Engineer",
      "Biomedical Equipment Technician",
      "Biologist",
      "Pharmacologist",
      "Chemist",
      "Biochemist",
      "Formulation Scientist",
      "Toxicologist",
      "DMPK Scientist",
      "Clinical Research Scientist",
      "Clinical Research Associate",
      "Biostatistician",
      "Regulatory Affairs Specialist",
      "Medical Writer",
      "Health Product Manager",
      "Clinical Operations Manager",
      "Healthcare Compliance Manager",
      "Healthcare Quality Improvement Specialist"
    ],
    "Government & Non-Profit": [
      "Government Relations Manager",
      "Policy Analyst",
      "Program Manager",
      "Fundraising Coordinator",
      "Volunteer Coordinator"
    ]
  }
}

In [None]:
cookies = {
    '_hjSessionUser_6388958': 'eyJpZCI6IjQxOGUwNzcwLWE3YzAtNTBmOC05NDE4LTAxYjQwNzFkNDYwZiIsImNyZWF0ZWQiOjE3NTA4Mjk4MjE5MTQsImV4aXN0aW5nIjpmYWxzZX0=',
    '_hjSession_6388958': 'eyJpZCI6IjYzNDlmYTVmLTlkNTEtNDU2YS04MDFiLWEwNTljNTRiYmY2NyIsImMiOjE3NTA4Mjk4MjE5MTUsInMiOjAsInIiOjAsInNiIjowLCJzciI6MCwic2UiOjAsImZzIjoxfQ==',
    '_uetsid': '6b81f720518611f0981ea504cec8251d',
    '_uetvid': '6b823070518611f09fcff1af0cf89f08',
    '_gcl_au': '1.1.1167952771.1750829822',
    '_clck': 'w0flwz%7C2%7Cfx2%7C0%7C2002',
    '_ga': 'GA1.1.1812497396.1750829822',
    '_ga_ETKKWETCJD': 'GS2.1.s1750829822$o1$g0$t1750829822$j60$l0$h928065022',
    '_tt_enable_cookie': '1',
    '_ttp': '01JYJSDX1B6KS3XCK7GN8QCHB8_.tt.1',
    'ttcsid': '1750829823024::gtewOAeumkry7EaMvHXD.1.1750829823024',
    '_clsk': 'wpanqj%7C1750829823973%7C1%7C1%7Cb.clarity.ms%2Fcollect',
    'ttcsid_CM0IJ53C77U0797CAP10': '1750829823023::jSB6o9Ve1MM-ssaV1TPo.1.1750829823975',
}

# send request to get jobposts

In [None]:
import requests


num_of_results = 200
job_title = 'Backend Engineer'

def get_job_posts(job_title,num_of_results):
  headers = {
      'accept': 'application/json, text/plain, */*',
      'accept-language': 'en-US,en;q=0.9,en-IN;q=0.8',
      'baggage': 'sentry-environment=production,sentry-release=pigeon_production%40v0.0.819,sentry-public_key=5f46138160b2461b9e0fb4bb1cc803bc,sentry-trace_id=81177d03883d49d0861aa611e3092220,sentry-sample_rate=0.01,sentry-transaction=%2Fjobs%2F%5Bvisit%5D,sentry-sampled=false',
      'content-type': 'application/json',
      'dnt': '1',
      'origin': 'https://jobright.ai',
      'priority': 'u=1, i',
      'referer': 'https://jobright.ai/jobs/back',
      'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138", "Microsoft Edge";v="138"',
      'sec-ch-ua-mobile': '?0',
      'sec-ch-ua-platform': '"Windows"',
      'sec-fetch-dest': 'empty',
      'sec-fetch-mode': 'cors',
      'sec-fetch-site': 'same-origin',
      'sentry-trace': '81177d03883d49d0861aa611e3092220-b7f6993eb0a7f9ee-0',
      'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0',
      'x-client-type': 'web',
      # 'cookie': '_hjSessionUser_6388958=eyJpZCI6IjQxOGUwNzcwLWE3YzAtNTBmOC05NDE4LTAxYjQwNzFkNDYwZiIsImNyZWF0ZWQiOjE3NTA4Mjk4MjE5MTQsImV4aXN0aW5nIjpmYWxzZX0=; _hjSession_6388958=eyJpZCI6IjYzNDlmYTVmLTlkNTEtNDU2YS04MDFiLWEwNTljNTRiYmY2NyIsImMiOjE3NTA4Mjk4MjE5MTUsInMiOjAsInIiOjAsInNiIjowLCJzciI6MCwic2UiOjAsImZzIjoxfQ==; _uetsid=6b81f720518611f0981ea504cec8251d; _uetvid=6b823070518611f09fcff1af0cf89f08; _gcl_au=1.1.1167952771.1750829822; _clck=w0flwz%7C2%7Cfx2%7C0%7C2002; _ga=GA1.1.1812497396.1750829822; _ga_ETKKWETCJD=GS2.1.s1750829822$o1$g0$t1750829822$j60$l0$h928065022; _tt_enable_cookie=1; _ttp=01JYJSDX1B6KS3XCK7GN8QCHB8_.tt.1; ttcsid=1750829823024::gtewOAeumkry7EaMvHXD.1.1750829823024; _clsk=wpanqj%7C1750829823973%7C1%7C1%7Cb.clarity.ms%2Fcollect; ttcsid_CM0IJ53C77U0797CAP10=1750829823023::jSB6o9Ve1MM-ssaV1TPo.1.1750829823975',
  }

  params = {
      'sortCondition': '0',
      'count': str(num_of_results),
      'position': '0',
  }

  json_data = {
      'jobTitle': job_title,
      'city': 'Within US',
      'jobTypes': [1,2,3,4,],
      'seniority': [1,2,3,4,5,6,],
      'workModel': [1,2,3,],
      'radiusRange': 1000,
      'position': 0,
      'count': 1000,
  }

  response = requests.post(
      'https://jobright.ai/swan/recommend/visitor-list/jobs',
      params=params,
      # cookies=cookies,
      headers=headers,
      json=json_data,
  )
  print(response)
  return response


# main code

In [None]:
job_cate_list = []
for key,value in category['job_category'].items():
    job_cate_list.extend(category['job_category'][key])
print(job_cate_list)
# data = dict(category['job_category'])
# data.values()
print(len(job_cate_list))

['Backend Engineer', 'Java Engineer', 'Python Engineer', '.Net Engineer', 'C/C++ Engineer', 'Golang Engineer', 'Full Stack Engineer', 'Blockchain Engineer', 'Salesforce Developer', 'Frontend Software Engineer', 'React Developer', 'UI/UX Developer', 'iOS/Swift Developer', 'Android Developer', 'Flutter Developer', 'Unity Developer', 'Unreal Engine Developer', 'AR/VR Developer', 'Game Developer', 'Software Testing/Quality Assurance Engineer', 'Automation Test Engineer', 'QA Manager', 'Network Security Engineer', 'Cloud Security Engineer', 'Cyber Security Analyst', 'Cyber Security Engineer', 'Network Engineer', 'Systems Engineer', 'Site Reliability Engineer (SRE)', 'DevOps', 'SoC Analyst', 'IT Support Specialist', 'Help Desk Technician/Desktop Support Technician', 'System Administrator', 'Network Support Specialist', 'Salesforce Administrator', 'Database Administrator', 'Machine Learning Engineer', 'AI Engineer', 'LLM Engineer', 'Machine Learning/AI Researcher', 'Machine Learning, Deep Lea

In [None]:
for job_title in job_cate_list:
  response = get_job_posts(job_title,num_of_results)
  time.sleep()

<Response [200]>


In [None]:
import json
data = dict(response.json())
# print(json.dumps(data))
print(len(data['result']['jobList']))
job_data = data['result']['jobList']
# job_results = data['result']['jobList']['jobResult']
# company_results = data['result']['jobList']['companyResult']
# job_notes = data['result']['jobList']['jobNotes']

200


In [None]:
job_results

In [None]:
import sqlite3
import pandas as pd


try:
  conn = sqlite3.connect('job_posts.db')
  c = conn.cursor()

  job_results = []
  company_results = []
  job_notes = []
  for job_details in job_data:
      post  =job_details['jobResult']
      company  =job_details['companyResult']
      note  =job_details['jobNotes']
      post['impId'] =company['impId'] =note['impId'] = job_details['impId']
      job_results.append(post)
      company_results.append(company)
      job_notes.append(note)
  # print(job_results)
  df_job_post = pd.DataFrame(job_results)
  df_company = pd.DataFrame(company_results)
  df_note = pd.DataFrame(job_notes)

  dfs = [df_job_post, df_company, df_note]
  for data_frames in dfs:
    for col in data_frames.columns:
      if data_frames[col].apply(type).eq(list).any() or data_frames[col].apply(type).eq(dict).any():
          print(f"Converting list in column '{col}' to JSON string.")
          data_frames[col] = data_frames[col].apply(json.dumps)
    # data_frames.drop_duplicates(inplace=True)

  df_job_post.to_sql('job_post', conn, if_exists='append', index=False)
  df_company.to_sql('company', conn, if_exists='append', index=False)
  df_note.to_sql('job_note', conn, if_exists='append', index=False)
  print("Data successfully written to database.")

finally:
  print('closing db')
  conn.close()

In [None]:
import sqlite3
import pandas as pd


try:
  conn = sqlite3.connect('job_posts.db')
  c = conn.cursor()
  df_job_post = pd.read_sql_query("SELECT * from job_post", conn)
  df_company = pd.read_sql("SELECT * from company", conn)
  df_note = pd.read_sql_query("SELECT * from job_note", conn)
finally:
  conn.close()

df_company

In [None]:
import asyncio
import sqlite3
import pandas as pd
import aiohttp
from itertools import cycle

# --- Configuration ---
DB_PATH = 'jobs_database.db'
PROXY_TABLE = 'proxies'
JOBS_TABLE = 'jobs'
RESULTS_TABLE = 'jobs_with_responses'
# Set the maximum number of concurrent requests to avoid overloading the server
MAX_CONCURRENT_REQUESTS = 5

# --- 1. Database Setup (Helper function to create sample data) ---
def setup_database():
    """Creates a sample database with proxies and jobs for demonstration."""
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()

    # Create proxies table
    c.execute(f'''
        CREATE TABLE IF NOT EXISTS {PROXY_TABLE} (
            proxy_url TEXT PRIMARY KEY,
            active INTEGER
        )
    ''')
    proxies = [
        # In a real scenario, these would be your actual proxy IPs and ports
        # For this example, we use a public proxy testing service.
        # If these fail, replace them with other free proxies or your own.
        ('http://5.78.69.164:8080', 1), # Example active proxy
        ('http://194.135.18.239:5678', 1), # Example active proxy
        ('http://1.1.1.1:8080', 0), # Example inactive proxy
        ('http://190.61.88.147:8080', 1), # Example active proxy
    ]
    c.executemany(f'INSERT OR IGNORE INTO {PROXY_TABLE} VALUES (?, ?)', proxies)

    # Create jobs table with URLs to scrape
    c.execute(f'''
        CREATE TABLE IF NOT EXISTS {JOBS_TABLE} (
            job_id INTEGER PRIMARY KEY,
            url TEXT NOT NULL
        )
    ''')
    urls = [
        # Using a site that's good for testing scraping
        (1, 'http://httpbin.org/get?job=1'),
        (2, 'http://httpbin.org/get?job=2'),
        (3, 'http://httpbin.org/status/404'), # This URL will fail
        (4, 'http://httpbin.org/get?job=4'),
        (5, 'http://httpbin.org/delay/2'), # This URL is slow
        (6, 'http://httpbin.org/get?job=6'),
        (7, 'http://httpbin.org/get?job=7'),
        (8, 'http://httpbin.org/get?job=8'),
        (9, 'https://non-existent-domain-12345.com'), # This domain doesn't exist
        (10, 'http://httpbin.org/get?job=10'),
    ]
    c.executemany(f'INSERT OR IGNORE INTO {JOBS_TABLE} VALUES (?, ?)', urls)

    conn.commit()
    conn.close()
    print("Database setup complete.")

# --- 2. Core Asynchronous Logic ---

async def fetch_url(session: aiohttp.ClientSession, url: str, proxy: str):
    """
    Fetches a single URL using a given proxy.
    Returns a dictionary with the URL, status, and response text.
    """
    print(f"Fetching {url} via proxy {proxy}")
    try:
        # Set a timeout for the request to avoid getting stuck
        timeout = aiohttp.ClientTimeout(total=20)
        async with session.get(url, proxy=proxy, timeout=timeout) as response:
            return {
                "url": url,
                "status_code": response.status,
                "response_text": await response.text()
            }
    except Exception as e:
        # Catch any exception (timeout, connection error, etc.)
        print(f"Error fetching {url}: {e}")
        return {
            "url": url,
            "status_code": -1, # Custom code for an application-level error
            "response_text": str(e)
        }

async def main():
    """
    Main function to orchestrate the process.
    """
    # a. Read active proxies from the database
    conn = sqlite3.connect(DB_PATH)
    try:
        proxies_df = pd.read_sql_query(f"SELECT proxy_url FROM {PROXY_TABLE} WHERE active = 1", conn)
        active_proxies = proxies_df['proxy_url'].tolist()
        if not active_proxies:
            print("No active proxies found. Exiting.")
            return

        # Create a rotator for the proxies
        proxy_rotator = cycle(active_proxies)

        # b. Read URLs from the database into a DataFrame
        jobs_df = pd.read_sql_query(f"SELECT * FROM {JOBS_TABLE}", conn)
        urls_to_fetch = jobs_df['url'].tolist()

    finally:
        conn.close()

    # c. Create a semaphore to limit concurrent requests
    semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)

    # d. Create a single aiohttp session and create tasks
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls_to_fetch:
            # This wrapper coroutine acquires the semaphore before fetching
            async def fetch_with_semaphore(url, proxy):
                async with semaphore:
                    return await fetch_url(session, url, proxy)

            # Assign a task for each URL, rotating through proxies
            task = asyncio.create_task(fetch_with_semaphore(url, next(proxy_rotator)))
            tasks.append(task)

        # e. Run all tasks concurrently and wait for them to complete
        print(f"Starting {len(tasks)} requests with a concurrency limit of {MAX_CONCURRENT_REQUESTS}...")
        results = await asyncio.gather(*tasks)

    # f. Process results and append to the original DataFrame
    # Create a mapping from URL to result for efficient lookup
    result_map = {res['url']: res for res in results}

    jobs_df['status_code'] = jobs_df['url'].map(lambda u: result_map[u]['status_code'])
    # Only keep first 500 chars of response to keep DB size reasonable
    jobs_df['response_text'] = jobs_df['url'].map(lambda u: result_map[u]['response_text'])

    print("\n--- Scraping Results ---")
    print(jobs_df)

    # g. Update the database with the new results
    conn = sqlite3.connect(DB_PATH)
    try:
        print(f"\nSaving results to table '{RESULTS_TABLE}'...")
        jobs_df.to_sql(RESULTS_TABLE, conn, if_exists='replace', index=False)
        print("Database updated successfully.")
    finally:
        conn.close()

if __name__ == "__main__":
    # Run the setup once to create the database
    setup_database()

    # Run the main asynchronous event loop
    await main()

Database setup complete.
Starting 10 requests with a concurrency limit of 5...
Fetching http://httpbin.org/get?job=1 via proxy http://5.78.69.164:8080
Fetching http://httpbin.org/get?job=2 via proxy http://194.135.18.239:5678
Fetching http://httpbin.org/status/404 via proxy http://190.61.88.147:8080
Fetching http://httpbin.org/get?job=4 via proxy http://5.78.69.164:8080
Fetching http://httpbin.org/delay/2 via proxy http://194.135.18.239:5678
Error fetching http://httpbin.org/get?job=2: Cannot connect to host 194.135.18.239:5678 ssl:default [Connect call failed ('194.135.18.239', 5678)]
Fetching http://httpbin.org/get?job=6 via proxy http://190.61.88.147:8080
Error fetching http://httpbin.org/delay/2: Cannot connect to host 194.135.18.239:5678 ssl:default [Connect call failed ('194.135.18.239', 5678)]
Fetching http://httpbin.org/get?job=7 via proxy http://5.78.69.164:8080
Error fetching http://httpbin.org/get?job=1: 
Error fetching http://httpbin.org/status/404: 
Error fetching http://h

In [None]:
conn = sqlite3.connect(DB_PATH)
df_res = pd.read_sql_query("SELECT * from jobs_with_responses", conn)
df_res

Unnamed: 0,job_id,url,status_code,response_text
0,1,http://httpbin.org/delay/2,-1,
1,2,http://httpbin.org/delay/2,-1,
2,3,http://httpbin.org/delay/2,-1,
3,4,https://api.ipify.org?format=json,-1,
4,5,https://api.ipify.org?format=json,-1,
5,6,https://api.ipify.org?format=json,-1,
6,7,https://api.ipify.org?format=json,-1,
7,8,http://httpbin.org/status/404,-1,Cannot connect to host 194.135.18.239:5678 ssl...
8,9,https://non-existent-domain-12345.com,-1,


In [None]:
import asyncio
import sqlite3
import pandas as pd
import aiohttp
from itertools import cycle
from urllib.parse import urlparse

# --- Configuration ---
DB_PATH = 'jobs_database_advanced.db'
PROXY_TABLE = 'proxies'
JOBS_TABLE = 'jobs'
RESULTS_TABLE = 'jobs_with_responses'

# Global limit: At most 10 requests can be active across ALL domains.
GLOBAL_MAX_CONCURRENT_REQUESTS = 10

# Per-domain rules:
# - 'httpbin.org' is limited to 2 concurrent requests.
# - Any other domain will use the '__default__' limit of 3.
DOMAIN_CONCURRENCY_RULES = {
    'httpbin.org': 2,
    '__default__': 3  # Default limit for any other domain
}

# --- 1. Database Setup (Helper function to create sample data) ---
def setup_database():
    """Creates a sample database with proxies and jobs for demonstration."""
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()

    # Proxies Table
    c.execute(f'DROP TABLE IF EXISTS {PROXY_TABLE}')
    c.execute(f'CREATE TABLE {PROXY_TABLE} (proxy_url TEXT PRIMARY KEY, active INTEGER)')
    proxies = [
        ('http://5.78.69.164:8080', 1), ('http://194.135.18.239:5678', 1),
        ('http://1.1.1.1:8080', 0), ('http://190.61.88.147:8080', 1),
    ]
    c.executemany(f'INSERT INTO {PROXY_TABLE} VALUES (?, ?)', proxies)

    # Jobs Table with URLs from different domains
    c.execute(f'DROP TABLE IF EXISTS {JOBS_TABLE}')
    c.execute(f'CREATE TABLE {JOBS_TABLE} (job_id INTEGER PRIMARY KEY, url TEXT NOT NULL)')
    urls = [
        (1, 'http://httpbin.org/delay/2'), (2, 'http://httpbin.org/delay/2'),
        (3, 'http://httpbin.org/delay/2'), # These 3 will be slow due to domain limit
        (4, 'https://api.ipify.org?format=json'), (5, 'https://api.ipify.org?format=json'),
        (6, 'https://api.ipify.org?format=json'), (7, 'https://api.ipify.org?format=json'), # These 4 hit the default limit
        (8, 'http://httpbin.org/status/404'), # Error URL
        (9, 'https://non-existent-domain-12345.com'), # Connection error
    ]
    c.executemany(f'INSERT INTO {JOBS_TABLE} VALUES (?, ?)', urls)

    conn.commit()
    conn.close()
    print("Database setup complete.")

# --- 2. Core Asynchronous Logic ---

async def fetch_url(session: aiohttp.ClientSession, url: str, proxy: str):
    """
    Fetches a single URL using a given proxy.
    Returns a dictionary with the URL, status, and response text.
    """
    try:
        timeout = aiohttp.ClientTimeout(total=30)
        async with session.get(url, proxy=proxy, timeout=timeout) as response:
            return {
                "url": url,
                "status_code": response.status,
                "response_text": await response.text()
            }
    except Exception as e:
        print(f"Error fetching {url}: {type(e).__name__}")
        return {
            "url": url,
            "status_code": -1,
            "response_text": str(e)
        }

async def fetch_with_semaphores(url, proxy, session, global_sem, domain_sem):
    """
    A wrapper that acquires both the global and domain-specific semaphores
    before calling the fetch function.
    """
    async with global_sem:
        async with domain_sem:
            # Announce which domain is being processed
            domain = urlparse(url).netloc
            print(f"Processing {url} (Domain: {domain}) via proxy {proxy}")
            return await fetch_url(session, url, proxy)

async def main():
    """Main function to orchestrate the entire process."""
    # a. Read data from the database
    conn = sqlite3.connect(DB_PATH)
    try:
        proxies_df = pd.read_sql_query(f"SELECT proxy_url FROM {PROXY_TABLE} WHERE active = 1", conn)
        jobs_df = pd.read_sql_query(f"SELECT * FROM {JOBS_TABLE}", conn)
    finally:
        conn.close()

    active_proxies = proxies_df['proxy_url'].tolist()
    if not active_proxies:
        print("No active proxies found. Exiting.")
        return
    proxy_rotator = cycle(active_proxies)

    # b. Initialize semaphores
    global_semaphore = asyncio.Semaphore(GLOBAL_MAX_CONCURRENT_REQUESTS)
    domain_semaphores = {}
    default_limit = DOMAIN_CONCURRENCY_RULES['__default__']

    # c. Create tasks with dual-semaphore logic
    tasks = []
    async with aiohttp.ClientSession() as session:
        for _, row in jobs_df.iterrows():
            url = row['url']
            domain = urlparse(url).netloc

            # Get or create the semaphore for this specific domain
            if domain not in domain_semaphores:
                limit = DOMAIN_CONCURRENCY_RULES.get(domain, default_limit)
                print(f"Creating semaphore for domain '{domain}' with limit {limit}.")
                domain_semaphores[domain] = asyncio.Semaphore(limit)

            domain_semaphore = domain_semaphores[domain]

            # Create a task that is governed by BOTH semaphores
            task = asyncio.create_task(fetch_with_semaphores(
                url, next(proxy_rotator), session, global_semaphore, domain_semaphore
            ))
            tasks.append(task)

        # d. Run all tasks and gather results
        print(f"\nStarting {len(tasks)} requests with global limit={GLOBAL_MAX_CONCURRENT_REQUESTS} and per-domain rules...")
        results = await asyncio.gather(*tasks)

    # e. Process results and append to DataFrame
    result_map = {res['url']: res for res in results}
    jobs_df['status_code'] = jobs_df['url'].map(lambda u: result_map[u]['status_code'])
    jobs_df['response_text'] = jobs_df['url'].map(lambda u: result_map[u]['response_text'][:500])

    print("\n--- Scraping Results ---")
    print(jobs_df)

    # f. Update database with results
    conn = sqlite3.connect(DB_PATH)
    try:
        print(f"\nSaving results to table '{RESULTS_TABLE}'...")
        jobs_df.to_sql(RESULTS_TABLE, conn, if_exists='replace', index=False)
        print("Database updated successfully.")
    finally:
        conn.close()

if __name__ == "__main__":
    setup_database()
    await main()

Database setup complete.
Creating semaphore for domain 'httpbin.org' with limit 2.
Creating semaphore for domain 'api.ipify.org' with limit 3.
Creating semaphore for domain 'non-existent-domain-12345.com' with limit 3.

Starting 9 requests with global limit=10 and per-domain rules...
Processing http://httpbin.org/delay/2 (Domain: httpbin.org) via proxy http://5.78.69.164:8080
Processing http://httpbin.org/delay/2 (Domain: httpbin.org) via proxy http://194.135.18.239:5678
Processing https://api.ipify.org?format=json (Domain: api.ipify.org) via proxy http://5.78.69.164:8080
Processing https://api.ipify.org?format=json (Domain: api.ipify.org) via proxy http://194.135.18.239:5678
Processing https://api.ipify.org?format=json (Domain: api.ipify.org) via proxy http://190.61.88.147:8080
Processing https://non-existent-domain-12345.com (Domain: non-existent-domain-12345.com) via proxy http://190.61.88.147:8080
Error fetching https://api.ipify.org?format=json: ClientProxyConnectionError
Processi

In [2]:
# import http.server
import threading
import requests
import json
import time
import sys

token = None

def setup():
    resp = requests.post('https://github.com/login/device/code', headers={
            'accept': 'application/json',
            'editor-version': 'Neovim/0.6.1',
            'editor-plugin-version': 'copilot.vim/1.16.0',
            'content-type': 'application/json',
            'user-agent': 'GithubCopilot/1.155.0',
            'accept-encoding': 'gzip,deflate,br'
        }, data='{"client_id":"Iv1.b507a08c87ecfe98","scope":"read:user"}')


    # Parse the response json, isolating the device_code, user_code, and verification_uri
    resp_json = resp.json()
    device_code = resp_json.get('device_code')
    user_code = resp_json.get('user_code')
    verification_uri = resp_json.get('verification_uri')

    # Print the user code and verification uri
    print(f'Please visit {verification_uri} and enter code {user_code} to authenticate.')


    while True:
        time.sleep(5)
        resp = requests.post('https://github.com/login/oauth/access_token', headers={
            'accept': 'application/json',
            'editor-version': 'Neovim/0.6.1',
            'editor-plugin-version': 'copilot.vim/1.16.0',
            'content-type': 'application/json',
            'user-agent': 'GithubCopilot/1.155.0',
            'accept-encoding': 'gzip,deflate,br'
            }, data=f'{{"client_id":"Iv1.b507a08c87ecfe98","device_code":"{device_code}","grant_type":"urn:ietf:params:oauth:grant-type:device_code"}}')

        # Parse the response json, isolating the access_token
        resp_json = resp.json()
        access_token = resp_json.get('access_token')

        if access_token:
            break

    # Save the access token to a file
    with open('.copilot_token', 'w') as f:
        f.write(access_token)

    print('Authentication success!')


def get_token():
    global token
        # Check if the .copilot_token file exists
    while True:
        try:
            with open('.copilot_token', 'r') as f:
                access_token = f.read()
                break
        except FileNotFoundError:
            setup()
    # Get a session with the access token
    resp = requests.get('https://api.github.com/copilot_internal/v2/token', headers={
        'authorization': f'token {access_token}',
        'editor-version': 'Neovim/0.6.1',
        'editor-plugin-version': 'copilot.vim/1.16.0',
        'user-agent': 'GithubCopilot/1.155.0'
    })

    # Parse the response json, isolating the token
    resp_json = resp.json()
    token = resp_json.get('token')


def token_thread():
    global token
    while True:
        get_token()
        time.sleep(25 * 60)
def chat_completion_stream(messages: list, model: str, temperature: float = 0.1):
      """Stream chat completion from Copilot"""
      global token
    # If the token is None, get a new one
      if token is None or is_token_invalid(token):
        get_token()
      headers = {
          'authorization': f'Bearer {token}',
          'content-type': 'application/json',
          'copilot-integration-id': 'vscode-chat',
          'editor-plugin-version': 'copilot-chat/0.28.5',
          'editor-version': 'vscode/1.101.2',
          'openai-intent': 'conversation-other',
          'user-agent': 'GitHubCopilotChat/0.28.5',
          'x-github-api-version': '2025-05-01',
          'x-initiator': 'user',
          'x-interaction-type': 'conversation-other'
      }

      payload = {
          "messages": messages,
          "model": model,
          "temperature": temperature,
          "top_p": 1,
          "max_tokens": 64000,
          "n": 1,
          "stream": True
      }

      try:
          resp = requests.post('https://api.individual.githubcopilot.com/chat/completions',
                              headers=headers, json=payload, stream=True)

          if resp.status_code != 200:
              yield f"data: {json.dumps({'error': f'HTTP {resp.status_code}: {resp.text}'})}\n\n"
              return

          for line in resp.iter_lines():
              if line:
                  line_str = line.decode('utf-8')
                  if line_str.startswith('data: '):
                      yield line_str + '\n\n'

      except requests.exceptions.RequestException as e:
          yield f"data: {json.dumps({'error': f'Request error: {str(e)}'})}\n\n"
def copilot(prompt, language='python'):
    global token
    # If the token is None, get a new one
    if token is None or is_token_invalid(token):
        get_token()
    header = {
        'authorization': f'Bearer {token}',
  "ontent-type": "application/json",
  "copilot-integration-id": "vscode-chat",
  "editor-plugin-version": "copilot-chat/0.28.5",
  "editor-version": "vscode/1.101.2",
  "openai-intent": "conversation-other",
  "user-agent": "GitHubCopilotChat/0.28.5",
  # "vscode-machineid": "4547ee588f224680bd8fd598883975944a13b163e4b18f0f63de8950796578fe",
  # "vscode-sessionid": "7a2c2033-6471-4682-bda7-d14e756a00781751890740497",
  "x-github-api-version": "2025-05-01",
  "x-initiator": "user",
  # "x-interaction-id": "890a4322-84ee-42e9-96b8-b3f3ff44e529",
  "x-interaction-type": "conversation-other",
  # "x-onbehalf-extension-id": "saoudrizwan.claude-dev/3.18.3",
  # "x-request-id": "31270fce-3828-4ce7-8e01-300521a9ca48",
  "x-vscode-user-agent-library-version": "electron-fetch",
  "sec-fetch-site": "none",
  "sec-fetch-mode": "no-cors",
  "sec-fetch-dest": "empty",
  "accept-encoding": "gzip, deflate, br, zstd",
  "priority": "u=4, i"
}
    try:
        resp = requests.post('https://api.individual.githubcopilot.com/chat/completions', headers=header,
        json={
    "messages": [
        {
            "role": "system",
            "content": "Follow Microsoft content policies.\nAvoid content that violates copyrights.\nIf you are asked to generate content that is harmful, hateful, racist, sexist, lewd, or violent, only respond with \"Sorry, I can't assist with that.\"\nKeep your answers short and impersonal.\nUse Markdown formatting in your answers.\nMake sure to include the programming language name at the start of the Markdown code blocks.\nAvoid wrapping the whole response in triple backticks.\nThe user works in an IDE called Visual Studio Code which has a concept for editors with open files, integrated unit test support, an output pane that shows the output of running the code as well as an integrated terminal.\nThe active document is the source code the user is looking at right now.\nYou can only give one reply for each conversation turn."
        },
        {
            "role" : "assistant",
            "content" : prompt
        },
        {
            "role": "user",
            "content": "<task>\nhi\n</task>"
        },
        {
            "role": "user",
            "content": "<environment_details>\n# VSCode Visible Files\n(No visible files)\n\n# VSCode Open Tabs\nmain.py\n../../../Users/madha/AppData/Roaming/Code/User/settings.json\n\n# Current Time\n7/7/2025, 6:30:50 PM (Asia/Calcutta, UTC+5.5:00)\n\n# Current Working Directory (c:/my_space/projects/vscode_copilot) Files\n.copilot_token\nmain.py\n\n# Context Window Usage\n0 / 128K tokens used (0%)\n\n# Current Mode\nPLAN MODE\nIn this mode you should focus on information gathering, asking questions, and architecting a solution. Once you have a plan, use the plan_mode_respond tool to engage in a conversational back and forth with the user. Do not use the plan_mode_respond tool until you've gathered all the information you need e.g. with read_file or ask_followup_question.\n(Remember: If it seems the user wants you to use tools only available in Act Mode, you should ask the user to \"toggle to Act mode\" (use those words) - they will have to manually do this themselves with the Plan/Act toggle button below. You do not have the ability to switch to Act Mode yourself, and must wait for the user to do it themselves once they are satisfied with the plan. You also cannot present an option to toggle to Act mode, as this will be something you need to direct the user to do manually themselves.)\n</environment_details>"
        }
    ],
    "model": "gemini-2.5-pro",
    "temperature": 0.1,
    "top_p": 1,
    "max_tokens": 64000,
    "n": 1,
    "stream": True
}
                             )
        print(resp.status_code)
        print(resp.text)

    except requests.exceptions.ConnectionError:
        return ''

    result = ''

    # Parse the response text, splitting it by newlines
    resp_text = resp.text.split('\n')
    for line in resp_text:
        # If the line contains a completion, print it
        if line.startswith('data: {'):
            # Parse the completion from the line as json
            json_completion = json.loads(line[6:])
            completion = json_completion.get('choices')[0].get('text')
            if completion:
                result += completion
            else:
                result += '\n'

    return result

# Check if the token is invalid through the exp field
def is_token_invalid(token):
    if token is None or 'exp' not in token or extract_exp_value(token) <= time.time():
        return True
    return False

def extract_exp_value(token):
    pairs = token.split(';')
    for pair in pairs:
        key, value = pair.split('=')
        if key.strip() == 'exp':
            return int(value.strip())
    return None

# class HTTPRequestHandler(http.server.BaseHTTPRequestHandler):
#     def do_POST(self):
#         # Get the request body
#         content_length = int(self.headers['Content-Length'])
#         body = self.rfile.read(content_length)

#         # Parse the request body as json
#         body_json = json.loads(body)

#         # Get the prompt from the request body
#         prompt = body_json.get('prompt')
#         language = body_json.get('language', 'python')

#         # Get the completion from the copilot function
#         completion = copilot(prompt, language)

#         # Send the completion as the response
#         self.send_response(200)
#         self.send_header('Content-type', 'text/plain')
#         self.end_headers()
#         self.wfile.write(completion.encode())


def main():
    # Every 25 minutes, get a new token
    threading.Thread(target=token_thread, daemon=True).start()
    prompt = ["generate me a addition function in python"]
    language = "python"
    # completion = copilot(prompt, language)
    completion_generator = chat_completion_stream(prompt,"gemini-2.5-pro")
    for chunk in completion_generator:
      print(chunk, end='')

if __name__ == '__main__':
    main()

Please visit https://github.com/login/device and enter code 97D7-AFAF to authenticate.
Please visit https://github.com/login/device and enter code 0D0C-5F00 to authenticate.


KeyboardInterrupt: 

In [13]:
import json
print(token)
token = json.loads(token)
headers = {'authorization': f'Bearer {token["tid"]}'}
print(headers)

tid=0b97ef2bd0dcc75c83d5ca765e9d6a4c;exp=1751891450;sku=free_educational_quota;proxy-ep=proxy.individual.githubcopilot.com;st=dotcom;chat=1;cit=1;malfil=1;editor_preview_features=1;mcp=1;ccr=1;rt=1;8kp=1;ip=34.23.59.211;asn=AS396982:1df83fc4b78787daa76ade7b527ded8e9481e6a9bcf4f2ee3c6d9c6778155e40


JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [16]:
def copilot(prompt, language='python'):
    global token
    # If the token is None or invalid, get a new one
    # Note: The is_token_invalid function also needs a small fix (see below)
    if is_token_invalid(token):
        get_token()

    # --- FIX #1: Use the correct token string from the JSON object ---
    # The header needs the actual token, not the whole python dictionary.
    # headers = {'authorization': f'Bearer {token["tid"]}'}
    headers = {'authorization': f'Bearer {token}'}

    json_payload = {
        'prompt': prompt,
        'suffix': '',
        'max_tokens': 1000, # 10000 is very large, using a more standard value
        'temperature': 0,
        'top_p': 1,
        'n': 1,
        'stop': ['\n'],
        'nwo': 'github/copilot.vim',
        'stream': True,
        # 'extra': {
        #     'language': language
        # }
    }

    try:
        # --- FIX #2: Process the response as a stream ---
        # We add stream=True to the request call itself and iterate over the response.
        with requests.post('https://copilot-proxy.githubusercontent.com/v1/engines/copilot-codex/completions',
                           headers=headers, json=json_payload, stream=True) as resp:
            print(resp)
            print(f"Status Code: {resp.status_code}")
            if resp.status_code != 200:
                print(f"Error: Received status code {resp.status_code}")
                print(resp.text)
                return ""

            result = ''
            # We iterate over the response line by line as it comes in
            for line in resp.iter_lines():
                if not line:
                    continue

                # The line is a bytes object, so we decode it
                line_str = line.decode('utf-8')

                # The stream ends with this special message
                if line_str == 'data: [DONE]':
                    break

                # We're looking for the data payload
                if line_str.startswith('data: {'):
                    try:
                        # Parse the completion from the line as json
                        json_completion = json.loads(line_str[6:])

                        # --- FIX #3: Use the correct JSON key 'p' instead of 'text' ---
                        completion = json_completion.get('choices')[0].get('p') # Changed .get('text') to .get('p')

                        if completion:
                            result += completion
                    except (json.JSONDecodeError, IndexError, KeyError) as e:
                        print(f"Skipping malformed line: {line_str}, Error: {e}")

        return result

    except requests.exceptions.RequestException as e:
        print(f"A connection error occurred: {e}")
        return ''

def main():
    # Every 25 minutes, get a new token
    threading.Thread(target=token_thread, daemon=True).start()
    prompt = "what model are you"
    language = "python"
    completion = copilot(prompt, language)
    print(completion)

main()

<Response [200]>
Status Code: 200
aaaaaaaaaaaaaaa
