In [1]:
import sys

BASE_DIR = '../'
sys.path.insert(0, BASE_DIR)

from src import Preprocessor, Logger, KMeansConnector, MySQLConnectorConfig, MySQLConnector

import configparser
from pyspark.sql import SparkSession

CONFIG_PATH = '../config.ini'

In [2]:
config = configparser.ConfigParser()
config.read(CONFIG_PATH)

DB_CONNECTOR_JAR = f"../{config['spark']['mysql_jar']}"

In [3]:
spark = SparkSession.builder \
    .appName(config['spark']['app_name']) \
    .master(config['spark']['deploy_mode']) \
    .config("spark.driver.cores", config['spark']['driver_cores']) \
    .config("spark.executor.cores", config['spark']['executor_cores']) \
    .config("spark.driver.memory", config['spark']['driver_memory']) \
    .config("spark.executor.memory", config['spark']['executor_memory']) \
    .config("spark.dynamicAllocation.enabled", config['spark']['dynamic_allocation']) \
    .config("spark.dynamicAllocation.minExecutors", config['spark']['min_executors']) \
    .config("spark.dynamicAllocation.maxExecutors", config['spark']['max_executors']) \
    .config("spark.dynamicAllocation.initialExecutors", config['spark']['initial_executors']) \
    .config("spark.jars", DB_CONNECTOR_JAR) \
    .config("spark.driver.extraClassPath", DB_CONNECTOR_JAR) \
    .getOrCreate()

24/09/22 20:01:48 WARN Utils: Your hostname, tower resolves to a loopback address: 127.0.1.1; using 192.168.0.105 instead (on interface enp4s0)
24/09/22 20:01:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/09/22 20:01:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
logger = Logger(True).get_logger("logger")
preproc = Preprocessor(logger)

In [5]:
db_config = MySQLConnectorConfig(**dict(config['mysql']))

In [6]:
db_conn = MySQLConnector(spark, logger, db_config)

In [7]:
features_names = config['dataset']['features'].split(", ")
output = preproc.load(db_conn, db_conn.config.samples_table, features_names)
scaled_output = preproc.apply_scale(output)

2024-09-22 20:01:54,554 — logger — INFO — START_METHOD: Загрузка датасета из БД
2024-09-22 20:01:54,555 — logger — INFO — START_METHOD: Чтение датасета из таблицы БД
2024-09-22 20:01:55,878 — logger — INFO — END_METHOD: Чтение датасета из таблицы БД
2024-09-22 20:01:56,636 — logger — INFO — END_METHOD: Загрузка датасета из БД
2024-09-22 20:01:56,636 — logger — INFO — START_METHOD: Применение к экземпларам датасета StandardScaler-трансформации
2024-09-22 20:01:58,454 — logger — INFO — END_METHOD: Применение к экземпларам датасета StandardScaler-трансформации


In [8]:
model = KMeansConnector(2, 'scaled_features', logger, db_conn)

In [9]:
model.fit(scaled_output)

2024-09-22 20:02:01,099 — logger — INFO — START_METHOD: Обучение модели кластеризации
2024-09-22 20:02:03,223 — logger — INFO — END_METHOD: Обучение модели кластеризации


In [10]:
predictions = model.predict(scaled_output)

2024-09-22 20:02:03,968 — logger — INFO — START_METHOD: Получение предсказаний от обученной модели
2024-09-22 20:02:03,999 — logger — INFO — START_METHOD: Добавление новой записи в таблицу БД


[Stage 33:>                                                         (0 + 1) / 1]

2024-09-22 20:02:05,305 — logger — INFO — END_METHOD: Добавление новой записи в таблицу БД
2024-09-22 20:02:05,306 — logger — INFO — END_METHOD: Получение предсказаний от обученной модели


                                                                                

In [11]:
scores = model.evaluate(predictions)

2024-09-22 20:02:06,739 — logger — INFO — START_METHOD: Оценка качества модели кластеризации
2024-09-22 20:02:07,186 — logger — INFO — END_METHOD: Оценка качества модели кластеризации


In [12]:
scores

0.6265308730733993