In [None]:
import os 

In [None]:
os.getcwd()

In [None]:
os.chdir("..")

In [None]:
os.getcwd()

# config

### raw/config.yaml

# constants

### schema

In [None]:
# update __all__ 

from pydantic import BaseModel, Field 


class Constants:
    ...

class DataIngestionConstants(BaseModel):
    ROOT_DIR_NAME: str = Field(frozen=True) 
    REPORTS_DIR_NAME: str = Field(frozen=True)
    DATA_ROOT_DIR_NAME: str = Field(frozen=True) 
    INGESTION_ROOT_DIR_NAME: str = Field(frozen=True) 
    RAW_DATA_DIR_NAME: str = Field(frozen=True)
    SCHEMA_DATA_DIR_NAME: str = Field(frozen=True)

class DataTransformationConstants(BaseModel):
    ROOT_DIR_NAME: str = Field(frozen=True) 
    DATA_ROOT_DIR_NAME: str = Field(frozen=True) 
    TRANSFORMATION_ROOT_DIR_NAME: str = Field(frozen=True) 
    PARSED_DATA_DIR_NAME: str = Field(frozen=True)
    STRUCTURED_DATA_DIR_NAME: str = Field(frozen=True)
    TRAIN_DATA_DIR_NAME: str = Field(frozen=True)


__all__ = ["DataIngestionConstants", "DataTransformationConstants", "Constants", ]

### values

In [None]:
# 3 things needed to be updated at a time [avl_cons, process, Note: Available name] 
# Note is inside docstring of function 'load' from this file and 'load_constants' inside __int__.py 
# update __all__ 

# from src.ats.constants.schema import *
from src.ats.exception import CustomException 
from typing import List, Tuple, Dict
from box import ConfigBox
import sys 


def __ing__(CONFIG:ConfigBox) -> Constants:
    return DataIngestionConstants(
        ROOT_DIR_NAME = CONFIG.ROOT_DIR, 
        REPORTS_DIR_NAME = CONFIG.REPORTS.ROOT_DIR,
        DATA_ROOT_DIR_NAME = CONFIG.DATA.ROOT_DIR, 
        INGESTION_ROOT_DIR_NAME = CONFIG.DATA.INGESTION.ROOT_DIR, 
        RAW_DATA_DIR_NAME = CONFIG.DATA.INGESTION.RAW_DATA_DIR,
        SCHEMA_DATA_DIR_NAME = CONFIG.DATA.INGESTION.SCHEMA_DATA_DIR
    )

def __transform__(CONFIG:ConfigBox) -> Constants:
    return DataTransformationConstants(
        ROOT_DIR_NAME = CONFIG.ROOT_DIR , 
        DATA_ROOT_DIR_NAME = CONFIG.DATA.ROOT_DIR , 
        TRANSFORMATION_ROOT_DIR_NAME = CONFIG.DATA.TRANSFORMATION.ROOT_DIR , 
        PARSED_DATA_DIR_NAME = CONFIG.DATA.TRANSFORMATION.PARSED_DATA_DIR ,
        STRUCTURED_DATA_DIR_NAME = CONFIG.DATA.TRANSFORMATION.STRUCTURED_DATA_DIR ,
        TRAIN_DATA_DIR_NAME = CONFIG.STRUCTURED_TRAINING.ROOT_DIR,
    )

avl_cons = [
    "dataingestion", 
    "datatransformation", 
]
process = {
    "dataingestion":__ing__,
    "datatransformation":__transform__
} 

def load(config:ConfigBox, name: str | List[str] | Tuple[str]) -> Dict: 
    """loads respective constants for the given name

    Args:
        config (ConfigBox): configuration for the object
        name (str | List[str] | Tuple[str]): name of required object  

        Note: Available names --> DataIngestion, DataTransformation,  

    Raises:
        CustomException: Error shows with file name, line no and error message

    Returns:
        Dict: key = name of object used to load given in variable \'name\', 
        
              value = Object of the name used to load,

              example:
              output = load(config, "DataIngestion")
              output = { "DataIngestion" : DataIngestionConstants } 
              data_ingestion_constants = output["DataIngestion"] 
    """
    reqs:List[str] = []
    try:
        # validate type   
        if isinstance(name, str):
            reqs.append(name) 
        elif isinstance(name, List) or isinstance(name, Tuple):
            reqs += name 
        else:
            ValueError(f"Unsupported type {{{type(name)}}} for variable {{name}}") 

        # validate values 
        for req in reqs:
            req = req.strip().lower()
            if req not in avl_cons:
                ValueError(f"Unknown value provided in variable \'name\', {req}, name can only have values from {avl_cons}") 

        # run respective functions and return the output 
        output = {}
        for req in reqs: 
            func = process[req] 
            output[req] = func(config)

        return output
    except Exception as e: 
        raise CustomException(e, sys) 
    

__all__ = ["load"]

### __init__

In [None]:
# update required on 'Note: Available name' inside docstring of fuction 'load_constants' 
# update __all__ if needed

# from src.ats.constants.values import *
from src.ats.utils import load_yaml
from typing import Dict 
import os 



CONFIG = load_yaml(os.path.join("src", "ats", "config", "raw", "config.yaml")) 

def load_constants(name: str | list[str] | tuple[str]) -> Dict:
    """loads respective constants for the given name

    Args:
        name (str | list[str] | tuple[str]): name of required object 

        Note: Available names --> DataIngestion, DataTransformation,

    Returns:
        Dict: key = name of object used to load given in variable \'name\', 

              value = Object of the name used to load,

              example:
              output = load_constants("DataIngestion")
              output = { "DataIngestion" : DataIngestionConstants } 
              data_ingestion_constants = output["DataIngestion"] 
    """
    return load(CONFIG, name)


__all__ = ["load_constants", ]

# entity

In [None]:
# update __all__ 

from pydantic import BaseModel
from pathlib import Path 


class DataIngestion(BaseModel):
    ROOT_DIR_PATH: Path
    REPORTS_DIR_PATH: Path
    DATA_ROOT_DIR_PATH: Path
    INGESTION_ROOT_DIR_PATH: Path
    RAW_DATA_DIR_PATH: Path
    SCHEMA_DATA_DIR_PATH: Path

class DataTransformation(BaseModel):
    ROOT_DIR_PATH: Path
    DATA_ROOT_DIR_PATH: Path
    TRANSFORMATION_ROOT_DIR_PATH: Path
    PARSED_DATA_DIR_PATH: Path
    STRUCTURED_DATA_DIR_PATH: Path
    TRAIN_DATA_DIR_PATH: Path



__all__ = ["DataIngestion", "DataTransformation", ]

# config

### builder/__init__.py

In [None]:
# from src.ats.constants import load_constants 
# update __all__ also inside __init__ 

# from src.ats.constants import * 
# from src.ats.entity import *
from pathlib import Path 
import os 


constants = load_constants(["DataIngestion", "DataTransformation"])

DataIngestionConfig = DataIngestion(
    ROOT_DIR_PATH = Path(
        constants["DataIngestion"].ROOT_DIR_NAME),
    REPORTS_DIR_PATH = Path(os.path.join(
        constants["DataIngestion"].ROOT_DIR_NAME,
        constants["DataIngestion"].REPORTS_DIR_NAME)),
    DATA_ROOT_DIR_PATH = Path(os.path.join(
        constants["DataIngestion"].ROOT_DIR_NAME, 
        constants["DataIngestion"].DATA_ROOT_DIR_NAME)),
    INGESTION_ROOT_DIR_PATH = Path(os.path.join(
        constants["DataIngestion"].ROOT_DIR_NAME, 
        constants["DataIngestion"].DATA_ROOT_DIR_NAME, 
        constants["DataIngestion"].INGESTION_ROOT_DIR_NAME)),
    RAW_DATA_DIR_PATH = Path(os.path.join(
        constants["DataIngestion"].ROOT_DIR_NAME, 
        constants["DataIngestion"].DATA_ROOT_DIR_NAME, 
        constants["DataIngestion"].INGESTION_ROOT_DIR_NAME, 
        constants["DataIngestion"].RAW_DATA_DIR_NAME)),
    SCHEMA_DATA_DIR_PATH = Path(os.path.join(
        constants["DataIngestion"].ROOT_DIR_NAME,
        constants["DataIngestion"].DATA_ROOT_DIR_NAME, 
        constants["DataIngestion"].INGESTION_ROOT_DIR_NAME, 
        constants["DataIngestion"].SCHEMA_DATA_DIR_NAME)),
)

DataTransformationConfig = DataTransformation(
    ROOT_DIR_PATH = Path(
        constants["DataTransformation"].ROOT_DIR_NAME),
    DATA_ROOT_DIR_PATH = Path(os.path.join(
        constants["DataTransformation"].ROOT_DIR_NAME,
        constants["DataTransformation"].DATA_ROOT_DIR_NAME)),
    TRANSFORMATION_ROOT_DIR_PATH = Path(os.path.join(
        constants["DataTransformation"].ROOT_DIR_NAME,
        constants["DataTransformation"].DATA_ROOT_DIR_NAME,
        constants["DataTransformation"].TRANSFORMATION_ROOT_DIR_NAME)),
    PARSED_DATA_DIR_PATH = Path(os.path.join(
        constants["DataTransformation"].ROOT_DIR_NAME,
        constants["DataTransformation"].DATA_ROOT_DIR_NAME,
        constants["DataTransformation"].TRANSFORMATION_ROOT_DIR_NAME,
        constants["DataTransformation"].PARSED_DATA_DIR_NAME)),
    STRUCTURED_DATA_DIR_PATH = Path(os.path.join(
        constants["DataTransformation"].ROOT_DIR_NAME,
        constants["DataTransformation"].DATA_ROOT_DIR_NAME,
        constants["DataTransformation"].TRANSFORMATION_ROOT_DIR_NAME,
        constants["DataTransformation"].STRUCTURED_DATA_DIR_NAME)),
    TRAIN_DATA_DIR_PATH = Path(os.path.join(
        constants["DataTransformation"].TRAIN_DATA_DIR_NAME))
)

__all__ = ["DataIngestionConfig", "DataTransformationConfig", ]

# components

### schema/job_description.py

In [None]:
from pydantic import BaseModel, Field
from typing import Optional 


class JobDescription(BaseModel):
    job_title: str = Field(description="The job title")
    company_name: str = Field(description="The company name")
    location: str = Field(description="Job location")
    job_type: str = Field(description="Employment type (full-time, part-time, etc.)")
    experience_level: str = Field(description="Required experience level")
    job_description: str = Field(description="Complete job description text")
    requirements: str = Field(description="Job requirements and qualifications")
    responsibilities: str = Field(description="Key responsibilities")
    salary_range: Optional[str] = Field(description="Salary information if available")
    posted_date: Optional[str] = Field(description="When the job was posted")


__all__ = ["JobDescription", ]

In [None]:
JobDescription.model_json_schema()

### schema/resume.py

In [None]:
from pydantic import BaseModel, Field, EmailStr
from typing import List, Optional
from datetime import date

class PersonalInfo(BaseModel):
    name: str = Field(..., min_length=1, max_length=100)
    email: EmailStr
    phone: str = Field(..., pattern=r'^[\+]?[1-9][\d]{0,15}$')
    location: str
    linkedin: Optional[str] = None

class ProfessionalSummary(BaseModel):
    headline: str
    summary: str
    total_experience_years: int = Field(..., ge=0, le=50)
    career_level: str = Field(..., pattern=r'^(entry|junior|mid|senior|executive)$')

class WorkExperience(BaseModel):
    title: str
    company: str
    start_date: date 
    end_date: date
    duration_months: int = Field(..., ge=0)
    responsibilities: List[str]
    achievements: List[str]
    technologies_used: List[str]

class Skills(BaseModel):
    technical: List[str]
    soft: List[str]
    certifications: List[str]

class Education(BaseModel):
    degree: str
    institution: str
    graduation_year: int = Field(..., ge=1950, le=2025)
    gpa: Optional[str] = None

class ResumeSchema(BaseModel):
    personal_info: PersonalInfo
    professional_summary: ProfessionalSummary
    work_experience: List[WorkExperience]
    skills: Skills
    education: List[Education]
    keywords: List[str]
    
    class Config:
        json_schema_extra = {
            "example": {
                "personal_info": {
                    "name": "John Doe",
                    "email": "john.doe@email.com",
                    "phone": "+1-555-0123",
                    "location": "New York, NY",
                    "linkedin": "linkedin.com/in/johndoe"
                }
            }
        }


__all__ = ["ResumeSchema", ]

In [None]:
ResumeSchema.model_json_schema()

# schema/__init__.py

In [None]:
# update __all__

# from .job_description import *
# from .resume import *

### parser/job_description.py

In [None]:
import os
from firecrawl import Firecrawl
from dotenv import load_dotenv

class JobDescriptionParser:
    def __init__(self, firecrawl_api_key:str = None) -> None:
        if not firecrawl_api_key:
            load_dotenv()
            firecrawl_api_key = os.getenv("FIRECRAWL_API_KEY")
        if not firecrawl_api_key:
            raise ValueError(f"argument \'firecrawl_api_key\' is having value \'{firecrawl_api_key}\'")
        self.firecrawl = Firecrawl(api_key=firecrawl_api_key)

    def extract_job_description(self, url:str):
        """
        Extract job description using Firecrawl's AI-powered extraction
        """
        try:
            # Method 1: Using structured extraction with schema
            result = self.firecrawl.scrape(
                url,
                formats=[{
                    "type": "json",
                    "schema": JobDescription.model_json_schema()
                }],
                only_main_content=True,
                timeout=30000
            )
            
            if result.get('success'):
                return result['data']['json']
            else:
                print(f"Firecrawl extraction failed: {result}")
                return None
                
        except Exception as e:
            print(f"Error with Firecrawl: {str(e)}")
            return None

    def extract_job_description_with_prompt(self, url:str):
        """
        Alternative method using natural language prompt
        """
        try:
            result = self.firecrawl.scrape(
                url,
                formats=[{
                    "type": "json",
                    "prompt": """Extract the following information from this job posting:
                    - Job title
                    - Company name
                    - Location
                    - Job type (full-time, part-time, etc.)
                    - Experience level required
                    - Complete job description
                    - Requirements and qualifications
                    - Key responsibilities
                    - Salary range (if mentioned)
                    - Posted date (if available)
                    
                    Return as structured JSON."""
                }],
                only_main_content=True
            )
            
            if result.get('success'):
                return result['data']['json']
            else:
                return None
                
        except Exception as e:
            print(f"Error: {str(e)}")
            return None
    
    def parse(self, url:str) -> JobDescription | Dict | None:
        job_data = self.extract_job_description(url)
        if not job_data:
            job_data = self.extract_job_description_with_prompt(url)
        return job_data
    

__all__ = ["JobDescriptionParser", ]

# parser/__init__.py

In [None]:
# update __all__

# from .pdf import * 
# from .docx import * 
# from .html import * 
# from .job_description import *

### data_transformation.py

In [None]:
from typing import Any
from src.ats import logging
from src.ats.components.parsers import *
from src.ats.utils import save_file, dump_json, create_dirs
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.language_models.chat_models import BaseChatModel
from dotenv import load_dotenv 
from datetime import datetime
import os 


class DataTransformationComponents:
    def __init__(self, data_ingestion_config: DataIngestion, data_transformation_config: DataTransformation, llm:BaseChatModel = None) -> None:
        self.data_ingestion_config = data_ingestion_config
        self.data_transformation_config = data_transformation_config
        self.llm = llm

    def __post_init__(self) -> None:
        load_dotenv()
        if not self.llm:
            self.llm = ChatGoogleGenerativeAI(model=os.getenv("LLM"))

    def __parse(self, info:Dict[str, Dict[str, Path | Any]]) -> Dict[str, str]: 
        """parse the data from file, supported extentions ---> [ .pdf, .docx, .html ]

        Args:
            info (Dict[str, Dict[str, Path | Any]]): 
                there should be a key named \'path\' containing path to the file in the sub dictionary

                key = name of file 

                value = dictionary 

                example:
                info = {
                    "xyz.pdf" : {
                        "path": "path/to/the/file",
                        ...
                    }, 
                    "abc.docx": {
                        "path": "path/to/the/file",
                        ...
                    }
                    ...
                } 
                output = __parse(info)

        Raises:
            ValueError: if provided file with uncompatable format in argument 

        Returns:
            Dict: 
            key = name of file 

            value = string object of parsed data  

            example:
            output = __parse(info)
            output = {
                "xyz.pdf" : "string_parsed_data_of_xyz.pdf", 
                "abc.docx": "string_parsed_data_of_abc.docx", 
                ...
            } 
            name = output.keys()[0]
            data = output[name] 
        """
        try:
            logging.info("In __parse")
            output = {}
            for file_name in info.keys():
                logging.info(f"parsing \'{file_name}\'")
                ext = os.path.splitext(file_name)[1].lower()
                # get parser 
                if ext == ".pdf":
                    parser = PDFParser()
                elif ext == ".docx":
                    parser = DOCXParser()
                elif ext == ".html":
                    parser = HTMLParser()
                else:
                    raise ValueError(f"Unsupported file type: {file_name}") 
                logging.info(f"using \'{parser.__class__.__name__}\'")
                path = info[file_name]["path"]
                logging.info(f"path of file for parsing \'{path}\'") 
                extracted_data = parser.parse(path)
                logging.info("parsing complete.")
                # save file to local 
                output_path = os.path.join(self.data_transformation_config.PARSED_DATA_DIR_PATH, file_name)
                save_file(extracted_data, output_path) 
                logging.info(f"parsing complete for file \'{file_name}\'")
                output[file_name] = extracted_data 
            logging.info("Out __parse") 
            return output 
        except Exception as e: 
            logging.error(e) 
            raise CustomException(e, sys) 
        
    def __extract_keyword(self, data:Dict[str, str]) -> Dict[str, ResumeSchema]: 
        """extract structed output from parsed string data

        Args:
            data (Dict[str, str]): dictionary with name of file as keys and parsed data as value of respective keys

        Returns:
            
            Dict[ str, ResumeSchema ]: dictionary with name of file as keys and structured data as value of respective keys
            
            example:
            output = __extract_keyword(data)
            output = {
                        "personal_info": {
                            "name":"",
                            "email":"",
                            "phone":"",
                            "location":"",
                            "linkedin":"",
                        }, 
                        "professional_summary": {
                            "headline":"", 
                            "summary":"", 
                            "total_experience_years":"", 
                            "career_level":"", 
                        }, 
                        "work_experience": [
                            {
                                "title":"",
                                "company":"",
                                "start_date":"",
                                "end_date":"",
                                "duration_months":"",
                                "responsibilities":"",
                                "achievements":"",
                                "technologies_used":"",
                            }
                        ], 
                        "skills": {
                            "technical": "",
                            "soft": "",
                            "certifications": "",
                        }, 
                        "education": [
                            {
                                "degree":"",
                                "institution":"",
                                "graduation_year":"",
                                "gpa":"",
                            }
                        ], 
                        "keywords": ["", "", ...], 
                    }
        """
        try:
            output = {}
            train_data = {
                "X":[], 
                "y":[]
            }
            for file_name in data.keys():
                parsed_data = data[file_name]
                structured_parsed_data = self.llm.with_structured_output(ResumeSchema).invoke(parsed_data)
                output[file_name] = structured_parsed_data
                # persist transformed data to disk
                path = os.path.join(self.data_transformation_config.STRUCTURED_DATA_DIR_PATH, file_name)
                dump_json(structured_parsed_data, path)
                # create training data
                train_data["X"].append(parsed_data)
                train_data["y"].append(structured_parsed_data)
            # persist structed data to disk
            train_data_path = os.path.join(self.data_transformation_config.TRAIN_DATA_DIR_PATH, f"{datetime.now().strftime("%d_%m_%Y_%H_%M_%S")}.json")
            dump_json(train_data, train_data_path)
            return output
        except Exception as e: 
            logging.error(e) 
            raise CustomException(e, sys)  
        
    def _main(self, info:Dict[str, Dict[str, Path | Any]]) -> Dict[str, ResumeSchema]:
        """runs the transformation pipeline

        Args:
            info (Dict[str, Dict[str, Path  |  Any]]): 
                there should be a key named \'path\' containing path to the file in the sub dictionary

                key = name of file 

                value = dictionary 

                example:
                info = {
                    "xyz.pdf" : {
                        "path": "path/to/the/file",
                        ...
                    }, 
                    "abc.docx": {
                        "path": "path/to/the/file",
                        ...
                    }
                    ...
                } 
                output = _main(info)

        Returns:

            Dict[ str, ResumeSchema ]: dictionary with name of file as keys and structured data as value of respective keys

            example:
            output = _main(data)
            output = {
                        "personal_info": {
                            "name":"",
                            "email":"",
                            "phone":"",
                            "location":"",
                            "linkedin":"",
                        }, 
                        "professional_summary": {
                            "headline":"", 
                            "summary":"", 
                            "total_experience_years":"", 
                            "career_level":"", 
                        }, 
                        "work_experience": [
                            {
                                "title":"",
                                "company":"",
                                "start_date":"",
                                "end_date":"",
                                "duration_months":"",
                                "responsibilities":"",
                                "achievements":"",
                                "technologies_used":"",
                            }
                        ], 
                        "skills": {
                            "technical": "",
                            "soft": "",
                            "certifications": "",
                        }, 
                        "education": [
                            {
                                "degree":"",
                                "institution":"",
                                "graduation_year":"",
                                "gpa":"",
                            }
                        ], 
                        "keywords": ["", "", ...], 
                    }
        """
        # create required dir's 
        create_dirs(self.data_transformation_config.ROOT_DIR_PATH)
        create_dirs(self.data_transformation_config.DATA_ROOT_DIR_PATH)
        create_dirs(self.data_transformation_config.TRANSFORMATION_ROOT_DIR_PATH)
        create_dirs(self.data_transformation_config.PARSED_DATA_DIR_PATH)
        create_dirs(self.data_transformation_config.STRUCTURED_DATA_DIR_PATH)
        create_dirs(self.data_transformation_config.TRAIN_DATA_DIR_PATH)
        # start steps
        data = self.__parse(info)
        return self.__extract_keyword(data)
    

__all__ = ["DataTransformationComponents", ]

### __init__.py

In [None]:
# update __all__ 

from src.ats.components.data_ingestion import * 
from src.ats.components.data_transformation import * 

# pipeline

### __init__.py

In [None]:
# update __all__

from src.ats.components import * 
# from src.ats.config.builder import * 
from dataclasses import dataclass 
from typing import List, Dict 
from fastapi import UploadFile 


@dataclass 
class DataIngestionPipeline: 
    """pipeline for process of data ingestion 
    """
    def _run(self, files: List[UploadFile]) -> Dict[str, str]: 
        """runs data ingestion pipeline and returns the output

        Args:
            files (List[UploadFile]): list object of fastapi.UploadFile / files that have been uploaded

        Returns:
            Dict: shown below

            example:
            schema = _main(info)
            schema = {
                "path": path/of/the/file/in/disk,
                "size": size of the file in disk,
                "binary_content_size": total number of binary digits inside file (len(origin_data)),
                "base64_content_size": total number of base64 digits after converting from bytes to base64 string (len(base64_data))
            } 
            file_path = schema["path"]
            file_size = schema["size"]
            original_content_size = schema["binary_content_size"]
            converted_content_size = schema["base64_content_size"]
        """
        components = DataIngestionComponents(DataIngestionConfig) 
        return components._main(files) 
    
@dataclass 
class DataTransformationPipeline: 
    """pipeline for process of data transformation 
    """
    def _run(self, schema:Dict[str, Dict[str, Path | Any]]) -> Dict[str, ResumeSchema]: 
        """runs data transformation pipeline and returns the output

        Args:
            Args:
            schema (Dict[str, Dict[str, Path  |  Any]]): 
                there should be a key named \'path\' containing path to the file in the sub dictionary

                key = name of file 

                value = dictionary 

                example:
                schema = {
                    "xyz.pdf" : {
                        "path": "path/to/the/file",
                        ...
                    }, 
                    "abc.docx": {
                        "path": "path/to/the/file",
                        ...
                    }
                    ...
                } 
                pipeline = DataTransformationPipeline()
                output = pipeline._run(schema)

        Returns:
            Dict[ str, ResumeSchema ]: dictionary with name of file as keys and structured data as value of respective keys

            example:
            pipeline = DataTransformationPipeline()
            output = pipeline._run(schema)
            output = {
                        "personal_info": {
                            "name":"",
                            "email":"",
                            "phone":"",
                            "location":"",
                            "linkedin":"",
                        }, 
                        "professional_summary": {
                            "headline":"", 
                            "summary":"", 
                            "total_experience_years":"", 
                            "career_level":"", 
                        }, 
                        "work_experience": [
                            {
                                "title":"",
                                "company":"",
                                "start_date":"",
                                "end_date":"",
                                "duration_months":"",
                                "responsibilities":"",
                                "achievements":"",
                                "technologies_used":"",
                            }
                        ], 
                        "skills": {
                            "technical": "",
                            "soft": "",
                            "certifications": "",
                        }, 
                        "education": [
                            {
                                "degree":"",
                                "institution":"",
                                "graduation_year":"",
                                "gpa":"",
                            }
                        ], 
                        "keywords": ["", "", ...], 
                    }
        """
        components = DataTransformationComponents(DataIngestionConfig, DataTransformationConfig) 
        return components._main(schema) 


__all__ = ["DataIngestionPipeline", "DataTransformationPipeline", ]