# Run cluster

## Build nested images

```bash
 docker-compose -f .\docker\entrypoint.docker-compose.yml  -f .\docker\spark\spark.docker-compose.yml --env-file=./env/.env  build data-mart-python-k8s data-mart-k8s  spark-build
```

## Run spark driver
```bash
docker-compose -f .\docker\entrypoint.docker-compose.yml  -f .\docker\spark\spark.docker-compose.yml --env-file=./env/.env  up producer -d --force-recreate
```

## Start cluster

```bash
wsl -d <Distributive-Name>
helm repo add bitnami https://charts.bitnami.com/bitnami
helm search repo bitnami
```

### Create secrets

```bash
kubectl create secret generic spark-cluster-secrets \
  --from-env-file=./env/.env
```


### Run spark cluster

```bash
helm upgrade kayvan-release bitnami/spark \
  --install \
  -f ./docker/kubernetes/helm/spark-config.yaml \
  --set global.security.allowInsecureImages=true \
  --set image.tag=latest \
  --set image.registry="docker.io" \
  --set image.repository="daniinxorchenabo/spark-k8s" \
  --set image.tag="latest" \
  --set master.resources.requests.cpu="1" \
  --set master.resources.limits.cpu="1" \
  --set worker.resources.requests.cpu="8" \
  --set worker.resources.limits.cpu="8" \
  --set worker.coreLimit="8" \
  --set worker.replicaCount="4" \
  --force
kubectl apply -f ./docker/kubernetes/helm/spark-master-service.yaml
```

### Run other nodes
```bash
kubectl apply -f ./docker/kubernetes/mongo-k8s.yaml
kubectl apply -f ./docker/kubernetes/redis-k8s.yaml
kubectl apply -f ./docker/kubernetes/data-mart-k8s.yaml
```

## Init pyspark client

In [25]:
import os
ROOT_DIR = '/workspace/NN'
os.chdir(ROOT_DIR)

dataset_path = os.path.join(ROOT_DIR, 'neural', 'datasets', )
weight_path = os.path.join(ROOT_DIR, 'neural', 'weights', )
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
os.environ["JAVA_HOME"] = os.environ["K8S_JAVA_HOME"]
os.environ["PYSPARK_SUBMIT_ARGS"] = f"""
--conf spark.executor.memory={os.environ['K8S_SPARK_EXECUTOR_MEMORY']}
--conf spark.executor.cores={os.environ['K8S_SPARK_EXECUTOR_CORES']}
pyspark-shell
"""
MONGO_USER =  os.environ["K8S_MONGO_USER"]
MONGO_PASS =  os.environ["K8S_MONGO_PASSWORD"]
MONGO_HOST = os.environ["K8S_MONGO_EXTERNAL_HOST"]
MONGO_PORT =  os.environ["K8S_MONGO_EXTERNAL_PORT"]
MONGO_ADDR = f"{MONGO_USER}:{MONGO_PASS}@{MONGO_HOST}:{MONGO_PORT}"

DATA_MART_HOST = os.environ["K8S_EXTERNAL_DATAMART_HOST"]
DATA_MART_PORT = os.environ["K8S_EXTERNAL_DATAMART_PORT"]

KeyError: 'K8S_JAVA_HOME'

In [6]:
from pyspark.sql import SparkSession



def spark_app_generator(name):
    spark = (SparkSession.builder
        .appName(name)
        .master(os.environ['K8S_SPARK_MASTER_URL'])
        .config("spark.kubernetes.container.image", "bitnami/spark:latest")
        .config("spark.kubernetes.namespace", "default")
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
        .config("spark.executor.instances", os.environ['K8S_SPARK_EXECUTOR_INSTANCES'])      # сколько Executors запустить
        .config("spark.executor.cores", os.environ['K8S_SPARK_EXECUTOR_CORES'])          # по одному CPU‐ядру
        .config("spark.executor.memory", os.environ['K8S_SPARK_EXECUTOR_MEMORY'])      # по 512 МБ памяти
        .config("spark.driver.memory", os.environ['K8S_SPARK_DRIVER_MEMORY'])        # драйверу тоже ограничим RAM, если нужно
        .config("spark.driver.host", os.environ['K8S_SPARK_DRIVER_HOST'])
        .config("spark.jars.ivy", "/tmp/.ivy2")  # укажи директорию вручную
        .config("spark.driver.bindAddress",  os.environ['K8S_SPARK_DRIVER_BIND_ADDRESS'])
        .config("spark.driver.port",         os.environ['K8S_SPARK_DRIVER_PORT'])
        .config("spark.blockManager.port",   os.environ['K8S_SPARK_BLOCKMANAGER_PORT'])
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.3.0,"
                                       "com.redislabs:spark-redis_2.12:3.1.0,"
                                       "org.neo4j:neo4j-connector-apache-spark_2.12:5.3.1_for_spark_3")
        .config("spark.mongodb.read.connection.uri", f"mongodb://{MONGO_ADDR}")
        .config("spark.mongodb.write.connection.uri", f"mongodb://{MONGO_ADDR}")
        .config("spark.redis.host", os.environ['K8S_SPARK_REDIS_HOST'])
        .config("spark.redis.port", os.environ['K8S_SPARK_REDIS_PORT'])
        .config("spark.redis.db", os.environ['K8S_SPARK_REDIS_DB'])
        .getOrCreate())
    return spark


## Load data from Data-mart

In [12]:
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType

url = f"http://{DATA_MART_HOST}:{DATA_MART_PORT}/preprocessed"
# или "http://localhost:8080/preprocessed" при port-forward
response = requests.get(url, headers={"Accept": "application/json"})
response.raise_for_status()  # упадёт с ошибкой, если статус != 200

data =local_data = response.json()

Save clear data for tests.

In [15]:
os.makedirs((_cleared_data_path:=os.path.join(dataset_path, 'lab_8',)), exist_ok=True)
with open(os.path.join(_cleared_data_path,  'cleared_data_.py'), 'w') as f:
    print("data = [", file=f)
    for row in local_data:
        print(str(row) + ",", file=f)
    print("]", file=f)

Load clear data for tests.

In [3]:
import neural.datasets.lab_8.cleared_data as cd
data =  local_data = cd.data

Load clear data to spark.

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

def list_to_vector(xs):
    return Vectors.dense(xs)

list_to_vector_udf = udf(list_to_vector, VectorUDT())
spark = spark_app_generator('8th_lab_with_data_mart')
# Создаем начальный DF с колонками _id и list-фичами
df = spark.createDataFrame(local_data)
# Преобразуем list-фичи в вектор для ML
df = df.withColumn("features_vec", list_to_vector_udf("features"))

## Load data from mongodb

In [None]:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

# --- Обработчик фичей ---
def preprocess(df: DataFrame) -> DataFrame:
    selected = (
        df.select("_id", "product_name", "ingredients_n", "ingredients_sweeteners_n", "scans_n", "additives_n")
          .na.drop()
    )
    assembler = VectorAssembler(
        inputCols=["ingredients_n", "ingredients_sweeteners_n", "scans_n", "additives_n"],
        outputCol="features_assembled"
    )
    assembled = assembler.transform(selected)
    scaler = StandardScaler(
        inputCol="features_assembled",
        outputCol="features"
    ).fit(assembled)
    return scaler.transform(assembled).select("_id", "product_name", "features")

def list_to_vector(xs):
    return Vectors.dense(xs)
list_to_vector_udf = udf(list_to_vector, VectorUDT())


custom_schema = StructType([
     StructField("_id", StringType(), True),
        StructField("product_name", StringType(), True),
        StructField("ingredients_sweeteners_n", IntegerType(), True),
        StructField("ingredients_n", IntegerType(), True),
        StructField("additives_n", IntegerType(), True),
        StructField("scans_n", IntegerType(), True),
    ])

spark = spark_app_generator('8th_lab_without_data_mart')

row_df = (
            spark.read.schema(custom_schema)
            .format("mongodb")
            .options(host=f"{MONGO_HOST}:{MONGO_PORT}", database="off", collection='products')
            .load()
        )
df = preprocess(row_df)
df = df.withColumnRenamed("features", "features_vec")
df = df.withColumnRenamed("_id", "id")
rows = df.collect()



# KMean learning

In [20]:


# 4. Обучение модели KMeans
k = 4
kmeans = KMeans(
    featuresCol="features_vec",
    predictionCol="cluster",
    k=k,
    seed=1
)
model = kmeans.fit(df)

# 5. Предсказания
predictions = model.transform(df)

# 6. Оценка модели
evaluator = ClusteringEvaluator(
    featuresCol="features_vec",
    predictionCol="cluster",
    metricName="silhouette"
)
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette}")



25/06/21 20:53:27 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/06/21 20:53:42 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/06/21 20:53:57 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/06/21 20:54:12 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/06/21 20:54:16 WARN TaskSetManager: Stage 0 contains a task of very large size (28198 KiB). The maximum recommended task size is 1000 KiB.
25/06/21 20:54:23 WARN TaskSetManager: Stage 3 contains a task of very large size (28199 KiB). The maximum recommended task size is 1000 KiB.
25/06/21 20:

Silhouette Score: 0.4581098262921842


25/06/21 20:55:02 WARN TaskSetManager: Stage 53 contains a task of very large size (28198 KiB). The maximum recommended task size is 1000 KiB.
[Stage 53:>                                                         (0 + 2) / 2]

Cluster 0 center: [1.56529218 0.14839207 0.26360828 1.23343674]
Cluster 1 center: [0.4781701  0.01349948 0.29697953 0.1573333 ]
Cluster 2 center: [3.42836952 0.30116767 0.20211602 3.10932586]
Cluster 3 center: [1.43338213 5.50501379 0.30297745 2.47973613]


                                                                                

# Save results to data storage

In [None]:
# 7. Сохраняем результат в Redis
#    Храним каждую запись как Hash с ключом, равным _id
output = predictions.select("id", "product_name", "features", "cluster")
output.write \
    .format("org.apache.spark.sql.redis") \
    .option("table", "results_lab_8") \
    .option("key.column", "id") \
    .mode("overwrite") \
    .save()

# Опционально: вывод центров кластеров
centers = model.clusterCenters()
for idx, center in enumerate(centers):
    print(f"Cluster {idx} center: {center}")

## Tests saving

In [24]:
import redis

client = redis.Redis(
    host='host.docker.internal',
    port=30079,
    db=0,
    decode_responses=True  # получаем str вместо bytes
)
keys = client.keys('results_lab_8:*')
for key in keys:
    print(key, client.hgetall(key))

results_lab_8:0072036986900 {'cluster': '1', 'product_name': 'Greek Nonfat Yogurt', 'features': 'WrappedArray(0.4073016264065956, 0.0, 0.0951413979692623, 0.0)'}
results_lab_8:24065764 {'cluster': '0', 'features': 'WrappedArray(0.8824868572142904, 0.0, 0.0951413979692623, 1.2389043826043644)', 'product_name': 'Piščančja jetrna pašteta'}
results_lab_8:8480000848383 {'features': 'WrappedArray(1.5613229012252832, 0.0, 0.14271209695389345, 1.6518725101391527)', 'cluster': '0', 'product_name': 'Biscoitos Canela'}
results_lab_8:20499747 {'features': 'WrappedArray(0.5430688352087941, 0.0, 0.23785349492315574, 0.4129681275347882)', 'cluster': '1', 'product_name': 'Mortadella con pistacchio'}
results_lab_8:3596710464753 {'cluster': '0', 'features': 'WrappedArray(0.6109524396098934, 0.0, 0.0951413979692623, 1.2389043826043644)', 'product_name': 'Lardons allumettes fumés'}
results_lab_8:20197261 {'features': 'WrappedArray(0.06788360440109927, 0.0, 0.14271209695389345, 0.0)', 'product_name': 'Gour

KeyboardInterrupt: 