In [1]:
import os
import sys
import warnings
import pandas as pd

# 1. KONFIGURASI PATH
os.environ['HADOOP_HOME'] = "C:\\hadoop"
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# 2. DEFINISI FLAG JAVA 21 (Manual String)
# Kita susun manual agar tidak ada koma/tanda kurung yang salah dibaca Windows
java_flags = (
    "--add-opens=java.base/java.lang=ALL-UNNAMED "
    "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED "
    "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED "
    "--add-opens=java.base/java.io=ALL-UNNAMED "
    "--add-opens=java.base/java.net=ALL-UNNAMED "
    "--add-opens=java.base/java.nio=ALL-UNNAMED "
    "--add-opens=java.base/java.util=ALL-UNNAMED "
    "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED "
    "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED "
    "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED "
    "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED "
    "--add-opens=java.base/sun.security.action=ALL-UNNAMED "
    "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED "
    "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED "
    "--add-opens=java.base/sun.misc=ALL-UNNAMED "
    "--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED "
    "-Dio.netty.tryReflectionSetAccessible=true"
)

# 3. BAJAK PELUNCUR SPARK (THE NUCLEAR OPTION)
# Kita paksa Spark menggunakan arguments ini saat start. 
# Kita gabungkan: Driver Options + Packages Delta + Mode Pyspark Shell
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-java-options "{java_flags}" --packages io.delta:delta-spark_2.12:3.2.0 pyspark-shell'

# Import Spark SETELAH set environment variable di atas
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, monotonically_increasing_id
from delta import *

# 4. BUILDER MINIMALIS
# Kita tidak perlu config aneh-aneh lagi karena sudah kita inject di atas
builder = SparkSession.builder \
    .appName("ProyekSparkWindows_Nuclear") \
    .master("local[*]") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")

# Start
spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Bungkam Log
warnings.filterwarnings('ignore')
spark.sparkContext.setLogLevel("ERROR")

# Chapter 6: SQL Queries in Spark

In [2]:
# Membuat DataFrame Secara Manual (Creating DataFrame manually)

salary_data_with_id = [(1, "John", "Field-eng", 3500, 40), \
    (2, "Robert", "Sales", 4000, 38), \
    (3, "Maria", "Finance", 3500, 28), \
    (4, "Michael", "Sales", 3000, 20), \
    (5, "Kelly", "Finance", 3500, 35), \
    (6, "Kate", "Finance", 3000, 45), \
    (7, "Martin", "Finance", 3500, 26), \
    (8, "Kiran", "Sales", 2200, 35), \
  ]
columns= ["ID", "Employee", "Department", "Salary", "Age"]
salary_data_with_id = spark.createDataFrame(data = salary_data_with_id, schema = columns)
salary_data_with_id.show()


+---+--------+----------+------+---+
| ID|Employee|Department|Salary|Age|
+---+--------+----------+------+---+
|  1|    John| Field-eng|  3500| 40|
|  2|  Robert|     Sales|  4000| 38|
|  3|   Maria|   Finance|  3500| 28|
|  4| Michael|     Sales|  3000| 20|
|  5|   Kelly|   Finance|  3500| 35|
|  6|    Kate|   Finance|  3000| 45|
|  7|  Martin|   Finance|  3500| 26|
|  8|   Kiran|     Sales|  2200| 35|
+---+--------+----------+------+---+



In [3]:
# Konteks Penyimpanan CSV (Saving and Reading CSV Context)

salary_data_with_id.write.format("csv").mode("overwrite").option("header", "true").save("salary_data.csv")


In [4]:
csv_data = spark.read.csv('salary_data.csv', header=True)

In [5]:
csv_data.show()

+---+--------+----------+------+---+
| ID|Employee|Department|Salary|Age|
+---+--------+----------+------+---+
|  1|    John| Field-eng|  3500| 40|
|  7|  Martin|   Finance|  3500| 26|
|  3|   Maria|   Finance|  3500| 28|
|  4| Michael|     Sales|  3000| 20|
|  5|   Kelly|   Finance|  3500| 35|
|  2|  Robert|     Sales|  4000| 38|
|  6|    Kate|   Finance|  3000| 45|
|  8|   Kiran|     Sales|  2200| 35|
+---+--------+----------+------+---+



In [6]:
# Melakukan Transformasi pada Data (Perform transformations on the loaded data)

# Perform transformations on the loaded data 
processed_data = csv_data.filter(csv_data["Salary"] > 3000) 
# Save the processed data as a table 
processed_data.createOrReplaceTempView("high_salary_employees") 
# Perform SQL queries on the saved table 
results = spark.sql("SELECT * FROM high_salary_employees ") 
results.show()


+---+--------+----------+------+---+
| ID|Employee|Department|Salary|Age|
+---+--------+----------+------+---+
|  1|    John| Field-eng|  3500| 40|
|  7|  Martin|   Finance|  3500| 26|
|  3|   Maria|   Finance|  3500| 28|
|  5|   Kelly|   Finance|  3500| 35|
|  2|  Robert|     Sales|  4000| 38|
+---+--------+----------+------+---+



In [7]:
# Memanfaatkan Spark SQL untuk memfilter dan memilih data (Utilizing Spark SQL to filter and select data based on specific criteria)

# Save the processed data as a view 
salary_data_with_id.createOrReplaceTempView("employees") 
#Apply filtering on data
filtered_data = spark.sql("SELECT Employee, Department, Salary, Age FROM employees WHERE age > 30") 
# Display the results 
filtered_data.show()


+--------+----------+------+---+
|Employee|Department|Salary|Age|
+--------+----------+------+---+
|    John| Field-eng|  3500| 40|
|  Robert|     Sales|  4000| 38|
|   Kelly|   Finance|  3500| 35|
|    Kate|   Finance|  3000| 45|
|   Kiran|     Sales|  2200| 35|
+--------+----------+------+---+



In [8]:
# Agregasi (Aggregation)

# Perform an aggregation to calculate the average salary 
average_salary = spark.sql("SELECT AVG(Salary) AS average_salary FROM employees") 
# Display the average salary 
average_salary.show() 


+--------------+
|average_salary|
+--------------+
|        3275.0|
+--------------+



In [9]:
# Pengurutan (Sorting)

# Sort the data based on the salary column in descending order 
sorted_data = spark.sql("SELECT * FROM employees ORDER BY Salary DESC") 
# Display the sorted data 
sorted_data.show() 


+---+--------+----------+------+---+
| ID|Employee|Department|Salary|Age|
+---+--------+----------+------+---+
|  2|  Robert|     Sales|  4000| 38|
|  3|   Maria|   Finance|  3500| 28|
|  1|    John| Field-eng|  3500| 40|
|  5|   Kelly|   Finance|  3500| 35|
|  7|  Martin|   Finance|  3500| 26|
|  4| Michael|     Sales|  3000| 20|
|  6|    Kate|   Finance|  3000| 45|
|  8|   Kiran|     Sales|  2200| 35|
+---+--------+----------+------+---+



In [10]:
# Menggabungkan agregasi (Combining aggregations)

# Sort the data based on the salary column in descending order 
filtered_data = spark.sql("SELECT Employee, Department, Salary, Age FROM employees WHERE age > 30 AND Salary > 3000 ORDER BY Salary DESC") 
# Display the results 
filtered_data.show()


+--------+----------+------+---+
|Employee|Department|Salary|Age|
+--------+----------+------+---+
|  Robert|     Sales|  4000| 38|
|   Kelly|   Finance|  3500| 35|
|    John| Field-eng|  3500| 40|
+--------+----------+------+---+



In [11]:
# Mengelompokkan data (Grouping data)

# Group the data based on the Department column and take average salary for each department  
grouped_data = spark.sql("SELECT Department, avg(Salary) FROM employees GROUP BY Department") 
# Display the results 
grouped_data.show()


+----------+------------------+
|Department|       avg(Salary)|
+----------+------------------+
| Field-eng|            3500.0|
|     Sales|3066.6666666666665|
|   Finance|            3375.0|
+----------+------------------+



In [12]:
# Mengagregasi data (Aggregating data)

# Perform grouping and multiple aggregations  
aggregated_data = spark.sql("SELECT Department, sum(Salary) AS total_salary, max(Salary) AS max_salary FROM employees GROUP BY Department") 

# Display the results 
aggregated_data.show()


+----------+------------+----------+
|Department|total_salary|max_salary|
+----------+------------+----------+
| Field-eng|        3500|      3500|
|     Sales|        9200|      4000|
|   Finance|       13500|      3500|
+----------+------------+----------+



In [13]:
# Menghitung jumlah kumulatif menggunakan fungsi jendela (Calculating cumulative sum using window functions)

from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum

# Define the window specification
window_spec = Window.partitionBy("Department").orderBy("Age")

# Calculate the cumulative sum using window function
df_with_cumulative_sum = salary_data_with_id.withColumn("cumulative_sum", sum(col("Salary")).over(window_spec))

# Display the result
df_with_cumulative_sum.show()


+---+--------+----------+------+---+--------------+
| ID|Employee|Department|Salary|Age|cumulative_sum|
+---+--------+----------+------+---+--------------+
|  1|    John| Field-eng|  3500| 40|          3500|
|  7|  Martin|   Finance|  3500| 26|          3500|
|  3|   Maria|   Finance|  3500| 28|          7000|
|  5|   Kelly|   Finance|  3500| 35|         10500|
|  6|    Kate|   Finance|  3000| 45|         13500|
|  4| Michael|     Sales|  3000| 20|          3000|
|  8|   Kiran|     Sales|  2200| 35|          5200|
|  2|  Robert|     Sales|  4000| 38|          9200|
+---+--------+----------+------+---+--------------+



In [14]:
# Menerapkan UDF ke DataFrame (Applying a UDF to a DataFrame)

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a UDF to capitalize a string
capitalize_udf = udf(lambda x: x.upper(), StringType())

# Apply the UDF to a column
df_with_capitalized_names = salary_data_with_id.withColumn("capitalized_name", capitalize_udf("Employee"))

# Display the result
df_with_capitalized_names.show()


+---+--------+----------+------+---+----------------+
| ID|Employee|Department|Salary|Age|capitalized_name|
+---+--------+----------+------+---+----------------+
|  1|    John| Field-eng|  3500| 40|            JOHN|
|  2|  Robert|     Sales|  4000| 38|          ROBERT|
|  3|   Maria|   Finance|  3500| 28|           MARIA|
|  4| Michael|     Sales|  3000| 20|         MICHAEL|
|  5|   Kelly|   Finance|  3500| 35|           KELLY|
|  6|    Kate|   Finance|  3000| 45|            KATE|
|  7|  Martin|   Finance|  3500| 26|          MARTIN|
|  8|   Kiran|     Sales|  2200| 35|           KIRAN|
+---+--------+----------+------+---+----------------+



In [17]:
# import pandas as pd
# from pyspark.sql.functions import pandas_udf

# @pandas_udf('long')
# def pandas_plus_one(series: pd.Series) -> pd.Series:
#     # Simply plus one by using pandas Series.
#     return series + 1

# salary_data_with_id.select(pandas_plus_one(salary_data_with_id.Salary)).show()

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

@udf(returnType=LongType())
def plus_one(value):
    if value is not None:
        return value + 1
    return None

salary_data_with_id.select(plus_one(salary_data_with_id.Salary).alias("pandas_plus_one(Salary)")).show()

+-----------------------+
|pandas_plus_one(Salary)|
+-----------------------+
|                   3501|
|                   4001|
|                   3501|
|                   3001|
|                   3501|
|                   3001|
|                   3501|
|                   2201|
+-----------------------+



In [22]:
# @pandas_udf("integer")
# def add_one(s: pd.Series) -> pd.Series:
#     return s + 1

# spark.udf.register("add_one", add_one)
# spark.sql("SELECT add_one(Salary) FROM employees").show()

from pyspark.sql.types import IntegerType

salary_data_with_id.createOrReplaceTempView("employees")

def add_one_logic(value):
    if value is not None:
        return value + 1
    return None

spark.udf.register("add_one", add_one_logic, IntegerType())

spark.sql("SELECT add_one(Salary) FROM employees").show()

+---------------+
|add_one(Salary)|
+---------------+
|           3501|
|           4001|
|           3501|
|           3001|
|           3501|
|           3001|
|           3501|
|           2201|
+---------------+

