# Initial Set Up 🦬



In [None]:
####### Initial Set Up (Manual) #########
# Installing Java JDK on google colab
# JDK is needed because Spark is built on top of JVM. JDK provides neccesary tools to compile & run java codes and also to execute internal spark code.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download a prebuilt binary distribution of spark that compatible with hadoop
# Binary distribution -> Compiled code that can be executed directly by a computer's CPU
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

# Extract the tar file
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

# Install the findspark package. It's used to locate the spark installation, so that PySpark can be used.
!pip install -q findspark

# Set up the environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

# Import the libary
import findspark

# Initiate findspark
findspark.init()

# Check the location for Spark
findspark.find()

# Import SparkSession
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Check Spark Session Information
spark

In [1]:
####### Initial Set Up (Automatic) #########
# Install pyspark
!pip install pyspark
# Import SparkSession
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
# Create a Spark Session, sets up the local device as the master of the session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=144838de5d634f5bf527e7cc568974af0599fb65227d602db41b31b56cdbd062
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


# Dataframes Exploration 🐲


In [None]:
####### Creating a dataframe from a CSV file (With a header) #########
# Importing the sql types module that enables schema definitions.
from pyspark.sql.types import *
# Load a csv file to a dataframe (without a header)
df_citizen = spark.read.csv("/content/drive/MyDrive/SampleData/sampel_data_1.csv")
print("Headerless dataframe : ")
# Print the content of a dataframe
df_citizen.show()
# Load a csv file to a dataframe (with a header)
df_citizen_header = spark.read.csv("/content/drive/MyDrive/SampleData/sampel_data_1.csv", header=True)
print("Dataframe with header :")
df_citizen_header.show()
print("Showing rows with name Andi :")
# The headers of a dataframe become its properties
# The filter method is used to filter rows based on certain conditions.
df_citizen_header.filter(df_citizen_header.nama == "andi").show()

In [None]:
####### Creating a dataframe from a CSV file (With a predefined schema) #########
# Importing the sql types module that enables schema definitions.
from pyspark.sql.types import *
# Create a schema
schema_citizen = StructType([
    StructField('nama', StringType(), False),
    StructField('umur', IntegerType(), False)
])
# Load a csv file to a dataframe (with a schema)
df_citizen = spark.read.csv("/content/drive/MyDrive/SampleData/sampel_data_1.csv", schema=schema_citizen)
# Print the content of a dataframe
df_citizen.show()


In [None]:
####### Filtering the first row of a dataframe #########
from pyspark.sql.functions import monotonically_increasing_id
df_citizen_index = df_citizen.select("*").withColumn("id", monotonically_increasing_id())
df_citizen_index.filter(df_citizen_index.id > 0).show()

In [None]:
####### Dataframe operation with domain-specific language #########
from pyspark.sql.types import *
schema_nilai_ipa = StructType([
    StructField("Nama", StringType(), False),
    StructField("Kelas", StringType(), False),
    StructField("Nilai", IntegerType(), False)
])
nilai_ipa = [
    ["Budi", "IX A", 90],
    ["Ando", "IX B", 50],
    ["Putu", "IX A", 70],
    ["Kadek", "IX C", 40]
]
df_nilai_ipa = spark.createDataFrame(nilai_ipa, schema=schema_nilai_ipa)
print("Original Data :")
df_nilai_ipa.show()

# Adding a new column
df_nilai_ipa.withColumn("Nilai Up", df_nilai_ipa["Nilai"] + 10).show
# Select the first n-th rows from a dataframe. Returns a row object.
print(df_nilai_ipa.take(2))
# Selecting a specific field from a dataframe.
df_nilai_ipa.select(df_nilai_ipa.Kelas).show()

In [5]:
nilai_ipa = [
    ["Budi", "IX A", 90],
    ["Ando", "IX B", 50],
    ["Putu", "IX A", 70],
    ["Kadek", "IX C", 40]
]
df_nilai_ipa = spark.createDataFrame(nilai_ipa, schema=["Name", "Class", "Score"])
print("Original Data :")
df_nilai_ipa.show()

# Adding a new column
df_new = df_nilai_ipa.withColumn("Nilai Up", df_nilai_ipa["Score"] + 10)
df_new.show()

Original Data :
+-----+-----+-----+
| Name|Class|Score|
+-----+-----+-----+
| Budi| IX A|   90|
| Ando| IX B|   50|
| Putu| IX A|   70|
|Kadek| IX C|   40|
+-----+-----+-----+

+-----+-----+-----+--------+
| Name|Class|Score|Nilai Up|
+-----+-----+-----+--------+
| Budi| IX A|   90|     100|
| Ando| IX B|   50|      60|
| Putu| IX A|   70|      80|
|Kadek| IX C|   40|      50|
+-----+-----+-----+--------+



In [7]:
# Importing all of the neccesary modules
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Reading the raw data 
df_singkatan = spark.read.csv("/content/drive/MyDrive/SampleData/dataset_singkatan_csv.csv", header=True)
df_singkatan.show(truncate=False)

+---------+--------------------+
|singkatan|provinsi            |
+---------+--------------------+
|sumut    |Sumatra Utara       |
|sumbar   |Sumatra Barat       |
|kepri    |Kepulauan Riau      |
|sumsel   |Sumatra Selatan     |
|babel    |Bangka Belitung     |
|jabar    |Jawa Barat          |
|jateng   |Jawa Tengah         |
|diy      |DI Yogyakarta       |
|jatim    |Jawa Timur          |
|ntb      |Nusa Tenggara Barat |
|ntt      |Nusa Tenggara Tinggi|
|kalbar   |Kalimantan Barat    |
|kalteng  |Kalimantan Tengah   |
|kalsel   |Kalimantan Selatan  |
|kaltim   |Kalimantan Timur    |
|kaltara  |Kalimantan Utara    |
|sulbar   |Sulawesi Barat      |
|sulsel   |Sulawesi Selatan    |
|sultra   |Sulawesi Tenggara   |
|sulteng  |Sulawesi Tengah     |
+---------+--------------------+
only showing top 20 rows



In [8]:
####### Dataframe operation with SQL syntax #########
schema_nilai_ipa = StructType([
    StructField("Nama", StringType(), False),
    StructField("Kelas", StringType(), False),
    StructField("Nilai", IntegerType(), False)
])
nilai_ipa = [
    ["Budi", "IX A", 90],
    ["Putu", "IX A", 70],
    ["Ando", "IX B", 50],
    ["Kadek", "IX C", 40]
]
df_nilai_ipa = spark.createDataFrame(nilai_ipa, schema=schema_nilai_ipa)
print("Data original :")
df_nilai_ipa.show()

# Memfilter data (Menggunakan API)
print("Menampilkan data siswa kelas IX A : ")
df_nilai_ipa.filter(df_nilai_ipa["Kelas"] == "IX A").show()

# Memfilter data (Menggunakan Query SQL)
# First, we have to create a temporary view (virtual table limited to the current session)
df_nilai_ipa.createOrReplaceTempView("tb_nilai_ipa")
# Then, we could use the temporary view we've created as a target for our queries.
# Remember a view doesn't hold any actual data as it's not a physical table.
# The sql() method returns a dataframe. That's why we can use the show() method on it.
print("Menampilkan data siswa kelas IX C : ")
spark.sql('SELECT * from tb_nilai_ipa WHERE Kelas = "IX C"').show()

Data original :
+-----+-----+-----+
| Nama|Kelas|Nilai|
+-----+-----+-----+
| Budi| IX A|   90|
| Putu| IX A|   70|
| Ando| IX B|   50|
|Kadek| IX C|   40|
+-----+-----+-----+

Menampilkan data siswa kelas IX A : 
+----+-----+-----+
|Nama|Kelas|Nilai|
+----+-----+-----+
|Budi| IX A|   90|
|Putu| IX A|   70|
+----+-----+-----+

Menampilkan data siswa kelas IX C : 
+-----+-----+-----+
| Nama|Kelas|Nilai|
+-----+-----+-----+
|Kadek| IX C|   40|
+-----+-----+-----+



In [None]:
# Mengimpor modul-modul yang diperlukan
from pyspark.sql.types import *
from pyspark.sql import SparkSession
# Menginstansiasi objek SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Membuat skema untuk dataframe
schema_nilai_ipa = StructType([
 StructField("Nama", StringType(), False),
 StructField("Kelas", StringType(), False),
 StructField("Nilai", IntegerType(), False)
])
# Membuat nested list yang nantinya akan di-load ke dataframe
nilai_ipa = [
 ["Budi", "IX A", 90],
 ["Putu", "IX A", 70],
 ["Ando", "IX B", 50],
 ["Kadek", "IX C", 40]
]
# Membuat dataframe menggunakan metode createDataFrame()
df_nilai_ipa = spark.createDataFrame(nilai_ipa, schema=schema_nilai_ipa)
# Menampilkan isi dataframe menggunakan metode show()
print("Data Original :")
df_nilai_ipa.show()
# Memfilter data (Menggunakan API)
print("Menampilkan data siswa kelas IX A : ")
df_nilai_ipa.filter(df_nilai_ipa["Kelas"] == "IX A").show()
# Memfilter data (Menggunakan Query SQL)
# Pertama-tama kita harus membuat sebuah temporary view sebagai target query SQL.
df_nilai_ipa.createOrReplaceTempView("tb_nilai_ipa")
# Selanjutnya, kita dapat menggunakan metode sql() untuk menjalankan query SQL pada temporary view yang telah dibuat.
print("Menampilkan data siswa kelas IX C : ")
spark.sql('SELECT * from tb_nilai_ipa WHERE Kelas = "IX C"').show()

# RDDs Exploration 🐄

In [None]:
####### Creating an RDD #########
# Setting up the configuration for SparkContext
from pyspark import SparkContext, SparkConf
spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[*]")
sc = SparkContext.getOrCreate(spark_conf)
# Creating a list that contains number from 1 - 100.000
num = list(range(1, 100000))
# Creating an RDD from a list using the sc.parallelize() method
num_rdd = sc.parallelize(num)

num_rdd_1 = num_rdd.map(lambda x: x * 2)
# Checking whether the RDD is succesfully created.
num_rdd_1.collect()

In [None]:
# Applying a mapping function to each element of num_rdd
quadrupled_num_rdd = num_rdd.map(lambda x: x * 4)
# Showing the content of num_rdd using the rdd.collect() method.
quadrupled_num_rdd.collect()


[4, 8, 12, 16, 20, 24, 28, 32, 36]