In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc

spark = SparkSession.builder \
    .appName("Analisis Pencurian Motor di Kota Bandar Lampung") \
    .getOrCreate()

In [None]:
from google.colab import files
uploaded = files.upload()

Saving kriminalitas_balam.csv to kriminalitas_balam.csv


In [None]:
# 1. Drop NA dan duplikat dulu
# Membaca data dari file CSV dan menyimpannya ke variabel cm
cm = spark.read.csv("kriminalitas_balam.csv", header=True, inferSchema=True)
cm = cm.na.drop()
cm = cm.dropDuplicates()

# 2. Mapping Lokasi_Kejadian
mapping = {
    "jalan umum": "Jalan Umum",
    "jalanan umum": "Jalan Umum",
    "tempat parkir": "Tempat Parkir",
    "perkantoran": "Perkantoran",
    "pertokoan/mal/pusat perbelanjaan": "Pusat Perbelanjaan",
    "pasar": "Pasar",
    "rumah": "Rumah"
}

# Normalisasi dan Mapping Lokasi
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def map_lokasi(value):
    if value is None:
        return None
    return mapping.get(value.strip().lower(), value.title())

map_lokasi_udf = udf(map_lokasi, StringType())

# Lakukan mapping lokasi
cm = cm.withColumn("Lokasi_Kejadian", map_lokasi_udf(col("Lokasi_Kejadian")))

# 3. tambahkan kolom Shift_Waktu (setelah semua kolom beres)
from pyspark.sql.functions import when

cm = cm.withColumn(
    "Shift_Waktu",
    when(col("Waktu_Kejadian").between("04.00-04.59", "05.00-07.59"), "Pagi")
    .when(col("Waktu_Kejadian").between("08.00-11.59", "11.59"), "Siang")
    .when(col("Waktu_Kejadian").between("12.00-14.59", "14.59"), "Siang")
    .when(col("Waktu_Kejadian").between("15.00-17.59", "17.59"), "Sore")
    .when(col("Waktu_Kejadian").between("18.00-21.59", "21.59"), "Malam")
    .when(col("Waktu_Kejadian").between("22.00-23.59", "23.59"), "Malam")
    .otherwise("Lainnya")
)

In [None]:
kecamatan_mapping = {
    "tanjumg karang barat": "Tanjung Karang Barat",
    "tanjung karang tiimur": "Tanjung Karang Timur",
    "tanjung karang timur": "Tanjung Karang Timur",
    "tanjung karang barat": "Tanjung Karang Barat",
    "tanjung karang pusat": "Tanjung Karang Pusat",
    "tanjung senang": "Tanjung Senang",
    "teluk betung utara": "Teluk Betung Utara",
    "teluk betungtimur": "Teluk Betung Timur",
}

In [None]:
from pyspark.sql.functions import udf, lower
from pyspark.sql.types import StringType

def map_kecamatan(value):
    if value is None:
        return None
    return kecamatan_mapping.get(value.strip().lower(), value.title())

map_kecamatan_udf = udf(map_kecamatan, StringType())

In [None]:
cm = cm.withColumn("Kecamatan_Final", map_kecamatan_udf(col("Kecamatan")))

In [None]:
cm = cm.drop("Kecamatan") \
                   .withColumnRenamed("Kecamatan_Final", "Kecamatan")

In [None]:
cm.select("Kecamatan", col("Kecamatan").alias("Asli")).distinct().rdd.map(lambda row: (row["Kecamatan"], len(row["Kecamatan"]))).collect()

[('Tanjung Senang', 14),
 ('Panjang', 7),
 ('Sukabumi', 8),
 ('Teluk Betung Timur', 18),
 ('Kedaton', 7),
 ('Tanjung Karang Barat', 20),
 ('Sukarame', 8),
 ('Teluk Betung Utara', 18),
 ('Tanjung Karang Timur', 20),
 ('Kemiling', 8),
 ('Tanjung Karang Pusat', 20),
 ('Teluk Betung Selatan', 20)]

In [None]:
from pyspark.sql.functions import trim, regexp_replace

cm = cm.withColumn("Kecamatan", trim(col("Kecamatan")))
cm = cm.withColumn("Kecamatan", regexp_replace(col("Kecamatan"), "\u200b", ""))

In [None]:
waktu_mapping = {
    "04.00-04-59": "04.00-04.59",
    "05.00-07.60": "05.00-07.59",
    "08.00-11-59": "08.00-11.59",
    "08.00-11.60": "08.00-11.59",
    "08.00-11.62": "08.00-11.59",
    "12.59-14.59": "12.00-14.59"
    # sisanya biarkan default
}

In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

def map_waktu(value):
    if value is None:
        return None
    return waktu_mapping.get(value.strip(), value.strip())

map_waktu_udf = udf(map_waktu, StringType())

cm = cm.withColumn("Waktu_Kejadian", map_waktu_udf(col("Waktu_Kejadian")))

In [None]:
cm.select("Waktu_Kejadian").distinct().orderBy("Waktu_Kejadian").show(truncate=False)

+--------------+
|Waktu_Kejadian|
+--------------+
|04.00-04.59   |
|05.00-07.59   |
|08.00-11.59   |
|12.00-14.59   |
|15.00-17.59   |
|18.00-21.59   |
|22.00-23.59   |
+--------------+



In [None]:
cm.printSchema()
cm.select("Shift_Waktu").distinct().orderBy("Shift_Waktu").show(truncate=False)

root
 |-- Tahun: integer (nullable = true)
 |-- Bulan: string (nullable = true)
 |-- Jenis_Kejahatan: string (nullable = true)
 |-- Lokasi_Kejadian: string (nullable = true)
 |-- Waktu_Kejadian: string (nullable = true)
 |-- Jumlah_kasus: integer (nullable = true)
 |-- Shift_Waktu: string (nullable = false)
 |-- Kecamatan: string (nullable = true)

+-----------+
|Shift_Waktu|
+-----------+
|Lainnya    |
|Malam      |
|Pagi       |
|Siang      |
|Sore       |
+-----------+



In [None]:
# Simpan DataFrame ke CSV (dalam 1 file saja)
cm.coalesce(1).write.csv("/content/hasil_clean.csv", header=True, mode="overwrite")

# Rename file CSV dari folder output Spark
import shutil
import glob

# Ambil nama file yang di-generate Spark
csv_file = glob.glob("/content/hasil_clean.csv/part-*.csv")[0]

# Rename file ke nama final
shutil.move(csv_file, "/content/hasil_clean_final.csv")

# Download file ke komputer kamu
from google.colab import files
files.download("/content/hasil_clean_final.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>