In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [12]:
# Practice: Logistic Regression
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Example dataset
data = [(1, 2.0, 3.0, 0), (2, 1.0, 5.0, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'Features1','Features2', 'Label']
df = spark.createDataFrame(data, columns)

assembler = VectorAssembler(inputCols=['Features1','Features2'], outputCol='Features')
df_transformed1 = assembler.transform(df)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df_transformed1)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [-12.26205792103457,4.08735226377352]
Intercept: 11.568912718496556


In [15]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Example dataset
data = [(1, 1.0, 1.0), (2, 5.0, 5.0), (3, 10.0, 10.0), (4, 15.0, 15.0)]
columns = ['ID', 'Features1', 'Features2']
df = spark.createDataFrame(data, columns)

assembler = VectorAssembler(inputCols=['Features1','Features2'], outputCol='Features')
df_transformed1 = assembler.transform(df)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df_transformed1)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([5.33333333, 5.33333333]), array([15., 15.])]


In [6]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("CreditCardFraudML").getOrCreate()

df = spark.read.csv("/home/arsa/Downloads/2420506028_pertemuan_14/creditcard.csv", header=True, inferSchema=True)

print("Schema awal:")
df.printSchema()
print("Contoh 5 baris:")
df.show(5, truncate=False)
print("Total baris:", df.count())

df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

feature_columns = [c for c in df.columns if c != "Class"]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

df_ml = assembler.transform(df).select("features", F.col("Class").alias("label"))
df_ml.show(5)

train, test = df_ml.randomSplit([0.8, 0.2], seed=42)

                                                                                

Schema awal:
root
 |-- Time: double (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: 

                                                                                

In [3]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train)

predictions = model.transform(test)
predictions.show(5)

evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction")

auc = evaluator.evaluate(predictions)
print("AUC:", auc)

25/12/12 15:21:42 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[1.0,-1.358354061...|    0|[8.04909755847964...|[0.99968071195413...|       0.0|
|[4.0,1.2296576345...|    0|[7.69370534808351...|[0.99954452099332...|       0.0|
|[7.0,-0.644269442...|    0|[6.96932692011458...|[0.99906059772620...|       0.0|
|[11.0,1.069373587...|    0|[6.58114880308451...|[0.99861566320023...|       0.0|
|[15.0,1.492935976...|    0|[10.0672349744118...|[0.99995755398060...|       0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows


                                                                                

AUC: 0.955966419043936


In [4]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5
)

cvModel = cv.fit(train)

best_predictions = cvModel.transform(test)
best_auc = evaluator.evaluate(best_predictions)

print("Best Model AUC:", best_auc)
best_model = cvModel.bestModel
print("Best regParam :", best_model._java_obj.getRegParam())
print("Best elasticNetParam :", best_model._java_obj.getElasticNetParam())

                                                                                

Best Model AUC: 0.9784180817732415
Best regParam : 0.01
Best elasticNetParam : 0.0
