In [None]:
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from pyspark.sql.types import StringType

from sqlalchemy import create_engine

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from data_transform_utilities.score import generate_score_from_status
from data_transform_utilities.text_parsers import clean_str

from numpy import dot
from numpy.linalg import norm
import mlflow

In [None]:
spark_conf = SparkConf()
spark_conf.set("spark.cores", "12")
spark_conf.set("spark.driver.cores", "12")
spark_conf.set("spark.speculation", False)
spark_conf.set("spark.jars.packages", "com.mysql:mysql-connector-j:9.2.0")

spark = SparkSession \
    .builder.master("local") \
    .appName("Decision data overview") \
    .config(conf=spark_conf) \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
spark.udf.register("generate_score_from_status", generate_score_from_status, FloatType())
spark.udf.register("clean_str", clean_str, StringType())

In [None]:
engine = create_engine("mysql+pymysql://decision:1234@localhost/decision?charset=utf8")
days_to_read = 3000

## Carrega os dados de vagas

In [None]:
spark.read.jdbc(
    url="jdbc:mysql://decision:1234@localhost:3306/decision?charset=utf8",
    table=f"(SELECT * FROM vacancies WHERE requested_date > DATE_ADD(current_date(), INTERVAL -{days_to_read} DAY)) AS t",
    properties={"driver": "com.mysql.cj.jdbc.Driver"}
).createOrReplaceTempView("vacancies")

# Carrega os dados de candidatos

In [None]:
spark.read.jdbc(
    url="jdbc:mysql://decision:1234@localhost:3306/decision?charset=utf8",
    table=f"(SELECT * FROM applicants WHERE created_at > DATE_ADD(current_date(), INTERVAL -{days_to_read} DAY)) AS t",
    properties={"driver": "com.mysql.cj.jdbc.Driver"}
).createOrReplaceTempView("applicants")

In [None]:
spark.read.jdbc(
    url="jdbc:mysql://decision:1234@localhost:3306/decision?charset=utf8",
    table=f"(SELECT * FROM vacancies_applicants WHERE last_update > DATE_ADD(current_date(), INTERVAL -{days_to_read} DAY)) AS t",
    properties={"driver": "com.mysql.cj.jdbc.Driver"}
).createOrReplaceTempView("vacancies_applicants")

# Carrega o modelo

In [None]:
MODEL_NAME = 'applicant_job_similarity'
MODEL_VERSION = os.environ["MODEL_VERSION"] if "MODEL_VERSION" in os.environ else "29"
MLFLOW_TRACKING_URI = 'http://192.168.101.186:5000'
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

model = mlflow.sentence_transformers.load_model(f"models:/applicant_job_similarity/{MODEL_VERSION}")

# Função de embedding

In [None]:
def get_embedding(text, model):
    return model.encode(text, normalize_embeddings=True)

# Cria client do banco de Vetores

In [None]:
VECTOR_DB_HOST = os.environ["VECTOR_DB_HOST"] if "VECTOR_DB_HOST" in os.environ else "localhost"
client = QdrantClient(host=VECTOR_DB_HOST, port=6333)

# Cria as coleções no Qdrant

In [None]:
if not client.collection_exists(collection_name="applicants"):
    client.create_collection(
        collection_name="applicants",
        vectors_config={
            "title": VectorParams(size=768, distance=Distance.COSINE),
            "description": VectorParams(size=768, distance=Distance.COSINE),
            "location": VectorParams(size=768, distance=Distance.COSINE),
        }
    )

if not client.collection_exists(collection_name="vacancies"):
    client.create_collection(
        collection_name="vacancies",
        vectors_config={
            "title": VectorParams(size=768, distance=Distance.COSINE),
            "description": VectorParams(size=768, distance=Distance.COSINE),
            "location": VectorParams(size=768, distance=Distance.COSINE),
        },
    )

# Inicia a inserção no banco de Vetores

In [None]:
def insert_into_db(c, collection_name):
    client.upsert(
        collection_name=collection_name,
        points=[
            PointStruct(
                id=c["id"],
                vector={
                    "title":c["title_embeddings"],
                    "description": c["description_embeddings"],
                    "location": c["location_embeddings"],
                },
                payload={"title":c["title"], "description": c["description"], "location": c["location"]}
            )
        ]
    )

In [None]:
def insert_batch(batch, collection_name, model):
    [insert_into_db(
        {"id": v.id, "title":v.title,"description": v.description, "location": v.location, 
        "title_embeddings": get_embedding(v.title, model), 
        "description_embeddings": get_embedding(v.description, model), 
        "location_embeddings": get_embedding(v.location, model),
        "model_version": v.model_version}, collection_name) for v in batch]

In [None]:
applicants = spark.sql(f"""
    SELECT
        *,
        '{MODEL_VERSION}' AS model_version
    FROM
        (SELECT
            id,
            CLEAN_STR(professional_title) AS title,
            TRIM(CLEAN_STR(CONCAT(technical_knowledge, '\n', cv_pt, '\n', area_of_expertise))) AS description,
            location
        FROM 
            (SELECT
                a.id,
                LOWER(COALESCE(a.location, '')) AS location,
                LOWER(COALESCE(a.professional_title, '')) AS professional_title,
                LOWER(COALESCE(a.technical_knowledge, '')) AS technical_knowledge,
                LOWER(COALESCE(a.cv_pt,'')) AS cv_pt,
                LOWER(COALESCE(a.area_of_expertise,'')) AS area_of_expertise
            FROM
                applicants a
            ) AS t
        ) AS t2
    WHERE
        LENGTH(title) > 0
        AND LENGTH(description) > 150
        AND LENGTH(location) > 0
""")

In [None]:
vacancies = spark.sql(f"""
    SELECT
        *,
        '{MODEL_VERSION}' AS model_version
    FROM
        (SELECT
            id,
            title,
            CLEAN_STR(
                if(main_activities = technical_and_behavioral_skills,
                main_activities
                ,
                CONCAT(
                    main_activities, '\n', 
                    technical_and_behavioral_skills, '\n',
                    behavioral_skills
                )
            )) AS description,
            CONCAT( state, ', ', city) AS location
        FROM 
            (SELECT
                v.id,
                LOWER(TRIM(clean_str(v.title))) as title,
                LOWER(COALESCE(v.country, '')) AS country,
                LOWER(COALESCE(v.city, '')) AS city,
                LOWER(COALESCE(v.state, '')) AS state,
                LOWER(COALESCE(v.main_activities, '')) AS main_activities,
                LOWER(COALESCE(v.behavioral_skills, '')) AS behavioral_skills,
                LOWER(COALESCE(v.technical_and_behavioral_skills, '')) AS technical_and_behavioral_skills
            FROM
                vacancies v 
            ) AS t
        ORDER BY id DESC
        ) AS t2
    WHERE 
        LENGTH(TRIM(REGEXP_REPLACE(title, '\n', ''))) > 0
        AND LENGTH(TRIM(REGEXP_REPLACE(description, '\n', ''))) > 0
        AND LENGTH(TRIM(REGEXP_REPLACE(location, '\n', ''))) > 0
""")

# Inicia a indexação no Qdrant

In [None]:
insert_batch(vacancies.collect(), "vacancies", model)

In [None]:
insert_batch(applicants.collect(), "applicants", model)