In [1]:
import os
import findspark

from spark_builder import SparkBuilder
from pyspark.sql.functions import explode, split, col

import yaml
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import matplotlib.pyplot as plt

import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

os.environ['SPARK_HOME'] = "C:/apps/spark-3.5.1-bin-hadoop3/"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
#os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
#os.environ['PYSPARK_PYTHON'] = 'python'
findspark.init()
cfg = yaml.load(open('config.yaml'), Loader=yaml.FullLoader)

In [2]:
from pyspark.sql import SparkSession, DataFrame

In [3]:
with open(".env", "r") as f:
    for line in f:
        k, v = line.rstrip().split('=')
        os.environ[k] = v

In [4]:
url = "jdbc:oracle:thin:@localhost:1521/FREE"
properties = {
    "user": "system",
    "password": os.environ["ORACLE_PWD"],
    "driver": "oracle.jdbc.driver.OracleDriver"
}

In [None]:
#UPLOAD TO DB
spark = SparkBuilder({'spark.app.name': 'Upload_db'}).getSession()

cfg = yaml.load(open('config.yaml'), Loader=yaml.FullLoader)

dataset = spark.read.csv(cfg['dataset']['data_path'], header=True, inferSchema=True, sep='\t')
# dataset = dataset.select(cfg['dataset']['filtered_cols'])
# dataset = dataset.na.fill(value=0)
# print('Input data')
# dataset.printSchema()

dataset.write.jdbc(url=url, table=cfg['db']['data_table'], mode="overwrite", properties=properties)
del(dataset)

spark.stop()

In [6]:
spark = SparkBuilder({'spark.app.name': 'Kmenas'}).getSession()

In [7]:
proc_data = spark._jvm.DataMart.readAndProccess()

In [8]:
proc_data

JavaObject id=o122

In [9]:
final_data = DataFrame(proc_data, spark).select("scaledFeatures")

In [10]:
evaluator = ClusteringEvaluator(predictionCol='prediction',
                                featuresCol='scaledFeatures',
                                metricName='silhouette',
                                distanceMeasure='squaredEuclidean')
kmeans = KMeans(featuresCol='scaledFeatures', k=12, seed=42)
model = kmeans.fit(final_data)
predictions = model.transform(final_data)
score = evaluator.evaluate(predictions)

In [11]:
predictions.select("prediction").show(5)

+----------+
|prediction|
+----------+
|         1|
|         1|
|         1|
|         1|
|         1|
+----------+
only showing top 5 rows



In [12]:
spark.stop()

In [None]:
#UPLOAD TO DB
spark = SparkBuilder({'spark.app.name': 'Upload_db'}).getSession()

cfg = yaml.load(open('config.yaml'), Loader=yaml.FullLoader)

dataset = spark.read.csv(cfg['dataset']['data_path'], header=True, inferSchema=True, sep='\t')
dataset = dataset.select(cfg['dataset']['filtered_cols'])
dataset = dataset.na.fill(value=0)
print('Input data')
dataset.printSchema()

dataset.write.jdbc(url=url, table=cfg['db']['data_table'], mode="overwrite", properties=properties)
del(dataset)

spark.stop()

In [None]:
from pyspark import SQLContext
from pyspark.sql import SparkSession, DataFrame


class DataMart:
    def __init__(self, spark: SparkSession):
        self.spark_context = spark.sparkContext
        self.sql_context = SQLContext(self.spark_context, spark)
        self.jwm_datamart = self.spark_context._jvm.DataMart

    def read_dataset(self) -> DataFrame:
        jvm_data = self.jwm_datamart.readPreprocessedOpenFoodFactsDataset()
        return DataFrame(jvm_data, self.sql_context)

    def write_predictions(self, df: DataFrame):
        self.jwm_datamart.writePredictions(df._jdf)