# SPARK

### What is a Spark DataFrame?
- A **DataFrame** is a distributed collection of data organized into named columns, similar to a table in a relational database.
- It is built on top of **RDDs (Resilient Distributed Datasets)** and provides a higher-level abstraction.
- It is Immutable, we always create a new DataFrame from existing DataFrame.

### Key Features
- **Schema**: DataFrames have a schema that defines the structure of the data, including column names and types.
- **Lazy Evaluation**: Transformations on DataFrames are lazily evaluated, meaning they are not computed until an action (like `show()` or `collect()`) is called.
- **Optimized Execution**: Spark uses a query optimizer to optimize the execution plan.

### Common Operations
- **Creating DataFrames**:
  ```python
  from pyspark.sql import SparkSession
  spark = SparkSession.builder.getOrCreate()
  df = spark.createDataFrame(data, schema)
  ```
  
### Spark Fundamental Operations
- **Transformation**
- **Action**

### Transformations
Transformations create a new DataFrame from an existing one. They are **lazy**, meaning they are not executed until an action is called.

1. **`select(*cols)`**: Selects specific columns from the DataFrame.
   ```python
   df.select("column1", "column2").show()
   ```

2. **`filter(condition)`**: Filters rows based on a specified condition.
   ```python
   df.filter(df.age > 21).show()
   ```

3. **`withColumn(colName, expr)`**: Adds a new column or replaces an existing one.
   ```python
   df.withColumn("new_col", df.age + 1).show()
   ```

4. **`drop(*cols)`**: Removes specified columns from the DataFrame.
   ```python
   df.drop("column1").show()
   ```

5. **`groupBy(*cols)`**: Groups the DataFrame using specified columns for aggregation.
   ```python
   df.groupBy("column1").count().show()
   ```

6. **`join(other, on, how)`**: Joins two DataFrames based on a common column.
   ```python
   df1.join(df2, "id", "inner").show()
   ```

7. **`distinct()`**: Returns a new DataFrame with distinct rows.
   ```python
   df.distinct().show()
   ```

8. **`orderBy(*cols)`**: Sorts the DataFrame by specified columns.
   ```python
   df.orderBy("column1").show()
   ```

9. **`union(other)`**: Combines two DataFrames with the same schema.
   ```python
   df1.union(df2).show()
   ```

10. **`sample(withReplacement, fraction)`**: Returns a sampled subset of the DataFrame.
    ```python
    df.sample(False, 0.1).show()
    ```

10. **`Coalesce()`**: Reduces the number of partitions in a DataFrame without shuffling data.
    ```python
    reduced_df = df.coalesce(2)  # Reduces to 2 partitions
    print(f"Number of partitions after coalesce: {reduced_df.rdd.getNumPartitions()}")
    ```

11. **`Repartition()`**: Reduces the number of partitions in a DataFrame without shuffling data.
    ```python
    increased_df = df.repartition(4)  # Increases to 4 partitions
    print(f"Number of partitions after repartition: {increased_df.rdd.getNumPartitions()}")
    ```

There are 2 types of transformations:
   - **`Narrow`** transformation
   - **`wide`** transformation

### 1. Narrow Transformations
These transformations do not require data to be shuffled across partitions.

- **map()**: Applies a function to each element of the RDD.
  ```python
  from pyspark.sql import SparkSession

  spark = SparkSession.builder.appName("example").getOrCreate()
  rdd = spark.sparkContext.parallelize([1, 2, 3, 4])
  result = rdd.map(lambda x: x * 2).collect()
  print(result)
  # Output: [2, 4, 6, 8]
  ```

- **filter()**: Returns a new RDD containing only the elements that satisfy a given condition.
  ```python
  result = rdd.filter(lambda x: x % 2 == 0).collect()
  print(result)
  # Output: [2, 4]
  ```

- **flatMap()**: Similar to map, but each input item can be mapped to zero or more output items (flattening the results).
  ```python
  rdd = spark.sparkContext.parallelize(["hello world", "apache spark"])
  result = rdd.flatMap(lambda line: line.split(" ")).collect()
  print(result)
  # Output: ['hello', 'world', 'apache', 'spark']
  ```

### 2. Wide Transformations
These transformations involve shuffling data across partitions.

- **groupByKey()**: Groups the data by key.
  ```python
  rdd = spark.sparkContext.parallelize([("a", 1), ("b", 2), ("a", 3)])
  result = rdd.groupByKey().mapValues(list).collect()
  print(result)
  # Output: [('a', [1, 3]), ('b', )]
  ```

- **reduceByKey()**: Combines values with the same key using a specified associative function.
  ```python
  result = rdd.reduceByKey(lambda x, y: x + y).collect()
  print(result)
  # Output: [('a', 4), ('b', 2)]
  ```

- **join()**: Joins two RDDs by their keys.
  ```python
  rdd1 = spark.sparkContext.parallelize([("a", 1), ("b", 2)])
  rdd2 = spark.sparkContext.parallelize([("a", 3), ("b", 4)])
  result = rdd1.join(rdd2).collect()
  print(result)
  # Output: [('a', (1, 3)), ('b', (2, 4))]
  ```

### Actions
Actions trigger the execution of transformations and return a result to the driver program.

1. **`show(n)`**: Displays the first `n` rows of the DataFrame.
   ```python
   df.show(5) # when you want a quick visual overview of the data
   ```

2. **`head(n)`**: Returns the first `n` rows of the DataFrame as a list of Row.
   ```python
   df.head(5) # when you need to work with the data programmatically after retrieving it
   ```

3. **`count()`**: Returns the number of rows in the DataFrame.
   ```python
   df.count()
   ```

4. **`collect()`**: Returns all rows as a list to the driver.
   ```python
   data = df.collect()
   ```

5. **`first()`**: Returns the first row of the DataFrame.
   ```python
   first_row = df.first()
   ```

6. **`take(n)`**: Returns the first `n` rows as a list.
   ```python
   rows = df.take(5)
   ```

7. **`describe(*cols)`**: Provides summary statistics for numerical columns.
   ```python
   df.describe().show()
   ```

8. **`printSchema()`**: Displays the schema of the DataFrame.
   ```python
   df.printSchema()
   ```

9. **`read`**: Read data from various sources like CSV, JSON, Parquet, and more
   ```python
   df = spark.read.format("csv") \
      .option("header", "true") \
      .option("inferSchema", "true") \
      .load("/path/to/file.csv")
   df.show()
   ```

10. **`write.format("format").save(path)`**: Saves the DataFrame to a specified format (e.g., CSV, Parquet).
   ```python
   df.write.format("csv").save("output.csv")
   ```


### Summary
- **Transformations** are used to create new DataFrames and are executed lazily.
- **Actions** trigger the execution of transformations and return results.


### Useful Methods
- **Data Inspection**:
  - `df.printSchema()`: Prints the schema of the DataFrame.
  - `df.columns`: Lists all column names.
  - `df.dtypes`: Shows data types of each column.

- **Handling Missing Data**:
  - `df.dropna()`: Removes rows with null values.
  - `df.fillna(value)`: Replaces null values with a specified value.

### Example
Here's a simple example of creating and manipulating a DataFrame:
```python
data = [(1, "Alice"), (2, "Bob")]
df = spark.createDataFrame(data, ["id", "name"])
df.show()
```



### Execution Methods

#### Interactive Clients
- **`spark-shell`**: This is a powerful interactive shell that allows you to run Spark commands. It's great for testing and debugging your Spark applications.
  ```bash
    pyspark --master yarn --driver-memory 1G --executor-memory 1G --num-executors 2 --executor-cores 2
  ```
- **`Notebooks`**: Tools like Jupyter Notebooks or Apache Zeppelin provide an interactive environment where you can write and execute Spark code in various languages (Scala, Python, R). These are particularly useful for data exploration and visualization.

#### Submit Job
- **`spark-submit`**: This is the command-line tool used to submit Spark applications to a cluster. It allows you to run your application in a distributed environment, specifying various configurations like the master URL, application name, and resource allocation.
Here's a breakdown of its key components and usage:

### Basic Syntax
```bash
./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  --driver-memory <value> \
  --executor-memory <value> \
  --executor-cores <number> \
  <application-jar> \
  [application-arguments]
```

### Key Options
- **--class**: The entry point for your application (e.g., `org.apache.spark.examples.SparkPi`).
- **--master**: The master URL for the cluster (e.g., `spark://23.195.26.187:7077`)(Default: local[*]).
- **--deploy-mode**: Specifies whether to deploy your driver on the worker nodes (`cluster`) or locally as an external client (`client`).
- **--conf**: Arbitrary Spark configuration properties in `key=value` format.
- **--driver-memory**: Amount of memory to use for the driver process (e.g., `4g`, Default `1g`).
- **--executor-memory**: Amount of memory to use per executor process (e.g., `2g`).
- **--executor-cores**: Number of cores to use on each executor.
- **<application-jar>**: Path to the bundled jar including your application and all dependencies.
- **[application-arguments]**: Arguments passed to the main method of your main class, if any.

### Deployment Modes
- **`Client Mode`**: The driver runs on the machine where `spark-submit` is executed. This mode is suitable for interactive applications.
- **`Cluster Mode`**: The driver runs on one of the worker nodes in the cluster. This mode is suitable for production jobs.

### Example Usage
```bash
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://23.195.26.187:7077 \
  --deploy-mode cluster \
  --driver-memory 4g \
  --executor-memory 2g \
  --executor-cores 2 \
  /path/to/your-application.jar \
  1000
```

This command submits a Spark application that calculates Pi using the `SparkPi` example class. The application runs in cluster mode with specified memory and core configurations(https://spark.apache.org/docs/latest/submitting-applications.html).


### Spark Distributed Processing Model
Once a Spark job is submitted, several key steps occur to ensure the job is executed efficiently across the cluster. Here's a detailed breakdown of the process:

### 1. Job Submission
- **Driver Program**: The `spark-submit` command launches the driver program, which is the main entry point of your Spark application. The driver is responsible for converting the user code into a logical execution plan and requesting resources from the cluster manager.

### 2. Resource Allocation
- **Cluster Manager**: The driver communicates with the cluster manager (e.g., YARN, Mesos, Kubernetes, or Standalone) to request resources. The cluster manager allocates resources (CPU, memory) for the executors on the worker nodes

### 3. Task Scheduling
- **DAG Scheduler**: The driver converts the user code into a logical execution plan, whihc is represented as a Directed Acyclic Graph (DAG) of stages based on the transformations and actions in the job. The DAG scheduler divides the job into stages and submits them to the task scheduler(https://sparkbyexamples.com/spark/what-is-spark-stage/).
- **Task Scheduler**: The task scheduler assigns tasks to executors based on data locality and resource availability. It ensures tasks are executed efficiently across the cluster(https://spark.apache.org/docs/latest/job-scheduling.html).

### 4. Task Execution
- **Executors**: Executors are launched on the worker nodes to run the tasks. They perform the computations and store intermediate data in memory or on disk. Executors communicate with the driver to report the status of tasks and data.

### 5. Result Collection
- **Driver Program**: The driver collects the results from the executors. Depending on the action performed (e.g., `collect`, `count`), the results are either returned to the driver or written to an external storage system.

### 6. Job Completion
- **Resource Cleanup**: Once the job is completed, the driver program terminates, and the resources allocated by the cluster manager are released. Executors are shut down, and any temporary data stored on the worker nodes is cleaned up

### Example Workflow
1. **Submit Job**: User submits the job using `spark-submit`.
2. **Driver Initialization**: Driver program starts and requests resources.
3. **DAG Creation**: Driver constructs the DAG of stages.
4. **Task Scheduling**: Tasks are scheduled and sent to executors.
5. **Task Execution**: Executors run the tasks and perform computations.
6. **Result Collection**: Results are collected and returned to the driver.
7. **Job Completion**: Resources are released, and the job is marked as complete.

This process ensures that Spark can efficiently process large datasets by distributing tasks across multiple nodes and leveraging in-memory computation.


Source:
- The Internal Working of Apache Spark | Analytics Vidhya. https://www.analyticsvidhya.com/blog/2021/08/understand-the-internal-working-of-apache-spark/.
- What is Spark Stage? Explained - Spark By Examples. https://sparkbyexamples.com/spark/what-is-spark-stage/.
- Job Scheduling - Spark 3.5.2 Documentation - Apache Spark. https://spark.apache.org/docs/latest/job-scheduling.html.

### Status of Job

To check the status of running jobs and view details of completed jobs in Apache Spark, you can use several tools and interfaces. Here are the main methods:

### 1. Spark Web UI
The Spark Web UI provides a comprehensive view of the status and details of Spark jobs. It includes several tabs to monitor different aspects of your Spark application:

- **Jobs Tab**: Displays a summary of all jobs, including their status (running, succeeded, failed), duration, and progress. Clicking on a job provides detailed information such as the event timeline, DAG visualization, and stages of the job¹(https://spark.apache.org/docs/latest/web-ui.html).
- **Stages Tab**: Shows the current state of all stages in the Spark application, including active, pending, completed, skipped, and failed stages.
- **Storage Tab**: Provides information about RDDs and DataFrames that are cached.
- **Environment Tab**: Displays Spark configuration properties.
- **Executors Tab**: Shows details about the executors, including memory and CPU usage.
- **SQL Tab**: Provides metrics for SQL queries if you are using Spark SQL.

### 2. Spark History Server
The Spark History Server allows you to view the details of completed Spark applications. It provides similar information to the Spark Web UI but for past jobs. You can start the History Server using the following command:
```bash
./sbin/start-history-server.sh
```
Ensure that the event logs are enabled in your Spark configuration:
```bash
spark.eventLog.enabled true
spark.eventLog.dir hdfs:///path/to/event/logs
```