<a href="https://colab.research.google.com/github/blacktalenthubs/data-engineering-track/blob/main/week4_spark_python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Week 4: PySpark and DataFrame Operations

#### Topics Covered:

1. **Introduction to PySpark:**
   - **Definition:** PySpark is the Python API for Apache Spark, an open-source, distributed computing system. It enables data engineers to process large datasets across a distributed cluster of computers.
   - **Example:**
     ```python
     from pyspark.sql import SparkSession

     # Initialize Spark session
     spark = SparkSession.builder.appName("PySparkExample").getOrCreate()
     ```

2. **Setting up PySpark Environment:**
   - **Instructions:**
     - Install PySpark: `pip install pyspark`
     - Set up Spark session.
     - Example:
       ```python
       from pyspark.sql import SparkSession

       spark = SparkSession.builder \
           .appName("PySparkSetup") \
           .config("spark.some.config.option", "some-value") \
           .getOrCreate()
       ```

3. **Working with Spark DataFrames:**
   - **Creating DataFrames:**
     - Definition: DataFrames are distributed collections of data organized into named columns.
     - Example:
       ```python
       from pyspark.sql import Row

       # Create DataFrame from a list of Rows
       data = [Row(name="Alice", age=29), Row(name="Bob", age=31)]
       df = spark.createDataFrame(data)
       df.show()
       ```

4. **DataFrame Operations:**
   - **Filtering:**
     - Example:
       ```python
       # Filter rows where age is greater than 30
       df.filter(df.age > 30).show()
       ```

   - **Aggregations:**
     - Example:
       ```python
       # Calculate average age
       df.groupBy().avg("age").show()
       ```

   - **Joins:**
     - Example:
       ```python
       # Example join operation
       df1 = spark.createDataFrame([Row(id=1, value="A"), Row(id=2, value="B")])
       df2 = spark.createDataFrame([Row(id=1, name="Alice"), Row(id=2, name="Bob")])
       df1.join(df2, df1.id == df2.id).show()
       ```

5. **Handling Large Datasets with PySpark:**
   - **Definition:** Techniques to efficiently handle large datasets using PySpark, leveraging Spark's distributed computing capabilities.
   - **Example:**
     ```python
     # Read large dataset from a file
     large_df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True)
     ```

6. **Performance Optimization in PySpark:**
   - **Definition:** Strategies to optimize PySpark applications for better performance.
   - **Techniques:**
     - Use `persist` and `cache` to store intermediate results.
     - Use appropriate data partitioning.
     - Example:
       ```python
       # Cache DataFrame
       df.cache()
       ```

#### Mini Project:

**Description:**
- Use PySpark to load a large dataset from the API endpoints created in Week 3, perform various DataFrame operations such as filtering, aggregations, and joins, and write the processed data back to a file or database.

**Steps:**
1. **Fetch Data from API Endpoints:**
   - Use the `requests` library to fetch data from the API endpoints.
   - Example:
     ```python
     import requests
     import json

     # Fetch user data from API
     response = requests.get('http://127.0.0.1:5000/users')
     users_data = response.json()
     ```

2. **Load Data into PySpark DataFrames:**
   - Convert the fetched JSON data to a PySpark DataFrame.
   - Example:
     ```python
     from pyspark.sql import SparkSession

     # Initialize Spark session
     spark = SparkSession.builder.appName("PySparkProject").getOrCreate()

     # Create DataFrame from JSON data
     users_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(users_data)]))
     users_df.show()
     ```

3. **Perform DataFrame Operations:**
   - **Filtering:**
     ```python
     # Filter users by age
     users_df.filter(users_df.age > 30).show()
     ```

   - **Aggregations:**
     ```python
     # Calculate average user age
     users_df.groupBy().avg("age").show()
     ```

   - **Joins:**
     - Fetch and load additional data, then perform join operations.
     - Example:
       ```python
       # Fetch account data from API
       response = requests.get('http://127.0.0.1:5000/accounts')
       accounts_data = response.json()

       # Create DataFrame from JSON data
       accounts_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(accounts_data)]))

       # Perform join operation
       joined_df = users_df.join(accounts_df, users_df.user_id == accounts_df.user_id)
       joined_df.show()
       ```

4. **Write Processed Data to File or Database:**
   - Example:
     ```python
     # Write DataFrame to CSV file
     joined_df.write.csv("processed_data.csv", header=True)
     ```

### Week 4: PySpark and DataFrame Operations

#### Topics Covered:

1. **Introduction to PySpark**
2. **Setting up PySpark Environment**
3. **Working with Spark DataFrames**
4. **DataFrame Operations (filtering, aggregations, joins)**
5. **Handling Large Datasets with PySpark**
6. **Performance Optimization in PySpark**

#### Mini Project:

**Description:**
- Use PySpark to load a large dataset from the API endpoints created in Week 3, perform various DataFrame operations such as filtering, aggregations, and joins, and write the processed data back to a file or database.

**Outcome:**
- Students will learn how to use PySpark for large-scale data processing and become familiar with DataFrame operations, preparing them for real-world data engineering tasks.

### Complete Code Example

Here's the complete code example for the mini-project, reading data from the endpoints, and performing the required DataFrame operations:

```python
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("PySparkProject").getOrCreate()

# Function to fetch data from API and return as JSON
def fetch_data(url):
    response = requests.get(url)
    return response.json()

# Fetch data from the API endpoints
users_data = fetch_data('http://127.0.0.1:5000/users')
accounts_data = fetch_data('http://127.0.0.1:5000/accounts')
transactions_data = fetch_data('http://127.0.0.1:5000/transactions')
cards_data = fetch_data('http://127.0.0.1:5000/cards')
payments_data = fetch_data('http://127.0.0.1:5000/payments')

# Convert JSON data to PySpark DataFrames
users_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(users_data)]))
accounts_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(accounts_data)]))
transactions_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(transactions_data)]))
cards_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(cards_data)]))
payments_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(payments_data)]))

# Perform DataFrame operations
# Filtering: Users with age greater than 30 (assuming 'age' field exists)
# users_df_filtered = users_df.filter(users_df.age > 30)

# Aggregations: Calculate average balance in accounts
average_balance_df = accounts_df.groupBy().avg("balance")
average_balance_df.show()

# Join operations
# Join users with accounts on user_id
user_account_df = users_df.join(accounts_df, users_df.user_id == accounts_df.user_id, 'inner')

# Join transactions with accounts on account_id
account_transaction_df = accounts_df.join(transactions_df, accounts_df.account_id == transactions_df.account_id, 'inner')

# Join cards with accounts on account_id
account_card_df = accounts_df.join(cards_df, accounts_df.account_id == cards_df.account_id, 'inner')

# Join payments with users on user_id and cards on card_id
user_card_payment_df = payments_df.join(users_df, payments_df.user_id == users_df.user_id, 'inner') \
                                  .join(cards_df, payments_df.card_id == cards_df.card_id, 'inner')

# Final join to combine all data into a single DataFrame
final_df = user_account_df.join(account_transaction_df, 'account_id', 'inner') \
                          .join(account_card_df, 'account_id', 'inner') \
                          .join(user_card_payment_df, ['user_id', 'card_id'], 'inner')

final_df.show()

# Write the processed data to a CSV file
final_df.write.csv("processed_data.csv", header=True)

# Stop Spark session
spark.stop()
```

### Explanation

1. **Fetch Data from API Endpoints:**
   - The `fetch_data` function fetches data from a given API endpoint and returns it as JSON.

2. **Convert JSON Data to PySpark DataFrames:**
   - The fetched JSON data is converted into PySpark DataFrames.

3. **Perform DataFrame Operations:**
   - Filtering is demonstrated (commented out as the 'age' field may not exist in the provided data).
   - Aggregations calculate the average balance in accounts.
   - Multiple joins are performed to combine users, accounts, transactions, cards, and payments data into a single DataFrame.

4. **Write Processed Data to File:**
   - The final DataFrame is written to a CSV file.

5. **Stop Spark Session:**
   - The Spark session is stopped.

By keeping all the code in one file, you can read and understand the flow of data from fetching to processing and finally writing the output. This ensures the continuity and coherence of the project.


### Week 4: PySpark and DataFrame Operations

#### Topics Covered:

1. **Introduction to PySpark**
2. **Setting up PySpark Environment**
3. **Working with Spark DataFrames**
4. **DataFrame Operations (filtering, aggregations, joins)**
5. **Handling Large Datasets with PySpark**
6. **Performance Optimization in PySpark**

#### Mini Project:

**Description:**
- Use PySpark to load a large dataset from the API endpoints created in Week 3, perform various DataFrame operations such as filtering, aggregations, and joins, and write the processed data back to a file or database.

**Outcome:**
- Students will learn how to use PySpark for large-scale data processing and become familiar with DataFrame operations, preparing them for real-world data engineering tasks.

### Complete Code Example with SQLite and Airflow

1. **Data Processing with PySpark:**

```python
import requests
import json
import sqlite3
from pyspark.sql import SparkSession

def fetch_data(url):
    response = requests.get(url)
    return response.json()

def process_data():
    # Initialize Spark session
    spark = SparkSession.builder.appName("PySparkProject").getOrCreate()

    # Fetch data from the API endpoints
    users_data = fetch_data('http://127.0.0.1:5000/users')
    accounts_data = fetch_data('http://127.0.0.1:5000/accounts')
    transactions_data = fetch_data('http://127.0.0.1:5000/transactions')
    cards_data = fetch_data('http://127.0.0.1:5000/cards')
    payments_data = fetch_data('http://127.0.0.1:5000/payments')

    # Convert JSON data to PySpark DataFrames
    users_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(users_data)]))
    accounts_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(accounts_data)]))
    transactions_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(transactions_data)]))
    cards_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(cards_data)]))
    payments_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(payments_data)]))

    # Perform DataFrame operations
    # Join users with accounts on user_id
    user_account_df = users_df.join(accounts_df, users_df.user_id == accounts_df.user_id, 'inner')

    # Join transactions with accounts on account_id
    account_transaction_df = accounts_df.join(transactions_df, accounts_df.account_id == transactions_df.account_id, 'inner')

    # Join cards with accounts on account_id
    account_card_df = accounts_df.join(cards_df, accounts_df.account_id == cards_df.account_id, 'inner')

    # Join payments with users on user_id and cards on card_id
    user_card_payment_df = payments_df.join(users_df, payments_df.user_id == users_df.user_id, 'inner') \
                                      .join(cards_df, payments_df.card_id == cards_df.card_id, 'inner')

    # Final join to combine all data into a single DataFrame
    final_df = user_account_df.join(account_transaction_df, 'account_id', 'inner') \
                              .join(account_card_df, 'account_id', 'inner') \
                              .join(user_card_payment_df, ['user_id', 'card_id'], 'inner')

    # Write the processed data to a SQLite database
    database_path = 'processed_data.db'
    final_df.write \
        .format("jdbc") \
        .option("url", f"jdbc:sqlite:{database_path}") \
        .option("dbtable", "final_data") \
        .option("driver", "org.sqlite.JDBC") \
        .mode("overwrite") \
        .save()

    # Stop Spark session
    spark.stop()
```

2. **Apache Airflow DAG:**

```python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# Define the default_args dictionary
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
dag = DAG(
    'data_processing_dag',
    default_args=default_args,
    description='A DAG for processing data with PySpark and saving to SQLite',
    schedule_interval=timedelta(days=1),
)

def fetch_and_process_data():
    process_data()

# Define the task using PythonOperator
fetch_and_process_data_task = PythonOperator(
    task_id='fetch_and_process_data',
    python_callable=fetch_and_process_data,
    dag=dag,
)

# Set the task dependencies
fetch_and_process_data_task
```

### Explanation

1. **PySpark Data Processing:**
   - The `fetch_data` function fetches data from a given API endpoint and returns it as JSON.
   - The `process_data` function:
     - Initializes a Spark session.
     - Fetches data from the API endpoints.
     - Converts JSON data into PySpark DataFrames.
     - Performs join operations to combine users, accounts, transactions, cards, and payments data.
     - Writes the final DataFrame to a SQLite database.
     - Stops the Spark session.

2. **Apache Airflow DAG:**
   - Defines a DAG that runs daily.
   - Uses `PythonOperator` to run the `fetch_and_process_data` function, which calls `process_data`.
   - The `process_data` function is the same PySpark data processing code as above, modularized for reuse.

This setup allows you to run the data processing code on a schedule using Apache Airflow, and save the processed data to a SQLite database.