Импортируем нужные зависимости

In [15]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from constants import COLUMS
import dotenv
import psycopg2
import os

Создаем SparkSession

In [16]:
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

spark = SparkSession.builder \
    .config("spark.executor.memory", "70g") \
    .config("spark.driver.memory", "50g") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "16g") \
    .getOrCreate()

Загружаем и предобрабатываем исходный датасет

In [17]:
dataset = spark.read.csv("en.openfoodfacts.org.products.csv", inferSchema=True, header=True, sep="\t")
dataset = dataset.na.fill(0.0).na.fill("unkhown")
cols_to_keep = list(COLUMS)
dataset = VectorAssembler(inputCols=cols_to_keep, outputCol="features").setHandleInvalid("error").transform(dataset)
dataset = dataset.select("features")
dataset.show()

                                                                                

+--------------------+
|            features|
+--------------------+
|(119,[43,58,114],...|
|(119,[6,22,36,43,...|
|(119,[6,22,43,57,...|
|(119,[6,22,43,55,...|
|(119,[6,22,43,55,...|
|(119,[43,58,114],...|
|(119,[22,43,55,57...|
|(119,[6,22,43,55,...|
|(119,[6,22,43,55,...|
|(119,[43,58,114],...|
|(119,[43,58,114],...|
|(119,[6,22,43,57,...|
|(119,[6,43,55,57,...|
|(119,[6,22,43,55,...|
|(119,[43,58,114],...|
|(119,[43,58,114],...|
|(119,[43,58,114],...|
|(119,[43,58,114],...|
|(119,[43,58,114],...|
|(119,[43,58,114],...|
+--------------------+
only showing top 20 rows



                                                                                

Конвертируем данные в pandas DataFrame

In [18]:
datasetPD = dataset.toPandas()

Подгружаем креды для базы данных

In [19]:
dotenv.load_dotenv()

DATABASE_NAME = os.environ["DATABASE_NAME"]
DATABASE_COLUMN = os.getenv("DATABASE_COLUMN")
DATABASE_USER = os.getenv("DATABASE_USER")
DATABASE_PASSWORD = os.getenv("DATABASE_PASSWORD")
DATABASE_HOST = os.getenv("DATABASE_HOST")
DATABASE_N = os.getenv("DATABASE_N")

Коннектимся к Базе данных

In [20]:
conn = psycopg2.connect(dbname=DATABASE_N, user=DATABASE_USER, password=DATABASE_PASSWORD, host=DATABASE_HOST)
cursor = conn.cursor()

conn.autocommit = True

sql = F"DROP TABLE IF EXISTS {DATABASE_NAME};"
cursor.execute(sql)

sql = F"CREATE TABLE {DATABASE_NAME} ({DATABASE_COLUMN} varchar)"
cursor.execute(sql)

cursor.close()
conn.close()

Готовоим данные для записи в Базу данных

In [21]:
all_data = []
for i in range(len(datasetPD)):
    data_str = ""
    for j in range(len(datasetPD.loc[i])):
        data_str += str(datasetPD.loc[i][j])
        if j != len(datasetPD.loc[i]) - 1:
            data_str += ","
    all_data.append(data_str)

Создаем функцию для записи данных в Базу данных

In [22]:
def add_lines(lines):
    conn = psycopg2.connect(dbname=DATABASE_N, user=DATABASE_USER, password=DATABASE_PASSWORD, host=DATABASE_HOST)
    cursor = conn.cursor()

    conn.autocommit = True
    line_data = ""
    for i, line in enumerate(lines):
        line_data += f"(\'{line}\')"
        if i != len(lines) - 1:
            line_data += ","
    sql = f"INSERT INTO {DATABASE_NAME}({DATABASE_COLUMN}) VALUES {line_data}"
    cursor.execute(sql)

    cursor.close()
    conn.close()

Записываем данные в Базу данных

In [23]:
add_lines(all_data)

Читаем данные из Базы данных

In [24]:
conn = psycopg2.connect(dbname=DATABASE_N, user=DATABASE_USER, password=DATABASE_PASSWORD, host=DATABASE_HOST)
cursor = conn.cursor()

conn.autocommit = True
sql = f"SELECT * FROM {DATABASE_NAME}"
cursor.execute(sql)

all_data = []
for dat in cursor.fetchall():
    data = dat[0].split(",")
    list_data = []
    for i in range(len(data)):
        list_data.append(data[i])
    all_data.append(list_data)
df = pd.DataFrame(all_data, columns=['features'])

cursor.close()
conn.close()

Обрабатываем данные из Базы данных

In [25]:
dataset = spark.createDataFrame(df)
dataset.show()

+--------------------+
|            features|
+--------------------+
|(119,[43,58,114],...|
|(119,[6,22,36,43,...|
|(119,[6,22,43,57,...|
|(119,[6,22,43,55,...|
|(119,[6,22,43,55,...|
|(119,[43,58,114],...|
|(119,[22,43,55,57...|
|(119,[6,22,43,55,...|
|(119,[6,22,43,55,...|
|(119,[43,58,114],...|
|(119,[43,58,114],...|
|(119,[6,22,43,57,...|
|(119,[6,43,55,57,...|
|(119,[6,22,43,55,...|
|(119,[43,58,114],...|
|(119,[43,58,114],...|
|(119,[43,58,114],...|
|(119,[43,58,114],...|
|(119,[43,58,114],...|
|(119,[43,58,114],...|
+--------------------+
only showing top 20 rows



Обучаем и сохраняем модель

In [27]:
kmeans = KMeans().setK(3).setSeed(1).setMaxIter(5)
model = kmeans.fit(dataset)
model.save("kmeans.sav")

                                                                                

Считаем предсказания модели

In [28]:
predictions = model.transform(dataset)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))



Silhouette with squared euclidean distance = 0.9999993029872573


                                                                                

Записываем результат работы модели в Базу данных

In [31]:
predictionsDF = predictions.toPandas()
add_lines(predictionsDF)
predictionsDF.head()

                                                                                

                                            features  prediction
0  (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...           0
1  (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 65.0, 0.0, 0.0,...           0
2  (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 9.8, 0.0, 0.0, ...           0
3  (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.9, 0.0, 0.0, ...           0
4  (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 70.1, 0.0, 0.0,...           0
