In [2]:
# Import library SparkSession untuk memulai sesi spark
from pyspark.sql import SparkSession
# Import algoritma LinearRegression
from pyspark.ml.regression import LinearRegression
# Import algoritma VectorAssembler untuk mengubah data menjadi vector
from pyspark.ml.feature import VectorAssembler

# Inisiasi sesi spark
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Membuat sebuah dataset yang nantinya akan diprediksi data selanjutnya
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
# Membuat nama - nama kolom dari dataset tersebut
columns = ['ID', 'Feature', 'Target']
# Mengubah dataset yang dibuat menjadi dataframe pyspark
df = spark.createDataFrame(data, columns)

# Mengubah kolom Feature menjadi vektor terlebih dahulu
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Mempelajari data Fatures berdasarkan target dengan algoritma LinearRegression
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Menampilkan koefisien dari data dan intercept dari data
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/03 19:21:33 WARN Utils: Your hostname, figo-Vostro-V131, resolves to a loopback address: 127.0.1.1; using 10.135.44.75 instead (on interface wlp9s0)
25/12/03 19:21:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/03 19:21:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/03 19:21:49 WARN Instrumentation: [6602a648] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [10]:
# Import library spark untuk mengubah list menjadi vektor agar bisa di olah oleh Mllib PySPark
from pyspark.ml.linalg import Vectors
# Untuk memulai session
from pyspark.sql import SparkSession
# Melakukan klasifikasi denganl library LogisticRegression
from pyspark.ml.classification import LogisticRegression

# Untuk memulai session spark
spark = SparkSession.builder.appName('LogReg Example').getOrCreate()

# Menyiapkan dataset dalam bentuk vektor
data = [
    (1, Vectors.dense([2.0, 3.0]), 0),
    (2, Vectors.dense([1.0, 5.0]), 1),
    (3, Vectors.dense([2.5, 4.5]), 1),
    (4, Vectors.dense([3.0, 6.0]), 0)
]
# Menyiapkan nama - nama kolom dataset
columns = ['ID', 'Features', 'Label']

# Mengubah dataset tersebut menjadi dataframe PySpark
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Menampilkan koefisien dari model LogistikRegression
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


25/12/03 19:44:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Coefficients: [-12.262057941669692,4.087352270650961]
Intercept: 11.568912739136307


In [12]:
# Import library KMeans untuk pengelompokan data
from pyspark.ml.clustering import KMeans

# Menyiapkan sebuah dataset yang akan dikelompokan berdasarkan kemiripan data
data = [
    (1, Vectors.dense([1.0, 1.0])), 
    (2, Vectors.dense([5.0, 5.0])), 
    (3, Vectors.dense([10.0, 10.0])), 
    (4, Vectors.dense([15.0, 15.0]))]
# Menyiapkan nama - nama kolom dari dataset
columns = ['ID', 'Features']
# Mengubah dataset tersebut menjadi dataframe
df = spark.createDataFrame(data, columns)

# Melatih mode KMEans
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# menunjukan nilai tengah dari model
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


In [3]:
# Import beberapa library yang dibutuhkan
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pandas as pd

# Inisiasi sebuah sesi pyspark 
spark = SparkSession.builder.appName("Titanic Classification").getOrCreate()

# Menjadikan dataset titanic menjadi dataframe
df = spark.read.csv("Titanic_clean.csv", header=True, inferSchema=True)

# ISi semua kolom yang sama
df = df.fillna({'Age': df.select('Age').agg({'Age':'mean'}).first()[0],
                'Embarked': 'S'})

# MElakukan sebuah proses encoding
# Mendeklarasikan StringIndexr untuk mengencode kolom - kolom yang string
sex_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')
embarked_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndex')
df = sex_indexer.fit(df).transform(df)
df = embarked_indexer.fit(df).transform(df)

# Mengubah dataset vektor
feature_cols = ['Pclass', 'SexIndex', 'Age', 'SibSp', 'Parch', 'Fare', 'EmbarkedIndex']
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
df = assembler.transform(df)

# Membagi data dari data traning dan spliting
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Mendeklarasikan model
lr = LogisticRegression(featuresCol='features', labelCol='Survived', maxIter=50)

# TUnning model dengan hyperparameter

# Pendeklarasian untuk mengukur tingkat keberhasilan model
evaluator = MulticlassClassificationEvaluator(labelCol='Survived', predictionCol='prediction', metricName='accuracy')
# Buat hyper parameter
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Pendeklarasian sebuah model mulai dari nama algoritma, estimtaor
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# 9. Train model
cv_model = crossval.fit(train_df)

# 10. Make predictions
predictions = cv_model.transform(test_df)
predictions.select("features", "Survived", "prediction").show(5)

# 11. Evaluate accuracy
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy:.4f}")

# 12. Confusion Matrix
pred_labels = predictions.select("prediction", "Survived").toPandas()
confusion_matrix = pd.crosstab(pred_labels['Survived'], pred_labels['prediction'], rownames=['Actual'], colnames=['Predicted'])
print("\nConfusion Matrix:")
print(confusion_matrix)


25/12/03 21:55:53 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+--------+----------+
|            features|Survived|prediction|
+--------------------+--------+----------+
|[2.0,0.0,62.0,0.0...|       0|       0.0|
|[3.0,1.0,30.0,0.0...|       1|       1.0|
|[3.0,1.0,18.0,0.0...|       1|       1.0|
|[2.0,0.0,63.0,1.0...|       0|       0.0|
|[3.0,1.0,45.0,0.0...|       1|       1.0|
+--------------------+--------+----------+
only showing top 5 rows
Test Accuracy = 1.0000

Confusion Matrix:
Predicted  0.0  1.0
Actual             
0           32    0
1            0   24
