

## Comprehensive Guide to Running PySpark Jobs on Google Cloud Dataproc

This guide will walk you through everything you need to know to successfully run the provided `gcloud dataproc jobs submit pyspark` command. We'll cover prerequisites, a breakdown of each argument, and best practices.

The command you provided is a powerful way to submit PySpark jobs to a Dataproc cluster. Let's break down each part and then set up the environment for a smooth execution.

### 0\. Prerequisites

Before you can run this command, ensure you have the following in place:

  * **Google Cloud Project:** You need an active Google Cloud Project (e.g., `$PROJECT_ID `).
  * **Billing Enabled:** Ensure billing is enabled for your Google Cloud Project.
  * **gcloud CLI Installed and Authenticated:**
      * If you don't have it, install the Google Cloud SDK: [https://cloud.google.com/sdk/docs/install](https://cloud.google.com/sdk/docs/install)
      * Authenticate your `gcloud` CLI:

In [None]:
gcloud auth login
REGION = europe-west1
gcloud config set dataproc/region $REGION
PROJECT_ID=$(gcloud config get-value project) && \
gcloud config set project $PROJECT_ID


  * **API Enabled:**

In [None]:
gcloud services enable dataproc.googleapis.com
gcloud services enable servicemanagement.googleapis.com


  * **Cloud Storage Bucket:** A Google Cloud Storage (GCS) bucket (e.g., `gs://example-dataproc-workshop/01_2_Submit_Job/`) to store your PySpark scripts, utility files, and input/output data.
  * **PySpark Scripts:**
      * `process_data.py`: Your main PySpark application script located at `gs://example-dataproc-workshop/01_2_Submit_Job/pyspark_scripts/process_data.py`.
      * `utils.py`: Any utility Python files your main script depends on, located at `gs://example-dataproc-workshop/01_2_Submit_Job/pyspark_scripts/utils.py`.
  * **Dataproc Cluster:** A running Dataproc cluster (e.g., `example-cluster`) in the specified region (`europe-west1`). If you don't have one, see Section 2 for creating a cluster.
  * **Data in GCS:** Input data for your PySpark job (e.g., sales data in `gs://example-dataproc-workshop/01_2_Submit_Job/raw_data/sales/`).
  * **BigQuery Dataset:** If your job writes to BigQuery, ensure the BigQuery dataset (`example_dataproc_workshop`) exists.
### 1. Creating Data for the Job

We need a CSV file (since the script uses `.csv()`) with these columns. The `processing-date` argument was `2025-06-12`, so let's create a CSV file with some data for that date, and perhaps a few other dates to show filtering.

Let's create a file named `sales_data.csv`.

**`sales_data.csv` content:**

```csv
transaction_id,product_id,amount,transaction_date
TID001,PROD_A,150.75,2025-06-12
TID002,PROD_B,200.00,2025-06-12
TID003,PROD_A,75.20,2025-06-11
TID004,PROD_C,300.50,2025-06-12
TID005,PROD_B,120.99,2025-06-13
TID006,PROD_A,99.99,2025-06-12
TID007,PROD_D,50.00,2025-06-11
TID008,PROD_C,450.00,2025-06-13
```


1.  **Create the file locally:**
    Open a text editor (like Notepad, VS Code, Sublime Text) and copy-paste the content above. Save the file as `sales_data.csv`.

2.  **Upload the file to your GCS bucket:**
    Use the `gsutil cp` command to upload this file to the specified input path:

    ```bash
    gsutil cp sales_data.csv gs://example-dataproc-workshop/01_2_Submit_Job/raw_data/sales/sales_data.csv
    ```

    **Note:** The `process_data.py` script is set to read from `gs://example-dataproc-workshop/01_2_Submit_Job/raw_data/sales/`. If you put `sales_data.csv` directly inside that directory, Spark will read all CSV files within it. You could also create a date-partitioned structure if you prefer, like `gs://example-dataproc-workshop/01_2_Submit_Job/raw_data/sales/2025-06-12/sales.csv`, and adjust your `input-path` argument accordingly if needed, but for this example, placing `sales_data.csv` directly in the `sales/` directory will work.

Now, when your PySpark job runs with `--input-path=gs://example-dataproc-workshop/01_2_Submit_Job/raw_data/sales/` and `--processing-date=2025-06-12`, it will:

1.  Read `sales_data.csv`.
2.  Filter the data to include only rows where `transaction_date` is `2025-06-12`. This will result in:
    ```
    transaction_id,product_id,amount,transaction_date
    TID001,PROD_A,150.75,2025-06-12
    TID002,PROD_B,200.00,2025-06-12
    TID004,PROD_C,300.50,2025-06-12
    TID006,PROD_A,99.99,2025-06-12
    ```
3.  Group by `product_id` and sum the amounts. The expected output to BigQuery would be:

| product\_id | total\_sales | processing\_date |
| :---------- | :----------- | :--------------- |
| PROD\_A     | 250.74       | 2025-06-12       |
| PROD\_B     | 200.00       | 2025-06-12       |
| PROD\_C     | 300.50       | 2025-06-12       |

This data provides a good test case for the filtering and aggregation logic in your `process_data.py` script.

### 2\. Creating a Dataproc Cluster (if you don't have one)

If you don't already have an `example-cluster` running, you can create one using the `gcloud` CLI:

In [None]:
# if not done already
REGION=europe-west1
gcloud config set dataproc/region $REGION
PROJECT_ID=$(gcloud config get-value project) && \
gcloud config set project $PROJECT_ID
PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format='value(projectNumber)')



gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com \
  --role=roles/storage.admin

gcloud compute networks subnets update default --region=$REGION --enable-private-ip-google-access


# gcloud dataproc clusters create example-cluster \
#     --region=europe-west1 \
#     --zone=europe-west1-b \
#     --master-machine-type=n1-standard-4 \
#     --worker-machine-type=n1-standard-4 \
#     --num-workers=2 \
#     --image-version=2.1-debian11 \
#     --scopes=cloud-platform \
#     --project=$PROJECT_ID 
#     --max-age=1h


gcloud dataproc clusters create example-cluster\
  --enable-component-gateway\
  --bucket=example-dataproc-workshop\
  --region=europe-west1\
  --no-address\
  --master-machine-type=n4-standard-2\
  --master-boot-disk-type=hyperdisk-balanced\
  --master-boot-disk-size=100\
  --num-workers=2\
  --worker-machine-type=n4-standard-2\
  --worker-boot-disk-size=200\
  --image-version=2.2-debian12\
  --optional-components JUPYTER\
  --max-age=3600s\
  --labels=mode=workshop,user=zelda\
  --project=$PROJECT_ID 




**Explanation of Cluster Creation Parameters:**

  * `--region`: The Google Cloud region where your cluster will be created. Choose a region close to your data for better performance.
  * `--zone`: (Optional but recommended) The specific zone within the region.
  * `--master-machine-type`: Machine type for the master node. Choose based on your workload. `n1-standard-4` is a good starting point.
  * `--worker-machine-type`: Machine type for worker nodes.
  * `--num-workers`: Number of worker nodes. Adjust based on your processing needs.
  * `--image-version`: Dataproc image version. Always good to use a recent stable version. `2.1-debian11` is a good choice for Spark 3.x.

  * `--project`: Your Google Cloud Project ID.

Cluster creation can take a few minutes. You can check the status with:

In [None]:
gcloud dataproc clusters describe example-cluster --region=europe-west1



### 3\. Understanding the `gcloud dataproc jobs submit pyspark` Command

Let's break down each argument of the job submission command:

  * **`gcloud dataproc jobs submit pyspark gs://example-dataproc-workshop/01.2 Submit Job/pyspark_scripts/process_data.py`**

      * `gcloud dataproc jobs submit pyspark`: This is the core command to submit a PySpark job to Dataproc.
      * `gs://example-dataproc-workshop/01.2 Submit Job/pyspark_scripts/process_data.py`: This is the **path to your main PySpark application script** in Google Cloud Storage. Dataproc will download and execute this script on the cluster.

  * **`--cluster=example-cluster`**

      * Specifies the **name of the Dataproc cluster** where the job will be run. This cluster must already be running.

  * **`--region=europe-west1`**

      * Defines the **Google Cloud region** where your Dataproc cluster is located. This must match the region of your cluster.

  * **`--project=$PROJECT_ID `**

      * Indicates your **Google Cloud Project ID**. This ensures the command operates within the correct project context.

  * **`--py-files=gs://example-dataproc-workshop/01.2 Submit Job/pyspark_scripts/utils.py`**

      * This flag is used to include **additional Python `.py` files or `.zip` archives** that your main PySpark script depends on. These files will be distributed to the cluster and made available in the Python path for your PySpark application. You can specify multiple files separated by commas.

  * **`--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar`**

      * This flag is crucial for including **external Java Archive (JAR) files** that your PySpark application needs. In this case, `spark-bigquery-with-dependencies_2.12-0.23.2.jar` is the connector for reading from and writing to Google BigQuery from Spark. This JAR is pre-packaged with necessary dependencies, making it easier to use.

  * **`--properties=spark.executor.memory=4g,spark.sql.shuffle.partitions=50`**

      * This allows you to set **custom Spark configuration properties**.
          * `spark.executor.memory=4g`: Sets the amount of memory allocated to each Spark executor process. Adjust this based on your data size and processing needs. More memory can prevent OOM (Out of Memory) errors for memory-intensive tasks.
          * `spark.sql.shuffle.partitions=50`: Determines the number of partitions to use when shuffling data for operations like `join` or `groupBy`. Adjusting this can significantly impact performance. Too few can lead to data skew and bottlenecks; too many can add overhead.

  * **`--`**

      * This double dash acts as a **delimiter**. Any arguments *after* `--` are treated as **arguments to your PySpark script** (`process_data.py`) itself, not as arguments to the `gcloud dataproc jobs submit` command.

  * **`--input-path=gs://example-dataproc-workshop/01.2 Submit Job/raw_data/sales/`**

      * An example argument **passed to your `process_data.py` script**. Your script will need to parse this argument to know where to read input data from.

  * **`--output-table=example_dataproc_workshop.processed_sales`**

      * Another example argument **passed to your `process_data.py` script**. This tells your script the BigQuery table where the processed data should be written.

  * **`--processing-date=2025-06-12`**

      * A third example argument **passed to your `process_data.py` script**. This could be used for date-based partitioning, filtering, or logging within your Spark job.

### 4\. Designing Your PySpark Script (`process_data.py`)

Your `process_data.py` script needs to be designed to:

1.  **Initialize a SparkSession:** This is the entry point for all Spark functionality.
2.  **Parse Command-Line Arguments:** Use the `argparse` module (or similar) to parse the arguments passed after the `--` delimiter.
3.  **Read Input Data:** Read data from the `--input-path` using Spark's DataFrame API.
4.  **Perform Data Processing:** Apply your transformation logic.
5.  **Write Output Data:** Write the processed data to the `--output-table` (likely BigQuery in this case, using the `spark-bigquery-connector`).
6.  **Stop SparkSession:** Clean up resources.

**Example `process_data.py` structure:**


In [None]:
# process_data.py
import argparse
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType

# Import utilities if you have them in utils.py
try:
    from utils import some_utility_function
except ImportError:
    print("Could not import utils.py. Ensure it's in --py-files.")
    some_utility_function = lambda x: x # Fallback or handle appropriately

def main():
    parser = argparse.ArgumentParser(description="Process sales data with PySpark.")
    parser.add_argument("--input-path", type=str, required=True,
                        help="GCS path to raw sales data (e.g., gs://example-dataproc-workshop/01_2_Submit_Job/raw_data/sales/).")
    parser.add_argument("--output-table", type=str, required=True,
                        help="BigQuery output table (e.g., example_dataproc_workshop.processed_sales).")
    parser.add_argument("--processing-date", type=str, required=True,
                        help="Date for processing data (YYYY-MM-DD).")

    args = parser.parse_args()

    spark = SparkSession.builder \
        .appName("SalesDataProcessor") \
        .config("temporaryGcsBucket", "example-dataproc-workshop") \
        .getOrCreate()


    # Define schema for input data (adjust as per your actual data)
    input_schema = StructType([
        StructField("transaction_id", StringType(), True),
        StructField("product_id", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("transaction_date", DateType(), True)
    ])

    try:
        print(f"Reading data from: {args.input_path}")
        df = spark.read \
            .option("header", "true") \
            .schema(input_schema) \
            .csv(args.input_path)

        print(f"Processing data for date: {args.processing_date}")
        # Example processing: Filter by date and calculate total sales
        processed_df = df.filter(F.col("transaction_date") == F.lit(args.processing_date)) \
                         .groupBy("product_id") \
                         .agg(F.sum("amount").alias("total_sales")) \
                         .withColumn("processing_date", F.lit(args.processing_date))

        # Example of using a utility function
        # processed_df = some_utility_function(processed_df)

        print(f"Writing processed data to BigQuery table: {args.output_table}")
        # Write to BigQuery
        processed_df.write \
            .format("bigquery") \
            .option("table", args.output_table) \
            .mode("overwrite").save()
        # mode can be  "append" or "overwrite" or "ignore" or "errorifexists"

        print("PySpark job completed successfully!")

    except Exception as e:
        print(f"An error occurred: {e}")
        spark.stop()
        raise # Re-raise the exception to indicate job failure

    finally:
        spark.stop()

if __name__ == "__main__":
    main()

**Example `utils.py` (if used):**

In [None]:
# utils.py
from pyspark.sql import functions as F

def some_utility_function(df):
    """
    Example utility function to add a timestamp column.
    """
    return df.withColumn("processed_timestamp", F.current_timestamp())


### 5\. Uploading Your Files to GCS

Make sure your `process_data.py` and `utils.py` (if applicable) are uploaded to your specified GCS path:

In [None]:
gsutil cp process_data.py gs://example-dataproc-workshop/01_2_Submit_Job/pyspark_scripts/process_data.py
gsutil cp utils.py gs://example-dataproc-workshop/01_2_Submit_Job/pyspark_scripts/utils.py

Also, ensure your input data exists at `gs://example-dataproc-workshop/01_2_Submit_Job/raw_data/sales/`. For example, you might have `sales_2025-06-12.csv` inside that directory.



You want to create a BigQuery dataset named `processed_sales` within the `example-dataproc-workshop` project, and then a table within that dataset.

Here's how you can do it using the `bq` command-line tool, which is part of the Google Cloud SDK and specifically designed for BigQuery interactions.

### 1\. Create the BigQuery Dataset

First, let's create the dataset. Datasets need a location, and it's good practice to align it with your Dataproc cluster's region, which is `europe-west1`.

```bash
bq mk \
    --project_id=$PROJECT_ID  \
    --location=europe-west1 \
    example_dataproc_workshop.processed_sales
```

**Explanation:**

  * **`bq mk`**: The `bq` command-line tool's subcommand to "make" (create) a resource.
  * **`--project_id=$PROJECT_ID `**: Specifies the Google Cloud project where the dataset will be created.
  * **`--location=europe-west1`**: Sets the geographic location for the dataset. This is crucial as data cannot be moved between locations after creation, and your Spark job in `europe-west1` will perform best writing to a BigQuery dataset in the same region.
  * **`example_dataproc_workshop.processed_sales`**: This is the full ID for your new dataset. It follows the format `[PROJECT_ID].[DATASET_ID]`. In this case, `example-dataproc-workshop` is assumed to be the project you are logically associating with this data, and `processed_sales` is the dataset ID.

You should see output like:
`Dataset '$PROJECT_ID :example_dataproc_workshop.processed_sales' successfully created.`

### 2\. Create the BigQuery Table

Next, let's create the table `processed_sales` within the `example_dataproc_workshop.processed_sales` dataset. When creating a table, you need to define its schema.

Based on the output of your `process_data.py` script:

| Column Name     | Data Type |
| :-------------- | :-------- |
| `product_id`    | STRING    |
| `total_sales`   | FLOAT     |
| `processing_date` | DATE      |

Here's the command to create the table with this schema:

```bash
bq mk \
    --table \
    --project_id=$PROJECT_ID  \
    --schema "product_id:STRING,total_sales:FLOAT,processing_date:DATE" \
    example_dataproc_workshop.processed_sales
```

**Explanation:**

  * **`bq mk --table`**: Specifies that you are creating a table.
  * **`--project_id=$PROJECT_ID `**: Again, the project ID.
  * **`--schema "..."`**: Defines the schema of your table.
      * Each field is `name:TYPE`.
      * Separate multiple fields with commas.
      * Data types align with BigQuery's standard SQL types (e.g., `STRING`, `FLOAT`, `DATE`).
  * **`example_dataproc_workshop.processed_sales`**: This is the full ID for your new table, in the format `[PROJECT_ID].[DATASET_ID].[TABLE_ID]`.

You should see output like:
`Table '$PROJECT_ID :example_dataproc_workshop.processed_sales' successfully created.`

### Permissions

Ensure the user or service account you are using with `gcloud` has the necessary BigQuery permissions (e.g., `BigQuery Data Editor`, `BigQuery User`, or `BigQuery Admin`) within the `$PROJECT_ID ` project to create datasets and tables.

In [None]:
bq mk \
    --project_id=$PROJECT_ID  \
    --location=europe-west1 \
    example_dataproc_workshop

bq mk \
    --table \
    --project_id=$PROJECT_ID  \
    --schema "product_id:STRING,total_sales:FLOAT,processing_date:DATE" \
    example_dataproc_workshop.processed_sales

### 6\. Running the Job

Once all prerequisites are met and your files are in GCS, you can simply execute the `gcloud dataproc jobs submit pyspark` command:

In [None]:
gcloud dataproc jobs submit pyspark "gs://example-dataproc-workshop/01_2_Submit_Job/pyspark_scripts/process_data.py" \
    --cluster=example-cluster \
    --region=europe-west1  \
    --project=$PROJECT_ID  \
    --py-files="gs://example-dataproc-workshop/01_2_Submit_Job/pyspark_scripts/utils.py" \
    --properties=spark.bigquery.temporaryGcsBucket=example-dataproc-workshop \
    -- \
    --input-path="gs://example-dataproc-workshop/01_2_Submit_Job/raw_data/sales/" \
    --output-table=example_dataproc_workshop.processed_sales \
    --processing-date=2025-06-12



spark.bigquery.temporaryGcsBucket=example-dataproc-workshop to  --properties flag  tells the Spark BigQuery connector to use gs://example-dataproc-workshop/ as its temporary staging location for data being written to BigQuery. Make sure the service account running your Dataproc job has write permissions to this bucket.



### 7\. Monitoring Your Job

After submitting the job, `gcloud` will provide a job ID (e.g., `job-20250702-123456-abcde`). You can monitor its progress:

  * **gcloud CLI:**
    ```bash
    gcloud dataproc jobs describe job-20250702-123456-abcde --region=europe-west1
    gcloud dataproc jobs wait job-20250702-123456-abcde --region=europe-west1 # To wait for job completion
    ```
  * **Google Cloud Console:** Navigate to **Dataproc \> Jobs** in your project. You'll see a list of jobs, their status, and can click on a specific job to view its logs (driver output, YARN logs, etc.). This is often the easiest way to debug issues.

### 8\. Common Issues and Troubleshooting

  * **Cluster Not Found/Running:** Ensure the cluster name and region are correct, and the cluster is in a `RUNNING` state.
  * **Permissions Issues:**
      * **GCS Access:** The Dataproc cluster service account (default: `project-number-compute@developer.gserviceaccount.com`) needs appropriate permissions to read/write from `gs://example-dataproc-workshop/01.2 Submit Job/`. Grant it `Storage Object Admin` or more granular roles like `Storage Object Viewer` for input and `Storage Object Creator` for output.
      * **BigQuery Access:** If writing to BigQuery, the cluster service account needs `BigQuery Data Editor` or `BigQuery Data Owner` on the target dataset/table.
  * **PySpark Script Errors:**
      * Check the driver logs in the Cloud Console for stack traces and error messages.
      * Ensure all necessary imports are present and paths are correct.
      * Verify that the arguments parsed in your script (`input-path`, `output-table`, `processing-date`) match what's passed in the `gcloud` command.
  * **`--py-files` not found:** Double-check the GCS path for `utils.py`.
  * **`--jars` not found or incorrect:** Ensure the BigQuery connector JAR path is correct. If you're using a different Spark or Scala version, you might need a different JAR version.
  * **Spark Properties:** Incorrect Spark properties can lead to performance issues or job failures.
      * `spark.executor.memory`: If too low, you might see `OutOfMemoryError`.
      * `spark.sql.shuffle.partitions`: Tuning this can improve performance.
  * **Network Issues:** Ensure no firewall rules are blocking communication within the cluster or to external services.
  * **Resource Exhaustion:** If your cluster is too small for the workload, jobs might fail or take a very long time. Consider scaling up your cluster (more workers, larger machine types).

### 9\. Best Practices

  * **Version Control:** Keep your PySpark scripts and utility files under version control (e.g., Git).
  * **Idempotency:** Design your PySpark jobs to be idempotent, meaning running them multiple times with the same inputs produces the same result. This is crucial for retries and recovery.
  * **Structured Logging:** Implement good logging within your PySpark script to make debugging easier. Spark logs are already sent to Cloud Logging.
  * **Testing:** Thoroughly test your PySpark scripts in a local Spark environment or smaller Dataproc cluster before running them on large production clusters.
  * **Cost Management:**
      * Choose appropriate machine types and number of workers for your workload.
      * Consider using Preemptible VMs for worker nodes if your workload can tolerate interruptions (significant cost savings).
      * Delete clusters when not in use, or use Dataproc's auto-deletion feature.
  * **Cluster Reusability:** For frequent jobs, keeping a cluster running can save cluster startup time. For infrequent or long-running jobs, consider creating a new cluster for each job and deleting it afterward.
  * **Security:** Grant the Dataproc service account only the necessary permissions (least privilege principle).
  * **Monitoring and Alerting:** Set up Cloud Monitoring dashboards and alerts for Dataproc jobs to be notified of failures or performance issues.

