In [None]:
!pip install pyspark



In [None]:
# Example: Linear Regression with Spark MLlib --> clustering regresi
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
# Practice: Logistic Regression
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

# Inisialisasi sesi Spark
spark = SparkSession.builder \
    .appName("Logistic Regression") \
    .getOrCreate()

# Example dataset
data = [(1, 2.0, 3.0, 0), (2, 1.0, 5.0, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'x1', 'x2', 'Label']
df = spark.createDataFrame(data, columns)

assembler = VectorAssembler(inputCols=['x1', 'x2'], outputCol='Features')
df_features = assembler.transform(df)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df_features)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
# Practice: KMeans Clustering
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder \
    .appName("KMeans Clustering") \
    .getOrCreate()

# Example dataset
data = [(1, 1.0, 1.0), (2, 5.0, 5.0), (3, 10.0, 10.0), (4, 15.0, 15.0)]
columns = ['ID', 'x', 'y']
df = spark.createDataFrame(data, columns)

assembler = VectorAssembler(inputCols=['x', 'y'], outputCol='Features')
df_features = assembler.transform(df)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df_features)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


In [None]:
# TUGASSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS

In [None]:
from pyspark.sql import SparkSession

# Membuat sesi Spark
spark = SparkSession.builder.appName("Analisis Dataset").getOrCreate()

# Memuat dataset
data = spark.read.csv("/content/finance_dataset.csv", header=True, inferSchema=True)
data.show(10)


+---+-------------------+-----------+------------------+----------------+--------------+---------------+-----------+-----------+------------------+
| ID|               Date|Customer_ID|Transaction_Amount|Transaction_Type|Payment_Method|Account_Balance|   Category|   Location|Transaction_Status|
+---+-------------------+-----------+------------------+----------------+--------------+---------------+-----------+-----------+------------------+
|  1|2023-01-01 00:00:00|    CUST001|          27337.49|        Transfer| Bank Transfer|      804821.06|Electronics|   New York|           Pending|
|  2|2023-01-02 00:00:00|    CUST002|           97716.6|        Purchase|   Credit Card|      540169.76|Electronics|    Houston|           Pending|
|  3|2023-01-03 00:00:00|    CUST003|           5752.36|        Transfer|    Debit Card|       925251.6|Electronics|   New York|         Completed|
|  4|2023-01-04 00:00:00|    CUST004|          93443.22|        Transfer|    Debit Card|      704136.27|    Savi

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Menghapus baris dengan nilai yang hilang
data = data.na.drop()

# Mengubah kolom kategorikal menjadi numerik
indexer = StringIndexer(inputCol="Transaction_Status", outputCol="label").fit(data)
data = indexer.transform(data)

# Menggabungkan fitur menjadi satu vektor
assembler = VectorAssembler(inputCols=["Transaction_Amount", "Account_Balance"], outputCol="features")
final_data = assembler.transform(data).select("features", "label")

# Menampilkan beberapa baris dari data yang sudah siap
final_data.show(10)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[27337.49,804821.06]|  2.0|
| [97716.6,540169.76]|  2.0|
|  [5752.36,925251.6]|  0.0|
|[93443.22,704136.27]|  1.0|
|[15109.98,285987.99]|  2.0|
|[59820.18,392812.96]|  1.0|
|[30221.04,196268.97]|  1.0|
|  [4602.7,231101.98]|  1.0|
|[28959.12,467473.05]|  0.0|
|[72098.18,184918.69]|  0.0|
+--------------------+-----+
only showing top 10 rows



In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Memisahkan data menjadi set pelatihan dan pengujian
train, test = final_data.randomSplit([0.8, 0.2], seed=42)

# Membuat model Regresi Logistik
lr = LogisticRegression()
model = lr.fit(train)

# Membuat prediksi
predictions = model.transform(test)

# Mengevaluasi model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Akurasi: {accuracy}")

Akurasi: 0.32553558236887464


In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Membuat grid parameter
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()

# Membuat CrossValidator
crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
cv_model = crossval.fit(train)

# Mengevaluasi model terbaik
cv_predictions = cv_model.transform(test)
cv_accuracy = evaluator.evaluate(cv_predictions)
print(f"Akurasi setelah Cross-Validation: {cv_accuracy}")

Akurasi setelah Cross-Validation: 0.3282442748091603
