In [7]:
# Contoh membuat DataFrame sederhana dan operasi dasar
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michel', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)
df.show()

#Filtering data

print ("Filtering Data")
df_filtered = df.filter(df['Department'] == 'Sales')
df_filtered.show()

df_filtered = df.filter(df['Salary'] > 4000)
df_filtered.show()

#Menghitung rata-rata Salary
print ("Rata-rata Salary")
from pyspark.sql.functions import avg
df.groupBy().agg(avg("Salary")).show()

#Mengurutkan data berdasarkan Salary
print ("Mengurutkan data berdasarkan Salary")
df_sorted = df.orderBy('Salary', ascending=True)
df_sorted.show()
#mencetak skema struktur data dari DataFrame
print('Skema Struktur Data')
df.printSchema()
# Menampilkan 2 baris pertama dari DataFrame
print('Menampilkan 2 baris pertama dari DataFrame')
df.show(2)

# Mengambil baris pertama saja
print('Mengambil baris pertama saja')
first_row = df.first()
print(first_row)

# Memilih hanya kolom 'EmployeeName' dan 'Salary'
print('Memilih hanya kolom EmployeeName dan Salary')
df.select('EmployeeName', 'Salary').show()


+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|      Michel|     Sales|  4600|
|      Robert|     Sales|  4100|
|       Maria|   Finance|  3000|
+------------+----------+------+

Filtering Data
+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|      Michel|     Sales|  4600|
|      Robert|     Sales|  4100|
+------------+----------+------+

+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|      Michel|     Sales|  4600|
|      Robert|     Sales|  4100|
+------------+----------+------+

Rata-rata Salary
+-----------+
|avg(Salary)|
+-----------+
|     3675.0|
+-----------+

Mengurutkan data berdasarkan Salary
+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       Maria|   Finance|  3000|
|       James|     Sales|  3000|


In [18]:
#Transformasi DataFrame
df.select('EmployeeName', 'Salary').show()
df.filter(df.Salary > 3000).show()
df.groupBy('Department').avg('Salary').show()


#Menghitung rata-rata gaji per departemen
print('Menghitung rata-rata gaji di setiap departemen')
from pyspark.sql.functions import mean
df.groupBy('Department').agg(mean('Salary').alias('Rata_Rata_Gaji')).show()

# Menghitung gaji maksimum per departemen
print('Menghitung gaji maksimum per departemen')
from pyspark.sql.functions import max
df.groupBy('Department').agg(max('Salary').alias('Gaji_Maksimum')).show()

# Menghitung total gaji per departemen
print('Menghitung total gaji per departemen')
from pyspark.sql.functions import sum
df.groupBy('Department').agg(sum('Salary').alias('Total_Gaji')).show()

+------------+------+
|EmployeeName|Salary|
+------------+------+
|       James|  3000|
|      Michel|  4600|
|      Robert|  4100|
|       Maria|  3000|
+------------+------+

+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|      Michel|     Sales|  4600|
|      Robert|     Sales|  4100|
+------------+----------+------+

+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|     Sales|     3900.0|
|   Finance|     3000.0|
+----------+-----------+

Menghitung rata-rata gaji di setiap departemen
+----------+--------------+
|Department|Rata_Rata_Gaji|
+----------+--------------+
|     Sales|        3900.0|
|   Finance|        3000.0|
+----------+--------------+

Menghitung gaji maksimum per departemen
+----------+-------------+
|Department|Gaji_Maksimum|
+----------+-------------+
|     Sales|         4600|
|   Finance|         3000|
+----------+-------------+

Menghitung total gaji per departemen
+----------+-----

In [29]:
#manipulasi tpe data kompleks

bonus = df.withColumn('SalaryBonus', df['Salary'] * 0.1)
bonus.show()
total_compensation = bonus.withColumn('TotalCompensation', bonus['Salary'] + bonus['SalaryBonus'])
total_compensation.show()

#Mendefinidikan skema dengan tipe data struct
print('Struct Type')
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
    StructField("name", StringType(), True),
    StructField("info", StructType([
        StructField('address', StringType(), True),
        StructField('phone', StringType(), True)
    ]))
])
#Contoh data dengan tipe data Struct
data_struct = [('James', ('100 Main St', '123-456-7890'))]
df_struct = spark.createDataFrame(data_struct, schema)
df_struct.show(truncate=False)
df_struct.printSchema()

#explode
print('Explode')
from pyspark.sql.functions import explode, col
#Contoh data dengan tipe data array
data_array = [('James', ['Java','Scala']), ('Maria', ['Python', 'SQL'])]
df_array = spark.createDataFrame(data_array, ['name', 'skills'])
df_array.show()

# Menggunakan explode untuk meratakan data Array
Exploded = df_array.select(col('name'), explode(col('skills').alias('skill_single')))
Exploded.show()

#User Defined Functions (UDF)
print('UDF')
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Mendefinisikan fungsi Python kustom
def combine_info(name, dept):
  return f'Name: {name}, Dept: {dept}'
# Mendaftarkan fungsi Python sebagai UDF
combine_info_udf = udf(combine_info, StringType())
# Menerapkan UDF untuk membuat kolom baru
df.withColumn('CombinedInfo', combine_info_udf(df['EmployeeName'], df['Department'])).show(truncate=False)



+------------+----------+------+-----------+
|EmployeeName|Department|Salary|SalaryBonus|
+------------+----------+------+-----------+
|       James|     Sales|  3000|      300.0|
|      Michel|     Sales|  4600|      460.0|
|      Robert|     Sales|  4100|      410.0|
|       Maria|   Finance|  3000|      300.0|
+------------+----------+------+-----------+

+------------+----------+------+-----------+-----------------+
|EmployeeName|Department|Salary|SalaryBonus|TotalCompensation|
+------------+----------+------+-----------+-----------------+
|       James|     Sales|  3000|      300.0|           3300.0|
|      Michel|     Sales|  4600|      460.0|           5060.0|
|      Robert|     Sales|  4100|      410.0|           4510.0|
|       Maria|   Finance|  3000|      300.0|           3300.0|
+------------+----------+------+-----------+-----------------+

Struct Type
+-----+---------------------------+
|name |info                       |
+-----+---------------------------+
|James|{100 Ma

In [31]:
# menggunakan window functions
from pyspark.sql.window import Window
from pyspark.sql import functions as f

WindowSpec = Window.partitionBy('Department').orderBy('Salary')
df.withColumn('Rank', f.rank().over(WindowSpec)).show()

#running average
from pyspark.sql.functions import avg
# Menghitung rata-rata gaji bergulir di setiap departemen
df.withColumn('RunningAverage', avg('Salary').over(WindowSpec)).show()

#Running Total
from pyspark.sql.functions import sum
# Menghitung total gaji bergulir di setiap departemen
df.withColumn('RunningTotal', sum('Salary').over(WindowSpec)).show()

#Implementasi beberapa fungsi peringkat
from pyspark.sql.functions import rank, dense_rank, row_number, ntile

df.withColumn('Rank', rank().over(WindowSpec))\
  .withColumn('DenseRank', dense_rank().over(WindowSpec))\
  .withColumn('RowNumber', row_number().over(WindowSpec))\
  .withColumn('Ntile_2', ntile(2).over(WindowSpec))\
  .show()

+------------+----------+------+----+
|EmployeeName|Department|Salary|Rank|
+------------+----------+------+----+
|       Maria|   Finance|  3000|   1|
|       James|     Sales|  3000|   1|
|      Robert|     Sales|  4100|   2|
|      Michel|     Sales|  4600|   3|
+------------+----------+------+----+

+------------+----------+------+--------------+
|EmployeeName|Department|Salary|RunningAverage|
+------------+----------+------+--------------+
|       Maria|   Finance|  3000|        3000.0|
|       James|     Sales|  3000|        3000.0|
|      Robert|     Sales|  4100|        3550.0|
|      Michel|     Sales|  4600|        3900.0|
+------------+----------+------+--------------+

+------------+----------+------+------------+
|EmployeeName|Department|Salary|RunningTotal|
+------------+----------+------+------------+
|       Maria|   Finance|  3000|        3000|
|       James|     Sales|  3000|        3000|
|      Robert|     Sales|  4100|        7100|
|      Michel|     Sales|  4600|  

In [1]:
from pyspark.sql import SparkSession

# Membuat SparkSession
spark = SparkSession.builder.appName("DataManipulationColab").getOrCreate()
print("Spark Session berhasil dibuat!")

from google.colab import files

# Mengunggah file dari komputer Anda
uploaded = files.upload()

# Mengambil nama file yang diunggah
file_name = list(uploaded.keys())[0]
print(f"File '{file_name}' berhasil diunggah.")

Spark Session berhasil dibuat!


Saving test.csv to test.csv
File 'test.csv' berhasil diunggah.


In [5]:
# Memuat data CSV ke dalam PySpark DataFrame
# header=True: menganggap baris pertama sebagai nama kolom
# inferSchema=True: PySpark akan otomatis menentukan tipe data
df = spark.read.csv(file_name, header=True, inferSchema=True)

print("\n=== 5 Baris Pertama dari DataFrame ===")
df.show(5)

#Menghitung jumlah penumpang berdasarkan jenis kelamin
from pyspark.sql.functions import count
print("\n=== Jumlah Penumpang Berdasarkan Jenis Kelamin ===")
gender_counts = df.groupBy("Sex").agg(count('PassengerId').alias('Jumlah Penumpang'))
gender_counts.show()

# Menganalisis distribusi umur
# Menghitung statistik dasar untuk kolom 'Age'
print("\n=== Statistik Distribusi Umur ===")
from pyspark.sql.functions import avg, max, min
age_stats = df.agg(
    avg('Age').alias('Rata-rata Umur'),
    max('Age').alias('Umur Tertinggi'),
    min('Age').alias('Umur Terendah')
)
age_stats.show()

# Menemukan harga tiket termahal dan termurah
print("\n=== Harga Tiket Tertinggi dan Terendah ===")
fare_stats = df.agg(
    max('Fare').alias('Harga Tiket Tertinggi'),
    min('Fare').alias('Harga Tiket Terendah')
)

fare_stats.show()

# Menghitung jumlah penumpang per kelas
print("\n=== Jumlah Penumpang Berdasarkan Kelas ===")
class_counts = df.groupBy("Pclass").agg(count('PassengerId').alias('Jumlah Penumpang'))
class_counts.show()







=== 5 Baris Pertama dari DataFrame ===
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|        892|     3|    Kelly, Mr. James|  male|34.5|    0|    0| 330911| 7.8292| NULL|       Q|
|        893|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0| 363272|    7.0| NULL|       S|
|        894|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0| 240276| 9.6875| NULL|       Q|
|        895|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0| 315154| 8.6625| NULL|       S|
|        896|     3|Hirvonen, Mrs. Al...|female|22.0|    1|    1|3101298|12.2875| NULL|       S|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows


=== Jumlah Penumpang Berdasarkan Jenis Kelamin ===
+------+--