Instalasi OpenJDK karena Spark ditulis dalam bahasa Scala dan berjalan di atas Java Virtual Machine (JVM).

In [1]:
# OpenJDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Download Spark with Hadoop dan extract.

In [2]:
# Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
!tar xf spark-3.2.1-bin-hadoop2.7.tgz

Set environment PATH

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7/bin"

Install library **Findspark** yang akan mencari spark dalam sistem dan akan menginstallnya sebagai regular library.

In [4]:
!pip install -q findspark
import findspark
findspark.init("/content/spark-3.2.1-bin-hadoop2.7/", edit_rc=True)

Import **SparkSession** dari pyspark.sql and membuat SparkSession

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

Jalankan `spark` untuk melihat info dari Spark

In [6]:
spark

Membuat data `emp` dan `dept` beserta schema-nya

In [7]:
emp = [
       (1, "Smith", -1, "2018", "10", "M", 3000), \
       (2, "Rose", 1, "2010", "20", "M", 4000), \
       (3, "Williams", 1, "2010", "10", "M", 1000), \
       (4, "Jones", 2, "2005", "10", "F", 2000), \
       (5, "Brown", 2, "2010", "40", "", -1), \
       (6, "Brown", 2, "2010", "50", "", -1) \
      ]

empColumns = ["emp_id", "name", "superior_emp_id", "year_joined", \
              "emp_dept_id", "gender", "salary"]

dept = [("Finance", 10), \
        ("Marketing", 20), \
        ("Sales", 30), \
        ("IT", 40) \
      ]

deptColumns = ["dept name", "dept id"]

Memasukkan data dan juga schema ke dalam spark dataframe

In [8]:
emp_df = spark.createDataFrame(data=emp, schema=empColumns)
dept_df = spark.createDataFrame(data=dept, schema=deptColumns)

Menampilkan data `emp`

In [9]:
emp_df.show()

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     5|   Brown|              2|       2010|         40|      |    -1|
|     6|   Brown|              2|       2010|         50|      |    -1|
+------+--------+---------------+-----------+-----------+------+------+



Menampilkan data `dept`

In [10]:
dept_df.show()

+---------+-------+
|dept name|dept id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



Pada tabel `emp`, terdapat kolom `emp_dept_id` yang merujuk pada tabel `dept`. Hanya saja, nama kolom pada tabel `dept` perlu diubah agar dapat dilakukan join.

In [11]:
# ubah nama kolom
new_dept_df = dept_df.withColumnRenamed('dept name', 'dept_name').withColumnRenamed('dept id', 'dept_id')
new_dept_df.printSchema()

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)



Join kedua tabel dimana `new_dept_df.dept_id` sama dengan `emp_df.emp_dept_id`.

In [12]:
#Inner Join 
joined_df = emp_df.join(new_dept_df,emp_df.emp_dept_id ==  new_dept_df.dept_id,"inner")
joined_df.show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



Menampilkan daftar karyawan yang bekerja di departemen `Finance`, dan tampilkan gaji tertinggi (`max salary`) dari karyawan yang bekerja di departemen `Finance` menggunakan `pyspark` dataframe.

Cara 1: Menggunakan SQL Query

In [13]:
joined_df.createOrReplaceTempView("join_table") # Buat view baru

result1 = spark.sql("SELECT name FROM join_table WHERE dept_name='Finance'")
print("== Daftar karyawan yang bekerja di departemen Finance ==")
result1.show()

== Daftar karyawan yang bekerja di departemen Finance ==
+--------+
|    name|
+--------+
|   Smith|
|Williams|
|   Jones|
+--------+



In [14]:
result2 = spark.sql("SELECT name, salary FROM join_table WHERE dept_name='Finance' AND salary = (SELECT max(salary) FROM join_table WHERE dept_name='Finance')")
print("== Daftar karyawan yang bekerja di departemen Finance yang memiliki gaji tertinggi ==")
result2.show()

== Daftar karyawan yang bekerja di departemen Finance yang memiliki gaji tertinggi ==
+-----+------+
| name|salary|
+-----+------+
|Smith|  3000|
+-----+------+



Cara 2: Menggunakan query dari Spark

In [15]:
# Import fungsi max
from pyspark.sql.functions import max

In [16]:
result3 = joined_df.filter(joined_df.dept_name == 'Finance').select("name")
result3.show()

+--------+
|    name|
+--------+
|   Smith|
|Williams|
|   Jones|
+--------+



In [17]:
# Dapatkan salary tertinggi di departemen Finance
max_salary_in_finance = joined_df.filter("dept_name == 'Finance'").select(max("salary")).collect()[0]['max(salary)']
# Print bersama dengan nama
result4 = joined_df.filter((joined_df.dept_name == "Finance") & (joined_df.salary == max_salary_in_finance)).select("name", "salary")
result4.show()

+-----+------+
| name|salary|
+-----+------+
|Smith|  3000|
+-----+------+

