In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

In [None]:
old_Data = pd.read_csv("data-tinggi-muka-air-februari-2020.csv")

train, test = train_test_split(old_Data, test_size=0.2)

train.to_csv("Train.csv")
test.to_csv("Test.csv")

In [None]:
### Important ###
### Run this cell if you are using Google Colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
print('[Done] openjdk8 has been installed')
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz -C /usr/lib/
!rm spark-2.4.7-bin-hadoop2.7.tgz
print('[Done] Spark has been installed')
!pip install -q findspark
print('[Done] findspark has been installed')
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/lib/spark-2.4.7-bin-hadoop2.7"
print('[Done] Environment variables has been set')
import findspark
findspark.init()
print('[Done] You can use pyspark now.')

[Done] openjdk8 has been installed
[Done] Spark has been installed
[Done] findspark has been installed
[Done] Environment variables has been set
[Done] You can use pyspark now.


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
from pyspark.sql.types import StringType

# 1. Load Data

In [None]:
water_train = spark.read.csv('Train.csv', inferSchema=True, header=True)
water_test = spark.read.csv('Test.csv', inferSchema=True, header=True)

In [None]:
# water_train = water_train.withColumn("tanggal", water_train["tanggal"].cast(StringType()))
# water_test = water_test.withColumn("tanggal", water_test["tanggal"].cast(StringType()))

In [None]:
# water_train = water_train.withColumn("tanggal", water_train["tanggal"].substr(1, 11))
# water_test = water_test.withColumn("tanggal", water_test["tanggal"].substr(1, 11))

In [None]:
water_train.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- nama_pintu_air: string (nullable = true)
 |-- lokasi: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- tanggal: timestamp (nullable = true)
 |-- tinggi_air: integer (nullable = true)
 |-- status_siaga: string (nullable = true)



In [None]:
water_test.show()

+-----+--------------------+------------------+---------+------------------+-------------------+----------+----------------+
|  _c0|      nama_pintu_air|            lokasi| latitude|         longitude|            tanggal|tinggi_air|    status_siaga|
+-----+--------------------+------------------+---------+------------------+-------------------+----------+----------------+
|14874|          P.A. Karet|Banjir Kanal Barat|-6.197901|        106.810075|2020-02-24 05:00:00|      3530| Status : Normal|
|18955|     PA. Pulo Gadung|            Sunter|   -6.191|106.90419399999999|2020-02-28 06:10:00|      6280|Status : Siaga 3|
| 7405|     PA. Pulo Gadung|            Sunter|   -6.191|106.90419399999999|2020-02-07 16:00:00|      3340| Status : Normal|
|10720|    PS. Pesanggrahan|      Pesanggrahan|-6.397111|        106.771825|2020-02-20 02:10:00|      1080| Status : Normal|
| 3445|PS. Katulampa (Hulu)|          Ciliwung|-6.633167|106.83680600000001|2020-02-04 10:10:00|        60| Status : Normal|


# 2. Selection

In [None]:
# Selecting important features
chosen_columns = ['nama_pintu_air', 'lokasi', 'tinggi_air', 'status_siaga']

In [None]:
water_train = water_train.select(chosen_columns)
water_test = water_test.select(chosen_columns)

In [None]:
water_train.show()

+--------------------+------------------+----------+----------------+
|      nama_pintu_air|            lokasi|tinggi_air|    status_siaga|
+--------------------+------------------+----------+----------------+
|  P.A. Marina Ancol |              Laut|      1460| Status : Normal|
|   PS. Cipinang Hulu|     Cipinang Hulu|      1150| Status : Normal|
|PS. Katulampa (Hulu)|          Ciliwung|       360| Status : Normal|
|PS. Katulampa (Hulu)|          Ciliwung|       140| Status : Normal|
|           PS. Depok|          Ciliwung|      1550| Status : Normal|
|     PS. Krukut Hulu|            Krukut|       500| Status : Normal|
|   PS. Cipinang Hulu|     Cipinang Hulu|      1100| Status : Normal|
|           PS. Depok|          Ciliwung|       760| Status : Normal|
|      PS. Angke Hulu|             Angke|      1590|Status : Siaga 3|
|    Pompa Pasar Ikan|              Laut|      1340| Status : Normal|
|       PA. Manggarai|          Ciliwung|      7170| Status : Normal|
|     PA. Pulo Gadun

# Data Preprocessing

In [None]:
# Data Preprocessing

print(f'Train df count: {water_train.count()}') # counting the amount of data
print(f'Test df count: {water_test.count()}')

Train df count: 16548
Test df count: 4137


In [None]:
# Dropping Null
water_train = water_train.dropna() # dropping null or empty data
water_test = water_test.dropna()

In [None]:
print(f'Train df count: {water_train.count()}') # counting the amount of data
print(f'Test df count: {water_test.count()}')

Train df count: 16548
Test df count: 4137


# Transforming Data

In [None]:
# Transform Data

# Indexing String
def encode_column(column_name: str):
  global water_train, water_test

  indexer = StringIndexer(
      inputCol = column_name,
      outputCol = 'generated')
  
  # Catat
  indexer = indexer.fit(water_train) # fit sekali aja 
  # Ubah
  water_train = indexer.transform(water_train)
  water_train = water_train.drop(column_name)
  water_train = water_train.withColumnRenamed('generated', column_name)
  # Ubah
  water_test = indexer.transform(water_test)
  water_test = water_test.drop(column_name)
  water_test = water_test.withColumnRenamed('generated', column_name)

In [None]:
encode_column('nama_pintu_air')
encode_column('lokasi')
encode_column('status_siaga')

In [None]:
water_train.show()

+----------+--------------+------+------------+
|tinggi_air|nama_pintu_air|lokasi|status_siaga|
+----------+--------------+------+------------+
|      1460|          17.0|   2.0|         0.0|
|      1150|           7.0|   4.0|         0.0|
|       360|          18.0|   0.0|         0.0|
|       140|          18.0|   0.0|         0.0|
|      1550|           0.0|   0.0|         0.0|
|       500|          11.0|   8.0|         0.0|
|      1100|           7.0|   4.0|         0.0|
|       760|           0.0|   0.0|         0.0|
|      1590|           5.0|   5.0|         1.0|
|      1340|          12.0|   2.0|         0.0|
|      7170|           9.0|   0.0|         0.0|
|      3320|           1.0|   1.0|         0.0|
|       490|          18.0|   0.0|         0.0|
|      -910|           8.0|   6.0|         0.0|
|       900|           7.0|   4.0|         0.0|
|      2690|          16.0|  10.0|         0.0|
|     -1870|           8.0|   6.0|         0.0|
|         0|           3.0|   3.0|      

# Normalization

In [None]:
# Vectorize
assembler = VectorAssembler(
    inputCols=chosen_columns[0:-1],
    outputCol='Features'
)

In [None]:
water_train = assembler.transform(water_train)
water_test = assembler.transform(water_test)

In [None]:
# Scaler
scaler = StandardScaler(inputCol='Features', outputCol='Features_normalized')

In [None]:
scaler = scaler.fit(water_train) #fit sekali

In [None]:
water_train = scaler.transform(water_train)
water_test = scaler.transform(water_test)

In [None]:
water_train.show()

+----------+--------------+------+------------+------------------+--------------------+
|tinggi_air|nama_pintu_air|lokasi|status_siaga|          Features| Features_normalized|
+----------+--------------+------+------------+------------------+--------------------+
|      1460|          17.0|   2.0|         0.0| [17.0,2.0,1460.0]|[2.80876701359175...|
|      1150|           7.0|   4.0|         0.0|  [7.0,4.0,1150.0]|[1.15655112324366...|
|       360|          18.0|   0.0|         0.0|  [18.0,0.0,360.0]|[2.97398860262656...|
|       140|          18.0|   0.0|         0.0|  [18.0,0.0,140.0]|[2.97398860262656...|
|      1550|           0.0|   0.0|         0.0|  [0.0,0.0,1550.0]|[0.0,0.0,0.878864...|
|       500|          11.0|   8.0|         0.0|  [11.0,8.0,500.0]|[1.81743747938290...|
|      1100|           7.0|   4.0|         0.0|  [7.0,4.0,1100.0]|[1.15655112324366...|
|       760|           0.0|   0.0|         0.0|   [0.0,0.0,760.0]|[0.0,0.0,0.430927...|
|      1590|           5.0|   5.

# Generate Model

In [None]:
# Generate Model - using Logistic Regression
model = LogisticRegression(
    featuresCol='Features_normalized',
    labelCol='status_siaga',
    maxIter=100
    )

In [None]:
# Training
model = model.fit(water_train)

# Evaluate

In [None]:
# Model Testing and Evaluation
prediction = model.transform(water_test)
prediction.show()

+----------+--------------+------+------------+------------------+--------------------+--------------------+--------------------+----------+
|tinggi_air|nama_pintu_air|lokasi|status_siaga|          Features| Features_normalized|       rawPrediction|         probability|prediction|
+----------+--------------+------+------------+------------------+--------------------+--------------------+--------------------+----------+
|      3530|          16.0|  10.0|         0.0|[16.0,10.0,3530.0]|[2.64354542455694...|[2.65501474232930...|[0.84743724984843...|       0.0|
|      6280|           1.0|   1.0|         1.0|  [1.0,1.0,6280.0]|[0.16522158903480...|[0.71085482383337...|[0.44256278696996...|       0.0|
|      3340|           1.0|   1.0|         0.0|  [1.0,1.0,3340.0]|[0.16522158903480...|[1.43778593348532...|[0.67884118756022...|       0.0|
|      1080|          14.0|   9.0|         0.0| [14.0,9.0,1080.0]|[2.31310224648732...|[3.10439944396083...|[0.92684602458406...|       0.0|
|        60| 

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol='status_siaga')

In [None]:
score = evaluator.evaluate(prediction)
score = round(score * 100, 2)
print('Score: ' + str(score) + '%')

Score: 52.89%


In [None]:
prediction = prediction.toPandas()

In [None]:
c = 0
for index, row in prediction.iterrows():
  if row['status_siaga'] == row['prediction']:
    c = c+1

print('Accuracy :' + str(round(c/len(prediction)*100, 2)) + '%')

Accuracy :83.39%
