In [None]:
# Contoh membuat DataFrame sederhana dan operasi dasar
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)
df.show()

+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
|       Maria|   Finance|  3000|
+------------+----------+------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# 1. Membuat SparkSession
# SparkSession adalah entry point untuk semua operasi Spark
spark = SparkSession.builder \
    .appName('Big Data P 3') \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print("SparkSession berhasil dibuat")
print(f"Spark Version: {spark.version}")

# 2. Membuat DataFrame dengan data sederhana
# Data dalam bentuk list of tuples
data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('David', 'Finance', 3500),
        ('Sarah', 'Marketing', 4000)]

columns = ['EmployeeName', 'Department', 'Salary']

# Membuat DataFrame
df = spark.createDataFrame(data, schema=columns)

print("Data Employee yang dibuat:")
df.show()

# 3. Eksplorasi fungsi dasar DataFrame
print("Informasi Dasar DataFrame:")

# Menampilkan schema
print("Schema DataFrame:")
df.printSchema()

# Menampilkan jumlah baris dan kolom
print(f"\nJumlah baris: {df.count()}")
print(f"Jumlah kolom: {len(df.columns)}")

# Menampilkan nama kolom
print(f"Nama kolom: {df.columns}")

# Menampilkan beberapa baris pertama
print("3 Baris pertama:")
df.show(3)

# Menampilkan statistik deskriptif
print("Statistik Deskriptif:")
df.describe().show()

SparkSession berhasil dibuat
Spark Version: 3.5.1
Data Employee yang dibuat:
+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
|       Maria|   Finance|  3000|
|       David|   Finance|  3500|
|       Sarah| Marketing|  4000|
+------------+----------+------+

Informasi Dasar DataFrame:
Schema DataFrame:
root
 |-- EmployeeName: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: long (nullable = true)


Jumlah baris: 6
Jumlah kolom: 3
Nama kolom: ['EmployeeName', 'Department', 'Salary']
3 Baris pertama:
+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
+------------+----------+------+
only showing top 3 rows

Statistik Deskriptif:
+-------+------------+----------+-----

In [None]:
from pyspark.sql import functions as F

print("TRANSFORMASI DASAR DATAFRAMES")
print("="*50)

# 1. SELECT - Memilih kolom tertentu
print("SELECT: Memilih kolom EmployeeName dan Salary")
df_selected = df.select('EmployeeName', 'Salary')
df_selected.show()

# 2. FILTER/WHERE - Menyaring data berdasarkan kondisi
print("FILTER: Employee dengan Salary > 3000")
df_filtered = df.filter(df['Salary'] > 3000)
# Alternatif menggunakan where()
# df_filtered = df.where(df['Salary'] > 3000)
df_filtered.show()

print("Filter dengan multiple conditions:")
df_multiple_filter = df.filter((df['Salary'] > 3000) & (df['Department'] == 'Sales'))
df_multiple_filter.show()

# 3. GROUP BY dan AGREGASI
print("GROUP BY: Rata-rata salary per department")
df_grouped = df.groupBy('Department').agg(
    F.avg('Salary').alias('Average_Salary'),
    F.max('Salary').alias('Max_Salary'),
    F.min('Salary').alias('Min_Salary'),
    F.count('EmployeeName').alias('Employee_Count')
)
df_grouped.show()

# 4. ORDER BY - Mengurutkan data
print("ORDER BY: Mengurutkan berdasarkan Salary (descending)")
df_ordered = df.orderBy(df['Salary'].desc())
df_ordered.show()

# 5. DISTINCT - Menghilangkan duplikasi
print("DISTINCT: Department yang unik")
df.select('Department').distinct().show()

# 6. Operasi agregasi global
print("Statistik Global:")
total_salary = df.agg(F.sum('Salary')).collect()[0][0]
avg_salary = df.agg(F.avg('Salary')).collect()[0][0]
max_salary = df.agg(F.max('Salary')).collect()[0][0]

print(f"Total Salary: ${total_salary:,}")
print(f"Average Salary: ${avg_salary:,.2f}")
print(f"Maximum Salary: ${max_salary:,}")

TRANSFORMASI DASAR DATAFRAMES
SELECT: Memilih kolom EmployeeName dan Salary
+------------+------+
|EmployeeName|Salary|
+------------+------+
|       James|  3000|
|     Michael|  4600|
|      Robert|  4100|
|       Maria|  3000|
|       David|  3500|
|       Sarah|  4000|
+------------+------+

FILTER: Employee dengan Salary > 3000
+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
|       David|   Finance|  3500|
|       Sarah| Marketing|  4000|
+------------+----------+------+

Filter dengan multiple conditions:
+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
+------------+----------+------+

GROUP BY: Rata-rata salary per department
+----------+--------------+----------+----------+--------------+
|Department|Average_Salary|Max_Salary|Min_Salary|Employee_C

In [None]:
from pyspark.sql.functions import *

print("BEKERJA DENGAN TIPE DATA KOMPLEKS")
print("="*50)

# 1. Menambah kolom baru dengan withColumn()
print("Menambah kolom SalaryBonus (10% dari Salary)")
df_with_bonus = df.withColumn('SalaryBonus', df['Salary'] * 0.1)
df_with_bonus.show()

# 2. Menambah kolom dengan operasi kompleks
print("Menambah kolom TotalCompensation")
df_with_total = df_with_bonus.withColumn(
    'TotalCompensation',
    col('Salary') + col('SalaryBonus')
)
df_with_total.show()

# 3. Operasi kondisional dengan when()
print("Menambah kolom SalaryCategory berdasarkan kondisi")
df_with_category = df.withColumn(
    'SalaryCategory',
    when(col('Salary') < 3500, 'Low')
    .when(col('Salary') < 4500, 'Medium')
    .otherwise('High')
)
df_with_category.show()

# 4. Bekerja dengan string functions
print("Manipulasi string pada nama employee")
df_string_ops = df.withColumn('EmployeeName_Upper', upper(col('EmployeeName'))) \
                  .withColumn('Name_Length', length(col('EmployeeName'))) \
                  .withColumn('First_Letter', substring(col('EmployeeName'), 1, 1))
df_string_ops.show()

# 5. Membuat kolom dengan array
print("Membuat kolom Skills (array)")
# Simulasi data dengan skills
skills_data = [
    ('James', 'Sales', 3000, ['Communication', 'Negotiation']),
    ('Michael', 'Sales', 4600, ['Leadership', 'Communication', 'Analytics']),
    ('Robert', 'Sales', 4100, ['Communication', 'Customer Service']),
    ('Maria', 'Finance', 3000, ['Excel', 'Analysis', 'Reporting'])
]

skills_columns = ['EmployeeName', 'Department', 'Salary', 'Skills']
df_skills = spark.createDataFrame(skills_data, skills_columns)

print("Data dengan Skills:")
df_skills.show(truncate=False)

# 6. Operasi pada array
print("Operasi pada array Skills")
df_array_ops = df_skills.withColumn('Skills_Count', size(col('Skills'))) \
                        .withColumn('Has_Communication', array_contains(col('Skills'), 'Communication'))
df_array_ops.show(truncate=False)

# 7. Explode array menjadi baris terpisah
print("Explode array Skills menjadi baris terpisah")
df_exploded = df_skills.select('EmployeeName', 'Department', explode('Skills').alias('Skill'))
df_exploded.show()

BEKERJA DENGAN TIPE DATA KOMPLEKS
Menambah kolom SalaryBonus (10% dari Salary)
+------------+----------+------+-----------+
|EmployeeName|Department|Salary|SalaryBonus|
+------------+----------+------+-----------+
|       James|     Sales|  3000|      300.0|
|     Michael|     Sales|  4600|      460.0|
|      Robert|     Sales|  4100|      410.0|
|       Maria|   Finance|  3000|      300.0|
|       David|   Finance|  3500|      350.0|
|       Sarah| Marketing|  4000|      400.0|
+------------+----------+------+-----------+

Menambah kolom TotalCompensation
+------------+----------+------+-----------+-----------------+
|EmployeeName|Department|Salary|SalaryBonus|TotalCompensation|
+------------+----------+------+-----------+-----------------+
|       James|     Sales|  3000|      300.0|           3300.0|
|     Michael|     Sales|  4600|      460.0|           5060.0|
|      Robert|     Sales|  4100|      410.0|           4510.0|
|       Maria|   Finance|  3000|      300.0|           3300

In [None]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

print("WINDOW FUNCTIONS - OPERASI LANJUTAN")
print("="*50)

# Data yang lebih kompleks untuk demonstrasi window functions
extended_data = [
    ('James', 'Sales', 3000, '2023-01-01'),
    ('Michael', 'Sales', 4600, '2023-02-01'),
    ('Robert', 'Sales', 4100, '2023-01-15'),
    ('Maria', 'Finance', 3000, '2023-01-01'),
    ('David', 'Finance', 3500, '2023-03-01'),
    ('Sarah', 'Marketing', 4000, '2023-02-15'),
    ('John', 'Marketing', 3800, '2023-01-20'),
    ('Lisa', 'Finance', 4200, '2023-02-10')
]

extended_columns = ['EmployeeName', 'Department', 'Salary', 'HireDate']
df_extended = spark.createDataFrame(extended_data, extended_columns)

print("Data Employee Extended:")
df_extended.show()

# 1. Window untuk ranking dalam department
print("RANKING: Peringkat salary dalam setiap department")
windowSpec_rank = Window.partitionBy('Department').orderBy(F.col('Salary').desc())

df_ranked = df_extended.withColumn('Salary_Rank', F.rank().over(windowSpec_rank)) \
                      .withColumn('Dense_Rank', F.dense_rank().over(windowSpec_rank)) \
                      .withColumn('Row_Number', F.row_number().over(windowSpec_rank))

df_ranked.select('EmployeeName', 'Department', 'Salary', 'Salary_Rank', 'Dense_Rank', 'Row_Number').show()

print("Penjelasan perbedaan ranking:")
print("- RANK: Memberikan ranking dengan gap jika ada tie")
print("- DENSE_RANK: Memberikan ranking tanpa gap jika ada tie")
print("- ROW_NUMBER: Memberikan nomor urut unik")

# 2. Running totals dan cumulative operations
print("RUNNING TOTALS: Cumulative salary dalam department")
windowSpec_cumsum = Window.partitionBy('Department').orderBy('Salary').rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_running = df_extended.withColumn('Running_Sum', F.sum('Salary').over(windowSpec_cumsum)) \
                       .withColumn('Running_Avg', F.avg('Salary').over(windowSpec_cumsum)) \
                       .withColumn('Running_Count', F.count('Salary').over(windowSpec_cumsum))

df_running.select('EmployeeName', 'Department', 'Salary', 'Running_Sum', 'Running_Avg', 'Running_Count').show()

# 3. Lead dan Lag functions
print("LEAD/LAG: Membandingkan dengan baris sebelum/sesudah")
windowSpec_lead_lag = Window.partitionBy('Department').orderBy('Salary')

df_lead_lag = df_extended.withColumn('Previous_Salary', F.lag('Salary', 1).over(windowSpec_lead_lag)) \
                        .withColumn('Next_Salary', F.lead('Salary', 1).over(windowSpec_lead_lag)) \
                        .withColumn('Salary_Diff_Prev', F.col('Salary') - F.lag('Salary', 1).over(windowSpec_lead_lag))

df_lead_lag.select('EmployeeName', 'Department', 'Salary', 'Previous_Salary', 'Next_Salary', 'Salary_Diff_Prev').show()

# 4. Persentil dan statistik lanjutan
print("PERSENTIL: Menghitung persentil dalam department")
windowSpec_percentile = Window.partitionBy('Department')

df_percentile = df_extended.withColumn('Salary_Percentile', F.percent_rank().over(windowSpec_rank)) \
                          .withColumn('Dept_Max_Salary', F.max('Salary').over(windowSpec_percentile)) \
                          .withColumn('Dept_Min_Salary', F.min('Salary').over(windowSpec_percentile)) \
                          .withColumn('Salary_Pct_of_Max', (F.col('Salary') / F.max('Salary').over(windowSpec_percentile)) * 100)

df_percentile.select('EmployeeName', 'Department', 'Salary', 'Salary_Percentile', 'Salary_Pct_of_Max').show()

# 5. Moving averages
print("MOVING AVERAGE: Rata-rata bergerak")
windowSpec_moving = Window.partitionBy('Department').orderBy('Salary').rowsBetween(-1, 1)  # Window 3 baris

df_moving = df_extended.withColumn('Moving_Avg_3', F.avg('Salary').over(windowSpec_moving))
df_moving.select('EmployeeName', 'Department', 'Salary', 'Moving_Avg_3').show()

WINDOW FUNCTIONS - OPERASI LANJUTAN
Data Employee Extended:
+------------+----------+------+----------+
|EmployeeName|Department|Salary|  HireDate|
+------------+----------+------+----------+
|       James|     Sales|  3000|2023-01-01|
|     Michael|     Sales|  4600|2023-02-01|
|      Robert|     Sales|  4100|2023-01-15|
|       Maria|   Finance|  3000|2023-01-01|
|       David|   Finance|  3500|2023-03-01|
|       Sarah| Marketing|  4000|2023-02-15|
|        John| Marketing|  3800|2023-01-20|
|        Lisa|   Finance|  4200|2023-02-10|
+------------+----------+------+----------+

RANKING: Peringkat salary dalam setiap department
+------------+----------+------+-----------+----------+----------+
|EmployeeName|Department|Salary|Salary_Rank|Dense_Rank|Row_Number|
+------------+----------+------+-----------+----------+----------+
|        Lisa|   Finance|  4200|          1|         1|         1|
|       David|   Finance|  3500|          2|         2|         2|
|       Maria|   Finance| 

T 5

In [21]:
from google.colab import files
uploaded = files.upload()  # Pilih file lele.csv dari komputer


Saving lele.csv to lele.csv


In [22]:
!pip install pyspark
from pyspark.sql import SparkSession

# Membuat session Spark
spark = SparkSession.builder.appName("LeleAnalysis").getOrCreate()




In [23]:
# Membaca file CSV ke DataFrame PySpark
df = spark.read.csv("lele.csv", header=True, inferSchema=True)

# Menampilkan isi data
df.show()


+----------+-----+-------+------------+
|   tanggal|berat|panjang|jumlah_pakan|
+----------+-----+-------+------------+
|2025-01-01|  100|     15|          20|
|2025-01-02|  120|     16|          22|
|2025-01-03|  140|     17|          25|
|2025-01-04|  160|     18|          28|
|2025-01-05|  185|     19|          30|
+----------+-----+-------+------------+



In [24]:
df.printSchema()


root
 |-- tanggal: date (nullable = true)
 |-- berat: integer (nullable = true)
 |-- panjang: integer (nullable = true)
 |-- jumlah_pakan: integer (nullable = true)



In [25]:
df.groupBy().avg("berat", "panjang").show()


+----------+------------+
|avg(berat)|avg(panjang)|
+----------+------------+
|     141.0|        17.0|
+----------+------------+



In [26]:
from pyspark.sql.functions import max
df.select(max("berat")).show()


+----------+
|max(berat)|
+----------+
|       185|
+----------+



In [27]:
from pyspark.sql.functions import col
df = df.withColumn("efisiensi_pakan", col("berat")/col("jumlah_pakan"))
df.show()


+----------+-----+-------+------------+-----------------+
|   tanggal|berat|panjang|jumlah_pakan|  efisiensi_pakan|
+----------+-----+-------+------------+-----------------+
|2025-01-01|  100|     15|          20|              5.0|
|2025-01-02|  120|     16|          22|5.454545454545454|
|2025-01-03|  140|     17|          25|              5.6|
|2025-01-04|  160|     18|          28|5.714285714285714|
|2025-01-05|  185|     19|          30|6.166666666666667|
+----------+-----+-------+------------+-----------------+



In [28]:
df.write.csv("hasil_lele.csv", header=True)
