<a href="https://colab.research.google.com/github/faizdifak/Big-Data/blob/main/Advanced_Machine_Learning_Using_MLib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Praktikum Week 14 Advanced Machine Learning using MLib
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors # Import Vectors

# Example dataset: Create feature vectors using Vectors.dense()
data = [(1, Vectors.dense([2.0, 3.0]), 0), (2, Vectors.dense([1.0, 5.0]), 1), (3, Vectors.dense([2.5, 4.5]), 1), (4, Vectors.dense([3.0, 6.0]), 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling:
# The 'Features' column is already a VectorUDT, so VectorAssembler is not needed.
# We can directly use 'df' and 'Features' column for the model.

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label') # Use 'Features' directly
model = lr.fit(df) # Fit on the original df

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors # Import Vectors for feature vectors

# Example dataset
# Convert the feature lists to VectorUDT using Vectors.dense()
data = [(1, Vectors.dense([1.0, 1.0])), (2, Vectors.dense([5.0, 5.0])), (3, Vectors.dense([10.0, 10.0])), (4, Vectors.dense([15.0, 15.0]))]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


In [None]:
# STEP 1: Install Kaggle API
!pip install kaggle

# STEP 2: Upload kaggle.json (API key)
from google.colab import files
files.upload()

# STEP 3: Pindahkan kaggle.json ke folder Kaggle
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# STEP 4: Download dataset Bank Marketing
!kaggle datasets download -d janiobachmann/bank-marketing-dataset

# STEP 5: Unzip dataset
!unzip bank-marketing-dataset.zip




Saving bank.csv to bank.csv
cp: cannot stat 'kaggle.json': No such file or directory
chmod: cannot access '/root/.kaggle/kaggle.json': No such file or directory
Traceback (most recent call last):
  File "/usr/local/bin/kaggle", line 10, in <module>
    sys.exit(main())
             ^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/kaggle/cli.py", line 68, in main
    out = args.func(**command_args)
          ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/kaggle/api/kaggle_api_extended.py", line 1741, in dataset_download_cli
    with self.build_kaggle_client() as kaggle:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/kaggle/api/kaggle_api_extended.py", line 688, in build_kaggle_client
    username=self.config_values['username'],
             ~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^
KeyError: 'username'
unzip:  cannot find or open bank-marketing-dataset.zip, bank-marketing-dataset.zip.zip or bank-marketing-dataset.zip.ZIP.


In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
# Tugas Tambahan
# Load Dataset ke Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BankMarketing-MLlib") \
    .getOrCreate()

# Load CSV
df = spark.read.csv("/content/drive/MyDrive/bank.csv", header=True, inferSchema=True)

df.printSchema()
df.show(5)

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)

+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|       job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+----------+-------+---------+-------+-------+-------+----+-------+--

In [None]:
# Preprocessing (Encoding + Cleaning)
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

categorical_cols = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx") for c in categorical_cols]
encoders = [OneHotEncoder(inputCols=[c+"_idx"], outputCols=[c+"_vec"]) for c in categorical_cols]

label_indexer = StringIndexer(inputCol="deposit", outputCol="label") # Changed 'y' to 'deposit'

pipeline = Pipeline(stages=indexers + encoders + [label_indexer])
df_encoded = pipeline.fit(df).transform(df)


In [None]:
# Vector Assembler
# Gabungkan semua fitur ke dalam satu kolom vektor.
from pyspark.ml.feature import VectorAssembler

numeric_cols = ["age", "balance", "duration", "campaign", "pdays", "previous"]

feature_cols = numeric_cols + [c+"_vec" for c in categorical_cols]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

data = assembler.transform(df_encoded).select("label", "features")


In [None]:
# Train-test Split
train, test = data.randomSplit([0.8, 0.2], seed=42)


In [None]:
# Build Classification Model (Logistic Regression)
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")

model = lr.fit(train)
pred = model.transform(test)


In [None]:
# Evaluasi Model (Accuracy + Confusion Matrix)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = evaluator.evaluate(pred)

print("Accuracy =", accuracy)


Accuracy = 0.8280961182994455


In [None]:
# Confusion Matrix
pred.groupBy("label", "prediction").count().show()


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  831|
|  0.0|       1.0|  163|
|  1.0|       0.0|  209|
|  0.0|       0.0|  961|
+-----+----------+-----+



In [None]:
# Hyperparameter Tuning + Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 1.0])
             .addGrid(lr.maxIter, [20, 50, 100])
             .build())

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

cv_model = crossval.fit(train)

best_pred = cv_model.transform(test)
best_acc = evaluator.evaluate(best_pred)

print("Best Accuracy after tuning =", best_acc)


Best Accuracy after tuning = 0.8257855822550831
