# PySpark DataFrame Reference Guide

This notebook serves as a comprehensive reference for working with Spark DataFrames in PySpark.

## Table of Contents
1. [Spark Session Initialization](#spark-session)
2. [Creating DataFrames](#creating-dataframes)
3. [Reading Data from Files](#reading-files)
4. [DataFrame Operations](#dataframe-operations)
5. [Selecting and Filtering Data](#selecting-filtering)
6. [Aggregations and Grouping](#aggregations)
7. [Schema Definition](#schema)
8. [String Operations](#string-operations)
9. [Performance and Partitioning](#performance)

---

In [42]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


sc = spark.sparkContext
sc.setLogLevel("ERROR")

## 1. Spark Session Initialization <a name="spark-session"></a>

The SparkSession is the entry point for all Spark functionality. It's the first thing you need to create when working with Spark DataFrames.

**Key Points:**
- `SparkSession.builder` creates a new session or retrieves an existing one
- `appName()` sets the application name (visible in Spark UI)
- `config()` sets Spark configuration options
- `getOrCreate()` creates a new session or gets the existing one
- `setLogLevel()` controls logging verbosity (ERROR, WARN, INFO, DEBUG)

In [43]:
employees = [{"name": "John D.", "age": 30},
  {"name": "Alice G.", "age": 25},
  {"name": "Bob T.", "age": 35},
  {"name": "Eve A.", "age": 28}]

# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
df.show()

+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35|  Bob T.|
| 28|  Eve A.|
+---+--------+



## 2. Creating DataFrames <a name="creating-dataframes"></a>

### Method 1: From Python Collections

You can create DataFrames directly from Python lists, dictionaries, or tuples.

**Common Methods:**
- `spark.createDataFrame(data)` - Creates DataFrame from Python collections
- `df.show()` - Displays the DataFrame content
- `df.show(n, truncate=False)` - Shows n rows without truncating columns

In [44]:
r1 = df.collect()[0]
r1.name, r1.age

('John D.', 30)

### Accessing DataFrame Rows

**Key Methods:**
- `df.collect()` - Returns all rows as a list of Row objects (⚠️ Use with caution on large datasets!)
- `df.first()` - Returns the first row
- `df.take(n)` - Returns first n rows
- Row objects allow attribute-style access: `row.columnName`

In [45]:
avg_age = df.groupBy('name').avg('age')
avg_age.show()

+--------+--------+
|    name|avg(age)|
+--------+--------+
| John D.|    30.0|
|Alice G.|    25.0|
|  Bob T.|    35.0|
|  Eve A.|    28.0|
+--------+--------+



## 3. Aggregations and Grouping <a name="aggregations"></a>

### Basic Aggregation (Simple Method)

The simple `groupBy().avg()` syntax is quick but less flexible.

In [46]:
df.rdd.getNumPartitions()

12

## 4. Performance and Partitioning <a name="performance"></a>

### Understanding Partitions

Partitions determine how data is distributed across the cluster. Understanding partitioning is crucial for performance optimization.

**Key Concepts:**
- DataFrames are divided into partitions for parallel processing
- Number of partitions affects parallelism and performance
- Use `rdd.getNumPartitions()` to check partition count

In [47]:
avg_age.rdd.getNumPartitions()

1

In [48]:
from pyspark.sql.functions import avg

avg_age = df.groupBy('name').agg(avg('age').alias('average_age'))

avg_age.printSchema()

root
 |-- name: string (nullable = true)
 |-- average_age: double (nullable = true)



### Advanced Aggregation with agg()

The `agg()` method is more powerful and flexible than simple aggregation methods.

**Benefits:**
- Can apply multiple aggregations at once
- Allows custom column naming with `alias()`
- Works with any Spark SQL function
- More readable and maintainable code

**Common Aggregation Functions:**
- `F.avg()` - Average
- `F.count()` - Count
- `F.sum()` - Sum
- `F.min()` / `F.max()` - Min/Max
- `F.stddev()` - Standard deviation

In [49]:
avg_age.show()

+--------+-----------+
|    name|average_age|
+--------+-----------+
| John D.|       30.0|
|Alice G.|       25.0|
|  Bob T.|       35.0|
|  Eve A.|       28.0|
+--------+-----------+



In [50]:
df = spark.read.json("people.json")

df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



## 5. Reading Data from Files <a name="reading-files"></a>

### Reading JSON Files

Spark can automatically infer schema from JSON files.

**Common Options:**
- `spark.read.json(path)` - Reads JSON with schema inference
- `spark.read.json(path, schema=mySchema)` - Reads with explicit schema
- `df.printSchema()` - Displays the DataFrame schema in tree format

In [51]:
df.show(truncate=False)


+----+-------+
|age |name   |
+----+-------+
|NULL|Michael|
|30  |Andy   |
|19  |Justin |
+----+-------+



In [52]:
df['name']

Column<'name'>

## 6. Selecting and Filtering Data <a name="selecting-filtering"></a>

### Column Selection Methods

There are multiple ways to select columns in PySpark:

**Method 1:** Dictionary-style (returns Column object)
```python
df['columnName']
```

**Method 2:** Using F.col() (recommended for complex operations)
```python
F.col("columnName")
```

**Method 3:** Using select() with string (most common)
```python
df.select("columnName")
```

In [53]:
from pyspark.sql import functions as F
df.select(F.col("name"))

DataFrame[name: string]

In [54]:
df_names = df.select("name")
df_names.show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [55]:
df_age = df.select("age")
df_age.show()

+----+
| age|
+----+
|NULL|
|  30|
|  19|
+----+



In [56]:
df_adults= df_age.where(F.col("age") >= 18)
df_adults.show()

+---+
|age|
+---+
| 30|
| 19|
+---+



### Filtering Data

**Filter Methods:**
- `df.where(condition)` - Filters rows based on condition
- `df.filter(condition)` - Same as where() (alternative syntax)

**Common Operators:**
- `==`, `!=` - Equality/Inequality
- `>`, `>=`, `<`, `<=` - Comparison
- `&` - AND (use with parentheses!)
- `|` - OR (use with parentheses!)
- `~` - NOT

In [57]:
# Example 1: Single aggregation with alias
avg_by_name = df.groupBy("name").agg(F.avg("age").alias("average_age"))
avg_by_name.show()

# Example 2: Multiple aggregations with aliases
stats_by_name = df.groupBy("name").agg(
    F.count("*").alias("count"),
    F.avg("age").alias("avg_age"),
    F.max("age").alias("max_age")
)
stats_by_name.show()

# Example 3: Global aggregations (no groupBy) with aliases
global_stats = df.agg(
    F.count("*").alias("total_people"),
    F.avg("age").alias("overall_avg_age")
)
global_stats.show()

+-------+-----------+
|   name|average_age|
+-------+-----------+
|Michael|       NULL|
|   Andy|       30.0|
| Justin|       19.0|
+-------+-----------+

+-------+-----+-------+-------+
|   name|count|avg_age|max_age|
+-------+-----+-------+-------+
|Michael|    1|   NULL|   NULL|
|   Andy|    1|   30.0|     30|
| Justin|    1|   19.0|     19|
+-------+-----+-------+-------+

+-------+-----+-------+-------+
|   name|count|avg_age|max_age|
+-------+-----+-------+-------+
|Michael|    1|   NULL|   NULL|
|   Andy|    1|   30.0|     30|
| Justin|    1|   19.0|     19|
+-------+-----+-------+-------+

+------------+---------------+
|total_people|overall_avg_age|
+------------+---------------+
|           3|           24.5|
+------------+---------------+

+------------+---------------+
|total_people|overall_avg_age|
+------------+---------------+
|           3|           24.5|
+------------+---------------+



In [58]:
df.unpersist()

DataFrame[age: bigint, name: string]

### Memory Management

**Important Methods:**
- `df.cache()` - Persists DataFrame in memory for reuse
- `df.persist(storageLevel)` - Persists with specific storage level
- `df.unpersist()` - Removes DataFrame from cache
- Always unpersist when done to free memory!

In [59]:
from pyspark.sql import types as T

my_schema = T.StructType([
    T.StructField("name", T.StringType(), nullable=False),
    T.StructField("age", T.IntegerType(), nullable=False)
])
df_csv = spark.read.csv("people.csv", schema=my_schema, header=True)

## 7. Schema Definition <a name="schema"></a>

### Explicit Schema for CSV Files

Defining explicit schemas is recommended for better performance and data quality.

**Benefits:**
- Faster read performance (no schema inference needed)
- Data type enforcement
- Better error handling
- Documentation of expected data structure

**Common Data Types:**
- `StringType()` - Text data
- `IntegerType()`, `LongType()` - Integer numbers
- `DoubleType()`, `FloatType()` - Decimal numbers
- `BooleanType()` - True/False
- `TimestampType()`, `DateType()` - Date/time
- `ArrayType()`, `MapType()` - Complex types

In [60]:
df_csv.show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 34|
|    Bob| 45|
|Charlie| 29|
|  Diana| 28|
+-------+---+



In [61]:
from pyspark.sql.functions import concat, concat_ws, lit

# Example 1: concat - concatenates columns without separator
df_concat = df_csv.select(
    "name",
    "age",
    concat(lit("Name: "), F.col("name"), lit(", Age: "), F.col("age").cast("string")).alias("concatenated")
)
df_concat.show(truncate=False)

# Example 2: concat_ws - concatenates columns with separator
df_concat_ws = df_csv.select(
    "name",
    "age",
    concat_ws(" - ", F.col("name"), F.col("age").cast("string")).alias("concat_with_separator")
)
df_concat_ws.show(truncate=False)

# Example 3: concat_ws with multiple columns and custom separator
df_concat_multi = df_csv.select(
    concat_ws(" | ", lit("Person"), F.col("name"), lit("is"), F.col("age").cast("string"), lit("years old")).alias("description")
)
df_concat_multi.show(truncate=False)

+-------+---+----------------------+
|name   |age|concatenated          |
+-------+---+----------------------+
|Alice  |34 |Name: Alice, Age: 34  |
|Bob    |45 |Name: Bob, Age: 45    |
|Charlie|29 |Name: Charlie, Age: 29|
|Diana  |28 |Name: Diana, Age: 28  |
+-------+---+----------------------+

+-------+---+---------------------+
|name   |age|concat_with_separator|
+-------+---+---------------------+
|Alice  |34 |Alice - 34           |
|Bob    |45 |Bob - 45             |
|Charlie|29 |Charlie - 29         |
|Diana  |28 |Diana - 28           |
+-------+---+---------------------+

+--------------------------------------+
|description                           |
+--------------------------------------+
|Person | Alice | is | 34 | years old  |
|Person | Bob | is | 45 | years old    |
|Person | Charlie | is | 29 | years old|
|Person | Diana | is | 28 | years old  |
+--------------------------------------+

+--------------------------------------+
|description                           |
+--

## 8. String Operations <a name="string-operations"></a>

### String Concatenation Functions

PySpark provides powerful string manipulation functions.

**concat() vs concat_ws():**
- `concat()` - Concatenates columns/literals without separator
- `concat_ws(sep, col1, col2, ...)` - Concatenates with separator
- `lit()` - Creates literal/constant value

**Important Notes:**
- Non-string columns must be cast to string: `.cast("string")`
- `concat_ws()` ignores null values
- Use `lit()` to add constant text

## 9. Additional Resources and Best Practices

### Best Practices

1. **Always use explicit schemas** when reading data in production
2. **Use `F.col()`** instead of string column references for complex operations
3. **Avoid `collect()`** on large datasets - use `take()` or `show()` instead
4. **Cache DataFrames** that are used multiple times
5. **Use column aliases** for better readability
6. **Monitor partition count** for optimal performance
7. **Set appropriate log levels** to reduce noise

### Common DataFrame Methods Reference

**Display & Inspection:**
- `df.show(n, truncate)` - Display data
- `df.printSchema()` - Show schema
- `df.columns` - List column names
- `df.dtypes` - Column names and types
- `df.describe()` - Statistical summary
- `df.count()` - Row count

**Transformations:**
- `df.select()` - Select columns
- `df.filter()` / `df.where()` - Filter rows
- `df.withColumn(name, column)` - Add/modify column
- `df.withColumnRenamed(old, new)` - Rename column
- `df.drop(column)` - Remove column
- `df.distinct()` - Remove duplicates
- `df.orderBy()` / `df.sort()` - Sort data

**Aggregations:**
- `df.groupBy().agg()` - Group and aggregate
- `df.agg()` - Global aggregation
- `df.groupBy().count()` - Count by group

**Joins:**
- `df1.join(df2, on, how)` - Join DataFrames
- Join types: "inner", "outer", "left", "right", "cross"

**I/O Operations:**
- `spark.read.csv()`, `.json()`, `.parquet()` - Read files
- `df.write.csv()`, `.json()`, `.parquet()` - Write files
- Options: `.mode("overwrite")`, `.mode("append")`

### Performance Tips

- **Partitioning:** Use `repartition()` or `coalesce()` to optimize partition count
- **Broadcasting:** Use `F.broadcast()` for small DataFrames in joins
- **Predicate Pushdown:** Filter early in your transformation chain
- **Column Pruning:** Select only needed columns early
- **Caching:** Cache intermediate results used multiple times

---

**Happy Spark Learning! 🚀**