In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [3]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

# Example dataset with Vector type
data = [
    (1, Vectors.dense([2.0, 3.0]), 0),
    (2, Vectors.dense([1.0, 5.0]), 1),
    (3, Vectors.dense([2.5, 4.5]), 1),
    (4, Vectors.dense([3.0, 6.0]), 0)
]

columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")


Coefficients: [-12.26205792372122,4.087352264669246]
Intercept: 11.568912721182315


In [5]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

# Example dataset with Vector type
data = [
    (1, Vectors.dense([1.0, 1.0])),
    (2, Vectors.dense([5.0, 5.0])),
    (3, Vectors.dense([10.0, 10.0])),
    (4, Vectors.dense([15.0, 15.0]))
]

columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print("Cluster Centers:")
for c in centers:
    print(c)

Cluster Centers:
[5.33333333 5.33333333]
[15. 15.]


In [10]:
from pyspark.sql.functions import when, col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Load dataset
df = spark.read.csv("job_market.csv", header=True, inferSchema=True)

# Ambil kolom numerik saja
df = df.select("salary_min", "salary_max", "experience_required")

# Drop row kosong
df = df.dropna()

# Definisikan label: high_salary
median_salary = df.approxQuantile("salary_max", [0.5], 0.1)[0]

df = df.withColumn(
    "label",
    when(col("salary_max") >= median_salary, 1).otherwise(0)
)

# Vector assembler
assembler = VectorAssembler(
    inputCols=["salary_min", "salary_max", "experience_required"],
    outputCol="features"
)

final_data = assembler.transform(df).select("features", "label")

final_data.show(5, truncate=False)

+-----------------------+-----+
|features               |label|
+-----------------------+-----+
|[151082.0,291345.0,4.0]|1    |
|[156891.0,280075.0,3.0]|1    |
|[152134.0,280310.0,4.0]|1    |
|[151918.0,253988.0,7.0]|1    |
|[148141.0,252584.0,9.0]|1    |
+-----------------------+-----+
only showing top 5 rows


In [11]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Split data
train, test = final_data.randomSplit([0.8, 0.2], seed=42)

# Model
lr = LogisticRegression(featuresCol="features", labelCol="label")

lr_model = lr.fit(train)

pred = lr_model.transform(test)

# Evaluasi
acc = MulticlassClassificationEvaluator(metricName="accuracy").evaluate(pred)
f1  = MulticlassClassificationEvaluator(metricName="f1").evaluate(pred)
auc = BinaryClassificationEvaluator(metricName="areaUnderROC").evaluate(pred)

print("=== PERFORMANCE ===")
print("Accuracy :", acc)
print("F1 Score :", f1)
print("AUC ROC  :", auc)

=== PERFORMANCE ===
Accuracy : 1.0
F1 Score : 1.0
AUC ROC  : 1.0


In [12]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1])
             .addGrid(lr.elasticNetParam, [0.0, 0.5])
             .build())

cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
    numFolds=3
)

cv_model = cv.fit(train)
cv_pred = cv_model.transform(test)

print("Best regParam:", cv_model.bestModel._java_obj.getRegParam())
print("Best elasticNetParam:", cv_model.bestModel._java_obj.getElasticNetParam())

final_auc = BinaryClassificationEvaluator(metricName="areaUnderROC").evaluate(cv_pred)
print("AUC After CV:", final_auc)

Best regParam: 0.01
Best elasticNetParam: 0.5
AUC After CV: 1.0
