In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/23 20:05:38 WARN Utils: Your hostname, hakkan-VirtualBox, resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/11/23 20:05:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/23 20:05:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/23 20:05:59 WARN Instrumentation: [c28fd939] regParam is zero, which might cause numerical instability and overfitting.
25/11/23 20:06:03 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/11/23 20:06:04 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netl

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [5]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

data = [
    (1, Vectors.dense([2.0, 3.0]), 0),
    (2, Vectors.dense([1.0, 5.0]), 1),
    (3, Vectors.dense([2.5, 4.5]), 1),
    (4, Vectors.dense([3.0, 6.0]), 0)
]

columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)


Coefficients: [-12.262057964709234,4.087352278330043]
Intercept: 11.568912762179835


In [21]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

data = [
    (1, Vectors.dense([1.0, 1.0])),
    (2, Vectors.dense([5.0, 5.0])),
    (3, Vectors.dense([10.0, 10.0])),
    (4, Vectors.dense([15.0, 15.0]))
]

columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

centers = model.clusterCenters()
print("Cluster Centers:", centers)

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


In [22]:
#Homework 1
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

df = spark.read.csv("/home/hakkan/Downloads/datasets/heart_disease.csv", header=True, inferSchema=True)

label = "target"
categorical = ["sex", "cp", "thal", "slope"]
numeric = [c for c in df.columns if c not in categorical + [label]]
stages = []

for c in categorical:
    stages += [
        StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep"),
        OneHotEncoder(inputCol=c+"_idx", outputCol=c+"_vec")
    ]

assembler = VectorAssembler(
    inputCols=[c+"_vec" for c in categorical] + numeric, outputCol="features")

stages.append(assembler)

pipeline = Pipeline(stages=stages)
data = pipeline.fit(df).transform(df).select("features", label)
df.show(5)

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 52|  1|  0|     125| 212|  0|      1|    168|    0|    1.0|    2|  2|   3|     0|
| 53|  1|  0|     140| 203|  1|      0|    155|    1|    3.1|    0|  0|   3|     0|
| 70|  1|  0|     145| 174|  0|      1|    125|    1|    2.6|    0|  0|   3|     0|
| 61|  1|  0|     148| 203|  0|      1|    161|    0|    0.0|    2|  1|   3|     0|
| 62|  0|  0|     138| 294|  1|      1|    106|    0|    1.9|    1|  3|   2|     0|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
only showing top 5 rows


In [23]:
#Homework 2
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_multi = MulticlassClassificationEvaluator(
    labelCol="target",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator_multi.evaluate(pred)
print("Accuracy =", accuracy)

Accuracy = 0.8402366863905325


In [24]:
#Homework 3
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_multi = MulticlassClassificationEvaluator(
    labelCol="target",
    predictionCol="prediction",
    metricName="accuracy"
)

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1])
             .addGrid(lr.maxIter, [50, 100])
             .build())

cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_multi,
    numFolds=3
)

cvModel = cv.fit(train)
bestPred = cvModel.transform(test)

print("Best Accuracy =", evaluator_multi.evaluate(bestPred))
print("Best regParam =", cvModel.bestModel._java_obj.getRegParam())
print("Best maxIter  =", cvModel.bestModel._java_obj.getMaxIter())

Best Accuracy = 0.8402366863905325
Best regParam = 0.01
Best maxIter  = 50
