In [None]:
# Установка PySpark
!pip install pyspark




# **Задание 1: Бинарная классификация**

Загружаем необходимые библиотеки

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
import pandas as pd

Запускаем SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("Binary Classification") \
    .getOrCreate()

Сгенерируем данные

In [None]:
data = pd.DataFrame({
    'feature1': [0.1, 0.2, 0.4, 0.6, 0.8, 0.9, 0.3, 0.5, 0.7, 0.85],
    'feature2': [0, 1, 0, 1, 0, 1, 0, 1, 0, 1],
    'label': [0, 0, 0, 1, 1, 1, 0, 1, 1, 1]
})

df = spark.createDataFrame(data)

Проведем векторизацию

In [None]:
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
data_transformed = assembler.transform(df)

Разделим данные на тренировочный и тестовый наборы

In [None]:
train_data, test_data = data_transformed.randomSplit([0.7, 0.3])

Проведем обучение модели

In [None]:
lr = LogisticRegression(featuresCol='features', labelCol='label')
model = lr.fit(train_data)

Проводим прогнозирование и оценку модели

In [None]:
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions)

Сохраняем метрики в файл

In [None]:
with open("binary_classification_metrics.txt", "w") as f:
    f.write(f"Accuracy: {accuracy}\n")

In [None]:
print(f"Binary Classification Accuracy: {accuracy}")

Binary Classification Accuracy: 0.0


In [None]:
spark.stop()

# **Задание 2: Кластеризация (KMeans)**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
import numpy as np

In [None]:
spark = SparkSession.builder \
    .appName("KMeans Example") \
    .getOrCreate()

Сгенерируем подходящие исходные данные для проведения обучения

In [None]:
np.random.seed(42)
data = []
num_points_per_cluster = 100
num_clusters = 3

for i in range(num_clusters):
    # Генерируем центры кластеров
    center_x = np.random.rand() * 10
    center_y = np.random.rand() * 10
    for _ in range(num_points_per_cluster):
        # Генерируем точки вокруг центра с небольшим шумом
        point_x = center_x + np.random.randn() * 0.5
        point_y = center_y + np.random.randn() * 0.5
        data.append(Row(x=point_x, y=point_y))


Создаем DataFrame

In [None]:
  df = spark.createDataFrame(data)

Проведем подготовку данных

In [None]:
assembler = VectorAssembler(inputCols=["x", "y"], outputCol="features")
data_transformed = assembler.transform(df)

Проведем обучение модели

In [None]:
kmeans = KMeans(k=num_clusters, seed=42)
model = kmeans.fit(data_transformed)

Получаем и выводим центры кластеров

In [None]:
centers = model.clusterCenters()
for i, center in enumerate(centers):
    print(f"Center of cluster {i}: {center}")

Center of cluster 0: [7.0170251  7.01640171]
Center of cluster 1: [7.67892194 9.38511125]
Center of cluster 2: [3.68692443 9.52764947]


Сохраняем в файл полученные центры кластеров

In [None]:
with open("cluster_centers.txt", "w") as f:
    for i, center in enumerate(centers):
        f.write(f"Center of cluster {i}: {center}\n")

In [None]:
spark.stop()

# **Задание 3. Регрессия (LinearRegression)**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import numpy as np

In [2]:
spark = SparkSession.builder \
    .appName("Linear Regression Example") \
    .getOrCreate()

Сгенерируем подходящие исходные данные для проведения обучения

In [3]:
np.random.seed(42)
num_points = 1000
x = (np.random.rand(num_points) * 10).tolist()  # Признак в виде списка
y = (2 * np.array(x) + 3 + np.random.randn(num_points) * 2).tolist()  #Создаем зависимую переменную с некоторым шумом

Создаем DataFrame

In [4]:
data = [(float(xi), float(yi)) for xi, yi in zip(x, y)]
df = spark.createDataFrame(data, ["x", "y"])

Подготовим данные для обучения

In [5]:
assembler = VectorAssembler(inputCols=["x"], outputCol="features")
data_transformed = assembler.transform(df)

Проведем обучение линейной регрессии

In [6]:
lr = LinearRegression(featuresCol="features", labelCol="y")
lr_model = lr.fit(data_transformed)

Проведем прогнозирование и расчет метрик

In [7]:
predictions = lr_model.transform(data_transformed)
training_summary = lr_model.summary
r2 = training_summary.r2
rmse = training_summary.rootMeanSquaredError

Посмотрим значения метрик

In [8]:
print(f"R2: {r2}")
print(f"RMSE: {rmse}")

R2: 0.8944721603366936
RMSE: 1.9748092381589237


Сохраним полученные значения метрик в файл.

In [9]:
with open("regression_metrics.txt", "w") as f:
    f.write(f"R2: {r2}\n")
    f.write(f"RMSE: {rmse}\n")

Сгенерируем данные для кастеризации

In [10]:
num_clusters = 3
cluster_data = []
for i in range(num_clusters):
    center_x = np.random.rand() * 10
    center_y = np.random.rand() * 10
    for _ in range(100):
        point_x = center_x + np.random.randn() * 0.5
        point_y = center_y + np.random.randn() * 0.5
        cluster_data.append((float(point_x), float(point_y)))

Создадим для кластеризации DataFrame

In [11]:
cluster_df = spark.createDataFrame(cluster_data, ["x", "y"])

Подгготовим данные и проведем кластеризацию с помощью KMeans

In [12]:
assembler = VectorAssembler(inputCols=["x", "y"], outputCol="features")
cluster_data_transformed = assembler.transform(cluster_df)
kmeans = KMeans(k=num_clusters, seed=42)
kmeans_model = kmeans.fit(cluster_data_transformed)

Получим, выведем и сохраним центры кластеров

In [13]:
centers = kmeans_model.clusterCenters()

print("Cluster Centers:")
for i, center in enumerate(centers):
    print(f"Center of cluster {i}: {center}")

with open("cluster_centers.txt", "w") as f:
    for i, center in enumerate(centers):
        f.write(f"Center of cluster {i}: {center}\n")

Cluster Centers:
Center of cluster 0: [2.95188248 3.60068657]
Center of cluster 1: [9.66904691 3.33060381]
Center of cluster 2: [8.02470699 4.66428446]


In [14]:
spark.stop()

# **Задание 4. Word2vec**

In [15]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec

In [16]:
spark = SparkSession.builder \
    .appName("Word2Vec Example") \
    .getOrCreate()

In [17]:
sentences = [
    "Белеет парус одинокий в тумане моря голубом!",
    "Что ищет он в стране далекой?",
    "Что кинул он в краю родном?",
    "Играют волны — ветер свищет, и мачта гнется и скрипит…",
    "Увы, он счастия не ищет и не от счастия бежит!",
    "Под ним струя светлей лазури, над ним луч солнца золотой…",
    "А он, мятежный, просит бури, Как будто в бурях есть покой!"
]


Преобразуем предложения в DataFrame

In [18]:
data = [(sentence.split(" "),) for sentence in sentences]
df = spark.createDataFrame(data, ["text"])

Подготовим данных для Word2Vec, представив каждое предложение в виде списка слов


In [19]:
df.show(truncate=False)

+----------------------------------------------------------------------+
|text                                                                  |
+----------------------------------------------------------------------+
|[Белеет, парус, одинокий, в, тумане, моря, голубом!]                  |
|[Что, ищет, он, в, стране, далекой?]                                  |
|[Что, кинул, он, в, краю, родном?]                                    |
|[Играют, волны, —, ветер, свищет,, и, мачта, гнется, и, скрипит…]     |
|[Увы,, он, счастия, не, ищет, и, не, от, счастия, бежит!]             |
|[Под, ним, струя, светлей, лазури,, над, ним, луч, солнца, золотой…]  |
|[А, он,, мятежный,, просит, бури,, Как, будто, в, бурях, есть, покой!]|
+----------------------------------------------------------------------+



Обучаем модели Word2Vec

In [20]:
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(df)

Преобразуем слова в векторы

In [21]:
result = model.transform(df)

Посмотрим результат

In [22]:
result.show(truncate=False)

+----------------------------------------------------------------------+-------------------------------------------------------------------+
|text                                                                  |result                                                             |
+----------------------------------------------------------------------+-------------------------------------------------------------------+
|[Белеет, парус, одинокий, в, тумане, моря, голубом!]                  |[0.02615223093224423,0.04188473762146064,0.012471183203160763]     |
|[Что, ищет, он, в, стране, далекой?]                                  |[0.0037275906652212143,-0.04420801729429513,-0.014807784464210272] |
|[Что, кинул, он, в, краю, родном?]                                    |[-0.010152634854118029,-0.026048003191438813,0.002213459461927414] |
|[Играют, волны, —, ветер, свищет,, и, мачта, гнется, и, скрипит…]     |[2.8135075699537994E-4,-0.0059743637684732676,0.008829496800899506]|
|[Увы,, он, с

Сохраним векторы в файл

In [23]:
vectors = result.select("text", "result").rdd.map(lambda row: (row[0], row[1].toArray().tolist()))
with open("word_vectors.txt", "w", encoding='utf-8') as f:
    for sentence, vector in vectors.collect():
        f.write(f"Sentence: {sentence}, Vectors: {vector}\n")


In [24]:
spark.stop()