## Feature Engineering - Gemini Extract Requirements - Step 1

### Imports

In [None]:
%pip install -U -q google-generativeai

Python interpreter will be restarted.
Python interpreter will be restarted.


In [None]:
import pathlib
import textwrap

import google.generativeai as genai
import google.ai.generativelanguage as glm


from IPython.display import display
from IPython.display import Markdown

from google.api_core import retry

In [None]:
import pandas as pd
import glob
import rapidfuzz.utils
import thefuzz
from thefuzz import process, utils
from copy import copy
import ast
from rapidfuzz import fuzz as rapidfuzz


VALUE_ERROR = -1

In [None]:
import json
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [None]:
!pip install thefuzz
!pip install rapidfuzz

### Gemini Setup

In [None]:
GOOGLE_API_KEY = my_api_key
genai.configure(api_key=GOOGLE_API_KEY)

In [None]:
edu_item = glm.Schema(
    type = glm.Type.OBJECT,
    properties = {
        'type_of_education':  glm.Schema(type=glm.Type.STRING),
        'education_in_field':  glm.Schema(type=glm.Type.STRING),
    },
    required=['type_of_education', 'education_in_field']
)
exp_item = glm.Schema(
    type = glm.Type.OBJECT,
    properties = {
        'experience_in_field':  glm.Schema(type=glm.Type.STRING),
        'minimal_years_of_experience_in_the_field': glm.Schema(type=glm.Type.STRING)
    },
    required=['experience_in_field', 'minimal_years_of_experience_in_the_field']
)

In [None]:
edu_reqs = glm.Schema(
    type=glm.Type.ARRAY,
    items=edu_item
)
exp_reqs = glm.Schema(
    type=glm.Type.ARRAY,
    items=exp_item
)

In [None]:
extract_requirements = glm.FunctionDeclaration(
    name="extract_requirements",
    description=textwrap.dedent("""\
        extract required education items and required experience items from the job posting
        """),
    parameters=glm.Schema(
        type=glm.Type.OBJECT,
        properties = {
            'required_education_item_array': edu_reqs,
            'required_experience_item_array': exp_reqs,
            },
        required=['required_education_item_array', 'required_experience_item_array']
    )
)

In [None]:
model = model = genai.GenerativeModel(
    model_name='gemini-1.0-pro-latest',
    tools = [extract_requirements]
    )

In [None]:
prompt = """
Please extract education and experience requirements from the following job posting description following these guidelines:
required_education_item_array is an array of the education requirement items that appear in the job posting description.
all required education mentioned should be extracted and each should appear seperatly in an item containing the following two properties:
    1. type_of_education - string of the type of education. guidelines specific to this type are:
        a. for any non academic education requirement use \"Non-Academic\", including high school
        b. for academic degrees use abbreviations when possible (e.g. \"BS\" instead of \"Bachelor of Science\")
        c. for certificate programmes use \"Certification\"
        d. for diplomas use \"Diploma\" (not high school diplomas)
        e. in any other case or when not sure, extract type \"Other\"
    2. education_in_field - string of the field of education related to type_of_education mentioned above. guidelines specific to this type are:
        a. omit the type from this field as it should be exracted seperatly to the previous field of this item.
        b. use the most general and consice term for the topic of study (e.g. \"Psychology\" instead of \"Behavioral and Clinical Psychological Analysis\")
required_experience_item_array is an array of the experience requirement items that appear in the job posting description.
all required experience mentioned should be extracted and each should appear seperatly in an item containing the following two properties:
    1. experience_in_field - string of the field title or previous position title in which the experience requirement is. guidelines specific to this type are:
        a. use the most general and consice term for the field
        b. if no field or past position is mentioned, extract \"Any\"
    2. minimal_years_of_experience_in_the_field - string of float of the number of years required in the field mentioned above. guidelines specific to this type are:
        a. if time ranges of experience in field are mentioned, extract the lower number (e.g. \"5 to 10 years of experience\" should be \"5.0\", \"7+ years\" should be \"7.0\")
        b. if number of months is mentioned like in \"4 years and 3 months required\", write it as \"4.25\".
        c. if no time related to the experience field is mentioned, write null here.

the job posting description:

"""

### Load Job Posting data

In [None]:
dir_path = "dbfs:/FileStore/tables/"
file_name = "job_skills_part_1.parquet"
file_path = dir_path + file_name
df1 = spark.read.parquet(file_path, header = True)
df2 = spark.read.parquet("dbfs:/FileStore/job_skills_part_2.parquet", header = True)
df3 = spark.read.parquet("dbfs:/FileStore/job_skills_part_3.parquet", header = True)
df4 = spark.read.parquet("dbfs:/FileStore/job_skills_part_4.parquet", header = True)
df5 = spark.read.parquet("dbfs:/FileStore/job_skills_part_5.parquet", header = True)
df6 = spark.read.parquet("dbfs:/FileStore/job_skills_part_6.parquet", header = True)

df = df1.union(df4).union(df3).union(df2).union(df5).union(df6)

In [None]:
df.count()

Out[32]: 1296381

### Job Posting position column to chosen positions conversion

In [None]:
choices_new = spark.read.parquet("dbfs:/user_data/g37/choices_new.parquet")
choices = list(choices_new.toPandas().to_dict()['0'].values())
choices[:5]

Out[19]: ['Data Manager',
 'Clinical Coordinator',
 'Division Manager',
 'Logistics Analyst',
 'Building Manager']

In [None]:
def get_match(title):
    matches = process.extractOne(title, choices, scorer=rapidfuzz.token_set_ratio,
                                 processor=thefuzz.process.default_processor)
    if matches[1] < 90:
        return None
    return matches[0]
get_match_udf = F.udf(get_match)

In [None]:
dfsk = df.select("*")
dfsk = dfsk.withColumn('narrow_position', get_match_udf(F.col('position')))
dfsk = dfsk.dropna(subset=['narrow_position'])
dfsk.count()

Out[23]: 442646

### Run Gemini On Job Positions And Save

In [None]:
from pyspark.sql.types import *
from pyspark.sql import functions as F

def convert_summary(summary):
    description = prompt + summary
    success = False
    max_tries = 1
    while not success and max_tries > 0:
        try:
            res = model.generate_content(description)
            # res = 0
            success = True
        except:
            success = False
            max_tries -= 1
    if max_tries == 0:
        return None
    try:
        fc = res.candidates[0].content.parts[0].function_call
        jfc = json.dumps(type(fc).to_dict(fc), indent=4)
        res_dict = json.loads(jfc)
        return str({k: v if v is not None else [] for k, v in res_dict["args"].items()})
    except Exception:
        return None

In [None]:
n_samples = 24
sample_size = 1000
dfsk_len = dfsk.count()

for i in range(n_samples):
    sdf = dfsk.sample(fraction=sample_size/dfsk_len).toPandas()
    sdf['job_summary_processed'] = sdf['job_summary'].apply(convert_summary)
    spark.createDataFrame(sdf).write.parquet(f"dbfs:/user_data/g37/skills_true_narrowed_{sample_size}_{i}.parquet")