In [3]:
from pyspark.sql import SparkSession as sc
from pyspark.sql.functions import col, lit, explode, avg, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql import functions as F

sc = sc.builder.getOrCreate()

In [5]:
# path = r'file:///E:\IDE_workspace\VSCode\recruitment_project\Data\computer-information-technology\silver_layer.csv'
hdfs_path = 'hdfs://192.168.48.136:9000/recruitment/silver/'
df = sc.read.options(header=True, delimiter=',').csv(hdfs_path)

df = df.drop('year', 'month', 'day', 'source', 'jobCategory')

df = df.withColumn("max_salary", col("max_salary").cast("integer")) \
        .withColumn("min_salary", col("min_salary").cast("integer")) \
        .withColumn('Skills', F.from_json('Skills', ArrayType(StringType()))) \
        .withColumn('Requirement', F.from_json('Requirement', ArrayType(StringType()))) \
        .withColumn('Category', F.from_json('Category', ArrayType(StringType()))) \
        .withColumn('Created_time', to_timestamp(col('Created_time'), 'yyyy-MM-dd HH:mm:ss')) \
        .withColumn('Last_updated', to_timestamp(col('Last_updated'), 'yyyy-MM-dd HH:mm:ss'))
df.schema

StructType([StructField('Title', StringType(), True), StructField('Last_updated', TimestampType(), True), StructField('Created_time', TimestampType(), True), StructField('Skills', ArrayType(StringType(), True), True), StructField('Requirement', ArrayType(StringType(), True), True), StructField('Hirer', StringType(), True), StructField('Company_link', StringType(), True), StructField('Company_location', StringType(), True), StructField('Category', ArrayType(StringType(), True), True), StructField('jobDecription', StringType(), True), StructField('min_salary', IntegerType(), True), StructField('max_salary', IntegerType(), True), StructField('salary_range', StringType(), True), StructField('location', StringType(), True), StructField('work_type', StringType(), True), StructField('work_time_type', StringType(), True), StructField('company_name', StringType(), True)])

# Question:
What will we do with those data:

- Identify the skills most frequently required by companies.
- Determine the average salary for each skill.
- Locate the areas with the highest job concentration.
- What companies have the most job

In [4]:
# Explode 'Skills' column and select all columns from original DataFrame + exploded 'Skills'
exploded_skill = df.withColumn("Skills", explode(df['Skills']))

exploded_skill.show(5)

+--------------------+-------------------+-------------------+--------------------+-------------------+------------------+--------------------+--------------------+--------------------+----------+----------+------------+--------------------+---------------+-----------------+-------------------+
|               Title|       Last_updated|       Created_time|              Skills|        Requirement|             Hirer|        Company_link|    Company_location|            Category|min_salary|max_salary|salary_range|            location|      work_type|   work_time_type|       company_name|
+--------------------+-------------------+-------------------+--------------------+-------------------+------------------+--------------------+--------------------+--------------------+----------+----------+------------+--------------------+---------------+-----------------+-------------------+
|THỰC TẬP SINH .NE...|2024-08-02 13:47:40|2024-09-01 13:47:40|          E-commerce|[Tối thiểu Cử Nhân]|W2SOLUTIO

In [5]:
# Identify the skills with the highest count
# exploded_skill.groupBy('Exploded_Skill') \
#     .agg(
#         F.round(F.avg((F.col('min_salary') + F.col('max_salary')/ 2)), 2).alias('avg_salary'), 
#         F.count('Title').alias('count')) \
#     .orderBy('avg_salary', ascending=False).show(truncate=False)

 # Pivoting salary_range and grouping by Skills
pivot_df = exploded_skill.groupBy('Skills') \
    .pivot('salary_range') \
    .agg(F.count('Title')) \
    .fillna(0) \

# Add a 'total' column to sum across all salary ranges
pivot_df = pivot_df.withColumn('total', 
    sum(pivot_df[col] for col in pivot_df.columns if col != 'Skills')) \
    .orderBy('total', ascending=False)

# After pivoting and adding the 'total' column, reorder the columns
ordered_df = pivot_df.select(
    'Skills',   
    '<5M',      # Specify the order of salary range columns
    '5M-12M', 
    '12M-20M', 
    '20M-27M', 
    '>27M', 
    'Unknown',  # Put 'Unknown' at the end of salary ranges
    'total'     # Keep 'total' as the last column
)

# Show the result with the reordered columns
ordered_df.show()

+--------------------+---+------+-------+-------+----+-------+-----+
|              Skills|<5M|5M-12M|12M-20M|20M-27M|>27M|Unknown|total|
+--------------------+---+------+-------+-------+----+-------+-----+
|   English Languange|  7|     7|     15|      9|  24|     14|   76|
|            Teamwork|  3|     7|      8|      3|  10|     25|   56|
|                 SQL|  2|     1|      8|      3|   6|      8|   28|
|          JavaScript|  1|     2|      3|      1|   5|     15|   27|
|    Software Testing|  3|     4|      7|      1|   2|      7|   24|
|      Manual Testing|  3|     6|      3|      1|   1|      7|   21|
|    Microsoft Office|  1|     5|      2|      1|   3|      8|   20|
|              Python|  0|     1|      4|      4|   5|      5|   19|
|                Java|  0|     0|      3|      1|   6|      9|   19|
|Software Development|  0|     0|      4|      2|   5|      6|   17|
|Frontend Development|  1|     0|      3|      0|   5|      6|   15|
|                .NET|  0|     0| 

In [15]:
exploded_skill.filter(exploded_skill.Exploded_Skill == 'Cto').show()

+--------------------+-------------------+-------------------+--------------------+-----------+--------------------+--------------------+--------------------+----------+----------+------------+--------------------+---------------+-----------------+--------------------+--------------+
|               Title|       Last_updated|       Created_time|         Requirement|      Hirer|        Company_link|    Company_location|            Category|min_salary|max_salary|salary_range|            location|      work_type|   work_time_type|        company_name|Exploded_Skill|
+--------------------+-------------------+-------------------+--------------------+-----------+--------------------+--------------------+--------------------+----------+----------+------------+--------------------+---------------+-----------------+--------------------+--------------+
|Giám Đốc Công Ngh...|2024-08-06 13:38:46|2024-09-01 13:38:46|[5 - 10 năm kinh ...|Phương Đoàn|https://glints.co...|Tòa CIC, 219 Trun...|[Compute

In [6]:
# Explode 'Skills' column and select all columns from original DataFrame
exploded_requirement = df.withColumn("Requirement", explode(df['Requirement']))

exploded_requirement.show()

+--------------------+-------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+----------+----------+------------+--------------------+---------------+--------------------+--------------------+
|               Title|       Last_updated|       Created_time|              Skills|         Requirement|             Hirer|        Company_link|    Company_location|            Category|min_salary|max_salary|salary_range|            location|      work_type|      work_time_type|        company_name|
+--------------------+-------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+----------+----------+------------+--------------------+---------------+--------------------+--------------------+
|THỰC TẬP SINH .NE...|2024-08-02 13:47:40|2024-09-01 13:47:40|[E-commerce, Info...|   Tối thiểu C

In [7]:
exploded_requirement.groupBy('Requirement') \
    .agg(
        F.round(F.avg((F.col('min_salary') + F.col('max_salary')/ 2)), 2).alias('avg_salary'),
        F.count('Title').alias('count')) \
    .orderBy('count', ascending=False).show(truncate=False)

+-----------------------------+-------------+-----+
|Requirement                  |avg_salary   |count|
+-----------------------------+-------------+-----+
|Tối thiểu Cử Nhân            |2.005261566E7|234  |
|1 - 3 năm kinh nghiệm        |2.058617987E7|159  |
|3 - 5 năm kinh nghiệm        |2.322920991E7|96   |
|Tối thiểu Cao Đẳng           |2.23170078E7 |66   |
|Dưới một năm kinh nghiệm     |8975511.98   |44   |
|5 - 10 năm kinh nghiệm       |2.790748889E7|27   |
|Giới tính Nam                |1.832692308E7|26   |
|Tối thiểu Trung Học Phổ Thông|1.523684211E7|19   |
|Tối thiểu Bằng Liên Kết      |1.675005556E7|18   |
|25-35 tuổi                   |1.533333333E7|6    |
|22-40 tuổi                   |7375000.0    |4    |
|20-30 tuổi                   |1.4125E7     |4    |
|25-40 tuổi                   |2.816666667E7|3    |
|22-27 tuổi                   |1.4E7        |2    |
|18-35 tuổi                   |1.2675E7     |2    |
|35-45 tuổi                   |0.0          |2    |
|25-34 tuổi 

In [9]:
from pyspark.sql.functions import array_contains, col

df.where(array_contains(col('Requirement'), '30-55 tuổi')).show()

+--------------------+-------------------+-------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+----------+----------+------------+--------------------+---------------+-----------------+--------------------+
|               Title|       Last_updated|       Created_time|              Skills|         Requirement|   Hirer|        Company_link|    Company_location|            Category|min_salary|max_salary|salary_range|            location|      work_type|   work_time_type|        company_name|
+--------------------+-------------------+-------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+----------+----------+------------+--------------------+---------------+-----------------+--------------------+
|Full Stack Developer|2024-08-22 13:08:38|2024-09-01 13:08:38|[SQL, Spring Boot...|[5 - 10 năm kinh ...|HR JuYou|https://glints.co...|Lầ

In [10]:
# Exploded Skills and Requirement

# Explode 'Skills' column and select all columns from original DataFrame
exploded_requirement_skill = exploded_skill.withColumn("Requirement", explode(exploded_skill['Requirement']))

exploded_requirement_skill.show(5)

+--------------------+-------------------+-------------------+--------------------+-----------------+------------------+--------------------+--------------------+--------------------+----------+----------+------------+--------------------+---------------+-----------------+-------------------+
|               Title|       Last_updated|       Created_time|              Skills|      Requirement|             Hirer|        Company_link|    Company_location|            Category|min_salary|max_salary|salary_range|            location|      work_type|   work_time_type|       company_name|
+--------------------+-------------------+-------------------+--------------------+-----------------+------------------+--------------------+--------------------+--------------------+----------+----------+------------+--------------------+---------------+-----------------+-------------------+
|THỰC TẬP SINH .NE...|2024-08-02 13:47:40|2024-09-01 13:47:40|          E-commerce|Tối thiểu Cử Nhân|W2SOLUTION VIETNA

In [15]:
filter = exploded_requirement_skill \
    .where(~col('Requirement').rlike('tuổi'))

In [60]:
# result = filter.groupBy('Skills') \
#     .pivot('Requirement') \
#     .agg(F.count('Title').alias('count')) \
#     .fillna(0)'
cols = ['Skills',
        'Tối thiểu Cử Nhân',
        'Tối thiểu Cao Đẳng',
        'Dưới một năm kinh nghiệm', 
        '1 - 3 năm kinh nghiệm', 
        '3 - 5 năm kinh nghiệm', 
        '5 - 10 năm kinh nghiệm',
        # 'Tối thiểu Tiểu Học',
        # 'Tối thiểu Trung Học Phổ Thông',
        # 'Tối thiểu Bằng Liên Kết',
    ]

result = filter.groupBy('Skills') \
    .pivot('Requirement') \
    .agg( 
        F.round(F.avg(((F.col('min_salary') + F.col('max_salary'))/ 2)), 2)) \
    .fillna(0) \
    .select(cols) \
    .orderBy('Tối thiểu Cử Nhân', ascending=False)

result.show()

+--------------------+-----------------+------------------+------------------------+---------------------+---------------------+----------------------+
|              Skills|Tối thiểu Cử Nhân|Tối thiểu Cao Đẳng|Dưới một năm kinh nghiệm|1 - 3 năm kinh nghiệm|3 - 5 năm kinh nghiệm|5 - 10 năm kinh nghiệm|
+--------------------+-----------------+------------------+------------------------+---------------------+---------------------+----------------------+
|       Hand-ons Task|            7.5E7|               0.0|                     0.0|                7.5E7|                  0.0|                   0.0|
| Machine Translation|            7.5E7|               0.0|                     0.0|                  0.0|                7.5E7|                   0.0|
|         Linguistics|            7.5E7|               0.0|                     0.0|                  0.0|                7.5E7|                   0.0|
|Large Language Mo...|            7.5E7|               0.0|                     0.0|    

In [63]:
path = r'file:///E:\IDE_workspace\VSCode\recruitment_project\Data\report'

result.write.format('csv')\
    .option('header', 'true')\
    .save(path)

In [66]:
exploded_requirement_skill.where(col('Skills') == 'Machine Translation').show()

+--------------------+-------------------+-------------------+-------------------+--------------------+----------+--------------------+--------------------+--------------------+----------+----------+------------+--------------------+---------+-----------------+------------------+
|               Title|       Last_updated|       Created_time|             Skills|         Requirement|     Hirer|        Company_link|    Company_location|            Category|min_salary|max_salary|salary_range|            location|work_type|   work_time_type|      company_name|
+--------------------+-------------------+-------------------+-------------------+--------------------+----------+--------------------+--------------------+--------------------+----------+----------+------------+--------------------+---------+-----------------+------------------+
|[HN/HCM] Senior A...|2024-08-27 12:56:12|2024-09-01 12:56:12|Machine Translation|3 - 5 năm kinh ng...|Nga Nguyễn|https://glints.co...|Tầng 2, số 106 Tô...|[

In [68]:
skills = exploded_skill.select('Skills').distinct()

skills.write.format('csv')\
    .option('header', 'true')\
    .save(r'file:///E:\IDE_workspace\VSCode\recruitment_project\Data\skills.csv')