## Quant Job Extraction

Using serpAPI we will query quant jobs in the US and look for common skills and salary ranges for job titles and experiences.

In [44]:
import pandas as pd
from datetime import datetime
import logging

In [45]:
# Configure the logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)  # Set logging level to INFO

# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)

# Create a formatter and add it to the handler
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)

# Add the handler to the logger (avoid duplicate handlers)
if not logger.handlers:
    logger.addHandler(console_handler)

In [27]:
datetime.now()

datetime.datetime(2025, 6, 17, 8, 58, 24, 576129)

In [7]:
import os
from dotenv import load_dotenv
from serpapi import GoogleSearch

# Load environment variables from .env file
load_dotenv()

# Access the key
serpapi_key = os.getenv("SERPAPI_KEY")

In [120]:
search_terms = ['Quantitative Researcher', 'Quantitative Analyst', 'Quantitative Trader', 'Algorithmic Trader', 
                'Quantitative Developer', 'Quantitative Software Engineer', 'Model Validation Quantitative Analyst', 'Model Validation Analyst',
                'Quantitative Risk Analyst', 'Quantitative Portfolio Manager', 'Quantitative Investment Manager', 'Quantitative Asset Manager',
                'Quantitative Strategist', 'Financial Engineer', 'Quantitative Pricing Analyst', 'Trader', 
                'Energy Trader', 'Energy Analyst', 'Power Trader'
               ]
search_locations_us = ["New York, United States", "California, United States", 
    "Texas, United States", "Illinois, United States", "Florida, United States"]

#### Serp API call

Refined from lukebarousse repos https://github.com/lukebarousse/Data_Job_Pipeline_Airflow/blob/main/dags/serpapi_bigquery.py

In [129]:
def _serpapi_bigquery(search_term, search_location, search_time):
    """
    Function to call SerpApi and insert results into BigQuery {gsearch_jobs_all} used by us_job_postings 
    and non_us_job_postings tasks

    Args:
        search_terms : list
            List of search terms to search for
        search_locations : list
            List of search locations to search for
        search_time : str
            Time period to search for (e.g. 'past 24 hours')

    Returns:
        num_searches : int
            Number of searches performed for this search term and location
    
    Source:
        https://serpapi.com/google-jobs-results
        https://cloud.google.com/bigquery/docs/reference/libraries
    """
    next_page_token = None
    num = 0
    has_more_results = True
    jobs_all = pd.DataFrame()  # Always initialize it

    while has_more_results:
        logger.debug(f"START API CALL: {search_term} in {search_location} on search {num}")

        error = False
        params = {
            "api_key": serpapi_key,
            "device": "desktop",
            "engine": "google_jobs",
            "google_domain": "google.com",
            "q": search_term,
            "hl": "en",
            "gl": "us",
            "location": search_location,
            "chips": search_time,
        }

        if next_page_token:
            params["next_page_token"] = next_page_token

        try:
            search = GoogleSearch(params)
            results = search.get_dict()

            if 'error' in results:
                logger.debug(f"END SerpApi CALLS: {search_term} in {search_location} on search {num}: {results['error']}")
                error = True
                break

            logger.debug(f"SUCCESS SerpApi CALL: {search_term} in {search_location} on search {num}")

            # Process results and insert into BigQuery
            jobs = results['jobs_results']
            jobs = pd.DataFrame(jobs)
            jobs = pd.concat(
                [jobs, pd.json_normalize(jobs['detected_extensions'])],
                axis=1
            ).drop('detected_extensions', axis=1)
            jobs['date_time'] = datetime.now()

            if num == 0:
                jobs_all = jobs
            else:
                jobs_all = pd.concat([jobs_all, jobs])

            jobs_all['search_term'] = search_term
            jobs_all['search_location'] = search_location

            # Check for next_page_token
            if 'serpapi_pagination' in results and 'next_page_token' in results['serpapi_pagination']:
                next_page_token = results['serpapi_pagination']['next_page_token']
            else:
                logger.debug(f"END API CALLS: No more results for {search_term} in {search_location} on search {num}")
                has_more_results = False

            num += 1

        except Exception as e:
            logger.error(f"SerpApi ERROR (Timeout)!!!: {search_term} in {search_location} had an error (most likely TimeOut)!!!")
            logger.error(f"Following error returned: {e}")
            time.sleep(ERROR_SLEEP_MIN * 60)
            error = True
            break

    # Insert data into BigQuery
    if num > 0 and not error:

        final_columns = ['title', 'company_name', 'location', 'via', 'description', 'extensions',
                            'job_id', 'thumbnail', 'posted_at', 'schedule_type', 'salary',
                            'work_from_home', 'date_time', 'search_term', 'search_location', 'commute_time']
        jobs_all = jobs_all.loc[:, jobs_all.columns.isin(final_columns)]
        


    num_searches = num + 1

    return jobs_all, num_searches   

### Iterate over US jobs

In [97]:
def _us_jobs(search_terms, search_locations_us, **context):
    """
    DAG to pull US job postings using the _serpapi_bigquery function

    Args:
        search_terms : list
            List of search terms to search for
        search_locations_us : list
            List of search locations to search for
        context : dict
            Context dictionary from Airflow

    Returns:
        None
    """
    # search_time = "date_posted:today"
    search_time = "date_posted:month"
    total_searches = 0

    all_jobs = []

    for search_term in search_terms:
        jobs_for_search_term = []
    
        for search_location in search_locations_us:
            logger.debug(f"START SEARCH: {total_searches} searches done, starting search...")
            jobs_all, num_searches = _serpapi_bigquery(search_term, search_location, search_time)   
            total_searches += num_searches
    
            if jobs_all is not None and not jobs_all.empty:
                jobs_for_search_term.append(jobs_all)
    
        if jobs_for_search_term:
            # Concatenate all dataframes for this search term
            jobs_for_search_term_df = pd.concat(jobs_for_search_term, ignore_index=True)
            jobs_for_search_term_df.to_csv(f"{search_term}.csv", index=False)
            all_jobs.append(jobs_for_search_term_df)

    return all_jobs

In [98]:
all_jobs = _us_jobs(search_terms, search_locations_us)
all_jobs_df = pd.concat(all_jobs, ignore_index=True)
all_jobs_df.to_csv(f"all_jobs.csv", index=False)

ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

In [None]:
# Max number of searches to perform daily
MAX_SEARCHES = 1500

from numpy.random import choice

In [78]:
"""
Classify countries by code and sort them by percentage of views
"""

import pandas as pd

def view_percent():
    # import different country codes
    codes = pd.read_csv("country_codes.csv")

    # import youtube views for my channel and calculate percentage viewed
    views = pd.read_csv("youtube_views.csv")
    views = views.iloc[1: , :]
    views = views[views.Views != 0] # removing countries with no views
    views = views[views.Geography != 'US'] # pulling US already
    views["percent"] = views['Watch time (hours)'] / views['Watch time (hours)'].sum()

    # no results returned from SerpApi from these countries
    # may consider removing from search in future, but doesn' appear to use search credits for no results
    no_country_results = ["MO", "IR", "SD", "SY", "SZ", "SS" ] # "Macao", "Iran", "Sudan", "Syria", "Eswatini", "South Sudan"

    # merge dataframes for final dataframe
    percent = views.merge(codes, how='left', left_on='Geography', right_on='code')
    percent = percent[['country','percent']]
    
    # return the dataframe
    return percent

In [79]:
country_percent = view_percent()

country_percent

Unnamed: 0,country,percent
0,India,0.254927
1,United Kingdom,0.196187
2,Germany,0.098563
3,Canada,0.063891
4,Australia,0.061864
5,France,0.050508
6,Italy,0.028072
7,Netherlands,0.02674
8,South Africa,0.024297
9,Brazil,0.019638


In [134]:
# create list of countries listed based on weighted probability to get random countries
search_countries = list(country_percent.country)
search_probabilities = list(country_percent.percent)
search_locations = list(choice(search_countries, size=len(search_countries), replace=False, p=search_probabilities))
non_searched = [search_location for search_location in search_locations if search_location not in ['United Kingdom', 'Australia', 'Canada', 'Singapore', 'Japan', 'Hong Kong', 
                            'Switzerland', 'Netherlands', 'Denmark', 'Germany', 'France', 'Spain', 'Italy']]

In [135]:
def _non_us_jobs(search_terms, country_percent, **context):
    """
    DAG to pull non-US job postings using the _serpapi_bigquery function

    Args:
        search_terms : list
            List of search terms to search for
        country_percent : pandas dataframe
            Dataframe of countries and their relative percent of total YouTube views for my channel
        context : dict
            Context dictionary from Airflow

    Returns:
        None
    
    Source:
        https://youtube.com/@lukebarousse
    """
    search_time = "date_posted:month"
    search_countries = list(country_percent.country)
    search_probabilities = list(country_percent.percent)
    total_searches = 0

    # create list of countries listed based on weighted probability to get random countries
    search_locations = list(choice(search_countries, size=len(search_countries), replace=False, p=search_probabilities))
    all_jobs = []
        
    for search_location in non_searched: #search_locations:
        jobs_for_search_location = []
        
        if total_searches < MAX_SEARCHES:
            logger.info(f"SEARCHING COUNTRY: {search_location} [{search_locations.index(search_location)+1} of {len(search_locations)}]")
            for search_term in search_terms:
                logger.debug(f"SEARCHING TERM: {search_term}")
                logger.debug(f"Starting search number {total_searches}...")
                jobs_all, num_searches = _serpapi_bigquery(search_term, search_location, search_time)   
                total_searches += num_searches

                if jobs_all is not None and not jobs_all.empty:
                    jobs_for_search_location.append(jobs_all)                

            if jobs_for_search_location:
                # Concatenate all dataframes for this search term
                jobs_for_search_location_df = pd.concat(jobs_for_search_location, ignore_index=True)
                jobs_for_search_location_df.to_csv(f"{search_location}.csv", index=False)

        else:
            logger.info(f"STICK A FORK IN ME, I'M DONE!!!!: {total_searches} searches complete")
            return

In [None]:
_non_us_jobs(search_terms, country_percent)

2025-06-17 13:29:04,309 - INFO - SEARCHING COUNTRY: India [2 of 44]
