In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=0d20828ee9eebc2d0db0ec21b929549240732ca4b2bf095c85e907e86b017a8b
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


setup spark session

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Colab PySpark") \
    .getOrCreate()

In [None]:
spark.version

'3.5.1'

load the data

In [None]:
# do the necessary imports
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
schema = StructType([
    StructField("Co_Nm", StringType(), True),
    StructField("Co_Pg_Lstd", BooleanType(), True),
    StructField("Emp_Cnt", IntegerType(), True),
    StructField("Flw_Cnt", IntegerType(), True),
    StructField("Job_Ttl", StringType(), True),
    StructField("Job_Desc", StringType(), True),
    StructField("Is_Supvsr", BooleanType(), True),
    StructField("max_sal", FloatType(), True),
    StructField("med_sal", FloatType(), True),
    StructField("min_sal", FloatType(), True),
    StructField("py_prd", StringType(), True),  # Using StringType since Category type is not directly available in PySpark
    StructField("py_lstd", BooleanType(), True),
    StructField("wrk_typ", StringType(), True),  # Using StringType since Category type is not directly available in PySpark
    StructField("loc", StringType(), True),
    StructField("st_code", StringType(), True),
    StructField("is_remote", BooleanType(), True),
    StructField("views", IntegerType(), True),
    StructField("app_typ", StringType(), True),  # Using StringType since Category type is not directly available in PySpark
    StructField("app_is_off", BooleanType(), True),
    StructField("xp_lvl", StringType(), True),  # Using StringType since Category type is not directly available in PySpark
    StructField("domain", StringType(), True),
    StructField("has_post_domain", BooleanType(), True),
    StructField("is_sponsored", BooleanType(), True),
    StructField("base_comp", BooleanType(), True)
])

In [None]:
df = spark.read.csv("/content/LinkedInJobs_MLDataset.csv", header=True, schema=schema,  multiLine=True,
    escape='"',
    quote='"',
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True)
#/content/LinkedInJobs_MLDataset.csv
# /content/sample_data/LinkedInJobs_MLDataset.csv

data preprocessing

In [None]:
df.show()

+--------------------+----------+-------+-------+--------------------+--------------------+---------+--------+--------+--------+----------+-------+---------+--------------------+-------+---------+-----+------------------+----------+----------------+--------------------+---------------+------------+---------+
|               Co_Nm|Co_Pg_Lstd|Emp_Cnt|Flw_Cnt|             Job_Ttl|            Job_Desc|Is_Supvsr| max_sal| med_sal| min_sal|    py_prd|py_lstd|  wrk_typ|                 loc|st_code|is_remote|views|           app_typ|app_is_off|          xp_lvl|              domain|has_post_domain|is_sponsored|base_comp|
+--------------------+----------+-------+-------+--------------------+--------------------+---------+--------+--------+--------+----------+-------+---------+--------------------+-------+---------+-----+------------------+----------+----------------+--------------------+---------------+------------+---------+
|         HearingLife|      true|   1171|  11417|Hearing Care Prov...|

data cleaning and preprocessing

In [None]:
from pyspark.sql.functions import col, regexp_replace

# Remove leading and trailing quotes from the job_desc column
df = df.withColumn("Job_Desc", regexp_replace(col("Job_Desc"), r'^"|"$', ''))

# Select numerical columns for regression
numerical_columns = ["Emp_Cnt", "Flw_Cnt", "max_sal", "med_sal", "min_sal", "views"]
df = df.select(numerical_columns)

# Handle missing values (e.g., fill with mean or drop)
df = df.na.drop()


correleation matrix

In [None]:
correlation_matrix = df.select(numerical_columns).toPandas().corr()
print(correlation_matrix)

          Emp_Cnt   Flw_Cnt   max_sal   med_sal   min_sal     views
Emp_Cnt  1.000000  0.706234  0.079466  0.071709  0.058350 -0.021945
Flw_Cnt  0.706234  1.000000  0.171144  0.164611  0.149937  0.022855
max_sal  0.079466  0.171144  1.000000  0.990452  0.944375  0.098227
med_sal  0.071709  0.164611  0.990452  1.000000  0.980695  0.104343
min_sal  0.058350  0.149937  0.944375  0.980695  1.000000  0.109591
views   -0.021945  0.022855  0.098227  0.104343  0.109591  1.000000


In [None]:
# Assembling feature vector
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["Emp_Cnt", "Flw_Cnt", "max_sal", "min_sal", "views"], outputCol="features")
df = assembler.transform(df)

# Select features and target
df = df.select("features", col("med_sal").alias("label"))


Model Training

In [None]:
from pyspark.ml.regression import LinearRegression

# Split the data into training and test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)

# Initialize the Linear Regression model
lr = LinearRegression(featuresCol='features', labelCol='label')

# Fit the model to the training data
lr_model = lr.fit(train_data)


Model Evaluation

In [None]:
# Get the summary of the trained model
lr_summary = lr_model.summary
print("RMSE: %f" % lr_summary.rootMeanSquaredError)
print("R2: %f" % lr_summary.r2)

# Make predictions on the test data
predictions = lr_model.transform(test_data)

# Show some sample predictions
predictions.select("features", "label", "prediction").show(5)


RMSE: 0.000143
R2: 1.000000
+--------------------+-----+--------------------+
|            features|label|          prediction|
+--------------------+-----+--------------------+
|           (5,[],[])|  0.0|-2.41730055600881...|
|           (5,[],[])|  0.0|-2.41730055600881...|
| (5,[0,1],[4.0,5.0])|  0.0|-2.41724544334586...|
|(5,[0,1],[9.0,268...|  0.0|-2.41706215809970...|
|(5,[0,1],[9.0,268...|  0.0|-2.41706215809970...|
+--------------------+-----+--------------------+
only showing top 5 rows

