In [2]:
from pyspark.sql import SparkSession

# Memulai SparkSession
spark = SparkSession.builder \
    .appName("Simple DataFrame") \
    .getOrCreate()


In [8]:
import findspark 

findspark.init()
findspark.find()
from pyspark.sql import SparkSession

# Create a SparkSession object
spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

# Use the SparkSession object to create a DataFrame
data = spark.createDataFrame([('James', 'Sales', 3000),
                              ('Michael', 'Sales', 4600),
                              ('Robert', 'Sales', 4100),
                              ('Maria', 'Finance', 3000)],
                             ['EmployeeName', 'Department', 'Salary'])
# Show the DataFrame
data.show()

+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
|       Maria|   Finance|  3000|
+------------+----------+------+



In [6]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



In [12]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, max, sum

# Create a SparkSession object
spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

# Use the SparkSession object to create a DataFrame
data = spark.createDataFrame([('James', 'Sales', 3000),
                              ('Michael', 'Sales', 4600),
                              ('Robert', 'Sales', 4100),
                              ('Maria', 'Finance', 3000)],
                             ['EmployeeName', 'Department', 'Salary'])

# Show the DataFrame
data.show()

# Filter data to only show employees in the Sales department
sales_department = data.filter(data.Department == 'Sales')
print("Sales Department Employees:")
sales_department.show()

# Select specific columns (EmployeeName and Salary)
selected_columns = data.select('EmployeeName', 'Salary')
print("Selected Columns (EmployeeName and Salary):")
selected_columns.show()

# Group data by Department and calculate aggregate functions (mean, max, sum)
aggregated_data = data.groupBy('Department').agg(
    mean('Salary').alias('AverageSalary'),
    max('Salary').alias('MaxSalary'),
    sum('Salary').alias('TotalSalary')
)

print("Aggregated Data by Department:")
aggregated_data.show()


+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
|       Maria|   Finance|  3000|
+------------+----------+------+

Sales Department Employees:
+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
+------------+----------+------+

Selected Columns (EmployeeName and Salary):
+------------+------+
|EmployeeName|Salary|
+------------+------+
|       James|  3000|
|     Michael|  4600|
|      Robert|  4100|
|       Maria|  3000|
+------------+------+

Aggregated Data by Department:
+----------+-------------+---------+-----------+
|Department|AverageSalary|MaxSalary|TotalSalary|
+----------+-------------+---------+-----------+
|     Sales|       3900.0|     4600|      11700|
|   Finance|       3000.0| 

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create Spark session
spark = SparkSession.builder.appName("ComplexTypes").getOrCreate()

# Create DataFrame with StructType
data = [(1, ("James", "Smith")), 
        (2, ("Anna", "Rose")),
        (3, ("Robert", "Williams"))]

schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StructType([
        StructField("FirstName", StringType(), True),
        StructField("LastName", StringType(), True)
    ]), True)
])

df = spark.createDataFrame(data, schema)
df.show(truncate=False)

# Accessing nested columns
df.select("Name.FirstName", "Name.LastName").show()


+---+------------------+
|ID |Name              |
+---+------------------+
|1  |{James, Smith}    |
|2  |{Anna, Rose}      |
|3  |{Robert, Williams}|
+---+------------------+

+---------+--------+
|FirstName|LastName|
+---------+--------+
|    James|   Smith|
|     Anna|    Rose|
|   Robert|Williams|
+---------+--------+



In [15]:
from pyspark.sql.types import ArrayType, StringType

# Create DataFrame with ArrayType
data = [(1, ["apple", "banana", "cherry"]),
        (2, ["orange", "grape"]),
        (3, ["watermelon"])]

schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Fruits", ArrayType(StringType()), True)
])

df = spark.createDataFrame(data, schema)
df.show(truncate=False)

# Explode array to rows
from pyspark.sql.functions import explode
df.select("ID", explode("Fruits").alias("Fruit")).show()


+---+-----------------------+
|ID |Fruits                 |
+---+-----------------------+
|1  |[apple, banana, cherry]|
|2  |[orange, grape]        |
|3  |[watermelon]           |
+---+-----------------------+

+---+----------+
| ID|     Fruit|
+---+----------+
|  1|     apple|
|  1|    banana|
|  1|    cherry|
|  2|    orange|
|  2|     grape|
|  3|watermelon|
+---+----------+



In [16]:
from pyspark.sql.types import MapType, StringType

# Create DataFrame with MapType
data = [(1, {"apple": "red", "banana": "yellow"}),
        (2, {"orange": "orange", "grape": "purple"})]

schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("FruitColors", MapType(StringType(), StringType()), True)
])

df = spark.createDataFrame(data, schema)
df.show(truncate=False)

# Access value by key from Map
df.select("ID", "FruitColors.apple", "FruitColors.banana").show()


+---+-----------------------------------+
|ID |FruitColors                        |
+---+-----------------------------------+
|1  |{banana -> yellow, apple -> red}   |
|2  |{orange -> orange, grape -> purple}|
+---+-----------------------------------+

+---+-----+------+
| ID|apple|banana|
+---+-----+------+
|  1|  red|yellow|
|  2| NULL|  NULL|
+---+-----+------+



In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum, rank

# Create Spark session
spark = SparkSession.builder.appName("WindowFunctions").getOrCreate()

# Sample data for sales
data = [("James", "Sales", 3000),
        ("Michael", "Sales", 4600),
        ("Robert", "Sales", 4100),
        ("Maria", "Finance", 3000),
        ("James", "Sales", 2000),
        ("Scott", "Finance", 3300),
        ("Jen", "Marketing", 3900),
        ("Jeff", "Marketing", 3000),
        ("Kumar", "Marketing", 2000)]

columns = ["EmployeeName", "Department", "Salary"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()


+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
|       Maria|   Finance|  3000|
|       James|     Sales|  2000|
|       Scott|   Finance|  3300|
|         Jen| Marketing|  3900|
|        Jeff| Marketing|  3000|
|       Kumar| Marketing|  2000|
+------------+----------+------+



In [18]:
from pyspark.sql.functions import sum

# Define the window specification
windowSpec = Window.partitionBy("Department").orderBy("Salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculate running total
df.withColumn("RunningTotal", sum(col("Salary")).over(windowSpec)).show()


+------------+----------+------+------------+
|EmployeeName|Department|Salary|RunningTotal|
+------------+----------+------+------------+
|       Maria|   Finance|  3000|        3000|
|       Scott|   Finance|  3300|        6300|
|       Kumar| Marketing|  2000|        2000|
|        Jeff| Marketing|  3000|        5000|
|         Jen| Marketing|  3900|        8900|
|       James|     Sales|  2000|        2000|
|       James|     Sales|  3000|        5000|
|      Robert|     Sales|  4100|        9100|
|     Michael|     Sales|  4600|       13700|
+------------+----------+------+------------+



In [19]:
from pyspark.sql.functions import rank

# Define the window specification for ranking
windowSpecRank = Window.partitionBy("Department").orderBy(col("Salary").desc())

# Calculate rank
df.withColumn("Rank", rank().over(windowSpecRank)).show()


+------------+----------+------+----+
|EmployeeName|Department|Salary|Rank|
+------------+----------+------+----+
|       Scott|   Finance|  3300|   1|
|       Maria|   Finance|  3000|   2|
|         Jen| Marketing|  3900|   1|
|        Jeff| Marketing|  3000|   2|
|       Kumar| Marketing|  2000|   3|
|     Michael|     Sales|  4600|   1|
|      Robert|     Sales|  4100|   2|
|       James|     Sales|  3000|   3|
|       James|     Sales|  2000|   4|
+------------+----------+------+----+



In [20]:
# Running total and rank together
df.withColumn("RunningTotal", sum(col("Salary")).over(windowSpec)) \
  .withColumn("Rank", rank().over(windowSpecRank)).show()


+------------+----------+------+------------+----+
|EmployeeName|Department|Salary|RunningTotal|Rank|
+------------+----------+------+------------+----+
|       Scott|   Finance|  3300|        6300|   1|
|       Maria|   Finance|  3000|        3000|   2|
|         Jen| Marketing|  3900|        8900|   1|
|        Jeff| Marketing|  3000|        5000|   2|
|       Kumar| Marketing|  2000|        2000|   3|
|     Michael|     Sales|  4600|       13700|   1|
|      Robert|     Sales|  4100|        9100|   2|
|       James|     Sales|  3000|        5000|   3|
|       James|     Sales|  2000|        2000|   4|
+------------+----------+------+------------+----+

