# LinkedIn Job Postings

## ADS 508 Impacting the Business with a Distributed Data Science Pipeline

In [2]:
# Import packages
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import boto3
import sagemaker
from pyathena import connect
import awswrangler as wr
from collections import Counter
from wordcloud import WordCloud

import warnings
warnings.filterwarnings('ignore')

## Data Ingestion

### CSV to S3

In [3]:
# Setup boto3 session parameters
session = sagemaker.Session()
bucket = session.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
account_id = boto3.client("sts").get_caller_identity().get("Account")

# Establish connection
sm = boto3.Session().client(service_name="sagemaker", region_name=region)

In [4]:
# Set S3 Source Location (Public bucket)
s3_public_path = "s3://linkedin-postings"

In [5]:
%store s3_public_path

Stored 's3_public_path' (str)


In [6]:
# Set S3 Destination Location (Private bucket)
s3_private_path = "s3://{}/linkedin_data".format(bucket)
print(s3_private_path)

s3://sagemaker-us-east-1-109784353618/linkedin_data


In [7]:
%store s3_private_path

Stored 's3_private_path' (str)


In [8]:
# Copy data from Public S3 bucket to Private S3 bucket
!aws s3 cp --recursive $s3_public_path/ $s3_private_path/ --exclude "*" --include "postings/postings.csv"
!aws s3 cp --recursive $s3_public_path/ $s3_private_path/ --exclude "*" --include "salaries/salaries.csv"
!aws s3 cp --recursive $s3_public_path/ $s3_private_path/ --exclude "*" --include "job_skills/job_skills.csv"

copy: s3://linkedin-postings/postings/postings.csv to s3://sagemaker-us-east-1-109784353618/linkedin_data/postings/postings.csv
copy: s3://linkedin-postings/salaries/salaries.csv to s3://sagemaker-us-east-1-109784353618/linkedin_data/salaries/salaries.csv
copy: s3://linkedin-postings/job_skills/job_skills.csv to s3://sagemaker-us-east-1-109784353618/linkedin_data/job_skills/job_skills.csv


In [9]:
# Check files are copied successfully to private bucket
!aws s3 ls $s3_private_path/

                           PRE job_skills/
                           PRE postings/
                           PRE salaries/


### Create Athena Database 

In [10]:
ingest_create_athena_db_passed = False

In [11]:
ingest_create_athena_table_passed = False

In [12]:
database_name = "linkedin_data"

In [13]:
# Set S3 staging directory - a temporary directory for Athena queries
s3_staging_dir = "s3://{}/athena/staging".format(bucket)

In [14]:
# Connect to staging directory
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [15]:
# Create Database
statement = "CREATE DATABASE IF NOT EXISTS {}".format(database_name)

pd.read_sql(statement, conn)

### Verify database has been created

In [16]:
statement = "SHOW DATABASES"

df_show = pd.read_sql(statement, conn)
df_show.head(5)

Unnamed: 0,database_name
0,default
1,dsoaws
2,linkedin_data


In [17]:
if database_name in df_show.values:
    ingest_create_athena_db_passed = True

### Create Athena Tables from CSV Files

In [18]:
table_name = 'postings'
postings_path = "s3://{}/linkedin_data/postings/".format(bucket)

drop_statement = """DROP TABLE IF EXISTS {}.{};""".format(database_name, table_name)

print(drop_statement)
pd.read_sql(drop_statement, conn)
print("Attempted to Drop {} table".format(table_name))

DROP TABLE IF EXISTS linkedin_data.postings;
Attempted to Drop postings table


In [19]:
# SQL statement to execute the postings table
statement = """
    CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
        job_id string,
        company_name string,
        title string,
        description string,
        max_salary float,
        pay_period string,
        location string,
        company_id float,
        views float,
        med_salary float,
        min_salary float,
        formatted_work_type string,
        applies float,
        original_listed_time float,
        remote_allowed float,
        job_posting_url string,
        application_url string,
        application_type string,
        expiry float,
        closed_time float,
        formatted_experience_level string,
        skills_desc string,
        listed_time string,
        posting_domain string,
        sponsored int,
        work_type string,
        currency string,
        compensation_type string,
        normalized_salary float,
        zip_code int,
        fips int
    ) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    LOCATION '{}' 
    TBLPROPERTIES ('skip.header.line.count'='1')
    """.format(database_name, table_name, postings_path)

# Execute statement
pd.read_sql(statement, conn)
print("Created postings table")

Created postings table


In [20]:
table_name_2 = "salaries"
salaries_path = "s3://{}/linkedin_data/salaries/".format(bucket)

drop_statement2 = """DROP TABLE IF EXISTS {}.{};""".format(database_name, table_name_2)

print(drop_statement2)
pd.read_sql(drop_statement2, conn)
print("Attempted to Drop {} table".format(table_name_2))

DROP TABLE IF EXISTS linkedin_data.salaries;
Attempted to Drop salaries table


In [21]:
# SQL statement to execute the postings table
statement = """
    CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
        salary_id int,
        job_id string,
        max_salary float,
        med_salary float,
        min_salary float,
        pay_period string,
        currency string,
        compensation_type string
    ) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    LOCATION '{}' 
    TBLPROPERTIES ('skip.header.line.count'='1')
    """.format(database_name, table_name_2, salaries_path)

# Execute statement
pd.read_sql(statement, conn)
print("Created salaries table")

Created salaries table


In [22]:
table_name_3 = "job_skills"
job_skills_path = "s3://{}/linkedin_data/job_skills/".format(bucket)

drop_statement3 = """DROP TABLE IF EXISTS {}.{};""".format(database_name, table_name_3)

print(drop_statement3)
pd.read_sql(drop_statement3, conn)
print("Attempted to Drop {} table".format(table_name_3))

DROP TABLE IF EXISTS linkedin_data.job_skills;
Attempted to Drop job_skills table


In [23]:
# SQL statement to execute the postings table
statement = """
    CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
        job_id string,
        skill_abr string
    ) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    LOCATION '{}' 
    TBLPROPERTIES ('skip.header.line.count'='1')
    """.format(database_name, table_name_3, job_skills_path)

# Execute statement
pd.read_sql(statement, conn)
print("Created job_skills table")

Created job_skills table


### Verify tables have been created successfully

In [24]:
statement = "SHOW TABLES in {}".format(database_name)

df_show = pd.read_sql(statement, conn)
df_show.head(5)

Unnamed: 0,tab_name
0,job_skills
1,postings
2,salaries


In [25]:
if table_name in df_show.values:
    ingest_create_athena_table_passed = True

In [26]:
# View postings table to check the data looks correct
statement = """SELECT * FROM {}.{} LIMIT 5""".format(database_name, table_name)

pd.read_sql(statement, conn)

Unnamed: 0,job_id,company_name,title,description,max_salary,pay_period,location,company_id,views,med_salary,...,skills_desc,listed_time,posting_domain,sponsored,work_type,currency,compensation_type,normalized_salary,zip_code,fips
0,3887991202,ITP (International Talent Partnership),Director of Operations,"""Job Title: Operations Director (Remote)About ...",,they are now seeking a dynamic and experience...,you will play a pivotal role in driving opera...,,,,...,and quality assurance processes.Excellent lea...,with the ability to inspire and motivate team...,dynamic environment and effectively manage co...,,,United States,74526101.0,4.0,,
1,3887991216,CyberCoders,"""Senior Backend Engineer - Node.js",TypeScript,,"""We are a fully funded startup backed by the s...",machine learning,,,,...,,,,,,,,,,
2,,,,,,,,,,,...,,,,,,,,,,
3,What You'll Be Doing,,,,,,,,,,...,,,,,,,,,,
4,,,,,,,,,,,...,,,,,,,,,,


In [27]:
# View salaries table to check the data looks correct
statement = """SELECT * FROM {}.{} LIMIT 5""".format(database_name, table_name_2)

pd.read_sql(statement, conn)

Unnamed: 0,salary_id,job_id,max_salary,med_salary,min_salary,pay_period,currency,compensation_type
0,1,3884428798,,20.0,,HOURLY,USD,BASE_SALARY
1,2,3887470552,25.0,,23.0,HOURLY,USD,BASE_SALARY
2,3,3884431523,120000.0,,100000.0,YEARLY,USD,BASE_SALARY
3,4,3884911725,200000.0,,10000.0,YEARLY,USD,BASE_SALARY
4,5,3887473220,35.0,,33.0,HOURLY,USD,BASE_SALARY


## Create Athena Parquet Tables from CSV

### Postings Table

In [51]:
table_name = "postings"
postings_parquet_path = "s3://{}/linkedin_data/parquet/postings/".format(bucket)

# SQL statement to execute
statement = """CREATE TABLE IF NOT EXISTS {}.{}
WITH (format = 'PARQUET', external_location = '{}') AS
SELECT job_id,
        company_name,
        title,
        description,
        max_salary,
        pay_period,
        location,
        company_id,
        views,
        med_salary,
        min_salary,
        formatted_work_type,
        applies,
        original_listed_time,
        remote_allowed,
        job_posting_url,
        application_url,
        application_type,
        expiry,
        closed_time,
        formatted_experience_level,
        skills_desc,
        listed_time,
        posting_domain,
        sponsored,
        work_type,
        currency,
        compensation_type,
        normalized_salary,
        zip_code,
        fips
FROM {}.{}""".format(
    database_name, table_name_parquet, postings_parquet_path, database_name, table_name
)

pd.read_sql(statement, conn)

Unnamed: 0,rows


### Salaries table

In [52]:
table_name = "salaries"
salaries_parquet_path = "s3://{}/linkedin_data/parquet/salaries/".format(bucket)

# SQL statement to execute
statement = """CREATE TABLE IF NOT EXISTS {}.{}
WITH (format = 'PARQUET', external_location = '{}') AS
SELECT salary_id,
    job_id,
    max_salary,
    med_salary,
    min_salary,
    pay_period,
    currency,
    compensation_type
FROM {}.{}""".format(
    database_name, table_name_parquet, salaries_parquet_path, database_name, table_name_2
)


pd.read_sql(statement, conn)

Unnamed: 0,rows


### Job Skills table

In [54]:
table_name_parquet = "job_skills"
job_skills_parquet_path = "s3://{}/linkedin_data/parquet/job_skills/".format(bucket)

# SQL statement to execute
statement = """CREATE TABLE IF NOT EXISTS {}.{}
WITH (format = 'PARQUET', external_location = '{}') AS
SELECT job_id,
        skill_abr
FROM {}.{}""".format(
    database_name, table_name_parquet, job_skills_parquet_path, database_name, table_name_3
)

pd.read_sql(statement, conn)

Unnamed: 0,rows


## Data Exploration

In [28]:
# View job skills table to check the data looks correct
statement = """SELECT * FROM {}.{} LIMIT 5""".format(database_name, table_name_3)

pd.read_sql(statement, conn)

Unnamed: 0,job_id,skill_abr
0,3884428798,MRKT
1,3884428798,PR
2,3884428798,WRT
3,3887473071,SALE
4,3887465684,FIN


In [35]:
statement = """SELECT * FROM {}.{}""".format(database_name, table_name_parquet)
postings_df = pd.read_sql(statement, conn)

In [None]:
statement = """SELECT * FROM {}.{}""".format(database_name, table_name_2)
salaries_df = pd.read_sql(statement, conn)

In [None]:
statement = """SELECT * FROM {}.{}""".format(database_name, table_name_3)
job_skills_df = pd.read_sql(statement, conn)

### postings EDA

In [36]:
# Basic stats
print(postings_df.describe())
print(postings_df.info())

       job_id company_name title description max_salary pay_period location  \
count       0            0     0           0          0          0        0   
unique      0            0     0           0          0          0        0   
top       NaN          NaN   NaN         NaN        NaN        NaN      NaN   
freq      NaN          NaN   NaN         NaN        NaN        NaN      NaN   

       company_id views med_salary  ... skills_desc listed_time  \
count           0     0          0  ...           0           0   
unique          0     0          0  ...           0           0   
top           NaN   NaN        NaN  ...         NaN         NaN   
freq          NaN   NaN        NaN  ...         NaN         NaN   

       posting_domain sponsored work_type currency compensation_type  \
count               0         0         0        0                 0   
unique              0         0         0        0                 0   
top               NaN       NaN       NaN      NaN  

In [None]:
# Data types
postings_data_types = postings_df.dtypes
print(postings_data_types)

In [None]:
# Missing values
postings_df.isnull().sum()

In [None]:
# Duplicates
postings_df.duplicated().sum()

In [None]:
# Summary statistics
print(f"Unique Job Titles: {postings_df['title'].nunique()}")
print(f"Unique Companies: {postings_df['company_name'].nunique()}")

In [None]:
# Visualize most common job titles
plt.figure(figsize=(12,6))
postings_df['title'].value_counts().head(10).plot(kind='bar', color='green')
plt.xlabel("Job Title")
plt.ylabel("Number of Postings")
plt.title("Most Common Job Titles")
plt.xticks(rotation=45)
plt.show()

In [None]:
# Posting trends
postings_df['listed_time'] = postings_df.to_datetime(postings_df['listed_time'], unit='s')  # Convert Unix timestamp to datetime
postings_df.set_index('listed_time', inplace=True)

# Resample by month to see posting trends
postings_df.resample('M').size().plot(figsize=(12, 5), title="Job Posting Trends Over Time")
plt.ylabel("Number of Postings")
plt.show()

### salaries EDA

In [None]:
# Basic stats
print(salaries_df.describe())
print(salaries_df.info())

In [None]:
# Data types
salaries_data_types = salaries_df.dtypes
print(salaries_data_types)

In [None]:
# Missing values
salaries_df.isnull().sum()

In [None]:
# Duplicates
salaries_df.duplicated().sum()

In [None]:
# Salary distributions
print("Max Salary: ", salaries_df['max_salary'].describe())
print("Min Salary: ", salaries_df['min_salary'].describe())
print("Median Salary: ", salaries_df['med_salary'].describe())
# print(df[['min_salary', 'med_salary', 'max_salary', 'normalized_salary']].describe())

In [None]:
# Visualize salary distribution
plt.figure(figsize=(10,5))
sns.barplot(postings_df['normalized_salary'], bins=30, kde=True, color="blue")
plt.xlabel("Salary")
plt.ylabel("Frequency")
plt.title("Salary Distribution")
plt.show()

### job_skills EDA

In [None]:
# Basic stats
print(job_skills_df.describe())
print(job_skills_df.info())

In [None]:
# Data types
job_skills_data_types = job_skills_df.dtypes
print(job_skills_data_types)

In [None]:
# Missing values
job_skills_df.isnull().sum()

In [None]:
# Duplicates
job_skills_df.duplicated().sum()

In [None]:
# Summary statistics
print(f"Unique Job Skills: {job_skills_df['skill_abr'].nunique()}")

In [None]:
# Concatenate all skills descriptions
all_skills = " ".join(str(skill) for skill in job_skills_df['skill_abr'].dropna())

# Count word frequencies
word_counts = Counter(all_skills.split())

# Generate WordCloud
wordcloud = WordCloud(width=800, height=400, background_color="white").generate_from_frequencies(word_counts)

# Visualize most common job skills in job descriptions
plt.figure(figsize=(12,6))
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.title("Most Common Skills in Job Postings")
plt.show()

### Bias Analysis

In [None]:
# Detecting Skewed Data in Job Roles shown above
# Detecting salary bias shown above

In [None]:
# Detecting Biased Language in Job Descriptions (might favor aggressive hiring cultures)

# List of biased words to check
biased_words = ["dominant", "aggressive", "competitive", "strong leader"]

# Count biased words in descriptions
word_counts = Counter(" ".join(str(desc) for desc in postings_df['description'].dropna()).lower().split())

# Filter for biased words
biased_word_freq = {word: word_counts[word] for word in biased_words if word in word_counts}

# Visualize word frequency
plt.figure(figsize=(10,5))
sns.barplot(x=list(biased_word_freq.keys()), y=list(biased_word_freq.values()), palette="Reds_r")
plt.xlabel("Biased Words")
plt.ylabel("Frequency")
plt.title("Occurrence of Potentially Biased Words in Job Descriptions")
plt.show()