In [None]:
pip install pyspark



In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MyApplication") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
data = [("Alice", 34, "HR"), ("Bob", 45, "Tech"), ("Charlie", 29, "HR")]
df = spark.createDataFrame(data, ["name", "age", "department"])

# # From CSV file
# df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)

# # From Parquet
# df = spark.read.parquet("path/to/file.parquet")

# # From JSON
# df = spark.read.json("path/to/file.json")

In [None]:
df.show()
df.printSchema()

df.filter(df["age"] > 30).show()
df.filter("age > 30").show()  # Using SQL expression

+-------+---+----------+
|   name|age|department|
+-------+---+----------+
|  Alice| 34|        HR|
|    Bob| 45|      Tech|
|Charlie| 29|        HR|
+-------+---+----------+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- department: string (nullable = true)

+-----+---+----------+
| name|age|department|
+-----+---+----------+
|Alice| 34|        HR|
|  Bob| 45|      Tech|
+-----+---+----------+

+-----+---+----------+
| name|age|department|
+-----+---+----------+
|Alice| 34|        HR|
|  Bob| 45|      Tech|
+-----+---+----------+



In [None]:
# Count rows
df.count()

df.select("age").distinct().show()
df.sort(df["age"].desc()).show()

# Limit
df.limit(5).show()

+---+
|age|
+---+
| 34|
| 29|
| 45|
+---+

+-------+---+----------+
|   name|age|department|
+-------+---+----------+
|    Bob| 45|      Tech|
|  Alice| 34|        HR|
|Charlie| 29|        HR|
+-------+---+----------+

+-------+---+----------+
|   name|age|department|
+-------+---+----------+
|  Alice| 34|        HR|
|    Bob| 45|      Tech|
|Charlie| 29|        HR|
+-------+---+----------+



Aggregation and Grouping

In [None]:
from pyspark.sql import functions as F

df.select(F.max("age"), F.min("age"), F.avg("age")).show()


+--------+--------+--------+
|max(age)|min(age)|avg(age)|
+--------+--------+--------+
|      45|      29|    36.0|
+--------+--------+--------+



# Aggregation functions

In [None]:
from pyspark.sql import functions as F

# Aggregations
df.select(F.max("age"), F.min("age"), F.avg("age")).show()

# GroupBy
df.groupBy("department").count().show()
df.groupBy("department").agg(F.avg("salary").alias("avg_salary")).show()

# Window functions
from pyspark.sql.window import Window
window_spec = Window.partitionBy("department").orderBy("salary")
df.withColumn("rank", F.rank().over(window_spec)).show()

# Joining Dataframes

In [None]:
# Inner join
employees.join(departments, "department_id").show()

# Left outer join
employees.join(departments, "department_id", "left").show()

# Using different column names
employees.join(departments,
              employees["dept_id"] == departments["id"]).show()

# Multiple join conditions
df1.join(df2,
        (df1["id"] == df2["id"]) & (df1["type"] == df2["type"])).show()

In [None]:
# Register DataFrame as a temporary view
df.createOrReplaceTempView("people")

# Run SQL queries
result = spark.sql("SELECT * FROM people WHERE age > 30")
result.show()

# Register as a global temporary view
df.createGlobalTempView("people")

# Access global temporary view
spark.sql("SELECT * FROM global_temp.people").show()

+-----+---+----------+
| name|age|department|
+-----+---+----------+
|Alice| 34|        HR|
|  Bob| 45|      Tech|
+-----+---+----------+

+-------+---+----------+
|   name|age|department|
+-------+---+----------+
|  Alice| 34|        HR|
|    Bob| 45|      Tech|
|Charlie| 29|        HR|
+-------+---+----------+



# Data Sources and Sinks

In [None]:
# CSV
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("path/to/file.csv")

# JSON
df = spark.read.json("path/to/file.json")

# Parquet
df = spark.read.parquet("path/to/directory")

# ORC
df = spark.read.orc("path/to/file.orc")

# JDBC
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.table") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

In [None]:
# Parquet (default)
df.write.save("path/to/output")

# Specific format
df.write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("path/to/output")

# Available modes: append, overwrite, ignore, error

# Partitioning
df.write.partitionBy("year", "month") \
    .format("parquet") \
    .save("path/to/output")

# Structed Streaming

In [None]:
# Create streaming DataFrame from socket source
streaming_df = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Process data
word_counts = streaming_df \
    .selectExpr("explode(split(value, ' ')) as word") \
    .groupBy("word") \
    .count()

# Start the query
query = word_counts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Wait for the query to terminate
query.awaitTermination()

StreamingQueryException: [STREAM_FAILED] Query [id = 295882ce-5408-4e08-94a7-7104e54b294e, runId = 5d8ef8e9-3304-4e36-bef7-d063485c8dc4] terminated with exception: Connection refused (Connection refused)

In [None]:
# Emp Data & Schema

emp_data_1 = [
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female","51000","2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"]
]

emp_data_2 = [
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male","75000","2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

In [None]:
emp_df_1 = spark.createDataFrame(data=emp_data_1, schema=emp_schema)
emp_df_2 = spark.createDataFrame(data=emp_data_2, schema=emp_schema)


In [None]:
# Show emp dataframe (ACTION)

emp_df_1.show()
emp_df_2.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
+-----------+-------------+-------------+---+------+------+----------+

+----

In [None]:
emp_df_1.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: string (nullable = true)



In [None]:
# UNION and UNION ALL
# select * from emp_data_1 UNION select * from emp_df_2
emp = emp_df_1.unionAll(emp_df_2)

In [None]:
emp.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [None]:
from pyspark.sql.functions import desc, asc, col
emp_sorted = emp.orderBy(col("salary").asc())

In [None]:
emp_sorted.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        014|          107|    Emily Lee| 26|Female| 46000|2019-01-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        016|          107|  Kelly Zhang| 30|Female| 49000|2018-04-01|
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        018|          104|    Nancy Liu| 29|      | 50000|2017-06-01|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|
|        012|          105|   Susan Chen| 31|Female| 54000|2017-02-15|
|     

In [None]:
# Aggregation
# select dept_id, count(employee_id) as total_dept_count from emp_sorted group by dept_id
from pyspark.sql.functions import count

emp = emp_sorted.groupBy("department_id").agg(count("employee_id")).alias("total_dept_count")

In [None]:
emp.show()

+-------------+------------------+
|department_id|count(employee_id)|
+-------------+------------------+
|          101|                 3|
|          102|                 4|
|          103|                 4|
|          104|                 3|
|          107|                 2|
|          106|                 2|
|          105|                 2|
+-------------+------------------+



In [None]:
# Aggregation
# select dept_id, sum(salary) as total_dept_salary from emp_sorted group by dept_id
from pyspark.sql.functions import sum

emp_sum = emp_sorted.groupBy("department_id").agg(sum("salary").alias("total_dept_salary"))

In [None]:
emp_sum.show()

+-------------+-----------------+
|department_id|total_dept_salary|
+-------------+-----------------+
|          101|         165000.0|
|          107|          95000.0|
|          104|         162000.0|
|          102|         207000.0|
|          103|         232000.0|
|          106|         138000.0|
|          105|         111000.0|
+-------------+-----------------+



In [None]:
# Aggregation with having clause
# select dept_id, avg(salary) as avg_dept_salary from emp_sorted  group by dept_id having avg(salary) > 50000
from pyspark.sql.functions import avg

emp_avg = emp_sorted.groupBy("department_id").agg(avg("salary").alias("avg_dept_salary")).where("avg_dept_salary > 50000")

In [None]:
emp_avg.show()

+-------------+---------------+
|department_id|avg_dept_salary|
+-------------+---------------+
|          101|        55000.0|
|          104|        54000.0|
|          102|        51750.0|
|          103|        58000.0|
|          106|        69000.0|
|          105|        55500.0|
+-------------+---------------+



In [None]:
# Bonus TIP - unionByName
# In case the column sequence is different
emp_df_2_other = emp_df_2.select("employee_id", "salary", "department_id", "name", "hire_date", "gender", "age")

emp_df_1.printSchema()
emp_df_2_other.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: string (nullable = true)

root
 |-- employee_id: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)



In [None]:
emp_fixed = emp_df_1.unionByName(emp_df_2_other)

In [None]:
emp_fixed.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [None]:
emp_fixed.count()

20