In [1]:

import pyspark
from pyspark.ml import clustering, evaluation
from pyspark.sql import SparkSession


In [2]:
import os
from dotenv import load_dotenv

load_dotenv()

os.environ['SPARK_HOME'] = "/root/spark"
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-21-openjdk-amd64'

In [3]:
from pydantic import BaseModel
import yaml


class ClickhouseConfig(BaseModel):
    username: str
    password: str
    url: str 


class SparkSessionConfig(BaseModel):
    app_name: str
    deploy_mode: str
    driver_memory: str
    executor_memory: str
    
    
class PathsConfig(BaseModel):
    data: str
    model: str
    
    
class KMeansConfig(BaseModel):
    k: int
    maxIter: int
    seed: int

    
class Config(BaseModel):
    spark: SparkSessionConfig
    paths: PathsConfig
    kmeans: KMeansConfig
    clickhouse: ClickhouseConfig
    

class ConfigCreator:
    def load_config(self, file_path: str) -> Config:
        with open(file_path, 'r') as file:
            config_data = yaml.safe_load(file)
            
        config_data['clickhouse'] = {'username': os.getenv('CLICKHOUSE_USER'), 'password': os.getenv('CLICKHOUSE_PASSWORD'), 'url': os.getenv('CLICKHOUSE_URL')}
        
        return Config(**config_data)
        

config = ConfigCreator().load_config("../configs/config.yaml")


spark=SparkSessionConfig(app_name='my_kmeans', deploy_mode='local', driver_memory='1g', executor_memory='2g') paths=PathsConfig(data='data/subset.csv', model='model/my_kmeans') kmeans=KMeansConfig(k=10, maxIter=20, seed=42) clickhouse=ClickhouseConfig(username='test123', password='pass123word', url='jdbc:clickhouse://31.128.42.197:8123/my_db')


In [51]:
# import findspark

# findspark.init()

# spark_session = (
# SparkSession.builder.appName(config.spark.app_name)
# .master(config.spark.deploy_mode)
# .config("spark.driver.memory", config.spark.driver_memory)
# .config("spark.executor.memory", config.spark.executor_memory)
# .config("spark.jars", '../clickhouse-jdbc-0.6.4-all.jar, ../clickhouse-spark-runtime-3.3_2.13-0.7.3.jar') 
# .getOrCreate()
# ) 

# data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
# df = spark.createDataFrame(data, ["Name", "Age"])
# df.write.jdbc(url=config.clickhouse.url, table='predictions', mode="overwrite",
#               properties = {
#     "user": config.clickhouse.username,
#     "password": config.clickhouse.password,
#     "driver": "com.clickhouse.jdbc.ClickHouseDriver"}) 

In [7]:
from clickhouse_driver import Client
from clickhouse_sqlalchemy import make_session, get_declarative_base
from sqlalchemy import create_engine
import findspark

findspark.init()

spark_session = (
SparkSession.builder.appName(config.spark.app_name)
.master(config.spark.deploy_mode)
.config("spark.driver.memory", config.spark.driver_memory)
.config("spark.executor.memory", config.spark.executor_memory)
.config("spark.jars", '../clickhouse-jdbc-0.6.4-all.jar, ../clickhouse-spark-runtime-3.3_2.13-0.7.3.jar') 
.getOrCreate()
) 

data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark_session.createDataFrame(data, ["Name", "Age"])


clickhouse_url = f'clickhouse://{self.clickhouse_username}:{self.clickhouse_password}@31.128.42.197:8123/my_db'
engine = create_engine(clickhouse_url)
session = make_session(engine)

df.toPandas().to_sql('predictions', engine, if_exists='append', index=False)

session.close()

# Stop the Spark session
spark_session.stop()

# client = Client(host='localhost',
#                 user=config.clickhouse.username,       
#                 password=config.clickhouse.password   
        # )

# client.insert_dataframe('INSERT INTO my_db.predictions VALUES', df.toPandas())

24/08/22 19:21:28 WARN Utils: Your hostname, fmrzlvsxry resolves to a loopback address: 127.0.1.1; using 31.128.42.197 instead (on interface eth0)
24/08/22 19:21:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/08/22 19:21:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/22 19:21:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/08/22 19:21:30 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


AttributeError: 'DataFrame' object has no attribute 'to_sql'

24/08/22 19:21:42 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [54]:
spark_session.read \
            .format("jdbc") \
            .option("url",  'jdbc:clickhouse://31.128.42.197:9000/my_db') \
            .option("dbtable", 'my_db.openfood') \
            .option("user",  config.clickhouse.username) \
            .option("password",  config.clickhouse.password) \
            .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
            .load()

Py4JJavaError: An error occurred while calling o835.load.
: java.lang.ClassNotFoundException: com.clickhouse.jdbc.ClickHouseDriver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:34)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)


In [10]:
import findspark

findspark.init()

spark_session = (
    SparkSession.builder.appName(config.spark.app_name)
    .master(config.spark.deploy_mode)
    .config("spark.driver.memory", config.spark.driver_memory)
    .config("spark.executor.memory", config.spark.executor_memory)
    .getOrCreate()
)

24/08/21 12:29:38 WARN Utils: Your hostname, fmrzlvsxry resolves to a loopback address: 127.0.1.1; using 31.128.42.197 instead (on interface eth0)
24/08/21 12:29:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/21 12:29:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [11]:
import pyspark.sql
from pyspark.ml.feature import StandardScaler, VectorAssembler



class Preprocessor:
    def __init__(self, spark_session: pyspark.sql.SparkSession, data_path: str):
        self.data_path = data_path
        self.spark_session = spark_session
        self.df = None


    def load_data(self):
        self.df = self.spark_session.read.csv(self.data_path, header=True, inferSchema=True)


    def vectorize(self, df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:  
        vec_assembler = VectorAssembler(
            inputCols=df.columns, outputCol="features"
        )
        return vec_assembler.transform(df)
    
    
    def scale(self, df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame: 
        scaler = StandardScaler(inputCol="features", outputCol="scaled_features").fit(df)
        return scaler.transform(df)
    
    
    def create_df(self) -> pyspark.sql.DataFrame:
        
        self.load_data()

        self.df = self.df.drop('code', 'product_name', 'created_t', 'last_modified_t',
       'last_updated_t', 'serving_quantity', 'additives_n', 'nutriscore_score',
       'nova_group', 'completeness', 'last_image_t')

        self.df = self.vectorize(self.df)

        self.df = self.scale(self.df)
        
        return self.df


preprocessor = Preprocessor(spark_session, config.paths.data)
df = preprocessor.create_df()

                                                                                

In [12]:
model_args = dict(config.kmeans)
model = clustering.KMeans(featuresCol='scaled_features', **model_args)
model = model.fit(df)

24/08/21 12:29:51 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/08/21 12:29:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/08/21 12:29:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


In [13]:
evaluator = evaluation.ClusteringEvaluator(
    predictionCol="prediction",
    featuresCol='scaled_features',
    metricName="silhouette",
    distanceMeasure="squaredEuclidean",
)
output = model.transform(df)
output.show()

+----------------+-----------+----------------+------------------+------------------+---------------+---------------+---------------+-----------------+------------------+-----------------------------------------------------+-----------------------+--------------------+--------------------+----------+
|energy-kcal_100g|energy_100g|        fat_100g|saturated-fat_100g|carbohydrates_100g|    sugars_100g|     fiber_100g|  proteins_100g|        salt_100g|       sodium_100g|fruits-vegetables-nuts-estimate-from-ingredients_100g|nutrition-score-fr_100g|            features|     scaled_features|prediction|
+----------------+-----------+----------------+------------------+------------------+---------------+---------------+---------------+-----------------+------------------+-----------------------------------------------------+-----------------------+--------------------+--------------------+----------+
|            46.0|      192.0|             0.0|               0.0|             11.25|         

In [14]:
print(evaluator.evaluate(output))

0.4188446535941756


In [15]:
model.write().overwrite().save(config.paths.model)

                                                                                