In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [6]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Initialize Spark session
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

# Example dataset: Convert `Features` to Vectors
data = [
    (1, Vectors.dense([2.0, 3.0]), 0),
    (2, Vectors.dense([1.0, 5.0]), 1),
    (3, Vectors.dense([2.5, 4.5]), 1),
    (4, Vectors.dense([3.0, 6.0]), 0),
]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# No need for VectorAssembler since Features are already in Vector format
# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and intercept
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')



Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans

# Initialize Spark session
spark = SparkSession.builder.appName("KMeansExample").getOrCreate()

# Example dataset: Convert `Features` to Vectors
data = [
    (1, Vectors.dense([1.0, 1.0])),
    (2, Vectors.dense([5.0, 5.0])),
    (3, Vectors.dense([10.0, 10.0])),
    (4, Vectors.dense([15.0, 15.0])),
]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Membuat sesi Spark
spark = SparkSession.builder.appName("Spark MLlib").getOrCreate()

# Memuat dataset
path_data = "athlete_events.csv"
data = spark.read.csv(path_data, header=True, inferSchema=True)

# Pra-pemrosesan data: Memilih kolom yang relevan
kolom_dipilih = ["Age", "Height", "Weight", "Sex", "Sport", "Medal"]
data = data.select(kolom_dipilih).dropna()

# Mengkodekan variabel kategori
indexers = [
    StringIndexer(inputCol="Sex", outputCol="SexIndex"),
    StringIndexer(inputCol="Sport", outputCol="SportIndex"),
    StringIndexer(inputCol="Medal", outputCol="label")
]
for indexer in indexers:
    data = indexer.fit(data).transform(data)

data = data.withColumn("Age", col("Age").cast("double")) \
           .withColumn("Height", col("Height").cast("double")) \
           .withColumn("Weight", col("Weight").cast("double")) \
           .withColumn("SexIndex", col("SexIndex").cast("double")) \
           .withColumn("SportIndex", col("SportIndex").cast("double"))

# Menggabungkan fitur ke dalam kolom "features"
kolom_fitur = ["Age", "Height", "Weight", "SexIndex", "SportIndex"]
assembler = VectorAssembler(inputCols=kolom_fitur, outputCol="features", handleInvalid="skip")
data = assembler.transform(data)

data.show(5, truncate=False)
data.select("Age", "Height", "Weight").describe().show()



+----+------+------+---+-------------+-----+--------+----------+-----+--------------------------+
|Age |Height|Weight|Sex|Sport        |Medal|SexIndex|SportIndex|label|features                  |
+----+------+------+---+-------------+-----+--------+----------+-----+--------------------------+
|24.0|180.0 |80.0  |M  |Basketball   |NA   |0.0     |19.0      |0.0  |[24.0,180.0,80.0,0.0,19.0]|
|23.0|170.0 |60.0  |M  |Judo         |NA   |0.0     |22.0      |0.0  |[23.0,170.0,60.0,0.0,22.0]|
|21.0|185.0 |82.0  |F  |Speed Skating|NA   |1.0     |15.0      |0.0  |[21.0,185.0,82.0,1.0,15.0]|
|21.0|185.0 |82.0  |F  |Speed Skating|NA   |1.0     |15.0      |0.0  |[21.0,185.0,82.0,1.0,15.0]|
|25.0|185.0 |82.0  |F  |Speed Skating|NA   |1.0     |15.0      |0.0  |[25.0,185.0,82.0,1.0,15.0]|
+----+------+------+---+-------------+-----+--------+----------+-----+--------------------------+
only showing top 5 rows

+-------+------------------+------------------+------------------+
|summary|               Ag

In [17]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Membagi data menjadi data pelatihan dan pengujian
data_latih, data_uji = data.randomSplit([0.8, 0.2], seed=42)

# Membuat model Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Melatih model
model = lr.fit(data_latih)

# Mengevaluasi model
prediksi = model.transform(data_uji)
evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
akurasi = evaluator.evaluate(prediksi)

print(f"Akurasi pada data uji: {akurasi:.2f}")


Akurasi pada data uji: 0.85


In [20]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

spark = SparkSession.builder \
    .appName("Optimasi Hyperparameter Tuning") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

data_latih_sample = data_latih.sample(fraction=0.2, seed=42)

# Model Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5]) \
    .build()

evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

crossval = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

# Melatih model dengan dataset sampel
model_cv = crossval.fit(data_latih_sample)

# Mengevaluasi model pada data uji
prediksi = model_cv.transform(data_uji)
akurasi = evaluator.evaluate(prediksi)

print(f"Akurasi model terbaik setelah tuning: {akurasi:.2f}")


Akurasi model terbaik setelah tuning: 0.85
