In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, TimestampType, BooleanType, FloatType
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, Word2Vec, Tokenizer, StopWordsRemover, StringIndexer, IndexToString, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import re

import pandas as pd

from nltk.probability import FreqDist

from matplotlib import pyplot as plt

In [3]:
spark = SparkSession.builder.appName("Job_Postings").master('local[*]').getOrCreate()

In [4]:
schema = StructType([
    StructField("title", StringType()),
    StructField("link", StringType()),
    StructField("published_date", TimestampType()),
    StructField("is_hourly", BooleanType()),
    StructField("hourly_low", FloatType()),
    StructField("hourly_high", FloatType()),
    StructField("budget", FloatType()),
    StructField("country", StringType())
])

In [5]:
data_first = spark.read.option("header", "true").option("inferSchema", "true").csv('all_upwork_jobs_2024-02-07-2024-03-24.csv', schema=schema, multiLine=True, escape="\"")

data_second = spark.read.option("header", "true").option("inferSchema", "true").csv('all_upwork_jobs_2024-03-24-2024-05-21.csv', schema=schema, multiLine=True, escape="\"")


In [6]:
data = data_first.union(data_second)

In [7]:
data.show(n=3)

+--------------------+--------------------+-------------------+---------+----------+-----------+------+-------------+
|               title|                link|     published_date|is_hourly|hourly_low|hourly_high|budget|      country|
+--------------------+--------------------+-------------------+---------+----------+-----------+------+-------------+
|Experienced Media...|https://www.upwor...|2024-02-17 12:09:54|    false|      null|       null| 500.0|         null|
|Full Stack Developer|https://www.upwor...|2024-02-17 12:09:17|    false|      null|       null|1100.0|United States|
|     SMMA Bubble App|https://www.upwor...|2024-02-17 12:08:46|     true|      10.0|       30.0|  null|United States|
+--------------------+--------------------+-------------------+---------+----------+-----------+------+-------------+


In [8]:
data.printSchema()

root
 |-- title: string (nullable = true)
 |-- link: string (nullable = true)
 |-- published_date: timestamp (nullable = true)
 |-- is_hourly: boolean (nullable = true)
 |-- hourly_low: float (nullable = true)
 |-- hourly_high: float (nullable = true)
 |-- budget: float (nullable = true)
 |-- country: string (nullable = true)


In [9]:
# pipeline
stages = []

# 1) country
str2ind = StringIndexer(inputCol='country', outputCol='indexed_country', handleInvalid='skip')
country_one_hot = OneHotEncoder(inputCol='indexed_country', outputCol='country_onehot', dropLast=False)
stages.extend((str2ind, country_one_hot))


prep_country = Pipeline(stages=stages).fit(data)

In [10]:
data = prep_country.transform(data)

In [11]:
train, test = data.randomSplit([0.8, 0.2])

### Сначала кластеризуем данные по заголовкам. Для этого в очередной раз воспользуемся Word2Vec

In [12]:
@udf(returnType=StringType())
def text_prep(text):
    # Переводим текст в нижний регистр
    text = str(text).lower()
    # Убираем все, кроме букв
    text = re.sub(r'[^a-z]', ' ', text)
    # Убираем единичные буквы
    text = re.sub(r'\b\w\b', ' ', text)
    # Если пробелов больше одного заменяем на один
    text = re.sub(r'\s+', ' ', text)
    # Удаляем повторяющиеся слова
    text = ' '.join(set(word for word in text.split()))
    # Убираем пробелы слева и справа
    text = text.strip()
    return text

In [13]:
train = train.withColumn('prep_text', text_prep(F.col('title')))
train = train.where(train.prep_text != '')

test = test.withColumn('prep_text', text_prep(F.col('title')))
test = test.where(test.prep_text != '')

In [14]:
# Токенизатор
tokenizer = Tokenizer(inputCol='prep_text', outputCol='tokens')

# Стоп-слова
stopwords = StopWordsRemover.loadDefaultStopWords('english')
stopwords.extend(['need', 'needed'])
stopwords_remover = StopWordsRemover(inputCol='tokens', outputCol='clear_tokens', stopWords=stopwords)

# Ворд ту век
word2Vec = Word2Vec(inputCol='clear_tokens', outputCol='w2v_features', vectorSize=100, minCount=100)

# Пайплайн
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, word2Vec])
word2vec_model = pipeline.fit(train)

In [15]:
train = word2vec_model.transform(train)

In [16]:
word2vec_model.stages[-1]

Word2VecModel: uid=Word2Vec_af7fd2c00f8c, numWords=2204, vectorSize=100

In [17]:
word2vec_model.stages[-1].findSynonyms(word='devops', num=7).show()

+----------+------------------+
|      word|        similarity|
+----------+------------------+
|kubernetes| 0.845933198928833|
|       gcp|0.8030648827552795|
|    docker|0.7665404081344604|
|deployment|0.7598544359207153|
| terraform|0.7563865184783936|
|        ci| 0.750397264957428|
|        cd|0.7458357214927673|
+----------+------------------+


In [18]:
kmeans = KMeans(k=11, initMode="k-means||", featuresCol='w2v_features', predictionCol='cluster')
km_model = kmeans.fit(train.limit(600000))
train = km_model.transform(train)

In [19]:
d = {}
for i in range(11):
    ls = []
    tmp = train.select('clear_tokens', 'cluster').where(train['cluster'] == i).collect()

    tmp = [tmp[j][0] for j in range(len(tmp))]

    for el in tmp:
        ls.extend(el)

    fdist = list(FreqDist(ls))[:5]

    d[i] = fdist

d_frame = pd.DataFrame(list(d.items()), columns=['cluster', 'top_words'])
print(d_frame.to_string())

    cluster                                            top_words
0         0         [expert, specialist, google, ads, marketing]
1         1                [writer, looking, amp, create, video]
2         2                 [developer, app, react, full, stack]
3         3       [website, wordpress, developer, shopify, page]
4         4         [media, social, manager, content, marketing]
5         5                [data, expert, google, excel, python]
6         6                  [designer, design, ui, ux, website]
7         7            [video, youtube, editor, channel, videos]
8         8  [english, translation, native, spanish, translator]
9         9             [designer, design, logo, graphic, brand]
10       10        [assistant, virtual, sales, lead, generation]


### Построим модель для предсказания средней оплаты за час, используя кластеры или чистые эмбеддинги и разделив все цены на категории

In [20]:
train = train.where((train.hourly_high.isNotNull()) & (train.hourly_low.isNotNull()))
train = train.where(F.col('country').isNotNull())

train = train.withColumn('hourly_avg', (F.col('hourly_low') + F.col('hourly_high')) / 2)

In [21]:
train.show(n=3)

+--------------------+--------------------+-------------------+---------+----------+-----------+------+-------------+---------------+---------------+--------------------+--------------------+--------------------+--------------------+-------+----------+
|               title|                link|     published_date|is_hourly|hourly_low|hourly_high|budget|      country|indexed_country| country_onehot|           prep_text|              tokens|        clear_tokens|        w2v_features|cluster|hourly_avg|
+--------------------+--------------------+-------------------+---------+----------+-----------+------+-------------+---------------+---------------+--------------------+--------------------+--------------------+--------------------+-------+----------+
|!!! Amazon AWS / ...|https://www.upwor...|2024-03-18 13:59:29|     true|       8.0|       25.0|  null|United States|            0.0|(221,[0],[1.0])|security needed n...|[security, needed...|[security, nginx,...|[-0.1031561358831...|      5|

In [22]:
train = train.withColumn('salary_category', F.when(F.col('hourly_avg') <= 5, '<=5')
                       .otherwise(F.when(F.col('hourly_avg') <= 10, '5--10')
                       .otherwise(F.when(F.col('hourly_avg') <= 25, '10--25')
                       .otherwise(F.when(F.col('hourly_avg') <= 50, '25--50')
                       .otherwise(F.when(F.col('hourly_avg') <= 100, '50--100')
                       .otherwise(F.when(F.col('hourly_avg') <= 200, '100--200')
                       .otherwise(F.when(F.col('hourly_avg') <= 500, '200--500')
                       .otherwise('>500'))))))))


In [23]:
# pipeline
stages = []

# 1) cluster
cluster_one_hot = OneHotEncoder(inputCol='cluster', outputCol='cluster_onehot', dropLast=False)
stages.append(cluster_one_hot)

# 2) features
assembler_feature = VectorAssembler(inputCols=['cluster_onehot', 'country_onehot'], outputCol='features')
stages.append(assembler_feature)

# 3) label
label2ind = StringIndexer(inputCol='salary_category', outputCol='indexed_salary_category')
stages.append(label2ind)

train_prep = Pipeline(stages=stages).fit(train)

# 4) Random Forest
rf = RandomForestClassifier(featuresCol='features', labelCol='indexed_salary_category', predictionCol='prediction')


In [24]:
model_rf = rf.fit(train_prep.transform(train))

In [25]:
ind2label = IndexToString(inputCol='prediction', outputCol='pred_label', labels=train_prep.stages[-1].labels)

In [26]:
# test
test = test.where((test.hourly_high.isNotNull()) & (test.hourly_low.isNotNull()))
test = test.where(F.col('country').isNotNull())
test = test.withColumn('hourly_avg', (F.col('hourly_low') + F.col('hourly_high')) / 2)
test = test.withColumn('salary_category', F.when(F.col('hourly_avg') <= 5, '<=5')
                       .otherwise(F.when(F.col('hourly_avg') <= 10, '5--10')
                       .otherwise(F.when(F.col('hourly_avg') <= 25, '10--25')
                       .otherwise(F.when(F.col('hourly_avg') <= 50, '25--50')
                       .otherwise(F.when(F.col('hourly_avg') <= 100, '50--100')
                       .otherwise(F.when(F.col('hourly_avg') <= 200, '100--200')
                       .otherwise(F.when(F.col('hourly_avg') <= 500, '200--500')
                       .otherwise('>500'))))))))

In [27]:
test = word2vec_model.transform(test)
test = km_model.transform(test)

In [28]:
# pipeline
stages = []

# 1) cluster
cluster_one_hot = OneHotEncoder(inputCol='cluster', outputCol='cluster_onehot', dropLast=False)
stages.append(cluster_one_hot)

# 2) features
assembler_feature = VectorAssembler(inputCols=['cluster_onehot', 'country_onehot'], outputCol='features')
stages.append(assembler_feature)

# 3) label
label2ind = StringIndexer(inputCol='salary_category', outputCol='indexed_salary_category')
stages.append(label2ind)

test_prep = Pipeline(stages=stages).fit(test)

In [29]:
pred = model_rf.transform(test_prep.transform(test))
pred = ind2label.transform(pred)

In [30]:
pred.groupBy('pred_label', 'salary_category').count().show()

+----------+---------------+-----+
|pred_label|salary_category|count|
+----------+---------------+-----+
|    10--25|       200--500|  114|
|    10--25|          5--10| 5765|
|    10--25|         25--50|12176|
|    10--25|       100--200|  754|
|    10--25|        50--100| 4001|
|    10--25|            <=5| 1759|
|    10--25|           >500|   18|
|    10--25|         10--25|16626|
+----------+---------------+-----+


In [31]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='indexed_salary_category', metricName='accuracy')

acc = evaluator.evaluate(pred)
print(f'Accuracy: {acc}')

Accuracy: 0.4034163977385776


### Далее классификация, только используются не кластеры, а непосредственно векторы Word2Vec

In [32]:
assembler_w2v_feature = VectorAssembler(inputCols=['country_onehot', 'w2v_features'], outputCol='feature2')
label2ind_w2v = StringIndexer(inputCol='salary_category', outputCol='indexed_salary_category_w2v').fit(train)

train = label2ind_w2v.transform(assembler_w2v_feature.transform(train))

In [33]:
rf_model = RandomForestClassifier(featuresCol='feature2', labelCol='indexed_salary_category_w2v', predictionCol='prediction2').fit(train)

In [34]:
assembler_w2v_feature = VectorAssembler(inputCols=['country_onehot', 'w2v_features'], outputCol='feature2')
label2ind_w2v = StringIndexer(inputCol='salary_category', outputCol='indexed_salary_category_w2v').fit(test)

test = label2ind_w2v.transform(assembler_w2v_feature.transform(test))

In [35]:
ind2label = IndexToString(inputCol='prediction2', outputCol='pred_label2', labels=label2ind_w2v.labels)

In [36]:
pred = rf_model.transform(test_prep.transform(test))
pred = ind2label.transform(pred)

In [37]:
pred.groupBy('pred_label2', 'salary_category').count().show()

+-----------+---------------+-----+
|pred_label2|salary_category|count|
+-----------+---------------+-----+
|     10--25|       200--500|  112|
|      5--10|            <=5|  237|
|     10--25|          5--10| 5198|
|      5--10|         25--50|   65|
|      5--10|       200--500|    2|
|     10--25|         25--50|12039|
|      5--10|          5--10|  567|
|      5--10|       100--200|    5|
|     25--50|         25--50|   72|
|     10--25|       100--200|  747|
|     10--25|        50--100| 3974|
|     10--25|            <=5| 1522|
|     10--25|           >500|   17|
|     25--50|         10--25|    9|
|      5--10|        50--100|   23|
|     10--25|         10--25|16376|
|      5--10|         10--25|  241|
|      5--10|           >500|    1|
|     25--50|        50--100|    4|
|     25--50|       100--200|    2|
+-----------+---------------+-----+


In [39]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction2', labelCol='indexed_salary_category', metricName='accuracy')

acc = evaluator.evaluate(pred)
print(f'Accuracy: {acc}')

Accuracy: 0.4128551670589377
