<a href="https://colab.research.google.com/github/AbhinavKumar0000/Data_collection_pipeline/blob/main/Job_Postings_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Objective
The objective of this notebook is to document the end-to-end process of constructing a high quality, annotated dataset from raw software engineering job postings. The resulting dataset is intended for use in downstream machine learning applications, such as role classification and skill based candidate matching.

## Pipeline Stages
The documented pipeline consists of four primary stages:



*   **Data Collection**: Sourcing and ingesting raw job data via an API.
*   **Data Cleaning and Preprocessing**: Applying standardization and normalization procedures to the text data.
*   **Exploratory Data Analysis (EDA)**: Performing quantitative analysis of the corpus to inform the annotation strategy.
*   **Data Annotation**: Programmatically applying a data driven labeling schema to create the final, enriched dataset.

In [None]:
#Imporing necessary libraries
import requests
import pandas as pd
from bs4 import BeautifulSoup
import re
from sklearn.feature_extraction.text import CountVectorizer
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

### Ingests job posting data from the Remotive REST API

In [None]:
def collect_jobs_from_remotive_api():

    #API endpoint and parameters
    api_url = "https://remotive.com/api/remote-jobs?category=software-development&limit=50"

    print(f"Requesting data from API endpoint: {api_url}")
    job_list = []

    try:
        response = requests.get(api_url)
        response.raise_for_status()
        data = response.json()

        if 'jobs' not in data or not data['jobs']:
            print("The API response did not contain a 'jobs' array or it was empty")
            return pd.DataFrame()

        print(f"Successfully fetched {len(data['jobs'])} records from the API")

        #HTML is parsed to extract plain text
        for job in data['jobs']:
            description_html = job.get('description', '')
            soup = BeautifulSoup(description_html, 'html.parser')
            clean_description = soup.get_text(separator=' ', strip=True)

            #Maping the API response fields to our target schema
            job_data = {
                'job_title': job.get('title'),
                'company_name': job.get('company_name'),
                'location': job.get('candidate_required_location'),
                'job_description': clean_description,
                'url': job.get('url')
            }
            job_list.append(job_data)

    except requests.exceptions.RequestException as e:
        print(f"An error occurred during the API request: {e}")
        return pd.DataFrame()

    return pd.DataFrame(job_list)

###Executing the Data Collection Script

In [None]:
raw_df = collect_jobs_from_remotive_api()

if not raw_df.empty:
    raw_df.to_csv('raw_data.csv', index=False)
    print("Data collection successful. Raw dataset saved to raw_data.csv")
else:
    print("Data collection failed or returned no data")

In [None]:
if not raw_df.empty:
    print("Raw Data:")
    display(raw_df.head())

In [None]:
raw_df.info()

##Standardizing Dataset

In [None]:
def clean_job_description(text):

    if not isinstance(text, str):
        return ""

    #Standardize text to lowercase
    text = text.lower()

    #Remove any non-alphanumeric characters
    text = re.sub(r'[^a-z0-9\s]', '', text)

    #Collapse multiple whitespace characters into a single space
    text = re.sub(r'\s+', ' ', text).strip()
    return text

In [None]:
try:
    df = pd.read_csv('raw_data.csv')
    print(f"Loaded raw_data.csv with {len(df)} initial records")

    #De-duplicate records based on the unique job URL
    df.drop_duplicates(subset=['url'], inplace=True)

    #Remove records with null values in the description
    df.dropna(subset=['job_description'], inplace=True)
    print(f"After handling duplicates and nulls, {len(df)} records remain")

    #Apply the text cleaning to the job_description column
    df['cleaned_description'] = df['job_description'].apply(clean_job_description)

    #Select and reorder columns for the cleaned dataset
    cleaned_df = df[['job_title', 'company_name', 'location', 'cleaned_description', 'url']]
    cleaned_df.to_csv('cleaned_data.csv', index=False)
    print("Data cleaning complete and saved to cleaned_data.csv")

except FileNotFoundError:
    print("Error: raw_data.csv not found")

In [None]:
if 'cleaned_df' in locals():
    print("Cleaned Data:")
    display(cleaned_df.head())

In [None]:
cleaned_df.info()

# **Exploratory Data Analysis (EDA)**:
Before defining an annotation schema, an Exploratory Data Analysis is performed to understand the underlying structure and content of the text corpus. This data-driven approach ensures that the chosen labels are relevant, representative of the data, and will provide maximum value for model training. The analysis focuses on identifying the most frequent terms and phrases.

In [None]:
try:
    df_eda = pd.read_csv('cleaned_data.csv').dropna(subset=['cleaned_description'])
    print(f"Loaded cleaned_data.csv for analysis, containing {len(df_eda)} records")
except FileNotFoundError:
    print("Error: cleaned_data.csv not found")

###Unigram (Single Word) Frequency Analysis





In [None]:
vec_unigram = CountVectorizer(stop_words='english', ngram_range=(1, 1))
unigram_counts = vec_unigram.fit_transform(df_eda['cleaned_description'])
sum_words = unigram_counts.sum(axis=0)
words_freq = sorted([(word, sum_words[0, idx]) for word, idx in vec_unigram.vocabulary_.items()], key=lambda x: x[1], reverse=True)
top_20_words = pd.DataFrame(words_freq[:20], columns=['Word', 'Frequency'])

### Bigram (Two Word Phrase) Frequency Analysis

In [None]:
vec_bigram = CountVectorizer(stop_words='english', ngram_range=(2, 2))
bigram_counts = vec_bigram.fit_transform(df_eda['cleaned_description'])
sum_phrases = bigram_counts.sum(axis=0)
phrases_freq = sorted([(phrase, sum_phrases[0, idx]) for phrase, idx in vec_bigram.vocabulary_.items()], key=lambda x: x[1], reverse=True)
top_20_phrases = pd.DataFrame(phrases_freq[:20], columns=['Phrase', 'Frequency'])

## Visualization of EDA Results

In [None]:
plt.style.use('default')
fig, axes = plt.subplots(2, 1, figsize=(14, 12))
fig.suptitle('Job Description Text Analysis', fontsize=20, fontweight='bold', y=0.98)

#Plot 1: Top 20 words with horizontal bars
bars1 = axes[0].barh(top_20_words['Word'], top_20_words['Frequency'],
                    color=plt.cm.Blues(np.linspace(0.4, 0.8, len(top_20_words))),
                    edgecolor='black', linewidth=0.5)
axes[0].set_title('Top 20 Most Frequent Terms', fontsize=16, fontweight='bold', pad=20)
axes[0].set_xlabel('Frequency', fontsize=12, fontweight='bold')
axes[0].set_ylabel('Unigrams', fontsize=12, fontweight='bold')
axes[0].grid(axis='x', alpha=0.3)

for i, (value, word) in enumerate(zip(top_20_words['Frequency'], top_20_words['Word'])):
    axes[0].text(value + max(top_20_words['Frequency'])*0.01, i,
                f'{value:,}', va='center', fontsize=10, fontweight='bold')

#Plot 2: Top 20 phrases with different color scheme
bars2 = axes[1].barh(top_20_phrases['Phrase'], top_20_phrases['Frequency'],
                    color=plt.cm.Oranges(np.linspace(0.4, 0.8, len(top_20_phrases))),
                    edgecolor='black', linewidth=0.5)
axes[1].set_title('Top 20 Most Frequent Bigrams', fontsize=16, fontweight='bold', pad=20)
axes[1].set_xlabel('Frequency', fontsize=12, fontweight='bold')
axes[1].set_ylabel('Bigrams', fontsize=12, fontweight='bold')
axes[1].grid(axis='x', alpha=0.3)

for i, (value, phrase) in enumerate(zip(top_20_phrases['Frequency'], top_20_phrases['Phrase'])):
    axes[1].text(value + max(top_20_phrases['Frequency'])*0.01, i,
                f'{value:,}', va='center', fontsize=10, fontweight='bold')

plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()

 Generating a skill list by cross referencing EDA results with a Master list of known Technical Skills

In [None]:
MASTER_SKILL_LIST = [
    #Programming Languages
    'python', 'javascript', 'typescript', 'java', 'go', 'golang', 'ruby', 'php', 'c++', 'c#', 'rust', 'kotlin', 'swift',

    #Frontend Frameworks & Libraries
    'react', 'angular', 'vue', 'svelte', 'next js', 'react native',

    #Backend Frameworks & Libraries
    'node js', 'django', 'flask', 'fastapi', 'spring', 'ruby on rails',

    #Cloud Platforms & Services
    'aws', 'gcp', 'azure', 'amazon web services', 'google cloud', 'heroku', 'digitalocean',

    #Databases & Caching
    'sql', 'nosql', 'postgresql', 'mysql', 'mongodb', 'redis', 'cassandra', 'dynamodb', 'elasticsearch',

    #DevOps, Infrastructure & Tooling
    'docker', 'kubernetes', 'terraform', 'ansible', 'jenkins', 'ci cd', 'git', 'github', 'gitlab',

    #Data Science & ML
    'pandas', 'numpy', 'tensorflow', 'pytorch', 'scikit learn', 'apache spark', 'hadoop'
]

def generate_skills_from_eda(words_freq, phrases_freq, master_list):
    frequent_terms = {item[0] for item in words_freq + phrases_freq}
    identified_skills = [skill for skill in master_list if skill in frequent_terms]

    return identified_skills

if 'words_freq' in locals() and 'phrases_freq' in locals():

    SKILL_KEYWORDS = generate_skills_from_eda(words_freq, phrases_freq, MASTER_SKILL_LIST)
    print(f"Found {len(SKILL_KEYWORDS)} relevant skills in the dataset based on the EDA")
    print(SKILL_KEYWORDS)
else:
    print("Error: EDA results ('words_freq', 'phrases_freq') not found. Please run the EDA step first")
    #As a fallback, use the full master list
    SKILL_KEYWORDS = MASTER_SKILL_LIST

Assigning label for experience_level, job_type and technical_skills

In [None]:
def get_experience_level(desc):

    if any(k in desc for k in ['principal', 'lead', 'staff', '10+', '8+ years']): return 'Lead/Principal'
    if any(k in desc for k in ['senior', 'sr', '5+ years', '6+ years']): return 'Senior'
    if any(k in desc for k in ['mid level', '2+ years', '3+ years']): return 'Mid-Level'
    if any(k in desc for k in ['junior', 'jr', 'entry level', 'graduate']): return 'Entry-Level'
    return 'Not Specified'

def get_job_type(desc):

    if any(k in desc for k in ['full stack', 'fullstack']): return 'Full-Stack'
    if any(k in desc for k in ['backend', 'back end', 'api']): return 'Backend'
    if any(k in desc for k in ['frontend', 'front end', 'ui', 'ux']): return 'Frontend'
    if any(k in desc for k in ['mobile', 'ios', 'android']): return 'Mobile'
    if any(k in desc for k in ['devops', 'sre', 'infrastructure']): return 'DevOps'
    if 'data engineer' in desc: return 'Data'
    return 'Not Specified'

def extract_skill_tags(desc):

    found = [skill for skill in SKILL_KEYWORDS if skill in desc]
    return ', '.join(found) if found else 'No Tags Found'

In [None]:
try:
    df_annotate = pd.read_csv('cleaned_data.csv')
    print(f"Applying annotation rules to {len(df_annotate)} cleaned records")

    df_annotate['experience_level'] = df_annotate['cleaned_description'].apply(get_experience_level)
    df_annotate['job_type'] = df_annotate['cleaned_description'].apply(get_job_type)
    df_annotate['skill_tags'] = df_annotate['cleaned_description'].apply(extract_skill_tags)

    final_columns = ['job_title', 'company_name', 'experience_level', 'job_type', 'skill_tags', 'cleaned_description']
    annotated_df = df_annotate[final_columns].head(20)
    print("Created a sample of 20 annotated records")

    #Saving to CSV format
    annotated_df.to_csv('annotated_data.csv', index=False)
    print("Successfully saved annotated sample to annotated_data.csv")

    #Saving to JSON format
    annotated_df.to_json('annotated_data.json', orient='records', indent=4)
    print("Successfully saved annotated sample to annotated_data.json")

except FileNotFoundError:
    print("Error: cleaned_data.csv not found.")

In [None]:
if 'annotated_df' in locals():
    print("Annotaed data:")
    display(annotated_df)