In [1]:
import warnings
import sys
import os

# adding paths for project modules
CUR_DIR_WARNING = (
    "__file__ was not available, os.getcwd() was used instead. "
    "You may need to change the working directory."
)
try:
    CURRENT_DIRECTORY = os.path.dirname(__file__)
except NameError:
    CURRENT_DIRECTORY = os.getcwd()
    warnings.warn(CUR_DIR_WARNING)

if CURRENT_DIRECTORY not in sys.path:
    sys.path.append(CURRENT_DIRECTORY)
    
from config import PROJECT_ROOT_RELATIVE
PROJECT_ROOT = os.path.abspath(
    os.path.join(CURRENT_DIRECTORY, PROJECT_ROOT_RELATIVE)
)

if PROJECT_ROOT not in sys.path:
    sys.path.append(PROJECT_ROOT)
    print(f"{PROJECT_ROOT} was added to sys.path")



/home/jovyan/work/repos/jobs-research was added to sys.path


In [2]:
from math import ceil
import pandas as pd
import datetime as dt
from google.cloud import bigquery

from common.utils import (
    df_to_bq,
    bq_table_to_df,
    bq_merge,
    print_dict,
)
from functions import (
    get_post_id,
    LoadsLogger,
)
from mappings import (
    POSITIONS, 
    CITY_CLUSTERS,
    find_position_in_text, 
    collapse_city_groups, 
    prepare_mapping_dict,
)
from config import (
    PRINT_SQL,
    JOBS_POSTINGS_FINAL_COLS,
    BQ_DWH_PARAMS,
    BQ_ADB_PARAMS,
    GCP_NAME,
    SERVER, #switching between test/prod parameters
)

In [3]:
location = BQ_DWH_PARAMS[SERVER]['location']
bq_client = bigquery.Client(location=location)
source_tables_prefix = f"{GCP_NAME[SERVER]}.{BQ_DWH_PARAMS[SERVER]['dataset_name']}."
dataset = BQ_DWH_PARAMS[SERVER]['dataset_name']
analytical_dataset = BQ_ADB_PARAMS[SERVER]['dataset_name']
project = GCP_NAME[SERVER]
pipeline_name = "jobs_posting_transform"

In [4]:
# fetch new data
# deal with doubled posts
# normalize attributes
# update analytical tables

In [5]:
#----------------------------------------------------fetch new data------------------------------------------------

In [6]:
df_posting_load_query = f"""
with processed_loads as (
  select dlt_load_id
  from `{source_tables_prefix}_jp_processed_loads`
  where processed_by = '{pipeline_name}'
  group by dlt_load_id
  having max(finished_at) is not Null
)

, new_loads as (
  select distinct load_id
  from `{source_tables_prefix}._dlt_loads` as dl
  left join processed_loads as pl on dl.load_id = pl.dlt_load_id
  where dl.status = 0
    and pl.dlt_load_id is Null
)

select
    _dlt_load_id
    ,_dlt_id
    ,company
    ,city
    ,title
    ,occupation
    ,url
    ,portal
    ,experience_requirements__months_of_experience
    ,date_created
    ,description 
from `{source_tables_prefix}.jobs_posting` as jp
inner join new_loads nl on jp._dlt_load_id = nl.load_id
where locale = "en_DE"
"""
if PRINT_SQL:
    print(df_posting_load_query)
df_posting = bq_client.query(df_posting_load_query).to_dataframe()
print(f"Fetched {len(df_posting)} raws from `{source_tables_prefix}.jobs_posting`")

if df_posting.empty:
    print("No data to process")
    sys.exit(0)


with processed_loads as (
  select dlt_load_id
  from `x-avenue-450615-c3.job_postings_test._jp_processed_loads`
  where processed_by = 'jobs_posting_transform'
  group by dlt_load_id
  having max(finished_at) is not Null
)

, new_loads as (
  select distinct load_id
  from `x-avenue-450615-c3.job_postings_test.._dlt_loads` as dl
  left join processed_loads as pl on dl.load_id = pl.dlt_load_id
  where dl.status = 0
    and pl.dlt_load_id is Null
)

select
    _dlt_load_id
    ,_dlt_id
    ,company
    ,city
    ,title
    ,occupation
    ,url
    ,portal
    ,experience_requirements__months_of_experience
    ,date_created
    ,description 
from `x-avenue-450615-c3.job_postings_test..jobs_posting` as jp
inner join new_loads nl on jp._dlt_load_id = nl.load_id
where locale = "en_DE"

Fetched 0 raws from `x-avenue-450615-c3.job_postings_test..jobs_posting`
No data to process


SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
new_loads = LoadsLogger(df_posting, dataset, project)

In [None]:
new_loads.start(pipeline_name)

In [None]:
df_posting.drop(columns="_dlt_load_id", inplace=True)

In [None]:
#----------------------------------------------------deal with doubled posts------------------------------------------------

#consider posts with the same title, description, location, and hiring company the same
df_posting["job_id"] = df_posting[["title", "company", "city", "description"]].apply(get_post_id, axis=1, raw=True)

#marking the last post, only this one will go to the analytical table
df_posting.sort_values(
    ["job_id", "date_created"], 
    ascending = False, 
    inplace = True
)
df_posting["is_source"] = df_posting.groupby(by="job_id").cumcount()==0

#save mapping from old id on new
df_dlt_to_post_id = df_posting[["_dlt_id", "job_id", "is_source"]].copy(deep = True)

#get rid of doubles
df_posting = df_posting[df_posting.is_source].copy()

df_posting.drop(columns=['_dlt_id', 'is_source'], inplace = True)

#----------------------------------------------------normalize attributes------------------------------------------------

#preparing fields for mapping attributes
df_posting["title_lower_no_spaces"] = df_posting.title.map(
    lambda x: x.lower().replace(" ", "")
)
df_posting["occupation_lower_no_spaces"] = df_posting.occupation.map(
    lambda x: x.lower().replace(" ", "")
)

#preparing mapping rules
map_dicts_positions_prepared = [
    prepare_mapping_dict(*mapping_dict) for mapping_dict in POSITIONS
]

#----------------------------------------------------normalize positions------------------------------------------------

df_posting["position"] = None

for md in map_dicts_positions_prepared:
    if not (md.case_sensitive & md.spaces_sensitive):
        text_columns = ["title_lower_no_spaces", "occupation_lower_no_spaces"]
    elif md.case_sensitive & md.spaces_sensitive:
        text_columns = ["title", "occupation"]
    else:
        raise ValueError(
            "You need a small refinement to use case_sensitive != spaces_sensitive"
        )
    df_posting["position"] = df_posting[["position", *text_columns]].apply(
        lambda x: (
            x.iloc[0]
            if x.iloc[0] is not None
            else find_position_in_text(x.iloc[1:], md.mapping_dict)
        ),
        axis=1,
    )
    
df_posting.drop(columns=[
    "title_lower_no_spaces",
    "occupation_lower_no_spaces",
    "title",
    "occupation"
], inplace=True)

#----------------------------------------------------normalize cities------------------------------------------------

df_posting["city_group"] = df_posting.city.map(lambda x: collapse_city_groups(x, CITY_CLUSTERS))
df_posting.drop(columns="city", inplace=True)

df_posting['years_of_experience']=(df_posting['experience_requirements__months_of_experience']
                                       .map(lambda x: None if pd.isna(x) else ceil(x/12)
                                    )
)
df_posting.drop(columns="experience_requirements__months_of_experience", inplace=True)




In [None]:
#----------------------------------------------------update analytical tables------------------------------------------------

#download data to the tmp table
jobs_columns = list(JOBS_POSTINGS_FINAL_COLS.keys())
df_posting.rename(columns = {"job_id": "id"}, inplace=True)
df_posting = df_posting[jobs_columns]
df_to_bq(df_posting, '_jp_jobs_batch', dataset, project, truncate=True)

In [None]:
# save info about new ids 
df_dlt_to_post_id.rename(columns = {"_dlt_id": "dlt_id"}, inplace=True)
df_dlt_to_post_id['matched_at'] = dt.datetime.now()
df_to_bq(df_dlt_to_post_id, '_jp_dlt_ids_matching', dataset, project, truncate=False)

In [None]:
#update main analytical table
bq_merge(
    f"{project}.{analytical_dataset}.jobs",
    f"{project}.{dataset}._jp_jobs_batch", 
    "id",
    jobs_columns[1:], #exclude key column
    print_sql = PRINT_SQL,
)

In [None]:
# log
new_loads.finish(pipeline_name)