In [1]:
import os
import sys
import pandas as pd

import logging

logging.getLogger("snowflake").setLevel(logging.WARNING)
logging.getLogger("snowflake.connector").setLevel(logging.WARNING)
logging.getLogger("snowflake.snowpark").setLevel(logging.WARNING)


%pwd
os.chdir("../")

# Add the absolute path to src/ so Python can find automatch
src_path = os.path.abspath("src")
if src_path not in sys.path:
    sys.path.append(src_path)
    
%pwd


'c:\\Users\\fiscarelli\\Desktop\\Progetti\\Manpower IT\\Auto-Match\\Candidates-to-Jobs-Auto-Match-Cortex-AI'

In [2]:
%pwd


'c:\\Users\\fiscarelli\\Desktop\\Progetti\\Manpower IT\\Auto-Match\\Candidates-to-Jobs-Auto-Match-Cortex-AI'

In [3]:
from autoMatch.utils.snowflake_utils import get_snowpark_session
session = get_snowpark_session()

Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...
Going to open: https://login.microsoftonline.com/e2ba81b8-03fe-407c-96a1-f4bc0f512e7d/saml2?SAMLRequest=nZJNc9owEIb%2Fikc925IM4UMDZAiUlk5CPMFkptxkew0ituRIcgz99RUmzKSH5NCbR3529WjfHd0ey8J7A22EkmNEA4I8kKnKhNyN0SZe%2BAPkGctlxgslYYxOYNDtZGR4WVRsWtu9fILXGoz1XCNpWPtjjGotmeJGGCZ5CYbZlK2nD%2FcsDAirtLIqVQX6UPJ1BTcGtHWG15LMCKe3t7ZiGDdNEzSdQOkdDgkhmAyxo87Ityt%2FdG%2F6hKeYdM%2B8IxwevbvdCXkZwVdayQUy7GccR370uI6RN72qzpQ0dQl6DfpNpLB5ur8IGGdwtx52STgIGjc3H2qtKgj4n1pDYKRq8oK%2FQKrKqraue%2BC%2BcA4ZLtROuAEs52NUvYjs12G1fa2i47Ysj7P982HXT5LD4yD%2BPYx%2F8O8bFXG6yMlp9TBbpsh7viYcnhNeGlPDUp5zte6IhDc%2BJX5IY0oY6bEOCWivv0Xe3PkJyW1beZVvPYJSpFoZlVslCyGhtYQw4QOaDHzSycHvkn7qD3uc%2Bnk3SUl%2BQ0PoZ%2FicdoguG8RaET3537mM8Mcu70u5cjkt55EqRHryFkqX3H4eIw1oeyIyP29RBiUXxTTLNBjj4iwK1cw0cOt23%2BoaEJ5cbv

In [4]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: str
    database: str
    schema: str
    input_table: str
    input_table_cleaned: str
    input_table_italian_cities: str
    output_table: str


In [5]:
from autoMatch.constants import *
from autoMatch.utils.common import read_yaml, create_directories
from autoMatch import logger

class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH,
        schema_filepath = SCHEMA_FILE_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)

        create_directories([self.config.artifacts_root])


    
    def get_data_transformation_config(self) -> DataTransformationConfig:
        config = self.config.data_transformation

        create_directories([config.root_dir])

        data_ingestion_config = DataTransformationConfig(
            root_dir=config.root_dir,
            database=config.database,
            schema=config.schema,
            input_table=config.input_table,
            input_table_cleaned=config.input_table_cleaned,
            input_table_italian_cities = config.input_table_italian_cities,
            output_table = config.output_table,
        )

        return data_ingestion_config

In [None]:
from snowflake.snowpark.functions import col, trim, lower, length, parse_json, when, lit, trim, to_date, to_varchar
from snowflake.snowpark.types import StringType, BooleanType
from snowflake.snowpark.functions import udf

from datetime import dates

from autoMatch.utils.common import validate_string


class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config

    def clean_description(self, session):
        """
        Reads input table
        Cleans description column:
            - removes rows with empty description
            - replaces multiple consecutive whitespaces with a single whitespace (preserves newlines)
            - removes all html tags
            - lowercases all text
        Function returns Snowflake dataframe
        """
        database = self.config.database
        schema = self.config.schema
        input_table = self.config.input_table

        df = session.table(f"{database}.{schema}.{input_table}")
        df = df.filter((col("description").is_not_null()) & (trim(col("description")) != ""))

        def build_normalize_whitespace_udf():
            def normalize(text: str) -> str:
                import re
                if text is None:
                    return ''
                return re.sub(r'[ \t]+', ' ', text).strip()

            return udf(normalize, return_type=StringType(), input_types=[StringType()])
        normalize_udf = build_normalize_whitespace_udf()
        df = df.with_column("description", normalize_udf(df["description"]))
        

        def build_clean_html_udf():
            from bs4 import BeautifulSoup
            def clean_html(text: str) -> str:
                if not text:
                    return ""
                return BeautifulSoup(text, "html.parser").get_text()

            return udf(
                clean_html,
                return_type=StringType(),
                input_types=[StringType()],
                packages=["beautifulsoup4"]
            )

        clean_html_udf = build_clean_html_udf()
        df = df.with_column("description", clean_html_udf(df["description"]))

        df = df.with_column("description", lower(df["description"]))

        df = df.filter((col("description").is_not_null()) & 
                       (trim(col("description")) != "") &
                       (length(trim(col("description"))) > 5) &
                       (~col("description").like("%None%")) &
                       (~col("description").like("%null%"))
                       )

        df = df.with_column("description", col("description").cast("STRING"))
        
        logger.info(f"Table {input_table} successfully cleaned")

        return df
    
    def apply_ner_cortexai(self, session):
        """
        Reads input table cleaned
        Performs Named Entity Recognition:
            - age
            - date_of_birth
            - location
            - zip_code
            - last_job
            - second_last_job
            - third_last_job
            - skills
        Function returns Snowflake dataframe
        """

        database = self.config.database
        schema = self.config.schema
        input_table_cleaned = self.config.input_table_cleaned

        today_string = date.today().strftime("%Y-%m-%d")

        query = f"""
            SELECT
                *,
                SNOWFLAKE.CORTEX.COMPLETE(
                    'claude-4-sonnet',
                    CONCAT(
                        'Estrai dal seguente testo i campi: 
                        age (stringa. calcolala basandoti su date_of_birth, considerando che la data di oggi e {today_string};
                            esempio: se date_of_birth = 1990-05-18 e oggi siamo in data 2025-09-15, age = 35 
                            se non è possibile calcolarla, restituisci NaN), 
                        date_of_birth (stringa in formato YYYY-MM-DD, 
                            YYYY deve essere compreso tra 1900 e {today_string},
                            MM deve essere compreso tra 1 e 12,
                            DD deve essere compreso tra: 
                                1 e 30 quando MM uguale a 11, 04, 06, 09;
                                1 e 28 quando MM uguale a 02
                                1 e 31 per i restanti casi
                            ), 
                        location (stringa. solo la citta, non includere province o altro), 
                        zip_code (cap. 5 caratteri numerici), 
                        last_job, 
                        second_last_job, 
                        third_last_job, 
                        skills (stringa)',
                        'Rispondi in formato JSON, senza testo extra, attieniti a questo esempio: 
                        {{"age": 30, "date_of_birth": "1993-05-12", "location": "Milano", "zip_code": "20100", "last_job": "Data Engineer", "second_last_job": "Developer", "third_last_job": "Intern", "skills": "Python, SQL, Java"}}. ',
                        'Testo: ', description
                    )
                ) AS ner_json
            FROM 
            (SELECT *
            FROM {database}.{schema}.{input_table_cleaned}
            )

            """
        df = session.sql(query)

        def build_clean_parsing_udf():
            def clean(x: str) -> str:
                if x is None:
                    return ''
                x = x.lower().lstrip()
                if x.startswith("```json"):
                    x = x[8:].lstrip()
                x = x.replace('\n', ' ').replace('\t', ' ').replace('\\', '').strip()
                x = ' '.join(x.split())
                if x.endswith("```"):
                    x = x[:-3].rstrip()
                if x.endswith("'") or x.endswith('"'):
                    x = x[:-1].rstrip()
                return x

            return udf(clean, return_type=StringType(), input_types=[StringType()])
        clean_udf = build_clean_parsing_udf()

        df = df.with_column("ner_json", clean_udf(df["ner_json"]))

        def build_is_valid_json_udf():
            import json
            def is_valid(text: str) -> bool:
                if not text:
                    return False
                try:
                    json.loads(text)
                    return True
                except Exception:
                    return False

            return udf(is_valid, return_type=BooleanType(), input_types=[StringType()])
        
        is_valid_json_udf = build_is_valid_json_udf()
        df = df.with_column("is_valid_json", is_valid_json_udf(df["ner_json"]))
        df = df.filter(col("is_valid_json") == True)
        df = df.drop("is_valid_json")

        df = df.with_column("ner_json", parse_json(col("ner_json")))

        df = df.filter(df["ner_json"].is_not_null())

        df = df.with_columns(
            ["age", "date_of_birth", "location", "zip_code", "last_job", "second_last_job", "third_last_job", "skills"],
            [
                col("ner_json")["age"].cast("STRING"),
                col("ner_json")["date_of_birth"].cast("STRING"),
                col("ner_json")["location"].cast("STRING"),
                col("ner_json")["zip_code"].cast("STRING"), 
                col("ner_json")["last_job"].cast("STRING"),
                col("ner_json")["second_last_job"].cast("STRING"),
                col("ner_json")["third_last_job"].cast("STRING"),
                col("ner_json")["skills"].cast("STRING")
            ]
        )
        
        df = validate_string(df, "location")
        df = validate_string(df, "last_job")
        df = validate_string(df, "second_last_job")
        df = validate_string(df, "third_last_job")
        df = validate_string(df, "skills")

        # makes sure zip_code is a valid formato
        df = df.with_column(
            "zip_code",
            when(
                col("zip_code").rlike("^[0-9]{5}$"),
                col("zip_code")
            ).otherwise(lit(None))
        )

        # makes sure age is a reasonable value
        df = df.with_column(
            "age",
            when(
                (col("age") != "nan") &
                (col("age").cast("INT").is_not_null()) &
                (col("age").cast("INT") >= 1) &
                (col("age").cast("INT") <= 150),
                col("age").cast("INT")
            ).otherwise(lit(None))
        )

        # makes sure date_of_birth is a valid date
        date_regex = (
            "^(" +
            # Months with 31 days
            f"(19[0-9][0-9]|20[0-9][0-9]|{today_string})-(01|03|05|07|08|10|12)-(0[1-9]|[12][0-9]|3[01])|" +
            # Months with 30 days
            f"(19[0-9][0-9]|20[0-9][0-9]|{today_string})-(04|06|09|11)-(0[1-9]|[12][0-9]|30)|" +
            # February (non-leap year logic: 1–28)
            f"(19[0-9][0-9]|20[0-9][0-9]|{today_string})-02-(0[1-9]|1[0-9]|2[0-8])" +
            ")$"
        )
        df = df.with_column(
            "date_of_birth",
            when(
                col("date_of_birth").rlike(date_regex),
                to_date(to_varchar(col("date_of_birth")), "YYYY-MM-DD"),
            ).otherwise(lit(None))
        )
        
        df = df.drop("ner_json")
        df = df.drop("description")

        logger.info(f"NER on {input_table_cleaned} table successful")

        return df

    
    def add_geo_info(self, session):
        """
        Reads candidate dataframe
        Reads italian cities dataframe
        Checks that zip_codes are valid
        Adds geo info (latitude, longitude)
        Function returns Snowflake dataframe
        """
        database = self.config.database
        schema = self.config.schema
        output_table = self.config.output_table
        input_table_italian_cities = self.config.input_table_italian_cities
        
        candidates = session.table(f"{database}.{schema}.{output_table}")
        italian_cities = session.table(f"{database}.{schema}.{input_table_italian_cities}")

        # This avoids issues when the candidates table has already lat and long columns
        candidates = candidates.select(
            *[col(c) for c in candidates.columns if c.lower() not in {"latitude", "longitude", "province_ext"}]
        )

        valid_zips = [
            row[key]
            for row in italian_cities.select("zip").distinct().collect()
            for key in row.as_dict()
            if key.lower() == "zip"
        ]
        # Replace invalid ZIPs with None
        candidates = candidates.with_column(
            "zip_code",
            when(col("zip_code").isin(valid_zips), col("zip_code")).otherwise(lit(None))
            )

        # If zip_code is missing and location is available, get the zip_code from italian_cities dataframe
        df = candidates.join(
            italian_cities,
            lower(candidates["location"]) == lower(italian_cities["city_name"]),
            "left"
        )
        df = df.with_column(
            "zip_code",
            when(
                col("zip_code").is_null(), col("zip") 
            ).otherwise(col("zip_code"))
        )
        df = df.select(
            *[col(c.name) for c in candidates.schema.fields],
            )

        # Given the candidate location, get (lat, long) data from italian_cities
        df = df.join(
            italian_cities,
            lower(candidates["location"]) == lower(italian_cities["city_name"]),
            how="left"
        )
        df = df.select(
            *[col(c.name) for c in candidates.schema.fields],
            col("latitude"),
            col("longitude"),
            col("province_ext"),
            )
        
        df = validate_string(df, "province_ext")

        # Regex for latitude: -90 to 90 with optional decimals
        latitude_regex = r"^[-+]?([0-8]?\d(\.\d+)?|90(\.0+)?)$"
        # Regex for longitude: -180 to 180 with optional decimals
        longitude_regex = r"^[-+]?((1[0-7]\d|0?\d{1,2})(\.\d+)?|180(\.0+)?)$"

        # Make sure latitude and longiture are valid
        df = df.with_column(
            "latitude",
            when(
                col("latitude").rlike(latitude_regex),
                col("latitude").cast("FLOAT")
            ).otherwise(lit(None))
        ).with_column(
            "longitude",
            when(
                col("longitude").rlike(longitude_regex),
                col("longitude").cast("FLOAT")
            ).otherwise(lit(None))
        )

        #df = df.with_column("distance_km", lit(None))
        df = df.with_column("distance_km", lit(999999))

        return df


    def write_table(self, df, table_name = 'output_table'):
        """
        Writes table
        Function returns nothing
        """
        df.write.save_as_table(table_name, mode="overwrite")
        logger.info(f"Table {table_name} successfully written")

  


In [7]:
try:
    config = ConfigurationManager()
    data_transformation_config = config.get_data_transformation_config()
    data_transformation = DataTransformation(config=data_transformation_config)
    #df = data_transformation.clean_description(session)
    #data_transformation.write_table(df, data_transformation.config.input_table_cleaned)
    #df = data_transformation.apply_ner_cortexai(session)
    #data_transformation.write_table(df, data_transformation.config.output_table)
    df = data_transformation.add_geo_info(session)
    data_transformation.write_table(df, data_transformation.config.output_table)

except Exception as e:
    raise e

[2025-10-21 12:06:37,015: INFO: common: yaml file: config\config.yaml loaded successfully]
[2025-10-21 12:06:37,018: INFO: common: yaml file: params.yaml loaded successfully]
[2025-10-21 12:06:37,046: INFO: common: yaml file: schema.yaml loaded successfully]
[2025-10-21 12:06:37,049: INFO: common: created directory at: artifacts]
[2025-10-21 12:06:37,053: INFO: common: created directory at: artifacts/data_transformation]
[2025-10-21 12:06:41,980: INFO: 1758941708: Table MPG_IT_AUTOMATCH_CANDIDATE_FEATURES successfully written]


In [8]:


session.sql(f"""
    SELECT *
    FROM IT_DISCOVERY.CONSUMER_INT_MODEL.MPG_IT_AUTOMATCH_CANDIDATE_FEATURES
    WHERE candidateid='5546247'
""").to_pandas()

Unnamed: 0,DATE_ADDED,CANDIDATEID,LOCATION,LAST_JOB,SECOND_LAST_JOB,THIRD_LAST_JOB,SKILLS,AGE,DATE_OF_BIRTH,ZIP_CODE,PROVINCE_EXT,LATITUDE,LONGITUDE,DISTANCE_KM
0,2025-07-16 02:40:29.727,5546247,cernusco sul naviglio,,,,"web developer base (html, css, javascript, php...",19,2006-09-20,20063,Milano,45.524558,9.330925,999999


In [9]:
from snowflake.snowpark import Session

# Example description string
description_text = """
Mario Rossi, nata a vorbomanero (bo) il 15051993. CAP 20100.
Ha lavorato come Data Engineer, prima come Developer e ancora prima come Intern.
Competenze: Python, SQL, Java.
"""

# Construct the SQL query
query = f"""
SELECT
    SNOWFLAKE.CORTEX.COMPLETE(
        'claude-4-sonnet',
        CONCAT(
            'Estrai dal seguente testo i campi: 
            age (stringa. caratteri numerici), 
            date_of_birth (YYYY-MM-DD), 
            location (stringa. solo la citta, non includere province o altro), 
            zip_code (cap. 5 caratteri numerici), 
            last_job, 
            second_last_job, 
            third_last_job, 
            skills (stringa)',
            'Rispondi in formato JSON, senza testo extra, attieniti a questo esempio: 
            {{"age": 30, "date_of_birth": "1993-05-12", "location": "Milano", "zip_code": "20100", "last_job": "Data Engineer", "second_last_job": "Developer", "third_last_job": "Intern", "skills": "Python, SQL, Java"}}. ',
            'Testo: ', '{description_text}'
        )
    ) AS ner_json
"""

# Run the query
result_df = session.sql(query)

# Show the result
result_df.show()


------------------------------------------------------
|"NER_JSON"                                          |
------------------------------------------------------
|```json                                             |
|{"age": "30", "date_of_birth": "1993-05-15", "l...  |
|```                                                 |
------------------------------------------------------

