Compare multiple ML models per gene on LN_IC50 target.

Extension of the single-model script:
- loads Parquet with pandas (fast metadata / adequate for medium subsets)
- evaluates multiple models per gene (train/test split)
- saves long & wide CSVs
- saves one bar plot per model (Top-N genes with lowest MSE)

In [3]:
import os, shutil, pathlib, sys

def set_java_home():
    # kandidati: (1) sistemska java, (2) conda JAVA u $CONDA_PREFIX
    candidates = []
    java = shutil.which("java")
    if java:
        candidates.append(str(pathlib.Path(java).resolve().parents[1]))  # .../bin/java -> JDK root
    cp = os.environ.get("CONDA_PREFIX")
    if cp:
        candidates.append(cp)

    for c in candidates:
        if c and (pathlib.Path(c)/"bin"/"java").exists():
            os.environ["JAVA_HOME"] = c
            os.environ["PATH"] = os.pathsep.join([str(pathlib.Path(c)/"bin"), os.environ["PATH"]])
            return c
    return None

jh = set_java_home()
print("PY:", sys.executable)
print("JAVA_HOME:", jh or os.environ.get("JAVA_HOME"))
!java -version || echo "java not found"


PY: /home/paunica/miniconda/bin/python
JAVA_HOME: None
/bin/bash: java: command not found
java not found


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Project2-ML").config("spark.ui.showConsoleProgress","false").getOrCreate()

from pathlib import Path
DF_PATH = Path("data/california_housing.parquet")
if not DF_PATH.exists():
    from sklearn.datasets import fetch_california_housing
    import pandas as pd
    DF_PATH.parent.mkdir(parents=True, exist_ok=True)
    pdf = fetch_california_housing(as_frame=True).frame
    spark.createDataFrame(pdf).write.mode("overwrite").parquet(str(DF_PATH))

df = spark.read.parquet(str(DF_PATH))
df.printSchema(); df.show(5, truncate=False)


JAVA_HOME is not set


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [None]:
from pyspark.sql.functions import avg, corr, col

df.select(avg("MedInc").alias("avg_income")).show()
df.select(corr("MedInc","MedHouseVal").alias("corr_income_price")).show()

df.createOrReplaceTempView("cal")
spark.sql("""
  SELECT ROUND(Latitude) AS lat, COUNT(*) AS n, AVG(MedHouseVal) AS avg_price
  FROM cal GROUP BY ROUND(Latitude) ORDER BY avg_price DESC LIMIT 10
""").show()


In [None]:
from pyspark.sql.functions import when
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# Primer imputacije i jednostavnog feature-a:
df_prep = df.na.fill({"Population": 0})
df_prep = df_prep.withColumn("flag_old", when(col("HouseAge") > 30, 1).otherwise(0))

# Sve numeričke + novi feature
num_cols = ["MedInc","HouseAge","AveRooms","AveBedrms","Population","AveOccup","Latitude","Longitude","flag_old"]

assembler = VectorAssembler(inputCols=num_cols, outputCol="features_raw")
scaler    = StandardScaler(inputCol="features_raw", outputCol="features", withMean=False, withStd=True)

fe_model = Pipeline(stages=[assembler, scaler]).fit(df_prep)
df_fe = fe_model.transform(df_prep)

df_fe.select("features").show(3, truncate=False)


In [None]:
def load_data():
    try:
        # pokušaj čitanja mog fajla
        df = pd.read_parquet(DATA_PATH)
        print(f"Loaded {DATA_PATH}")
    except FileNotFoundError:
        print("Parquet file not found, using California housing dataset instead.")
        data = fetch_california_housing(as_frame=True)
        df = data.frame
        # target se u mom projektu zvao LN_IC50 → ovde ga preslikavam
        df.rename(columns={"MedHouseVal": "LN_IC50"}, inplace=True)
    return df

df = load_data()
print(df.head())



In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# binarizuj target po medijani
median_val = df.approxQuantile("MedHouseVal", [0.5], 0.01)[0]
df_cls = df_fe.withColumn("label", (col("MedHouseVal") >= median_val).cast("int"))

train_df, test_df = df_cls.select("features","label").randomSplit([0.8, 0.2], seed=42)

lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=50)
lr_model = lr.fit(train_df)
pred = lr_model.transform(test_df)

auc_roc = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="label").evaluate(pred)
auc_pr  = BinaryClassificationEvaluator(metricName="areaUnderPR",  labelCol="label").evaluate(pred)
f1      = MulticlassClassificationEvaluator(metricName="f1",       labelCol="label").evaluate(pred)
acc     = MulticlassClassificationEvaluator(metricName="accuracy", labelCol="label").evaluate(pred)
print(f"AUC-ROC={auc_roc:.3f}  AUC-PR={auc_pr:.3f}  F1={f1:.3f}  ACC={acc:.3f}")