## Job Scraping

In [8]:
import csv
from jobspy import scrape_jobs

df = scrape_jobs(
    site_name=["indeed"],
    search_term='"data engineer"',
    google_search_term="",
    location="France",
    results_wanted=300,
    #hours_old=168, #max week
    country_indeed='France',
)
print(f"Found {len(df)} jobs")

2025-01-28 17:47:20,314 - INFO - JobSpy:Indeed - search page: 1 / 3
2025-01-28 17:47:22,536 - INFO - JobSpy:Indeed - search page: 2 / 3
2025-01-28 17:47:24,228 - INFO - JobSpy:Indeed - search page: 3 / 3
2025-01-28 17:47:25,885 - INFO - JobSpy:Indeed - finished scraping


Found 300 jobs


## Data Transformation 

In [9]:
import pandas as pd
import re
import numpy as np

In [10]:
columns_to_keep = ['id', 'job_url','title', 'company',
       'location', 'date_posted', 'job_type', 'min_amount', 'max_amount','is_remote', 'job_level','description',
       'company_industry','company_num_employees']
df = df[columns_to_keep]

In [11]:
skills_keywords = [
    # Front-End (optional but useful for full-stack roles)
    "Vue", "React", "Angular", "HTML", "CSS", "JavaScript",

    # APIs
    "GraphQL", "Rest", "gRPC",

    # Back-End Frameworks
    "Django", "Flask", "FastAPI",

    # Programming Languages
    "Python", "Java", "Scala", "Go", "Rust",

    # Query and Scripting Languages
    "SQL", "NoSQL", "Bash", "Shell scripting",

    # Containerization and Orchestration
    "Docker", "Kubernetes", "Podman",

    # Cloud Platforms
    "AWS", "Azure", "Google Cloud Platform (GCP)", "IBM Cloud",

    # Infrastructure and Automation
    "CI/CD", "Terraform", "Ansible", "Jenkins", "GitHub Actions",

    # Databases
    "PostgreSQL", "MySQL", "MongoDB", "Cassandra", "Oracle DB", "Snowflake", "Redshift", "BigQuery", 

    # ETL and Workflow Orchestration
    "Airflow", "Luigi", "Prefect", "DBT (Data Build Tool)",

    # Messaging Systems
    "Kafka", "RabbitMQ", "Amazon SQS", "Google Pub/Sub",

    # Business Intelligence Tools
    "Tableau", "PowerBI", "Looker", "Metabase", 

    # Big Data Frameworks
    "Spark", "Hadoop", "Hive", "HBase", "Flink", "Storm",

    # Machine Learning Frameworks (optional for ML-related roles)
    "Tensorflow", "Keras", "PyTorch", "Scikit-learn", 

    # Data Manipulation and Analysis
    "Numpy", "Pandas", "Dask", "PyArrow", "Koalas",

    # Real-Time Data Processing
    "Flink", "Storm", "Beam",

    # DevOps Tools
    "Prometheus", "Grafana", "Datadog", "ELK Stack (Elasticsearch, Logstash, Kibana)",

    # Data Storage and File Formats
    "Parquet", "Avro", "ORC", "JSON", "XML", "CSV",

    # Version Control
    "GitLab", "GitHub", "Bitbucket",

    # Security and Authentication
    "OAuth", "SAML", "JWT", "IAM (Identity and Access Management)",
]

In [12]:
def extract_skills(description):
    description_cleaned = re.sub(r"[^\w\s]", " ", description)
    description_cleaned = description_cleaned.lower()
    matched_skills = [skill for skill in skills_keywords if skill.lower() in description_cleaned]
    return ", ".join(matched_skills)

def add_skills_column(df):
    if "description" not in df.columns:        raise ValueError("The DataFrame must contain a 'description' column.")
    df["skills"] = df["description"].apply(extract_skills)
    df = df.drop(columns=["description"])
    return df

def additional_transformations(df):
    df = df.where(pd.notnull(df), None)
    df = df.dropna(axis=1, how="all")
    df = df.replace({np.nan: None})
    return df


In [13]:
df = add_skills_column(df)

In [14]:
df = additional_transformations(df)

In [15]:
df.loc[:10]

Unnamed: 0,id,job_url,title,company,location,date_posted,job_type,min_amount,max_amount,is_remote,company_industry,company_num_employees,skills
0,in-162892ab33810bce,https://fr.indeed.com/viewjob?jk=162892ab33810bce,Data scientist (H/F),Euro Information,"Nantes, B5, FR",2025-01-28,,,,False,,"1,001 to 5,000","Rest, Python, Go, ORC"
1,in-81d483184fa8551d,https://fr.indeed.com/viewjob?jk=81d483184fa8551d,DevSecOps Expérimenté F/H,Askills,"Lyon, ARA, FR",2025-01-28,fulltime,,,False,,11 to 50,"Rest, AWS, Terraform, Prometheus, Grafana, GitLab"
2,in-13dea2e7321cdb7d,https://fr.indeed.com/viewjob?jk=13dea2e7321cdb7d,Data Engineer H/F,DECIDEOM,"Nantes, B5, FR",2025-01-28,fulltime,,,True,,51 to 200,"Rest, SQL"
3,in-7c9c9533ffc75852,https://fr.indeed.com/viewjob?jk=7c9c9533ffc75852,Data scientist (H/F),Crédit Mutuel Alliance Fédérale,"Nantes, B5, FR",2025-01-28,,,,False,,"10,000+","Rest, Python, Go, ORC"
4,in-4c5da9201beede67,https://fr.indeed.com/viewjob?jk=4c5da9201beede67,Data Engineer Expérimenté (H/F),NEXTON,"Paris, A8, FR",2025-01-28,fulltime,,,False,,201 to 500,"Rest, Python, Scala, BigQuery, Airflow"
5,in-f8d7b24b49b15278,https://fr.indeed.com/viewjob?jk=f8d7b24b49b15278,Data Engineer H/F,NINDO GROUP,"Paris, A8, FR",2025-01-28,fulltime,,,False,,,"Python, Java, Scala, SQL, NoSQL, Kafka, Tablea..."
6,in-e29e4a179925353c,https://fr.indeed.com/viewjob?jk=e29e4a179925353c,Data Engineer Manager (H/F),NEXTON,"Paris, A8, FR",2025-01-28,fulltime,,,False,,201 to 500,"Rest, Python, Scala, Go, BigQuery, Airflow, Nu..."
7,in-dd39b29aa8ad1b16,https://fr.indeed.com/viewjob?jk=dd39b29aa8ad1b16,Tech Talent Acquisition Confirmé H/F,PADOA,"Paris, A8, FR",2025-01-27,,,,False,,11 to 50,"Go, ORC"
8,in-dc7420eadbacb83f,https://fr.indeed.com/viewjob?jk=dc7420eadbacb83f,STAGE – Data Engineer -F/H,Thales,"Toulouse, O, FR",2025-01-27,fulltime,,,False,,"10,000+","Python, Azure"
9,in-a5983107170ad5c1,https://fr.indeed.com/viewjob?jk=a5983107170ad5c1,Data engineer (H/F),Anderlaine,"La Motte-Servolex, ARA, FR",2025-01-27,,,,False,,"501 to 1,000","Rest, Python, Go, SQL, NoSQL, Azure"


## Send Job Messages to Kafka

In [16]:
from kafka import KafkaProducer
import json
import datetime
import pandas as pd

In [17]:
# Custom serializer to handle dates
def custom_serializer(obj):
    if isinstance(obj, (datetime.date, datetime.datetime)):
        return obj.isoformat()  # Converts date to 'YYYY-MM-DD' or 'YYYY-MM-DDTHH:MM:SS' format
    raise TypeError(f"Type {type(obj)} not serializable")
    

# Kafka Producer Configuration
producer = KafkaProducer(
    bootstrap_servers='kafka:9092',
    value_serializer=lambda x: json.dumps(x, default=custom_serializer).encode('utf-8')
)

# Send Job Data to Kafka
for _, job in df.iterrows():
    job_dict = job.to_dict()
    producer.send('data_engineer_jobs', value=job_dict) 
    #print('Sent : ', job['id'])

producer.flush()
producer.close()

print("Messages sent to Kafka topic 'data_engineer_jobs'")

Messages sent to Kafka topic 'data_engineer_jobs'


## Create Jobs Table in PostgreSQL

In [12]:
import psycopg2

# Connect to PostgreSQL
conn = psycopg2.connect(
    host='postgres',          
    user='root',               
    password='root',          
    database='data_engineer_jobs',  
    port=5432                  
)
cur = conn.cursor()

drop_table_query = """
DROP TABLE IF EXISTS jobs;
"""
cur.execute(drop_table_query)

create_table_query = """
CREATE TABLE jobs (
    id VARCHAR(50) PRIMARY KEY,          -- Job ID, unique identifier
    job_url VARCHAR(255),       -- URL of the job posting
    title VARCHAR(255),         -- Job title
    company VARCHAR(255),       -- Company name
    location VARCHAR(255),      -- Job location
    date_posted DATE,                    -- The date the job was posted
    job_type VARCHAR(50),                -- Job type (e.g., fulltime, part-time)
    min_amount DECIMAL(10, 2),           -- minimum salary amount
    max_amount DECIMAL(10, 2),           -- maximum salary amount
    is_remote BOOLEAN,                   -- Whether the job is remote
    company_industry VARCHAR(255),       -- Industry of the company
    company_num_employees VARCHAR(50),   -- Number of employees in the company
    skills VARCHAR(500)                  -- Skills required for the job
);
"""
cur.execute(create_table_query)

conn.commit()
cur.close()
conn.close()

print("Table 'jobs' deleted and recreated successfully!")


Table 'jobs' deleted and recreated successfully!


In [2]:
# Test Table is created 
conn = psycopg2.connect(
    host='postgres',          
    user='root',             
    password='root',         
    database='data_engineer_jobs',  
    port=5432                 
)
cur = conn.cursor()

select_query = """
SELECT * FROM jobs;
"""
cur.execute(select_query)
rows = cur.fetchall()

column_names = [desc[0] for desc in cur.description]
print("Column Names:", column_names)
print("\nTable Content:")
print(len(rows))

# Close the cursor and connection
cur.close()
conn.close()


Column Names: ['id', 'job_url', 'title', 'company', 'location', 'date_posted', 'job_type', 'min_amount', 'max_amount', 'is_remote', 'company_industry', 'company_num_employees', 'skills']

Table Content:
347


## Kafka Consumers and Data Storage

In [18]:
from kafka import KafkaConsumer
import psycopg2
import json


# Kafka Consumer Configuration
consumer = KafkaConsumer(
    'data_engineer_jobs', 
    bootstrap_servers='kafka:9092',   
    auto_offset_reset='earliest', 
    enable_auto_commit=True, 
    group_id='data_jobs_consumer_group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')) 
)

conn = psycopg2.connect(
    host='postgres',      
    user='root',          
    password='root',     
    database='data_engineer_jobs',
    port=5432                   #
)
cur = conn.cursor()

def insert_job_to_db(job):
    insert_query = """
    INSERT INTO jobs (id, job_url, title, company, location, date_posted, job_type, min_amount, max_amount, is_remote, 
                      company_industry, company_num_employees, skills)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ON CONFLICT (id) DO NOTHING;  -- Avoid duplicate entries
    """
    cur.execute(insert_query, (
        job['id'], 
        job['job_url'], 
        job['title'], 
        job['company'], 
        job['location'], 
        job['date_posted'], 
        job['job_type'], 
        job['min_amount'], 
        job['max_amount'], 
        job['is_remote'], 
        job.get('company_industry'), 
        job.get('company_num_employees'), 
        job['skills']
    ))
    conn.commit()

# Kafka Consumer to process messages
print("Kafka Consumer is listening for messages...")
for message in consumer:
    job_data = message.value  
    try:
        insert_job_to_db(job_data)
        print(f"Inserted job with ID: {job_data['id']} into the database.")
    except Exception as e:
        print(f"Failed to insert job with ID: {job_data['id']} into the database. Error: {e}")

# Close PostgreSQL connection
cur.close()
conn.close()
print("Consumer stopped and PostgreSQL connection closed.")


Kafka Consumer is listening for messages...
Inserted job with ID: in-162892ab33810bce into the database.
Inserted job with ID: in-81d483184fa8551d into the database.
Inserted job with ID: in-13dea2e7321cdb7d into the database.
Inserted job with ID: in-7c9c9533ffc75852 into the database.
Inserted job with ID: in-4c5da9201beede67 into the database.
Inserted job with ID: in-f8d7b24b49b15278 into the database.
Inserted job with ID: in-e29e4a179925353c into the database.
Inserted job with ID: in-dd39b29aa8ad1b16 into the database.
Inserted job with ID: in-dc7420eadbacb83f into the database.
Inserted job with ID: in-a5983107170ad5c1 into the database.
Inserted job with ID: in-cf9cc6b2fd0278a3 into the database.
Inserted job with ID: in-9910f73de1094272 into the database.
Inserted job with ID: in-5ba7647e0ebb2c7d into the database.
Inserted job with ID: in-53609b164c735348 into the database.
Inserted job with ID: in-e2f1f125f1c06936 into the database.
Inserted job with ID: in-e04068388f504ddb

KeyboardInterrupt: 