In [2]:
import lzma
import json
import pandas as pd
from geopy.geocoders import GoogleV3
import os
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation
from textblob import TextBlob
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import langid
import re
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.cluster import KMeans
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import Normalizer
import nltk

# Job Description

In [3]:
# load job ad data in .json.xz format
job_ad_file_path = 'data/ads-50k.json.xz'

job_ad_array = []
# Read the compressed JSON file into a DataFrame
with lzma.open(job_ad_file_path, 'rt', encoding='utf-8') as file:
    for i, line in enumerate(file):
        data = json.loads(line.strip())
        job_ad_array.append(data)
if job_ad_array:
    job_ad_df = pd.DataFrame(job_ad_array)



## 1. Convert location info to coordinate

In [4]:
def combine_location_area(row:object)->str:
    '''
    Aggregate values from three location related columns(location, suburb, area)
    After analysis, there is not missing data from location, 17156 missing from area and 12998 from suburb. The value of suburb is the most appropriate level of data, but due to missing data issue, and possible duplicated suburb name across Australia, if suburb data is available, we use location+suburb, if suburb data is unavailable but area data is available, we use location+area instead, or we use location only if both suburb and area are unavailable.
    :param row:
    :return:
    '''
    # if a suburb is not found and can't be found in location
    if pd.notnull(row['suburb']):
        if row['suburb'] == row['location']:
            return row['location']
        elif row['suburb'] not in row['location']:
            combined = f"{row['location']} {row['suburb']}"
            return combined
    # if suburb cant be found, use area info to enrich location info
    elif pd.notnull(row['area']) and row['area'] not in row['location']:  # Check if area is not null
        # Split area into components using regex
        # areas = [item.strip() for item in re.split(r', | & ', row['area'])]
        # Combine each area component with location
        combined = f"{row['location']} {row['area']}"
        return combined
    return row['location']  # Return location as a list if area is null

def getLocationInfo(location_name:str)->dict:
    '''
    Call google API to get location details based on address provided
    :param location_name: address
    :return: location_name,location.address, location.latitude, location.longitude
    '''
    # Initialize GoogleV3 API with your API key
    api_key = "AIzaSyCe9mKkMM6-zHF060DakgrULDAWWvUmtDA"
    geolocator = GoogleV3(api_key=api_key)
    # Provide the location name
    # location_name = "Shepparton & Goulburn Valley"

    # Get location information
    location = geolocator.geocode(location_name)

    # Extract latitude and longitude
    if location:
        return location_name,location.address, location.latitude, location.longitude
    else:
        print("Location not found.")
        return location_name,None,None,None
def check_oversea(row:object)->object:
    '''
    CHECK THE COUNTRY OF AN ADDRESS
    :param value: the address value from the dataframe
    :return:
    if the address is an Australian address, return 0
    else if the address is a New Zealand address, return 1
    else return 3 for rest of countries
    '''
    if 'Australia' in row["address"]:
        row["global"] = 0
        row["country"] = 'Australia'
    elif 'New Zealand' in row["address"]:
        row["global"] = 1
        row["country"] = 'New Zealand'
    else:
        row["global"] = 3
        row["country"] = row["address"].split(',')[-1]
    return row

def update_valid_state_data(row:object)->object:
    '''
    Get state data from address for Australian addresses
    :param row: row data
    :return: row data
    '''
    valid_state_list = ['VIC','QLD','NSW','SA','NT','TAS','ACT']
    if row['global']==0:
        for state in valid_state_list:
            if state in row['address']:
                row['state'] = state
                return row
    else:
        if 'Victoria' in row['address']:
            row['state'] = 'VIC'
            return row
        elif 'Queensland' in row['address']:
            row['state'] =  'QLD'
            return row
        elif 'Tasmania' in row['address']:
            row['state'] =  'TAS'
            return row
        elif 'Canberra' in row['address']:
            row['state'] =  'ACT'
            return row
    row['state'] = 'NA'
    return row

In [23]:
expanded_df = pd.json_normalize(job_ad_df['metadata'])
# Combine the expanded columns with the original DataFrame
job_ad_extend_df = pd.concat([job_ad_df.drop(columns=['metadata']), expanded_df], axis=1)
# Rename columns using a dictionary
job_ad_extend_df = job_ad_extend_df.rename(columns={
    'classification.name': 'classification',
    'subClassification.name': 'subClassification',
    'location.name': 'location',
    'workType.name': 'workType',
    'area.name': 'area',
    'suburb.name': 'suburb'
})

In [6]:
# generate a new column by combining location and area
job_ad_extend_df['area_location'] = job_ad_extend_df.apply(combine_location_area, axis=1)
# drop unnecessary columns
job_ad_extend_df = job_ad_extend_df.drop(['location', 'area', 'suburb'], axis=1)

In [7]:
# a temp file to avoid call api all the time
file_path = "data/coordinate.csv"

# Check if the file exists
if os.path.exists(file_path):
    print(f"The file '{file_path}' exists.")
    coordinate_data_df = pd.read_csv(file_path,index_col=0)
else:
    coordinate_data = [getLocationInfo(loc) for loc in unique_area_location]
    incomplete_coordinate_data = [item[0] for item in coordinate_data if item[1] is None]
    # remove if location can not be found
    coordinate_data = [item for item in coordinate_data if item[1] is not None]
    # 1st fixing for ACT recognition issue
    fixing_ACT = [item.replace('ACT','Canberra') for item in incomplete_coordinate_data if 'ACT' in item]
    coordinate_data_ACT = [getLocationInfo(loc) for loc in fixing_ACT]
    coordinate_data_ACT = [[item[0].replace('Canberra','ACT'),item[1],item[2],item[3]] for item in coordinate_data_ACT if item[1] is not None]
    # there are still some locations cant be recognized by google API,
    fixed_ACT_list = [item[0] for item in coordinate_data_ACT]
    incomplete_coordinate_data = [item for item in incomplete_coordinate_data if item not in fixed_ACT_list]
    len(job_ad_extend_df[job_ad_extend_df["area_location"].isin(incomplete_coordinate_data)])
    job_ad_extend_filtered_df = job_ad_extend_df[~job_ad_extend_df['area_location'].isin(incomplete_coordinate_data)]
    coordinate_data_df = pd.DataFrame(coordinate_data, columns=['mapping_address','address', 'Latitude', 'Longitude'])
    coordinate_data_df = coordinate_data_df.apply(check_oversea, axis=1)
    coordinate_data_df = coordinate_data_df.apply(update_valid_state_data, axis=1)
    coordinate_data_df.to_csv('data/coordinate.csv', index=False)

The file 'data/coordinate.csv' exists.


In [8]:
# join coordinates with job ad dataframe
job_ad_extend_coord_df = job_ad_extend_df.merge(coordinate_data_df, left_on='area_location', right_on='mapping_address', how='left')[['id', 'title', 'abstract', 'content', 'additionalSalaryText',
       'classification', 'subClassification', 'workType', 'area_location',
        'address', 'Latitude', 'Longitude',
       'global', 'state', 'country']]
# drop data with incomplete address(326 rows affected)
job_ad_extend_coord_df = job_ad_extend_coord_df[job_ad_extend_coord_df["address"].notnull()]

## 2. Clean text data

In [9]:
# nltk.download('stopwords')
# nltk.download('wordnet')
custom_stop_words = ["please","letter","within","at","of","looking","their","they","about","are","if","have","is","us","on","our","this","to","be","for","in","with","i","you","we","a","and","the","will", "join", "work", "opportunity", "new", "role", "based","company","currently","duty","candidate","applicant","end","position","skills","ability",'application','applications',"apply","click","responsibility","responsibilities"]
# Function to extract and clean text from HTML
def extract_text_from_html(html:str)->str:
    '''
    Remove html related syntax from content
    :param html: text data
    :return: cleaned text
    '''
    soup = BeautifulSoup(html, 'html.parser')
    text = soup.get_text(separator=' ')  # Extract text and separate paragraphs with spaces
    return text
def preprocess_text(text:str)->str:
    '''
    Preprocess text data
    :param text: jd text data
    :return: cleaned text
    '''
    # 1) Lowercase
    text = text.lower()
    # 2) Remove punctuation (optional)
    text = ''.join([char for char in text if char.isalnum() or char == ' '])
    # 3) Remove words with numbers
    text = re.sub(r'\b\w*\d+\w*\b', '', text)  # Removes words with numbers
    text = re.sub(r'\s+', ' ', text).strip()

    stop_words = set(stopwords.words('english'))
    words = text.split()
    # 4) Remove stopwords
    filtered_words = [word for word in words if (word not in stop_words) and (word not in custom_stop_words)]
    # 5) Apply lemmatization
    lemmatizer = WordNetLemmatizer()
    lemmatized_words = [lemmatizer.lemmatize(word) for word in filtered_words]
    return ' '.join(lemmatized_words)

def is_english(text:str)->bool:
    '''
    Check if text is in English language
    :param text: input text
    :return: True if text is in English language
    '''
    predictions = langid.classify(text)  # Predict the top language
    return predictions[0] == 'en' or predictions[0] == 'fr'

# Apply HTML text extraction
job_ad_extend_coord_df['cleaned_content'] = job_ad_extend_coord_df['content'].apply(extract_text_from_html)
# Combine abstract and content for analysis
job_ad_extend_coord_df['abstract_content'] = job_ad_extend_coord_df['abstract'] + " " + job_ad_extend_coord_df['cleaned_content']
job_ad_extend_coord_df['abstract_content'] = job_ad_extend_coord_df['abstract_content'].apply(preprocess_text)
job_ad_extend_coord_df['cleaned_title'] = job_ad_extend_coord_df['title'].apply(preprocess_text)
job_ad_extend_coord_df["is_english"] = job_ad_extend_coord_df['abstract_content'].apply(is_english)
#remove non-english records
job_ad_extend_coord_df = job_ad_extend_coord_df[job_ad_extend_coord_df["is_english"]]

## 3. Exract Salary Info

In [10]:
def extract_salary_info(text:str)->dict:
    '''
    Extract salary information from text
    :param text: salary text
    :return: salary information in (salary_value, salary_unit) format
    '''
    # Helper function to clean and convert numbers
    def parse_value(value_str, is_k=False):
        value = float(value_str.replace(',', '')) if value_str else 0
        return value * 1000 if is_k else value
    if type(text).__name__ == "str":
        # Regex patterns with priority order
        patterns = [
            # Hourly rates
            (r'(?:(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:to|-|up\s*to)\s*)?(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:p\.h\.|per\s+hour|/hr?)\b',
             lambda m: ((parse_value(m.group(2), bool(m.group(3))) + parse_value(m.group(5), bool(m.group(6))))/2)
                       if m.group(2) else parse_value(m.group(5), bool(m.group(6))), 'hour'),

            # Daily rates
            (r'(?:(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:to|-)\s*)?(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:p\.d\.|per\s+day)\b',
             lambda m: ((parse_value(m.group(2), bool(m.group(3))) + parse_value(m.group(5), bool(m.group(6))))/2)
                       if m.group(2) else parse_value(m.group(5), bool(m.group(6))), 'day'),

            # Weekly rates
            (r'(?:(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:to|-)\s*)?(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:p\.w\.|per\s+week)\b',
             lambda m: ((parse_value(m.group(2), bool(m.group(3))) + parse_value(m.group(5), bool(m.group(6))))/2)
                       if m.group(2) else parse_value(m.group(5), bool(m.group(6))), 'week'),

            # Monthly rates
            (r'(?:(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:to|-)\s*)?(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:p\.m\.|per\s+month)\b',
             lambda m: ((parse_value(m.group(2), bool(m.group(3))) + parse_value(m.group(5), bool(m.group(6))))/2)
                       if m.group(2) else parse_value(m.group(5), bool(m.group(6))), 'month'),

            # Annual rates
            (r'(?:(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:to|-)\s*)?(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:p\.a\.|per\s+year|per\s+annum)\b',
             lambda m: ((parse_value(m.group(2), bool(m.group(3))) + parse_value(m.group(5), bool(m.group(6))))/2)
                       if m.group(2) else parse_value(m.group(5), bool(m.group(6))), 'year'),

            # Generic ranges
            (r'(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:to|-|−)\s*(\$|£|€)?(\d[\d,.]*)(k?)(?:\s*(?:per|p\.)(?:\s*\w+)?)?',
             lambda m: (parse_value(m.group(2), bool(m.group(3))) + parse_value(m.group(5), bool(m.group(6))))/2, None),

            # Single values with units
            (r'(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:p\.h\.|per\s+hour|/hr?)\b', lambda m: parse_value(m.group(2), bool(m.group(3))), 'hour'),
            (r'(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:p\.d\.|per\s+day)\b', lambda m: parse_value(m.group(2), bool(m.group(3))), 'day'),
            (r'(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:p\.w\.|per\s+week)\b', lambda m: parse_value(m.group(2), bool(m.group(3))), 'week'),
            (r'(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:p\.m\.|per\s+month)\b', lambda m: parse_value(m.group(2), bool(m.group(3))), 'month'),
            (r'(\$|£|€)?(\d[\d,.]*)(k?)\s*(?:p\.a\.|per\s+year|per\s+annum)\b', lambda m: parse_value(m.group(2), bool(m.group(3))), 'year'),

            # Single values without units
            (r'(\$|£|€)?(\d[\d,.]*)(k?)\b', lambda m: parse_value(m.group(2), bool(m.group(3))), None),
        ]

        for pattern, value_func, unit in patterns:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                try:
                    value = value_func(match)
                    # Default unit logic
                    if unit is None:
                        unit = 'year' if (value > 10000 or 'k' in match.group(0).lower()) else None
                    return (round(value, 2), unit) if unit else (round(value, 2), 'year')
                except Exception as e:
                    continue
    return (None, None)

job_ad_extend_coord_df[["salary_value", "salary_unit"]] = job_ad_extend_coord_df["additionalSalaryText"].apply(extract_salary_info).apply(pd.Series)
job_ad_extend_coord_df["salary_unit"] = job_ad_extend_coord_df["salary_unit"].astype(str)


In [12]:
job_ad_extend_coord_df.columns

Index(['id', 'title', 'abstract', 'content', 'additionalSalaryText',
       'classification', 'subClassification', 'workType', 'area_location',
       'address', 'Latitude', 'Longitude', 'global', 'state', 'country',
       'cleaned_content', 'abstract_content', 'cleaned_title', 'is_english',
       'salary_value', 'salary_unit'],
      dtype='object')

In [14]:
job_ad_extend_coord_df = job_ad_extend_coord_df[['id', 'classification', 'subClassification', 'workType', 'Latitude', 'Longitude', 'global', 'state', 'country', 'abstract_content', 'cleaned_title', 'salary_value', 'salary_unit']]
job_ad_extend_coord_df["id"] = job_ad_extend_coord_df["id"].astype(str)
job_ad_extend_coord_df.to_csv("data/job_ad_extend_coord_df.csv")

# Event Data

In [15]:
# Path to your .csv.xz file
event_file_path = 'data/ads-50k-events.csv.gz'

# Read the compressed CSV file into a DataFrame
with lzma.open(event_file_path, 'rt', encoding='utf-8') as file:
    event_data_df = pd.read_csv(event_file_path)

In [17]:
event_data_df['resume_id'] = event_data_df['resume_id'].astype(str)
event_data_df['job_id'] = event_data_df['job_id'].astype(str)

In [18]:
# remove duplicated rows
event_data_dedup_df = event_data_df.groupby(["event_datetime","resume_id","job_id","event_platform","kind"]).first().reset_index()

In [19]:
event_data_dedup_df.to_csv("event_data_dedup_df.csv")

# Merge


In [22]:
job_ad_extend_coord_df = pd.read_csv("data/job_ad_extend_coord_df.csv",index_col=0)
event_data_dedup_df = pd.read_csv("data/event_data_dedup_df.csv",index_col=0)
job_event_join_df = event_data_dedup_df.merge(job_ad_extend_coord_df, left_on='job_id', right_on='id', how='inner')
job_event_join_df = job_event_join_df[['resume_id', 'job_id', 'event_platform',
       'kind', 'cleaned_title', 'classification', 'subClassification',
       'workType', 'Latitude', 'Longitude','global', 'state', 'country',
       'abstract_content', 'salary_value','salary_unit']]
job_event_join_df["resume_id"] = job_event_join_df["resume_id"].astype('str')
job_event_join_df["job_id"] = job_event_join_df["job_id"].astype('str')
job_event_join_df['cleaned_title'].fillna('NA',inplace = True)
job_event_join_df['abstract_content'].fillna('NA',inplace = True)
job_event_join_df["salary_unit"].fillna('NA',inplace = True)
job_event_join_df["salary_value"].fillna(0,inplace = True)
job_event_join_df.to_csv("data/job_event_join_df.csv")

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  job_event_join_df['cleaned_title'].fillna('NA',inplace = True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  job_event_join_df['abstract_content'].fillna('NA',inplace = True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate ob

In [24]:
job_event_join_df.columns

Index(['resume_id', 'job_id', 'event_platform', 'kind', 'cleaned_title',
       'classification', 'subClassification', 'workType', 'Latitude',
       'Longitude', 'global', 'state', 'country', 'abstract_content',
       'salary_value', 'salary_unit'],
      dtype='object')