In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Inisialisasi Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Memuat data contoh
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Menyiapkan data untuk pemodelan
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Melatih model regresi linier
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Menampilkan koefisien dan intercept dari model
print(f'Koefisien: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Koefisien: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Inisialisasi Spark Session
spark = SparkSession.builder.appName('MLlib Logistic Regression Example').getOrCreate()

# Dataset contoh
data = [(1, [2.0, 3.0], 0), (2, [1.0, 5.0], 1), (3, [2.5, 4.5], 1), (4, [3.0, 6.0], 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Menggunakan VectorAssembler untuk mengonversi array 'Features' menjadi kolom vektor
# VectorAssembler mengharapkan kolom terpisah, tetapi 'Features' sudah berupa array
# Jadi, kita perlu mengonversinya terlebih dahulu menjadi kolom terpisah (satu kolom untuk setiap nilai dalam array).

# Membuat kolom terpisah untuk setiap elemen dalam array
df = df.withColumn("Feature1", col("Features")[0]).withColumn("Feature2", col("Features")[1])

# Sekarang, gunakan VectorAssembler untuk menggabungkan kolom-kolom ini menjadi satu kolom vektor
assembler = VectorAssembler(inputCols=["Feature1", "Feature2"], outputCol="FeatureVector")
df_transformed = assembler.transform(df)

# Melatih model regresi logistik
lr = LogisticRegression(featuresCol="FeatureVector", labelCol="Label")
model = lr.fit(df_transformed)

# Menampilkan koefisien dan intercept
print(f'Koefisien: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Koefisien: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Inisialisasi Spark Session
spark = SparkSession.builder.appName('KMeans Clustering Example').getOrCreate()

# Dataset contoh
data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Pisahkan array dalam kolom 'Features' menjadi kolom terpisah
df = df.withColumn("Feature1", col("Features")[0]).withColumn("Feature2", col("Features")[1])

# Menggunakan VectorAssembler untuk mengubah kolom terpisah menjadi kolom vektor
assembler = VectorAssembler(inputCols=["Feature1", "Feature2"], outputCol="FeatureVector")
df_transformed = assembler.transform(df)

# Melatih model KMeans clustering
kmeans = KMeans(featuresCol='FeatureVector', k=2) # Menggunakan 'FeatureVector' sebagai featuresCol
model = kmeans.fit(df_transformed) # Melatih model dengan DataFrame yang sudah ditransformasi

# Menampilkan pusat kluster
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


## Homework


In [2]:
from google.colab import files
files.upload()

Saving kaggle (3).json to kaggle (3).json


{'kaggle (3).json': b'{"username":"wahyudwi123","key":"6d38239b086f0d2f5bf58d70a012b180"}'}

In [3]:
!kaggle datasets download -d sobhanmoosavi/us-accidents


Dataset URL: https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents
License(s): CC-BY-NC-SA-4.0
Downloading us-accidents.zip to /content
100% 652M/653M [00:23<00:00, 29.6MB/s]
100% 653M/653M [00:23<00:00, 29.7MB/s]


In [1]:
!unzip us-accidents.zip


Archive:  us-accidents.zip
replace US_Accidents_March23.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: US_Accidents_March23.csv  


In [2]:
from pyspark.sql import SparkSession

# Inisialisasi Spark Session
spark = SparkSession.builder.appName("USAccidentsClassification").getOrCreate()

# Muat dataset
data = spark.read.csv("US_Accidents_March23.csv", header=True, inferSchema=True)

# Tampilkan skema dataset
data.printSchema()

# Tampilkan beberapa contoh data
data.show(5)


root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- V

In [3]:
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

# Hapus baris yang memiliki nilai kosong
data = data.dropna()

# Pilih kolom yang relevan
selected_columns = ["Severity", "Temperature(F)", "Humidity(%)", "Pressure(in)", "Visibility(mi)",
                    "Wind_Speed(mph)", "Precipitation(in)", "Weather_Condition"]
data = data.select(*selected_columns)

# Encode kolom target ("Severity") dan fitur kategorikal ("Weather_Condition")
indexer = StringIndexer(inputCol="Severity", outputCol="label")
weather_indexer = StringIndexer(inputCol="Weather_Condition", outputCol="Weather_Condition_Index")

# Gabungkan semua fitur menjadi sebuah vektor
assembler = VectorAssembler(
    inputCols=["Temperature(F)", "Humidity(%)", "Pressure(in)", "Visibility(mi)",
               "Wind_Speed(mph)", "Precipitation(in)", "Weather_Condition_Index"],
    outputCol="features"
)

# Bangun pipeline
pipeline = Pipeline(stages=[weather_indexer, indexer, assembler])
processed_data = pipeline.fit(data).transform(data)

# Pilih data akhir untuk pelatihan
final_data = processed_data.select("features", "label")
final_data.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[42.1,58.0,29.76,...|  2.0|
|[37.0,93.0,29.69,...|  2.0|
|[33.1,92.0,29.63,...|  0.0|
|[32.0,100.0,29.59...|  0.0|
|[33.8,100.0,29.66...|  0.0|
+--------------------+-----+
only showing top 5 rows



In [4]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Bagi data menjadi data latih dan uji
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

# Latih model Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)

# Buat prediksi pada data uji
predictions = lr_model.transform(test_data)

# Evaluasi model
evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Akurasi: {accuracy}")


Akurasi: 0.9418528229182939


In [5]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Tentukan parameter grid untuk tuning
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Atur cross-validation
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # 5-fold cross-validation

# Latih model dengan cross-validation
cv_model = crossval.fit(train_data)

# Evaluasi model terbaik
best_model = cv_model.bestModel
cv_predictions = best_model.transform(test_data)
cv_accuracy = evaluator.evaluate(cv_predictions)
print(f"Akurasi setelah Cross-Validation: {cv_accuracy}")


Akurasi setelah Cross-Validation: 0.9418668963449903
