In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [1]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# Inisialisasi Spark session
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

# Dataset contoh
data = [
    (1, [2.0, 3.0], 0),
    (2, [1.0, 5.0], 1),
    (3, [2.5, 4.5], 1),
    (4, [3.0, 6.0], 0)
]
columns = ['ID', 'Features', 'Label']

# Membuat DataFrame
df = spark.createDataFrame(data, columns)

# Konversi kolom 'Features' menjadi DenseVector
vector_udf = udf(lambda x: Vectors.dense(x), VectorUDT())
df = df.withColumn("Features", vector_udf(df["Features"]))

# Melatih model logistic regression
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Menampilkan koefisien dan intercept
print(f'Koefisien: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Koefisien: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [2]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT

# Inisialisasi Spark session
spark = SparkSession.builder.appName("KMeansClusteringExample").getOrCreate()

# Dataset contoh
data = [
    (1, [1.0, 1.0]),
    (2, [5.0, 5.0]),
    (3, [10.0, 10.0]),
    (4, [15.0, 15.0])
]
columns = ['ID', 'Features']

# Membuat DataFrame
df = spark.createDataFrame(data, columns)

# Konversi kolom 'Features' menjadi DenseVector
vector_udf = udf(lambda x: Vectors.dense(x), VectorUDT())
df = df.withColumn("Features", vector_udf(df["Features"]))

# Melatih model KMeans
kmeans = KMeans(featuresCol='Features', k=2)  # Jumlah cluster = 2
model = kmeans.fit(df)

# Menampilkan pusat cluster
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


In [None]:
!pip install kaggle



In [None]:
from google.colab import files
files.upload()  # Upload kaggle.json


Saving kaggle (1).json to kaggle (1).json


{'kaggle (1).json': b'{"username":"armutri","key":"129d3395464a8b400d13aaa17f027bd7"}'}

In [None]:
import os
os.environ['KAGGLE_CONFIG_DIR'] = "/content"


In [None]:
!kaggle datasets download -d uciml/iris


Dataset URL: https://www.kaggle.com/datasets/uciml/iris
License(s): CC0-1.0
Downloading iris.zip to /content
  0% 0.00/3.60k [00:00<?, ?B/s]
100% 3.60k/3.60k [00:00<00:00, 8.26MB/s]


In [None]:
# Importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create Spark session
spark = SparkSession.builder.master("local").appName("Iris Classification").getOrCreate()

# Load the dataset
df = spark.read.csv("Iris.csv", header=True, inferSchema=True)

# Show the first few rows to understand its structure
df.show(5)


+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 5 rows



In [None]:
#preparing data
from pyspark.ml.feature import VectorAssembler

# Index the target variable (Species)
indexer = StringIndexer(inputCol="Species", outputCol="label")
df = indexer.fit(df).transform(df)

# Assemble feature columns into a single vector column
assembler = VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"], outputCol="features")
df = assembler.transform(df)

# Show the dataframe with new features and label
df.select("features", "label").show(5)


+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|  0.0|
|[4.9,3.0,1.4,0.2]|  0.0|
|[4.7,3.2,1.3,0.2]|  0.0|
|[4.6,3.1,1.5,0.2]|  0.0|
|[5.0,3.6,1.4,0.2]|  0.0|
+-----------------+-----+
only showing top 5 rows



In [None]:
#train a clssification model
# Split the data into training and testing sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)

# Initialize Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Train the model on training data
lr_model = lr.fit(train_data)


In [None]:
#evaluate the model
# Make predictions on the test set
predictions = lr_model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Model Accuracy: {accuracy:.4f}")


Model Accuracy: 1.0000


In [None]:
#Hyperparameter Tuning with Cross-Validation
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Create a parameter grid for cross-validation
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Create CrossValidator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)  # 3-fold cross-validation

# Run cross-validation and get the best model
cv_model = crossval.fit(train_data)

# Make predictions with the best model
cv_predictions = cv_model.transform(test_data)

# Evaluate the best model
cv_accuracy = evaluator.evaluate(cv_predictions)
print(f"Cross-Validated Model Accuracy: {cv_accuracy:.4f}")


Cross-Validated Model Accuracy: 1.0000
