In [1]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Skills Demand Analytics") \
    .getOrCreate()

print("Spark Session created!")



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/ubuntu/venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/venv/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [3]:
# Load raw data
data_path = "hdfs://namenode:9000/user/ubuntu/Project_Code/cleaned_job_data_using_spark.csv"

raw_data = spark.sparkContext.textFile(data_path)
# Remove header
header = raw_data.first()
parsed_data = raw_data.filter(lambda row: row != header)

print("First 5 rows of raw data (excluding header):")
print(parsed_data.take(5))



[Stage 0:>                                                          (0 + 1) / 1]

First 5 rows of raw data (excluding header):
['"family lawyer","child custody lawyers specialize in family law focusing on cases related to child custody and visitation rights they represent clients in court provide legal counsel and advocate for the best interests of children in custody disputes what you ll do specialize in child custody cases advocating for the best interests of children conduct investigations gather evidence and present cases in court negotiate parenting plans and visitation schedules","child custody law family law legal advocacy negotiation court representation legal research client counseling case preparation","electronics instrumentation","port moresby papua new guinea","2 to 10 years","65000","89000"', '"ux ui designer","interaction designers specialize in designing user interactions within digital interfaces they create meaningful and engaging user experienceeriences by considering user behaviors and system responses what you ll do work on interaction design de

                                                                                

In [4]:

# Parse and Clean Data

import csv
from io import StringIO

def parse_csv(row):
    f = StringIO(row)
    reader = csv.reader(f, skipinitialspace=True)
    return next(reader)

# Parse rows using csv module
parsed_data = parsed_data.map(parse_csv)

# Extract relevant fields
def extract_relevant_fields(row):
    try:
        return [
            row[1],  # job_description
            row[2],  # job_skills
            row[3],  # job_industry
            row[4],  # job_location
            row[5],  # experience_level
            float(row[6]) if row[6] else None,  # min_salary
            float(row[7]) if row[7] else None   # max_salary
        ]
    except Exception as e:
        print(f"Error processing row: {row} -> {e}")
        return None

cleaned_data = parsed_data.map(extract_relevant_fields).filter(lambda row: row is not None)

print("First 5 rows of cleaned data:")
print(cleaned_data.take(5))


First 5 rows of cleaned data:
[['child custody lawyers specialize in family law focusing on cases related to child custody and visitation rights they represent clients in court provide legal counsel and advocate for the best interests of children in custody disputes what you ll do specialize in child custody cases advocating for the best interests of children conduct investigations gather evidence and present cases in court negotiate parenting plans and visitation schedules', 'child custody law family law legal advocacy negotiation court representation legal research client counseling case preparation', 'electronics instrumentation', 'port moresby papua new guinea', '2 to 10 years', 65000.0, 89000.0], ['interaction designers specialize in designing user interactions within digital interfaces they create meaningful and engaging user experienceeriences by considering user behaviors and system responses what you ll do work on interaction design defining how users interact with digital pro

In [5]:
# Convert the cleaned RDD into a DataFrame for further processing.

columns = ["job_description", "job_skills", "job_industry", "job_location", "experience_level", "min_salary", "max_salary"]
cleaned_df = spark.createDataFrame(cleaned_data, schema=columns)

print("DataFrame schema:")
cleaned_df.printSchema()

print("First 5 rows of DataFrame:")
cleaned_df.show(5, truncate=False)




DataFrame schema:
root
 |-- job_description: string (nullable = true)
 |-- job_skills: string (nullable = true)
 |-- job_industry: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- min_salary: double (nullable = true)
 |-- max_salary: double (nullable = true)

First 5 rows of DataFrame:


                                                                                

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------+-----------------------------+----------------+----------+----------+
|job_description                                                                                                                                        

In [6]:
# We use Tokenizer to split job descriptions into words.

from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="job_description", outputCol="words")
tokenized_data = tokenizer.transform(cleaned_df)

print("First 5 rows after tokenization:")
tokenized_data.select("job_description", "words").show(5, truncate=False)




First 5 rows after tokenization:


[Stage 5:>                                                          (0 + 1) / 1]

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [7]:
# Calculate the Term Frequency-Inverse Document Frequency (TF-IDF) for the tokenized job descriptions.

from pyspark.ml.feature import HashingTF, IDF

# Compute term frequencies
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=1000)
tf_data = hashing_tf.transform(tokenized_data)

# Compute IDF
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(tf_data)
tfidf_data = idf_model.transform(tf_data)

print("First 5 rows after TF-IDF:")
tfidf_data.select("job_description", "features").show(5, truncate=False)





11:16:04.816 [dispatcher-CoarseGrainedScheduler] ERROR org.apache.spark.scheduler.cluster.YarnScheduler - Lost executor 1 on datanode3: Container marked as failed: container_1731663000200_0012_01_000002 on host: datanode3. Exit status: -100. Diagnostics: Container released on a *lost* node.


                                                                                

First 5 rows after TF-IDF:
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
# Apply K-Means Clustering: Group job descriptions into clusters based on their TF-IDF features.

from pyspark.ml.clustering import KMeans

kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=5, seed=42)
model = kmeans.fit(tfidf_data)
clustered_data = model.transform(tfidf_data)

print("First 5 rows after clustering:")
clustered_data.select("job_description", "cluster").show(5, truncate=False)



                                                                                

First 5 rows after clustering:
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|job_description                                                                                                                                                                                                                                                                                                                                                                             

In [21]:
# Analyze High-Demand Skills : aggregate the data to identify high-demand skills by industry and cluster.

# Group by job industry and cluster to count jobs
high_demand_skills = clustered_data.groupBy("job_industry", "cluster") \
    .count() \
    .orderBy("count", ascending=False)

print("High-demand skills by industry:")
high_demand_skills.show(1000, truncate=False)


High-demand skills by industry:




+----------------------------------------------+-------+-----+
|job_industry                                  |cluster|count|
+----------------------------------------------+-------+-----+
|pharmaceuticals                               |1      |52752|
|chemicals                                     |1      |49133|
|utilities gas and electric                    |1      |40549|
|telecommunications                            |1      |38772|
|commercial banks                              |1      |37122|
|specialty retailers other                     |1      |33732|
|financial services                            |1      |31599|
|automotive                                    |1      |31590|
|minimuming crude oil production               |1      |28305|
|insurance property and casualty stock         |1      |28204|
|minimuming                                    |1      |26400|
|semiconductors and other electronic components|1      |24756|
|real estate                                   |1      

                                                                                

In [20]:
# """ """ # Export Processed Data :save the clustered data to a CSV file for further analysis or visualization.""" 
# clustered_data.show(1, truncate=False)

# output_path = "file:///home/ubuntu/Project_Code/skills_demand_analysis_results"
# clustered_data.write.csv(output_path, header=True)

# print(f"Processed data saved to {output_path}") """

# Select only the relevant columns and convert arrays to strings if needed
# Select only the relevant columns
!hdfs dfs -rm -r /user/ubuntu/skills_demand_analysis_results

export_df = clustered_data.select(
    "job_description", 
    "job_skills", 
    "job_industry", 
    "job_location", 
    "experience_level", 
    "min_salary", 
    "max_salary",
    "cluster"
)

# Convert DataFrame to RDD and format as CSV strings
header = ','.join(export_df.columns)
data_rdd = export_df.rdd.map(lambda row: ','.join(f'"{str(val)}"' if val is not None else '""' for val in row))

# Combine header and data using sparkContext
final_rdd = spark.sparkContext.parallelize([header]).union(data_rdd)

# Save as a single text file
output_path = "hdfs://namenode:9000/user/ubuntu/skills_demand_analysis_results"
final_rdd.coalesce(1).saveAsTextFile(output_path)

print(f"Processed data saved to {output_path}")

# Optional: If you want to get the file locally after saving to HDFS
from subprocess import call
local_path = "/home/ubuntu/skills_demand_analysis_results.csv"
call(["hdfs", "dfs", "-getmerge", output_path, local_path])



Deleted /user/ubuntu/skills_demand_analysis_results


                                                                                

Processed data saved to hdfs://namenode:9000/user/ubuntu/skills_demand_analysis_results


2024-11-20 11:36:22,727 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2024-11-20 11:36:24,840 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2024-11-20 11:36:25,778 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false


0