# 2. Spark Basics with PySpark

In this notebook, we’ll learn the **building blocks** of Spark using **PySpark**:

1. **SparkSession**: The entry point to Spark.
2. **RDD (Resilient Distributed Datasets)**: The lower-level abstraction.
3. **DataFrames**: The higher-level, SQL-like abstraction.
4. **Transformations and Actions**: The functional operations that drive Spark computations.
5. **Partitioning and Persistence**: How Spark manages data distribution.

By the end, you’ll understand the basics of how PySpark organizes and processes data at scale.


## 1. Creating a SparkSession

The **SparkSession** is Spark’s main entry point in **Spark 2.0+**. It allows you to create DataFrames, register DataFrame tables, execute SQL queries, and read from external data sources.

**Additional Explanation**

Before Spark 2.0, you would initialize Spark with separate entries (e.g., `SparkContext`, `SQLContext`, `HiveContext`). The `SparkSession` conveniently unified these contexts into a single entry point. This consolidation means you can seamlessly switch between DataFrame operations, SQL queries, and lower-level RDD manipulations without re-initializing or juggling multiple contexts.

When you build your SparkSession, you can also configure:
- **Master URL** (e.g., `local[*]`, `spark://...`) to specify the cluster manager.
- **Config options** like memory usage, shuffle partitions, or log level.

For example, to set the log level lower (for less verbose output) and specify 4 local cores:
```python
spark = SparkSession.builder \
    .appName("SparkBasics") \
    .master("local[4]") \
    .config("spark.ui.showConsoleProgress", "false") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
```

Below, we create a SparkSession named `SparkBasics`:


In [None]:
from pyspark.sql import SparkSession

# Create or get a Spark session
spark = SparkSession.builder \
    .appName("SparkBasics") \
    .getOrCreate()

print("SparkSession created!")

> **Note**: If you’re running this locally, ensure `SPARK_HOME` is set correctly and `pyspark` is installed. On a cloud environment or in a notebook like Databricks, a SparkSession may already be created for you.

## 2. Understanding RDDs (Resilient Distributed Datasets)

RDDs are the **lower-level** abstraction in Spark, representing a distributed collection of items. They provide **fault tolerance** (resilience) and can be operated on in parallel across a cluster. Nowadays, you’ll mostly use **DataFrames**, but RDDs are still useful for specialized or custom operations.

### 2.1 Creating RDDs

You can create an RDD in several ways:
1. **Parallelize** a local collection (e.g., a Python list).
2. **Load** external data (text files, CSV, etc.) via `sparkContext.textFile(...)`.

Below, we create an RDD from a local list:


In [None]:
# Create an RDD by parallelizing a local list
data_list = ["apple", "banana", "cherry", "date"]
rdd = spark.sparkContext.parallelize(data_list)
print("RDD count:", rdd.count())
print("RDD sample:", rdd.take(2))  # take(2) fetches first 2 elements

**Additional Explanation**

RDDs are the fundamental data structure at the heart of Spark’s distributed engine. Each RDD is conceptually an immutable collection of data partitioned across nodes in the cluster. Spark automatically tracks the **lineage** of each RDD—that is, the sequence of operations used to create it—making it easy to recompute partitions if some node fails.

You can also create RDDs from:
- **Text files**:
  ```python
  text_rdd = spark.sparkContext.textFile("path/to/myfile.txt")
  ```
- **Whole directories** of data:
  ```python
  large_rdd = spark.sparkContext.textFile("hdfs://path/to/huge-dataset/*")
  ```

When working locally, you can load files from your machine’s filesystem. For cluster deployments, you typically load from HDFS, S3, or another distributed store.

### 2.2 Transformations and Actions on RDDs

Spark’s **transformations** return a new RDD (they’re **lazy**), while **actions** trigger execution and return a value (materializing the result). Common transformations include `map()`, `filter()`, and `flatMap()`. Common actions are `collect()`, `count()`, `reduce()`, etc.

Below, we apply transformations to an RDD and then use an action:


In [None]:
# RDD Transformations
mapped_rdd = rdd.map(lambda x: x.upper())   # map() transforms each element
filtered_rdd = mapped_rdd.filter(lambda x: x.startswith("B"))  # filter() keeps certain elements

# RDD Action
result = filtered_rdd.collect()  # collect() returns all elements to the driver

print("Transformed RDD Result:", result)

**Additional Explanation**

- **Transformations**: Create a *new* RDD by defining how each record is mapped from the parent RDD. Examples include:
  - `map()`: Apply a function to each element.
  - `filter()`: Keep only elements passing a predicate.
  - `flatMap()`: Similar to `map()` but allows splitting elements into multiple outputs.
- **Actions**: Trigger execution and return a value. Examples include:
  - `reduce(func)`: Combine elements using a user-specified function that operates on two items at a time.
  - `count()`: Return the number of elements in the RDD.
  - `first()`, `take(n)`: Retrieve elements to the driver program.

## 3. DataFrames: The Higher-Level Abstraction

DataFrames build on top of RDDs and provide a **relational** view of data, with named columns and powerful optimizations via the **Catalyst** query optimizer. They’re generally **faster** and **easier to use** for most big data tasks.

### 3.1 Creating a DataFrame
You can create a DataFrame from:
- **Python lists** (small data) or RDDs.
- **External data sources** (CSV, JSON, Parquet, etc.).

Here’s a simple example from a Python list of tuples:


In [None]:
sample_data = [
    ("Alice", 29, "Engineer"),
    ("Bob",   35, "Doctor"),
    ("Cathy", 25, "Artist")
]

columns = ["Name", "Age", "Occupation"]
df = spark.createDataFrame(sample_data, columns)

df.show()

**Additional Explanation**

While you can create DataFrames from local data, real-world use often comes from:
1. **Reading CSV/JSON/Parquet**:
   ```python
   df_csv = spark.read.csv("path/to/data.csv", header=True, inferSchema=True)
   df_parquet = spark.read.parquet("path/to/data.parquet")
   ```
2. **SQL Tables** (via JDBC/ODBC or Hive Metastore):
   ```python
   jdbc_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://hostname/db") \
    .option("dbtable", "tablename") \
    .option("user", "username") \
    .option("password", "secret") \
    .load()
   ```

DataFrames also enable quick ETL (Extract, Transform, Load) patterns if you need to combine multiple datasets. By specifying the schema or letting Spark infer it, you keep metadata about column names, types, etc.

Under the hood, Spark still uses RDDs for low-level operations. But for day-to-day usage, DataFrames are often more concise and more performant thanks to the Catalyst Optimizer.

### 3.2 Inspecting DataFrames
Use methods like `.show()`, `.describe()`, and `.printSchema()` to explore DataFrames:


In [None]:
# Print schema
df.printSchema()

# Summaries
df.describe().show()

**Additional Explanation**

You can also use:
- `df.columns` to get a list of column names.
- `df.dtypes` to see columns and their Spark data types.
- `df.head(n)` or `df.take(n)` to return the first `n` rows locally (similar to `.show(n)` but returns a list of Row objects).

**Example**:
```python
print("Columns:", df.columns)
print("Schema (dtypes):", df.dtypes)
for row in df.head(2):
    print(row)
```

These methods make it easy to quickly inspect your DataFrame structure and sample data.

### 3.3 DataFrame Operations
DataFrames support a wide variety of operations, including **selecting columns**, **filtering**, **grouping**, and **aggregation**. Many of these are similar to SQL queries.

Below is an example:


In [None]:
# Select columns
df.select("Name", "Age").show()

# Filter rows
df.filter(df.Age > 30).show()

# Group + Agg
df.groupBy("Occupation").count().show()

**Additional Explanation**

- **Column Expressions**: You can reference columns via `df.colName` or using Spark’s `F` (functions) library:
  ```python
  from pyspark.sql.functions import col, lower, upper
  new_df = df.select(upper(col("Name")).alias("NAME_UPPER"))
  new_df.show()
  ```
- **User-Defined Functions (UDFs)**: For more complex transformations, define UDFs:
    ```python
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType

    def greet(name):
        return f"Hello, {name}!"

    greet_udf = udf(greet, StringType())
    df.select("Name", greet_udf(col("Name")).alias("Greeting")).show()
    ```
> **Note**: However, keep in mind that UDFs can be slower than built-in Spark functions, since they bypass many of Spark’s optimizations.

- **Joins**: DataFrames support `inner`, `left`, `right`, `full`, and `cross` joins. Example:
    ```python
    df1.join(df2, df1.id == df2.id, "inner").show()
    ```
Use the join mode that suits your data relationship to handle missing or unmatched records appropriately.

## 4. Transformations and Actions on DataFrames

While RDDs have functional transformations, DataFrames offer a more **SQL-like** syntax for transformations. Actions (like `.show()`) will execute the query plan.

> **Example**: Filtering rows, creating new columns, or performing aggregations are transformations. Calling `.collect()` or `.count()` is an action.

**Additional Explanation**

- **Transformations**: Spark builds a logical plan describing how to transform data. For instance:
  - `select()`, `filter()`, `withColumn()`, `groupBy()`, `agg()`
- **Actions**: Evaluate the plan. Examples:
  - `collect()`, `count()`, `show()`, `head()`
  
**Catalyst Query Optimizer**  
Spark’s DataFrame operations are compiled down to an optimized plan thanks to the Catalyst optimizer. It can rearrange filters, push down predicates, and combine operations for efficiency. This is why using DataFrame APIs is typically faster than manual RDD manipulations for most SQL-like workflows.

**Example**:

In [None]:
df_filtered = df.filter(col("Age") > 30)
row_count = df_filtered.count()  # triggers execution
print(f"Number of records with Age > 30: {row_count}")

## 5. Partitioning and Persistence

Spark automatically partitions DataFrames/RDDs across the cluster. On a **single machine** or small environment, you might not notice it as much, but in a **distributed** setting, partitioning is crucial for parallelism.

You can **cache** or **persist** frequently accessed data to improve performance:


In [None]:
# Example of caching a DataFrame
df.cache()  # or df.persist()
# Now subsequent actions on df will be faster if the data is reused

**Additional Explanation**

- **Partitioning**: Spark splits data into partitions which are processed in parallel across the cluster (or local CPU cores). If you have a large dataset, you can adjust the number of partitions:
  ```python
  df = df.repartition(8)  # Increase partition count
  ```
- Conversely, you can reduce partitions using coalesce() if you want fewer, bigger partitions. Partitioning affects shuffle performance and parallelism.

Persistence Levels: By default, `cache()` uses MEMORY_ONLY storage. You can choose other persistence levels:
- `MEMORY_AND_DISK`
- `MEMORY_ONLY_SER`
- `DISK_ONLY`

> Depending on data size and memory constraints, picking the right level is essential for performance. For instance, if your data is larger than available memory, `MEMORY_AND_DISK` can help avoid `OutOfMemoryError`.

In [None]:
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)  # or MEMORY_ONLY, MEMORY_ONLY_SER, etc.

## 6. Shutting Down Spark

When you’re finished, it’s good practice to **stop** the SparkSession (especially in scripts or local dev environments):


In [None]:
spark.stop()
print("Spark session stopped.")

**Additional Explanation**

When you call `spark.stop()`, Spark attempts to gracefully terminate all active jobs and release resources like executors and shuffle files. In production or cluster environments, you might schedule your Spark job to run, produce output, then stop once the job completes.

If you’re in a multi-notebook environment (like Databricks), be mindful that calling `stop()` can affect other notebooks sharing the same SparkSession. Some cluster managers automatically handle session lifecycles, so consult the platform’s documentation if you’re unsure.

---
## Summary

- **RDDs**: Low-level data abstraction, good for custom or specialized tasks.
- **DataFrames**: Higher-level relational abstraction, recommended for most analytics.
- **Transformations**: Lazy operations that define a computation plan.
- **Actions**: Trigger execution and return results.
- **Partitioning and Persistence**: Key to scalability and performance.

Now that you know the **basics of Spark with PySpark**, you can start using Spark to handle large datasets, run queries, and transform data quickly. In the next notebook, we’ll explore how to **interface** Spark with **pandas**, **numpy**, **scikit-learn**, and more!