In [None]:
import os
import pandas as pd
from pymongo import MongoClient
import yaml
from dotenv import load_dotenv
import sys
import ast
from pathlib import Path
load_dotenv(dotenv_path=Path("/app/.env"), override=False)


# load_dotenv(override=False)
def get_collection():
    """Connect to MongoDB and return the target collection"""
    mongo_client = MongoClient(os.environ.get("MONGO_URI"))
    db = mongo_client["kafka_project"]
    return db["kafka_project"]

def fetch_all_to_df():
    """
    Fetch all documents from MongoDB collection and return as a DataFrame.
    """
    collection = get_collection()
    cursor = collection.find({}, {"_id": 0})
    data = list(cursor)
    df = pd.DataFrame(data)
    return df
 
if __name__ == "__main__":
    df = fetch_all_to_df()


In [None]:
import pandas as pd

df['published_dt_parsed'] = pd.to_datetime(df['published_dt'].astype(str).str[:10], errors='coerce', utc=True)

two_weeks_ago = pd.Timestamp.now(tz='UTC') - pd.Timedelta(days=14)
mask = (df['published_dt_parsed'] >= two_weeks_ago)
df = df[mask].copy()

df['published_dt_parsed'] = df['published_dt_parsed'].dt.date

df.drop(columns=['published_dt_parsed'], inplace=True) 


In [389]:
import ast



df["location"] = df["location"].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
df["employment_type"] = df["employment_type"].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
df["contract_type"] = df["contract_type"].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)

dict_columns = ["location", "employment_type", "contract_type"] 
for col in dict_columns:
    expanded = df[col].apply(pd.Series)
    expanded = expanded.add_prefix(f"{col}_")
    df = pd.concat([df, expanded], axis=1)
    df.drop(columns=[col], inplace=True)
    


In [390]:
df["experience_level_junior"] = df["experience_level"].apply(lambda x: "junior" in x if isinstance(x, list) else False)
df["experience_level_mid"] = df["experience_level"].apply(lambda x: "mid" in x if isinstance(x, list) else False)
df["experience_level_senior"] = df["experience_level"].apply(lambda x: "senior" in x if isinstance(x, list) else False)

In [391]:
df["hybrid"] = df["work_mode"].apply(lambda x: "hybrid" in x if isinstance(x, list) else False)
df["remote"] = df["work_mode"].apply(lambda x: "remote" in x if isinstance(x, list) else False)
df["on-site"] = df["work_mode"].apply(lambda x: "on-site" in x if isinstance(x, list) else False)

In [392]:
df.drop(columns=["work_mode", "experience_level","scraped_dt"], inplace=True)

In [393]:
df["salary_range"] = df["salary_range"].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
df["salary_range_dict"] = df["salary_range"].apply(lambda x: x[0] if isinstance(x, list) and len(x) > 0 else {})

In [394]:
salary_cols = df["salary_range_dict"].apply(pd.Series)
salary_cols = salary_cols.add_prefix("salary_")
df = pd.concat([df, salary_cols], axis=1)
df.drop(columns=["salary_range_dict","salary_range"], inplace=True)

In [None]:
yaml_path = "/app/requirements_config.yaml"
with open(yaml_path, "r") as f:
    config = yaml.safe_load(f)


In [None]:
def filter_offers(df: pd.DataFrame, config: dict) -> pd.DataFrame:
    """
    Filter job offers based on criteria from the config file.
    Adds skill match percentage columns.
    """
    df_filtered = df.copy()
       
    # Filter by work mode
    if config.get("work_mode"):
        mask = pd.Series(False, index=df_filtered.index)
        for mode in config["work_mode"]:
            mask |= df_filtered[mode]
        df_filtered = df_filtered[mask]
    
    # Filter by city/district for hybrid offers
    mask_hybrid = (df_filtered["hybrid"] == True) & (df_filtered["remote"] == False)
    if mask_hybrid.any():
        if config.get("city"):
            df_filtered = df_filtered[df_filtered["location_city"] == config["city"]]

        import numpy as np

        if config.get("district"):
            districts = [None if d == "Null" else d for d in config["district"]]
            if None in districts:
                districts.append(np.nan)
            df_filtered = df_filtered[df_filtered["location_district"].isin(districts)]
            df_filtered = df_filtered[df_filtered["location_district"].apply(lambda x: x in districts or (pd.isna(x) and (None in districts or np.nan in districts)))]
    
    # Filter by employment type
    if config.get("employment_type"):
        types = config["employment_type"]
        all_cols = ['employment_type_full_time', 'employment_type_part_time']

        allowed_cols = [f"employment_type_{type}" for type in types]
        disallowed_cols = [col for col in all_cols if col not in allowed_cols]

        allowed_mask = df_filtered[allowed_cols].any(axis=1)
        disallowed_mask = ~df_filtered[disallowed_cols].any(axis=1) if disallowed_cols else True

        df_filtered = df_filtered[allowed_mask & disallowed_mask]


    # Filter by experience level
    if config.get("experience_level"):
        levels = config["experience_level"]
        all_cols = ["experience_level_junior", "experience_level_mid", "experience_level_senior"]

        allowed_cols = [f"experience_level_{lvl}" for lvl in levels]
        disallowed_cols = [col for col in all_cols if col not in allowed_cols]

        allowed_mask = df_filtered[allowed_cols].any(axis=1)
        disallowed_mask = ~df_filtered[disallowed_cols].any(axis=1) if disallowed_cols else True

        df_filtered = df_filtered[allowed_mask & disallowed_mask]



    
    def salary_in_range(row):
        """
        Check if salary is within the configured range for the contract type.
        """

        if pd.isna(row["salary_min"]) or pd.isna(row["salary_max"]):
            return False

        for ctype in ["b2b", "permanent", "mandate"]:
            if row.get(f"contract_type_{ctype}"):
                min_cfg = config.get(f"{ctype}_min_salary", 0)
                max_cfg = config.get(f"{ctype}_max_salary", float("inf"))
                return row["salary_min"] >= min_cfg and row["salary_max"] <= max_cfg

        return False


    df_filtered = df_filtered[df_filtered.apply(salary_in_range, axis=1)]

    # Calculate skill match %
        
    def calculate_skill_match(row, my_skills):
        """
        Calculate required and optional skill match percentages.
        """
        try:
            skills_list = row["required_skills"]

            if isinstance(skills_list, str):
                import ast
                skills_list = ast.literal_eval(skills_list)

            demand_offer_skills = [
            s["skill"].lower()
            for s in skills_list
            if s.get("level", "unspecified") in ("unspecified", "expected",'regular','advanced',"master","b2","a2","junior","c1","c2")]
            nicetohave_offer_skills = [
            s["skill"].lower()
            for s in skills_list
            if s.get("level", "optional") in ("optional",'nice to have')]
            my_skills_lower = [s.lower() for s in my_skills]

            matched_names = []
            for offer_skill in demand_offer_skills:
                if any(my_skill in offer_skill for my_skill in my_skills_lower): #if in config it's "sql", "postgreSQL" will be matched
                    matched_names.append(offer_skill)

            matched_names_nicetohave = []

            for offer_skill in nicetohave_offer_skills:
                if any(my_skill in offer_skill for my_skill in my_skills_lower):
                    matched_names_nicetohave.append(offer_skill)

            total = len(demand_offer_skills)
            required_total = len(demand_offer_skills)
            optional_total = len(nicetohave_offer_skills)

            required_matched = len(matched_names)
            optional_matched = len(matched_names_nicetohave)

            # Required match percentage (core qualification)
            required_percent = round((required_matched / required_total) * 100, 1) if required_total > 0 else 0.0

            # Optional match percentage (bonus score)
            optional_percent = round((optional_matched / optional_total) * 100, 1) if optional_total > 0 else 0.0

            # Composite score: required match + 0.5 * optional match
            # (weighting optional skills at 50% importance)
            composite_percent = round(required_percent + 0.5 * optional_percent, 1)


            skill_match = {"composite_percent": composite_percent,
                                  "required_percent":required_percent, 
                                  "optional_percent":optional_percent, 
                                  "matched_names": matched_names, 
                                  "matched_names_nicetohave":matched_names_nicetohave} 
            return skill_match
        except Exception:
            return None


    my_skills = config.get("required_skills", [])
    skill_match_df = df_filtered.apply(lambda row: pd.Series(calculate_skill_match(row, my_skills)), axis=1)
    df_filtered = pd.concat([df_filtered, skill_match_df], axis=1)

    return df_filtered.reset_index(drop=True)




In [397]:
filtered_df = filter_offers(df, config)


In [None]:
match = filtered_df[['title', 'company', 'link', 'required_skills', 
       'location_city', 'location_district', 'hybrid', 'remote', 'salary_min',
       'salary_max', 'salary_currency', 'salary_unit', 'salary_net_gross',
       'salary_contract', 'composite_percent', 'required_percent',
       'optional_percent', 'matched_names', 'matched_names_nicetohave']]

match = match.sort_values(["composite_percent","required_percent","optional_percent"
                           ], ascending = False)


In [None]:
def list_to_str_set(x):
    """
    Convert a list to a comma-separated string of unique values.
    """
    if not x or x == []:
        return None 
    if isinstance(x, list):
        return ', '.join(set(x))  
    return x

match["matched_names_nicetohave"] = match["matched_names_nicetohave"].apply(list_to_str_set)


In [401]:
def list_to_str_set(x):
    if not x or x == []:
        return None 
    if isinstance(x, list):
        return ', '.join(set(x))  
    return x

match["matched_names"] = match["matched_names"].apply(list_to_str_set)


In [None]:
def combine_salary(row):
    """
    Combine salary information into a readable string.
    """
    # range
    minmax = ""
    if pd.notnull(row['salary_min']) and pd.notnull(row['salary_max']):
        minmax = f"{row['salary_min']}-{row['salary_max']}"
    elif pd.notnull(row['salary_min']):
        minmax = str(row['salary_min'])
    elif pd.notnull(row['salary_max']):
        minmax = str(row['salary_max'])
    
    # PLN/h
    currency_unit = ""
    if pd.notnull(row['salary_currency']) and pd.notnull(row['salary_unit']):
        unit = "h" if row['salary_unit'] == "hour" else row['salary_unit']
        currency_unit = f"{row['salary_currency']}/{unit}"
    elif pd.notnull(row['salary_currency']):
        currency_unit = row['salary_currency']
    elif pd.notnull(row['salary_unit']):
        currency_unit = row['salary_unit']
    
    # net
    netgross = row['salary_net_gross'] if pd.notnull(row['salary_net_gross']) else ''
    # b2b
    contract = f"- {row['salary_contract']}" if pd.notnull(row['salary_contract']) else ''
    
    # all
    return " ".join([i for i in [minmax, currency_unit, netgross, contract] if i]).strip()

match["salary_info"] = match.apply(combine_salary, axis=1)

match_structurized = match.drop(columns=["salary_min","salary_max", "salary_currency", "salary_net_gross", "salary_contract","salary_unit" ])

In [None]:
def location(row):
    """
    Combine city and district into a single location string.
    """
    city = row['location_city']
    district = row['location_district']
    if district is None or district == "" or pd.isna(district):
        return city
    else:
        return f"{city}-{district}"

match_structurized["location"] = match_structurized.apply(location, axis=1)


In [404]:

match_structurized["word_mode"] = match_structurized.apply(
    lambda x: f"remote - {x['location']}" if x["remote"] and pd.notnull(x["location"]) else x["location"],
    axis=1
)


In [None]:
def location_mode(row):
    """
    Return work mode and location in a readable format.
    """
    city = row["location_city"] if pd.notnull(row["location_city"]) else ""
    remote = row.get("remote", False)
    hybrid = row.get("hybrid", False)

    if remote and hybrid:
        prefix = "Remote / Hybrid"
    elif remote:
        prefix = "Remote"
    elif hybrid:
        prefix = "Hybrid"
    else:
        prefix = "On-site"

    return f"{prefix} – {city}" if city else prefix

match_structurized["word_mode"] = match_structurized.apply(location_mode, axis=1)

In [None]:
match_structurized =match_structurized[["title","company","word_mode","salary_info","link","required_skills", 
                                         "composite_percent", "required_percent","matched_names" ,"optional_percent",
                                          "matched_names_nicetohave"]]

In [408]:
match_structurized = match_structurized.fillna("")


In [409]:
import ast

def pretty_skills(cell):
    try:
        skills = ast.literal_eval(cell) if isinstance(cell, str) else cell
        if isinstance(skills, list):
            return "<br>".join([f"{s['skill']} ({s['level']})" for s in skills])
        else:
            return str(cell)
    except Exception:
        return str(cell)
match_structurized['required_skills'] = match_structurized['required_skills'].apply(pretty_skills)


In [410]:
html_table = match_structurized.to_html(index=False, escape=False, classes="styled-table")

In [None]:
import re

def safe(val):
    """
    Return an empty string for None/NaN, otherwise return the string value. just for UX
    """
    return "" if val is None or (isinstance(val, float) and pd.isna(val)) else str(val)

def make_mail_card(row):    
    """
    Create HTML for a single job offer card for email.
    """
    skills_html = ""
    for skill in safe(row['required_skills']).replace("<br>", "\n").split("\n"):
        m = re.match(r"^(.*?)\s*(\([^)]+\))?$", skill.strip())
        if not m:
            continue
        main, bracket = m.group(1), m.group(2)
        if bracket:
            skills_html += f"{main}<span style='color:#94a3b8;font-size:12px;font-weight:400;'> {bracket}</span><br>"
        else:
            skills_html += f"{main}<br>"

    return f"""
    <div style="background:#f6f8fa;border-radius:16px;box-shadow:0 2px 12px #cbd5e133;padding:26px 32px 20px 32px;margin-bottom:28px;max-width:700px;">
      <div style="font-size:20px;font-weight:700;color:#1e293b;margin-bottom:4px;">{safe(row['title'])}</div>
      <div style="font-size:15px;color:#64748b;font-weight:500;margin-bottom:8px;">{safe(row['company'])}</div>
      <div style="font-size:14px;margin-bottom:8px;">
        <span style="color:#2563eb;font-weight:500;">{safe(row['word_mode'])}</span>
        {" | <span style='color:#047857;font-weight:bold;'>" + safe(row['salary_info']) + "</span>" if safe(row['salary_info']) else ""}
      </div>
      <div style="font-size:13px;margin-bottom:10px;line-height:1.5;">
        <b>Requirements:</b><br>{skills_html}
      </div>
      <div style="font-size:12px;margin-bottom:8px;">
        <b>Match:</b> {safe(row['composite_percent'])}% | <b>Required:</b> {safe(row['required_percent'])}% | <b>Optional:</b> {safe(row['optional_percent'])}%
      </div>
      <div style="font-size:12px;color:#555;margin-bottom:12px;">
        <b>Must-have match:</b> {safe(row['matched_names'])} <br>
        <b>Nice-to-have match:</b> {safe(row['matched_names_nicetohave'])}
      </div>
      <a href="{safe(row['link'])}" style="display:inline-block;padding:9px 20px;border-radius:8px;background:#6366f1;color:#fff;font-weight:600;text-decoration:none;font-size:13px;">See offer</a>
    </div>
    """


def make_job_matches_email(df):
    """
      Create HTML with all job offers for email sending.
    """
    cards = df.apply(make_mail_card, axis=1).str.cat(sep="\n")
    return f"""
    <html>
    <head>
      <meta charset="utf-8">
      <title>Latest Job Matches</title>
    </head>
    <body style="margin:0;padding:0;font-family:Segoe UI,Arial,sans-serif;">
      <div style="max-width:720px;margin:0 auto;padding:24px 0;">
        <div style="padding:24px 32px 18px 32px;">
          <h2 style="font-size:22px;font-weight:700;color:#222;margin:0 0 8px 0;">
            Latest Job Matches
          </h2>
        </div>
        {cards}
      </div>
    </body>
    </html>
    """


In [412]:

html_email = make_job_matches_email(match_structurized)
with open("job_matches_email.html", "w", encoding="utf-8") as f:
    f.write(html_email)


In [None]:
# with open("tabelka.html", "w", encoding="utf-8") as f:
#     f.write(html_email)
    
# import os
# import webbrowser

# plik = os.path.abspath("tabelka.html")
# webbrowser.open('file://' + plik)


In [None]:
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import os
load_dotenv()
EMAIL_SMTP = os.getenv("EMAIL_SMTP")
EMAIL_SENDER = os.getenv("EMAIL_SENDER")
EMAIL_RECIEVER = os.getenv("EMAIL_RECIEVER")
EMAIL_SENDER_PASSWORD = os.getenv("EMAIL_SENDER_PASSWORD")

msg = MIMEMultipart('alternative')
msg['Subject'] = "Latest Job Matches"
msg['From'] = EMAIL_SENDER
msg['To'] = EMAIL_RECIEVER

msg.attach(MIMEText(html_email, 'html'))

# send by SMTP
with smtplib.SMTP(EMAIL_SMTP, 587) as server:
    server.starttls()
    server.login(EMAIL_SENDER, EMAIL_SENDER_PASSWORD)
    server.sendmail(EMAIL_SENDER, EMAIL_RECIEVER, msg.as_string())
