# Python `PySpark` Module: Methods, Functions, and Examples

This notebook provides detailed notes, explanations, and examples for working with Apache Spark using PySpark. Each cell covers a specific concept or function.

## Introduction to `PySpark`

PySpark is the Python API for Apache Spark, a fast and general-purpose cluster computing system. It provides high-level APIs for distributed computing and machine learning.

## Importing PySpark

First, we need to import the necessary PySpark modules and create a SparkSession, which is the entry point for PySpark functionality.

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("PySpark Examples") \
    .getOrCreate()

print("Spark session created successfully")

## Creating DataFrames

PySpark DataFrames are distributed collections of data organized into named columns. Let's create a DataFrame from a simple list of data.

In [None]:
# Create a DataFrame from a list of data
data = [("John", 30, "New York"),
        ("Anna", 25, "San Francisco"),
        ("Peter", 35, "Chicago")]
columns = ["name", "age", "city"]

df = spark.createDataFrame(data, columns)
print("DataFrame Schema:")
df.printSchema()
print("\nDataFrame Content:")
df.show()

## Basic DataFrame Operations

Let's explore common DataFrame operations like selecting columns, filtering rows, and adding new columns.

In [None]:
from pyspark.sql.functions import col, concat, lit

# Select specific columns
print("Select specific columns:")
df.select("name", "age").show()

# Filter rows
print("\nFilter people older than 30:")
df.filter(col("age") > 30).show()

# Add new column
print("\nAdd greeting column:")
df_with_greeting = df.withColumn("greeting", 
    concat(lit("Hello, "), col("name")))
df_with_greeting.show()

## Aggregations and Grouping

PySpark provides powerful functions for aggregating and grouping data.

In [None]:
from pyspark.sql.functions import avg, count

# Create a new DataFrame with more data
data_agg = [("NY", "Sales", 1000),
            ("NY", "Sales", 2000),
            ("SF", "Sales", 3000),
            ("SF", "HR", 1500),
            ("CH", "HR", 2500)]
df_agg = spark.createDataFrame(data_agg, ["city", "dept", "salary"])

# Group by and aggregate
print("Average salary by city:")
df_agg.groupBy("city").agg(
    avg("salary").alias("avg_salary"),
    count("*").alias("count")
).show()

# Group by multiple columns
print("\nCount by city and department:")
df_agg.groupBy("city", "dept").count().show()

## Window Functions

Window functions perform calculations across a set of rows related to the current row.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank

# Create window specification
windowSpec = Window.partitionBy("dept").orderBy("salary")

# Add rank and dense_rank
df_with_rank = df_agg.withColumn("rank", rank().over(windowSpec)) \
    .withColumn("dense_rank", dense_rank().over(windowSpec))

print("Rankings within departments:")
df_with_rank.show()

## Joining DataFrames

PySpark supports different types of joins between DataFrames.

In [None]:
# Create two DataFrames for joining
employee_data = [("001", "John"), ("002", "Anna"), ("003", "Bob")]
department_data = [("001", "Sales"), ("002", "HR"), ("004", "IT")]

df_employees = spark.createDataFrame(employee_data, ["id", "name"])
df_departments = spark.createDataFrame(department_data, ["id", "department"])

# Inner join
print("Inner Join:")
df_employees.join(df_departments, "id", "inner").show()

# Left outer join
print("\nLeft Outer Join:")
df_employees.join(df_departments, "id", "left").show()

# Right outer join
print("\nRight Outer Join:")
df_employees.join(df_departments, "id", "right").show()

## User-Defined Functions (UDFs)

Create custom functions to transform data in PySpark DataFrames.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a Python function
def make_greeting(name):
    return f"Hello, {name}!"

# Register as UDF
greeting_udf = udf(make_greeting, StringType())

# Apply UDF to DataFrame
df_with_udf = df_employees.withColumn("greeting", greeting_udf(col("name")))
print("DataFrame with UDF applied:")
df_with_udf.show()

## Summary

PySpark offers powerful distributed computing capabilities:
- DataFrame operations for structured data
- SQL-like operations and aggregations
- Window functions for advanced analytics
- Joining multiple datasets
- Custom transformations with UDFs

Clean up the SparkSession when done:

In [None]:
# Stop the SparkSession
spark.stop()
print("SparkSession stopped")