# Remote Work Tracker BI Project: ETL, DB, and Utility Scripts

This Jupyter Notebook provides a comprehensive overview and demonstration of the Extract, Transform, Load (ETL) process, database interaction, and utility functions developed for the Remote Work Tracker Business Intelligence (BI) project. These scripts are designed to process raw job data (e.g., from the Remotive.com API) and store it in a structured SQLite database, making it ready for further analysis and visualization in tools like Power BI.

## Project Components

The core components covered in this notebook are:
1.  **`db_schema_and_etl_design.md`**: Documentation outlining the database schema and ETL process.
2.  **`db_connector.py`**: Python script for connecting to and interacting with the SQLite database.
3.  **`etl_script.py`**: Python script implementing the Extract, Transform, Load logic.
4.  **`utils.py`**: Python script containing general utility functions, such as logging.

## 1. Database Schema Design

The foundation of our BI project is a well-defined database schema. We are using a simple SQLite database with a single table, `remote_jobs`, to store the processed job data. The schema is designed to capture all relevant information from the job postings.

### `remote_jobs` Table Structure


| Column Name                   | Data Type   | Constraints       | Description                                      |
| :---------------------------- | :---------- | :---------------- | :----------------------------------------------- |
| `id`                          | INTEGER     | PRIMARY KEY       | Unique identifier for each job posting (from API)|
| `job_title`                   | TEXT        | NOT NULL          | Title of the job                                 |
| `company_name`                | TEXT        | NOT NULL          | Name of the hiring company                       |
| `company_name`                | TEXT        | NOT NULL          | Name of the hiring company                       |
| `publication_date`            | TEXT        | NOT NULL          | Date and time the job was published (ISO format) |
| `job_type`                    | TEXT        |                   | Type of employment (e.g., full_time, contract)   |
| `category`                    | TEXT        |                   | Job category (e.g., Software Development)        |
| `candidate_required_location` | TEXT        |                   | Geographical restrictions for candidates         |
| `salary_range`                | TEXT        |                   | Stated salary range, if available                |
| `job_description`             | TEXT        |                   | Full description of the job                      |
| `source_url`                  | TEXT        | UNIQUE, NOT NULL  | URL to the original job posting                  |
| `company_logo`                | TEXT        |                   | URL to the company logo                          |
| `job_board`                   | TEXT        | NOT NULL          | Source job board (e.g., Remotive.com)            |
| `ingestion_timestamp`         | TIMESTAMP   | DEFAULT CURRENT_TIMESTAMP | Timestamp when the record was ingested   |

*Note: The `db_schema_and_etl_design.md` file contains a more detailed explanation of the schema and ETL process.*

## 2. Database Connector (`db_connector.py`)

The `db_connector.py` script provides a `DBConnector` class to manage interactions with the SQLite database. It handles connecting, disconnecting, creating the `remote_jobs` table, and inserting job data from a Pandas DataFrame.

### Key Features:

- **Connection Management**: `connect()` and `disconnect()` methods for robust database handling.
- **Schema Initialization**: `create_table()` ensures the `remote_jobs` table exists with the defined schema.
- **Data Insertion**: `insert_jobs(df)` efficiently inserts DataFrame records, using `INSERT OR IGNORE` to prevent duplicate entries based on the `id` column (which is the primary key from the API). This is crucial for handling daily scrapes without re-inserting old data.
- **Data Retrieval**: `fetch_all_jobs()` allows for easy retrieval of all stored job data into a Pandas DataFrame for analysis.

### Code (`db_connector.py`):

In [6]:
import sqlite3
import pandas as pd
from datetime import datetime

class DBConnector:
    def __init__(self, db_name="remote_jobs.db"):
        self.db_name = db_name
        self.conn = None
        self.cursor = None

    def connect(self):
        """Establishes a connection to the SQLite database."""
        try:
            self.conn = sqlite3.connect(self.db_name)
            self.cursor = self.conn.cursor()
            print(f"Connected to database: {self.db_name}")
        except sqlite3.Error as e:
            print(f"Error connecting to database: {e}")

    def disconnect(self):
        """Closes the database connection."""
        if self.conn:
            self.conn.close()
            print("Disconnected from database.")

    def create_table(self):
        """Creates the remote_jobs table if it doesn't exist."""
        if not self.conn:
            self.connect()
        
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS remote_jobs (
            id INTEGER PRIMARY KEY,
            job_title TEXT NOT NULL,
            company_name TEXT NOT NULL,
            publication_date TEXT NOT NULL,
            job_type TEXT,
            category TEXT,
            candidate_required_location TEXT,
            salary_range TEXT,
            job_description TEXT,
            source_url TEXT UNIQUE NOT NULL,
            company_logo TEXT,
            job_board TEXT NOT NULL,
            ingestion_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """
        try:
            self.cursor.execute(create_table_sql)
            self.conn.commit()
            print("Table 'remote_jobs' ensured to exist.")
        except sqlite3.Error as e:
            print(f"Error creating table: {e}")

    def insert_jobs(self, df: pd.DataFrame):
        """Inserts job data from a Pandas DataFrame into the remote_jobs table.
           Handles duplicates by ignoring entries with existing source_url.
        """
        if df.empty:
            print("No data to insert.")
            return

        if not self.conn:
            self.connect()
            self.create_table() # Ensure table exists before inserting

        # Prepare data for insertion
        # Convert DataFrame rows to a list of tuples, ensuring order matches SQL INSERT statement
        # And handle potential None values for columns that can be null
        data_to_insert = []
        for index, row in df.iterrows():
            data_to_insert.append((
                row.get("id"),
                row.get("job_title"),
                row.get("company_name"),
                row.get("publication_date"),
                row.get("job_type"),
                row.get("category"),
                row.get("candidate_required_location"),
                row.get("salary_range"),
                row.get("job_description"),
                row.get("source_url"),
                row.get("company_logo"),
                row.get("job_board"),
                row.get("ingestion_timestamp", datetime.now().isoformat()) # Default if not set by ETL
            ))

        insert_sql = """
        INSERT OR IGNORE INTO remote_jobs (
            id, job_title, company_name, publication_date, job_type, category,
            candidate_required_location, salary_range, job_description, source_url,
            company_logo, job_board, ingestion_timestamp
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
        """
        try:
            self.cursor.executemany(insert_sql, data_to_insert)
            self.conn.commit()
            print(f"Successfully inserted/ignored {len(data_to_insert)} records into 'remote_jobs'.")
        except sqlite3.Error as e:
            print(f"Error inserting data: {e}")

    def fetch_all_jobs(self):
        """Fetches all job records from the database."""
        if not self.conn:
            self.connect()

        try:
            self.cursor.execute("SELECT * FROM remote_jobs;")
            columns = [description[0] for description in self.cursor.description]
            rows = self.cursor.fetchall()
            df = pd.DataFrame(rows, columns=columns)
            return df
        except sqlite3.Error as e:
            print(f"Error fetching data: {e}")
            return pd.DataFrame()

if __name__ == "__main__":
    db = DBConnector()
    db.connect()
    db.create_table()

    # Example of inserting dummy data
    dummy_data = pd.DataFrame([
        {
            "id": 1,
            "job_title": "Software Engineer",
            "company_name": "Tech Corp",
            "publication_date": "2025-10-15T10:00:00",
            "job_type": "full_time",
            "category": "Software Development",
            "candidate_required_location": "Worldwide",
            "salary_range": "$80,000 - $120,000",
            "job_description": "Develop and maintain software.",
            "source_url": "http://example.com/job1",
            "company_logo": "http://example.com/logo1.png",
            "job_board": "ExampleJobs",
            "ingestion_timestamp": datetime.now().isoformat()
        },
        {
            "id": 2,
            "job_title": "Data Analyst",
            "company_name": "Data Insights Inc.",
            "publication_date": "2025-10-14T11:30:00",
            "job_type": "contract",
            "category": "Data Analysis",
            "candidate_required_location": "Remote US",
            "salary_range": "$50/hr - $70/hr",
            "job_description": "Analyze large datasets.",
            "source_url": "http://example.com/job2",
            "company_logo": "http://example.com/logo2.png",
            "job_board": "ExampleJobs",
            "ingestion_timestamp": datetime.now().isoformat()
        }
    ])
    db.insert_jobs(dummy_data)

    # Fetch and display data
    all_jobs = db.fetch_all_jobs()
    print("\nAll jobs in DB:")
    print(all_jobs.head())

    db.disconnect()

       

Connected to database: remote_jobs.db
Table 'remote_jobs' ensured to exist.
Successfully inserted/ignored 2 records into 'remote_jobs'.

All jobs in DB:
   id          job_title        company_name     publication_date   job_type  \
0   1  Software Engineer           Tech Corp  2025-10-15T10:00:00  full_time   
1   2       Data Analyst  Data Insights Inc.  2025-10-14T11:30:00   contract   

               category candidate_required_location        salary_range  \
0  Software Development                   Worldwide  $80,000 - $120,000   
1         Data Analysis                   Remote US     $50/hr - $70/hr   

                  job_description               source_url  \
0  Develop and maintain software.  http://example.com/job1   
1         Analyze large datasets.  http://example.com/job2   

                   company_logo    job_board         ingestion_timestamp  
0  http://example.com/logo1.png  ExampleJobs  2025-10-18T19:41:30.210673  
1  http://example.com/logo2.png  ExampleJob

## 3. ETL Script (`etl_script.py`)

The `etl_script.py` orchestrates the data pipeline, performing the Extract, Transform, and Load operations. It reads raw data, cleans and standardizes it, and then uses the `DBConnector` to persist it in the database.

### Key Functions:

- **`extract_data(file_path)`**: Reads the raw job data from a specified CSV file into a Pandas DataFrame. This acts as the "Extract" stage.
- **`transform_data(df)`**: Cleans and transforms the DataFrame. This includes renaming columns to match the database schema, handling missing values (replacing `NaN` with `None`), converting `publication_date` to a consistent ISO format, and adding an `ingestion_timestamp`. This is the "Transform" stage.
- **`load_data(df, db_connector)`**: Takes the transformed DataFrame and an instance of `DBConnector` to insert the data into the `remote_jobs` table. This is the "Load" stage.

### Code (`etl_script.py`):

In [7]:
import pandas as pd
import numpy as np
from datetime import datetime

def extract_data(file_path: str) -> pd.DataFrame:
    """Extracts raw job data from a CSV file into a Pandas DataFrame."""
    try:
     df = pd.read_csv(r'd:\courses\Data Science\Projects\Python\remote-work-tracker\data\raw\remotive_jobs_extended.csv')
     print(f"Successfully extracted {len(df)} records from {file_path}")
     return df
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return pd.DataFrame()
    except Exception as e:
        print(f"Error extracting data: {e}")
        return pd.DataFrame()              

In [8]:
def transform_data(df):
    """Transforms and cleans the raw job data DataFrame."""
    if df.empty:
        print("No data to transform.")
        return df

    # Rename columns to match database schema
    df = df.rename(columns={
        "Job ID": "id",
        "Job Title": "job_title",
        "Company Name": "company_name",
        "Publication Date": "publication_date",
        "Job Type": "job_type",
        "Category": "category",
        "Candidate Required Location": "candidate_required_location",
        "Salary Range": "salary_range",
        "Job Description": "job_description",
        "Source URL": "source_url",
        "Company Logo": "company_logo",
        "Job Board": "job_board"
    })

    # Handle missing values
    df = df.replace({np.nan: None})

    # Convert publication_date to ISO format string
    def parse_date(date_str):
        if pd.isna(date_str) or date_str is None:
            return None
        try:
             dt_obj = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
             return dt_obj.isoformat()
        except ValueError:
                return None
    df['publication_date'] = df['publication_date'].apply(parse_date)

    # Add ingestion timestamp
    df['ingestion_timestamp'] = datetime.now().isoformat()

    # Ensure all required columns are present, fill with None if missing
    required_cols = [
        "id", "job_title", "company_name", "publication_date", "job_type",
        "category", "candidate_required_location", "salary_range",
        "job_description", "source_url", "company_logo", "job_board",
        "ingestion_timestamp"
    ]
    for col in required_cols:
        if col not in df.columns:
            df[col] = None
    # Select and reorder columns to match the database schema
    df = df[required_cols]

    print("Data transformation complete.")
    return df

In [9]:
def load_data(df, db_connector):
    """Loads the transformed DataFrame into the database using DBConnector."""
    
    if df.empty:
        print("No data to load.")
        return

    print(f"Loading {len(df)} records into the database...")
    db_connector.insert_jobs(df)
    print("Data loading complete.")

if __name__ == "__main__":
    print("Running ETL script in standalone mode.")
    csv_file = "remotive_jobs_extended.csv"
    extracted_df = extract_data(csv_file)
    transformed_df = transform_data(extracted_df)
    if not transformed_df.empty:
        db = DBConnector()
        db.connect()
        db.create_table()
        load_data(transformed_df, db)
        db.disconnect()
    else:
        print("ETL process completed with no data to load.")    

Running ETL script in standalone mode.
Successfully extracted 1559 records from remotive_jobs_extended.csv
Data transformation complete.
Connected to database: remote_jobs.db
Table 'remote_jobs' ensured to exist.
Loading 1559 records into the database...
Successfully inserted/ignored 1559 records into 'remote_jobs'.
Data loading complete.
Disconnected from database.


## 4. Utility Script (`utils.py`)

The `utils.py` script contains general-purpose functions that can be reused across different parts of the project. For this BI project, it includes a logging setup and a function to get the current timestamp.

### Key Functions:

- **`setup_logging(log_file, level)`**: Configures Python's `logging` module to output messages to both a file and the console. This is essential for monitoring script execution, debugging, and tracking data pipeline events.
- **`get_current_timestamp()`**: Returns the current UTC timestamp in ISO format, useful for consistent time-stamping of data or log entries.

### Code (`utils.py`):

In [10]:
import logging
from datetime import datetime

def setup_logging(log_file='app.log', level=logging.INFO):
    """Sets up logging configuration."""
    logging.basicConfig(
        level=level,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    logging.info("Logging is set up.")

def get_current_timestamp():
    """Returns the current UTC timestamp in ISO format."""
    return datetime.utcnow().isoformat()    

if __name__ ==  "__main__":
    setup_logging()
    logging.info("This is a test log message.")
    logging.debug("This is a debug message.")
    logging.warning("This is a warning message.")
    logging.error("This is an error message.")
    print(f"Current UTC Timestamp: {get_current_timestamp()}")

2025-10-18 19:41:31,180 - INFO - Logging is set up.
2025-10-18 19:41:31,181 - INFO - This is a test log message.
2025-10-18 19:41:31,182 - ERROR - This is an error message.


Current UTC Timestamp: 2025-10-18T17:41:31.184071


  return datetime.utcnow().isoformat()


## 5. End-to-End Demonstration

This section demonstrates how all the components work together. We will:
1.  Initialize the `DBConnector` and ensure the table is created.
2.  Run the ETL process using the `remotive_jobs_extended.csv` file.
3.  Fetch and display a sample of the data directly from the database.

> Note: Ensure `remotive_jobs_extended.csv` is present in the same directory as this notebook, or update the `csv_file` path accordingly  

In [None]:
# 1. Setup Logging
from utils import setup_logging
setup_logging(log_file='etl_remotive.log', level=logging.INFO)
logging.info("Starting end-to-end ETL process demonstration.")

# 2. Initialize DB Connector and Create Table
from db_connector import DBConnector
db = DBConnector()
db.connect()
db.create_table()

# 3. Run ETL Process
from etl_script import extract_data, transform_data, load_data
csv_file = "remotive_jobs_extended.csv"
extracted_df = extract_data(csv_file)
transformed_df = transform_data(extracted_df)

if not transformed_df.empty:
    load_data(transformed_df, db)
    logging.info("ETL process completed and data loaded into database.")
else:
        logging.warning("ETL process completed with no data to load.")

# 4. Fetch and Display Data from DB
print("Fetching data from database...")
all_jobs_df = db.fetch_all_jobs()
if not all_jobs_from_db.empty:
    display(Markdown(f"### Sample of Data from Database ({len(all_jobs_from_db)} records))"))
    display(all_jobs_from_db.head())
else:
    display(Markdown("### No Data Found in Database"))

# 5. Disconnect from DB
db.disconnect()
logging.info("End-to-end ETL process demonstration finished")