# **PROJECT** - EQ ETL (𝗘𝗮𝗿𝘁𝗵𝗾𝘂𝗮𝗸𝗲𝗗𝗮𝘁𝗮 𝗨𝗦𝗚𝗦 API Data)


# **PySpark** - by *Dipakraj Patil*

In [1]:
# Project - fresh start

from google.colab import drive
import os

drive.mount('/content/drive')
project_path = r"/content/drive/MyDrive/Project_EQ/Pyspark"

def create_directory(path):
    # Check if the directory exists
    if not os.path.exists(path):
        try:
            # Create the directory and any necessary parent directories
            os.makedirs(path)
            print(f"Directory '{path}' created.")
        except Exception as e:
            print(f"Error creating directory '{path}': {e}")
    else:
        print(f"Directory '{path}' already exists.")

# calling a function to create a dir folder
create_directory(project_path)

# change the directory for a project
os.chdir(project_path)

Mounted at /content/drive
Directory '/content/drive/MyDrive/Project_EQ/Pyspark' already exists.


### seismic_data_pipeline.py

In [2]:
%%writefile "/content/drive/MyDrive/Project_EQ/Pyspark/seismic_data_pipeline.py"

# Standard library imports
import json
import os
import logging
from decimal import Decimal, InvalidOperation
import requests

# Third-party libraries
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import from_unixtime, col, split, to_json, struct, date_format
from pyspark.sql.types import StructType, StructField, DecimalType, LongType, StringType, IntegerType, TimestampType
import pyarrow.parquet as pq
import gcsfs

# Google Cloud libraries
from google.cloud import storage
from google.cloud import bigquery

# Configuration module
import config

# Set up logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


class SeismicDataFetcher:
    def __init__(self, source_url: str):
        """
        Initialize the SeismicDataFetcher with the provided source URL.

        Args:
            source_url (str): The URL from which to fetch seismic data.
        """
        self.source_url = source_url
        self.raw_data = None  # Initialize raw_data to None

    def retrieve_data(self) -> dict:
        """
        Fetch seismic data from the API.

        Makes a GET request to the specified source URL and retrieves the seismic data
        in JSON format. If the request is successful, the data is stored in the
        `raw_data` attribute.

        Returns:
            dict: The seismic data retrieved from the API.

        Raises:
            SystemExit: Exits the program if an error occurs during data retrieval.
        """
        try:
            response = requests.get(self.source_url)
            response.raise_for_status()  # Raise error for unsuccessful requests
            self.raw_data = response.json()  # Store data as JSON
            logging.info("Successfully retrieved seismic data.\n")
        except Exception as e:
            logging.error(f"Error during data retrieval: {e}")
            exit(1)  # Exit the program in case of an error

        return self.raw_data


class CustomJSONEncoder(json.JSONEncoder):
    """
    Custom JSON encoder to handle specific data types.

    This encoder extends the default JSONEncoder to provide support for
    serializing Decimal objects as floats, ensuring compatibility with
    JSON serialization since JSON does not natively support Decimal.

    Example:
        >>> json.dumps(Decimal('12.34'), cls=CustomJSONEncoder)
        12.34
    """

    def default(self, obj):
        """
        Override the default method to provide custom serialization for specific types.

        Args:
            obj: The object to serialize.

        Returns:
            The serialized representation of the object.

        If the object is an instance of Decimal, it converts it to a float.
        For all other types, it falls back to the default serialization.
        """
        if isinstance(obj, Decimal):
            return float(obj)  # Convert Decimal to float
        return super(CustomJSONEncoder, self).default(obj)


class CloudStorageManager:
    """
    A class to manage Google Cloud Storage operations for a specific bucket.

    This class provides methods to upload and download JSON data, as well as
    to read Parquet files from a specified path within a bucket.

    Attributes:
        bucket_name (str): The name of the Google Cloud Storage bucket.
        silver_layer_path (str): The path to the silver layer in the GCS bucket.
        gcs_client (google.cloud.storage.Client): A client to interact with Google Cloud Storage.
        bucket (google.cloud.storage.Bucket): The specific bucket instance.
    """

    def __init__(self, bucket_name: str, silver_layer_path: str):
        """
        Initialize the CloudStorageManager with the specified bucket name and silver layer path.

        Args:
            bucket_name (str): The name of the Google Cloud Storage bucket.
            silver_layer_path (str): The path to the silver layer in the GCS bucket.
        """
        self.bucket_name = bucket_name
        self.gcs_client = storage.Client()
        self.bucket = self.gcs_client.bucket(bucket_name)
        self.silver_layer_path = silver_layer_path

    def upload_json_data(self, target_blob_name: str, data: dict) -> None:
        """
        Upload data as JSON to Google Cloud Storage.

        This method serializes the provided data using a custom JSON encoder
        to handle Decimal serialization and uploads it to the specified blob.

        Args:
            target_blob_name (str): The name of the target blob in the bucket.
            data (dict): The data to upload, serialized as JSON.
        """
        blob = self.bucket.blob(target_blob_name)

        # Use custom encoder to handle Decimal serialization
        json_payload = json.dumps(data, cls=CustomJSONEncoder)
        blob.upload_from_string(json_payload, content_type='application/json')
        logging.info(f"Data successfully uploaded to {target_blob_name} in bucket {self.bucket_name}.\n")

    def download_json_data(self, source_blob_name: str) -> dict:
        """
        Download JSON data from Google Cloud Storage and return it as a Python object.

        Args:
            source_blob_name (str): The name of the source blob in the bucket.

        Returns:
            dict: The downloaded data as a Python dictionary.
        """
        blob = self.bucket.blob(source_blob_name)
        downloaded_json = blob.download_as_text()
        logging.info(f"Data successfully downloaded from {source_blob_name}.\n")
        return json.loads(downloaded_json)  # Convert back to Python object

    def read_silver_layer_data(self) -> None:
        """
        Read Parquet files from the silver layer path in the specified GCS bucket.

        This method lists all Parquet files in the silver layer path, reads them
        into an Arrow Table, converts it to a Pandas DataFrame, and returns the DataFrame.

        Returns:
            pandas.DataFrame: A DataFrame containing the data from the Parquet files,
                              or None if no files are found or an error occurs.
        """
        logging.info("Attempting to read silver layer data...")
        gcs_path = f"{self.bucket_name}/{self.silver_layer_path}"  # Don't prefix with 'gs://'

        # Initialize GCS file system using gcsfs
        fs = gcsfs.GCSFileSystem()

        # List all files in the specified directory and filter for .parquet files only
        parquet_files = [f"gs://{file}" for file in fs.ls(gcs_path) if file.endswith(".parquet")]

        # Log the filtered list of .parquet files to confirm
        logging.info(f"List of parquet files: {parquet_files}")

        # Check if there are any Parquet files in the list
        if parquet_files:
            # Try reading the .parquet files into an Arrow Table
            try:
                table = pq.ParquetDataset(parquet_files, filesystem=fs).read()
                pandas_df_of_silver_layer_data = table.to_pandas()
                logging.info("Pandas DataFrame created successfully from silver layer data.\n")
                return pandas_df_of_silver_layer_data
            except Exception as e:
                logging.error(f"Error reading Parquet files: {e}")
                return None
        else:
            logging.warning("No Parquet files found in the specified directory.")
            return None


class SeismicDataTransformation:
    """
    A class for transforming raw seismic data into a structured format for analysis.

    This class takes raw seismic data, flattens it, and converts it into a Spark DataFrame.
    """

    def __init__(self, spark_session, raw_data):
        """
        Initialize the SeismicDataTransformation with a Spark session and raw data.

        Args:
            spark_session (SparkSession): The Spark session to be used for DataFrame operations.
            raw_data (dict): The raw seismic data retrieved from the API.
        """
        self.spark_session = spark_session
        self.raw_data = raw_data
        self.transformed_entries = None

    def transform_to_flat_structure(self) -> list:
        """
        Flatten and reformat the retrieved seismic data for further processing.

        This method extracts relevant fields from the raw data and organizes them
        into a flat structure suitable for analysis.

        Returns:
            list: A list of transformed seismic data entries.
        """
        events = self.raw_data['features']
        self.transformed_entries = []

        for event in events:
            coordinates = {
                "longitude": Decimal(str(event['geometry']['coordinates'][0])),
                "latitude": Decimal(str(event['geometry']['coordinates'][1])),
                "depth": Decimal(str(event['geometry']['coordinates'][2]))
            }

            details = event['properties']
            details['location'] = coordinates

            # Safely convert float values to Decimal
            def safe_decimal(value):
                try:
                    return Decimal(str(value)) if value is not None else Decimal("0")
                except InvalidOperation:
                    return Decimal("0")

            processed_entry = {
                'magnitude': safe_decimal(details.get('mag', 0)),
                'place': details.get('place', ''),
                'event_time': details.get('time', 0),
                'last_update': details.get('updated', 0),
                'timezone_offset': int(details.get('tz', 0) or 0),
                'info_url': details.get('url', ''),
                'description': details.get('detail', ''),
                'felt_reports': int(details.get('felt', 0) or 0),
                'cdi_value': safe_decimal(details.get('cdi', 0)),
                'mmi_value': safe_decimal(details.get('mmi', 0)),
                'alert_status': details.get('alert', ''),
                'event_status': details.get('status', ''),
                'tsunami_warning': int(details.get('tsunami', 0) or 0),
                'significance': int(details.get('sig', 0) or 0),
                'network_code': details.get('net', ''),
                'event_code': details.get('code', ''),
                'event_ids': details.get('ids', ''),
                'data_sources': details.get('sources', ''),
                'event_types': details.get('types', ''),
                'station_count': int(details.get('nst', 0) or 0),
                'min_distance': safe_decimal(details.get('dmin', 0)),
                'rms_value': safe_decimal(details.get('rms', 0)),
                'gap_angle': safe_decimal(details.get('gap', 0)),
                'magnitude_type': details.get('magType', ''),
                'event_type': details.get('type', ''),
                'location': coordinates
            }

            self.transformed_entries.append(processed_entry)

        logging.info("Data transformed into a flat structure successfully.\n")
        return self.transformed_entries

    def create_spark_dataframe(self, flattened_data_in_list_form: list) -> DataFrame:
        """
        Create a Spark DataFrame from the transformed seismic data.

        Args:
            flattened_data_in_list_form (list): A list of transformed seismic data entries.

        Returns:
            DataFrame: A Spark DataFrame containing the transformed data.
        """
        schema = StructType([
            StructField("magnitude", DecimalType(10, 2), True),
            StructField("place", StringType(), True),
            StructField("event_time", LongType(), True),
            StructField("last_update", LongType(), True),
            StructField("timezone_offset", IntegerType(), True),
            StructField("info_url", StringType(), True),
            StructField("description", StringType(), True),
            StructField("felt_reports", IntegerType(), True),
            StructField("cdi_value", DecimalType(10, 2), True),
            StructField("mmi_value", DecimalType(10, 2), True),
            StructField("alert_status", StringType(), True),
            StructField("event_status", StringType(), True),
            StructField("tsunami_warning", IntegerType(), True),
            StructField("significance", IntegerType(), True),
            StructField("network_code", StringType(), True),
            StructField("event_code", StringType(), True),
            StructField("event_ids", StringType(), True),
            StructField("data_sources", StringType(), True),
            StructField("event_types", StringType(), True),
            StructField("station_count", IntegerType(), True),
            StructField("min_distance", DecimalType(10, 2), True),
            StructField("rms_value", DecimalType(10, 2), True),
            StructField("gap_angle", DecimalType(10, 2), True),
            StructField("magnitude_type", StringType(), True),
            StructField("event_type", StringType(), True),
            StructField("location", StructType([
                StructField("longitude", DecimalType(10, 6), True),
                StructField("latitude", DecimalType(10, 6), True),
                StructField("depth", DecimalType(10, 6), True)
            ]), True)
        ])

        df = self.spark_session.createDataFrame(flattened_data_in_list_form, schema=schema)
        logging.info("Spark DataFrame created successfully from transformed data.\n")
        print('**********' * 8, 'SCHEMA > flattened df (Before Transformations)', '**********' * 8)
        df.printSchema()
        return df

    def apply_transformations(self, flattened_df: DataFrame) -> DataFrame:
        """
        Apply transformations to the seismic data.

        This method converts event_time and last_update from milliseconds to
        timestamp format and extracts area information from the place.

        Args:
            flattened_df (DataFrame): The Spark DataFrame containing flattened seismic data.

        Returns:
            DataFrame: A transformed Spark DataFrame with applied transformations.
        """
        transformed_df = flattened_df.withColumn("event_time",
                                                 from_unixtime(col("event_time") / 1000).cast(TimestampType())) \
            .withColumn("last_update", from_unixtime(col("last_update") / 1000).cast(TimestampType()))

        transformed_df = transformed_df.withColumn("area", split(col("place"), " of ").getItem(1)).drop("place")
        logging.info("Transformations applied to the DataFrame successfully.\n")
        print('**********' * 8, 'SCHEMA > transformed_df (After Transformations)', '**********' * 8)
        transformed_df.printSchema()
        return transformed_df


class BigQueryUploader:
    """
    A class to upload data from a Spark DataFrame to Google BigQuery.

    This class handles the conversion of input data into a format suitable for
    uploading to BigQuery, and provides methods to perform the upload.
    """

    def __init__(self, project_id: str, dataset_id: str, table_id: str, input_df, spark) -> None:
        """
        Initialize the BigQueryUploader with project and dataset information.

        Args:
            project_id (str): Google Cloud project ID.
            dataset_id (str): BigQuery dataset ID.
            table_id (str): BigQuery table ID.
            input_df (DataFrame): The input DataFrame (Pandas or Spark) to be uploaded.
        """
        self.client = bigquery.Client()
        self.project_id = project_id
        self.dataset_id = dataset_id
        self.table_id = table_id
        self.input_df = input_df
        self.pyspark_df = None
        self.spark = spark
        logging.info("BigQueryUploader initialized successfully.\n")

    def apply_transformations_for_bigquery(self) -> DataFrame:
        """
        Apply necessary transformations to the input DataFrame for BigQuery upload.

        This method converts the input Pandas DataFrame into a PySpark DataFrame
        and adds a timestamp column for record insertion.

        Returns:
            DataFrame: The transformed PySpark DataFrame ready for upload.
        """
        # Convert Pandas DataFrame to PySpark DataFrame
        self.pyspark_df = self.spark.createDataFrame(self.input_df)

        # Add 'insert_dt' column
        self.pyspark_df = self.pyspark_df \
            .withColumn("insert_dt", F.current_timestamp()) \
            .withColumn("insert_dt", F.col("insert_dt").cast(TimestampType()))

        logging.info("Transformations applied to the input DataFrame for BigQuery upload.\n")
        return self.pyspark_df

    def upload_to_bigquery(self, df) -> None:
        """
        Upload the Spark DataFrame to Google BigQuery.

        This method converts the Spark DataFrame to a Pandas DataFrame and uploads
        it to BigQuery using the specified table reference. If the DataFrame is too
        large, it can be exported to Google Cloud Storage as JSON first.

        Args:
            df (DataFrame): The Spark DataFrame to be uploaded to BigQuery.
        """
        # Convert the Spark DataFrame to Pandas DataFrame
        df_pandas = df.toPandas()
        df_pandas['insert_dt'] = df_pandas['insert_dt'].astype('datetime64[ns]')
        row_count = df_pandas.shape[0]
        table_ref = f"{self.project_id}.{self.dataset_id}.{self.table_id}"

        job_config = bigquery.LoadJobConfig(
            schema=[
                bigquery.SchemaField("magnitude", "NUMERIC"),
                bigquery.SchemaField("event_time", "TIMESTAMP"),
                bigquery.SchemaField("last_update", "TIMESTAMP"),
                bigquery.SchemaField("timezone_offset", "INT64"),
                bigquery.SchemaField("info_url", "STRING"),
                bigquery.SchemaField("description", "STRING"),
                bigquery.SchemaField("felt_reports", "INT64"),
                bigquery.SchemaField("cdi_value", "NUMERIC"),
                bigquery.SchemaField("mmi_value", "NUMERIC"),
                bigquery.SchemaField("alert_status", "STRING"),
                bigquery.SchemaField("event_status", "STRING"),
                bigquery.SchemaField("tsunami_warning", "INT64"),
                bigquery.SchemaField("significance", "INT64"),
                bigquery.SchemaField("network_code", "STRING"),
                bigquery.SchemaField("event_code", "STRING"),
                bigquery.SchemaField("event_ids", "STRING"),
                bigquery.SchemaField("data_sources", "STRING"),
                bigquery.SchemaField("event_types", "STRING"),
                bigquery.SchemaField("station_count", "INT64"),
                bigquery.SchemaField("min_distance", "NUMERIC"),
                bigquery.SchemaField("rms_value", "NUMERIC"),
                bigquery.SchemaField("gap_angle", "NUMERIC"),
                bigquery.SchemaField("magnitude_type", "STRING"),
                bigquery.SchemaField("event_type", "STRING"),
                bigquery.SchemaField("area", "STRING"),
                bigquery.SchemaField("insert_dt", "TIMESTAMP")
            ],
            write_disposition="WRITE_APPEND"  # Overwrite existing data
        )

        job = self.client.load_table_from_dataframe(df_pandas, table_ref, job_config=job_config)
        job.result()  # Wait for the job to complete

        logging.info(
            f"<< {row_count} New Records 😬 >> Successfully Uploaded to BigQuery Table {self.table_id} in dataset: {self.dataset_id}\n"
        )


Overwriting /content/drive/MyDrive/Project_EQ/Pyspark/seismic_data_pipeline.py


### main_hist.py

In [3]:
%%writefile "/content/drive/MyDrive/Project_EQ/Pyspark/main_hist.py"

import os
import logging
from pyspark.sql import SparkSession
from seismic_data_pipeline import SeismicDataFetcher, CloudStorageManager, SeismicDataTransformation, BigQueryUploader
import config
def main():
    # Configure logging
    logging.basicConfig(level=logging.INFO)

    # Create Spark session
    spark = SparkSession.builder.master('local[*]').appName('SeismicDataPipeline').getOrCreate()

    # Set Google Cloud credentials
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "default_compute_key.json"  # Path to your service account key file

    try:
        # Fetch seismic data
        logging.info("Fetching seismic data from the source API...\n\n")
        seismic_fetcher = SeismicDataFetcher(config.SOURCE_API_URL_HISTORICAL)
        fetched_data = seismic_fetcher.retrieve_data()

        # STEP 1: Ingest raw Data into GCS bucket in landing location (in .json format)
        logging.info("Uploading fetched seismic data to Google Cloud Storage (landing layer)...\n\n")
        gcs_manager = CloudStorageManager(config.GCS_BUCKET, config.PYSPARK_SILVER_LAYER_PATH)
        gcs_manager.upload_json_data(config.PYSPARK_LANDING_LOCATION, fetched_data)

        # STEP 2: Download raw json data from GCS bucket for flattening and processing
        logging.info("Downloading raw-JSON data from Google Cloud Storage (landing layer)...\n\n")
        data_to_process = gcs_manager.download_json_data(config.PYSPARK_LANDING_LOCATION)

        # STEP 3: Perform transformations
        logging.info("Transforming raw data...\n\n")
        transformations = SeismicDataTransformation(spark, data_to_process)
        flattened_data_in_list_form = transformations.transform_to_flat_structure()  # Data in a list format
        flattened_df = transformations.create_spark_dataframe(flattened_data_in_list_form)
        transformed_df = transformations.apply_transformations(flattened_df)

        # STEP 4: Upload transformed Data back to the GCS bucket in Silver layer location (in parquet)
        logging.info("Uploading transformed data to Silver layer in Google Cloud Storage...\n\n")
        transformed_df.write.mode("overwrite").parquet(config.PYSPARK_SILVER_LAYER)

        # STEP 5: Read data from Silver Layer for BigQuery loading
        logging.info("Reading data from Silver layer for BigQuery upload...\n\n")
        silver_layer_pandas_df = gcs_manager.read_silver_layer_data()

        # TASK 6: Upload data from Silver layer to BigQuery
        logging.info("Finally, Uploading data to BigQuery...\n\n")
        uploader = BigQueryUploader(config.PROJECT_ID, config.DATASET_ID, config.TABLE_ID, silver_layer_pandas_df, spark)
        transformed_df_for_bq = uploader.apply_transformations_for_bigquery()  # Apply transformations to prepare the data for BigQuery
        uploader.upload_to_bigquery(transformed_df_for_bq)  # Upload the transformed data to BigQuery

    except Exception as e:
        logging.error(f"An error occurred during the data processing pipeline: {e}")
    finally:
        # Stop Spark session
        logging.info("Everything is running smoothly 😊 \n\n")
        logging.info("Successfull : Now stopping Spark session...Good bye \n")
        spark.stop()
        logging.info("The project execution is successful! 🎉")

if __name__ == "__main__":
    main()


Overwriting /content/drive/MyDrive/Project_EQ/Pyspark/main_hist.py


## main.py - daily

In [4]:
%%writefile "/content/drive/MyDrive/Project_EQ/Pyspark/main_daily.py"

import os
import logging
from pyspark.sql import SparkSession
from seismic_data_pipeline import SeismicDataFetcher, CloudStorageManager, SeismicDataTransformation, BigQueryUploader
import config
def main():
    # Configure logging
    logging.basicConfig(level=logging.INFO)

    # Create Spark session
    spark = SparkSession.builder.master('local[*]').appName('SeismicDataPipeline').getOrCreate()

    # Set Google Cloud credentials
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "default_compute_key.json"  # Path to your service account key file

    try:
        # Fetch seismic data
        logging.info("Fetching seismic data(daily) from the source API...\n\n")
        seismic_fetcher = SeismicDataFetcher(config.SOURCE_API_URL_DAILY)  # Daily data URL
        fetched_data = seismic_fetcher.retrieve_data()

        # STEP 1: Ingest raw Data into GCS bucket in landing location (in .json format)
        logging.info("Uploading fetched seismic data to Google Cloud Storage (landing layer)...\n\n")
        gcs_manager = CloudStorageManager(config.GCS_BUCKET, config.PYSPARK_SILVER_LAYER_PATH)
        gcs_manager.upload_json_data(config.PYSPARK_LANDING_LOCATION, fetched_data)

        # STEP 2: Download raw json data from GCS bucket for flattening and processing
        logging.info("Downloading raw JSON data from Google Cloud Storage...\n\n")
        data_to_process = gcs_manager.download_json_data(config.PYSPARK_LANDING_LOCATION)

        # STEP 3: Perform transformations
        logging.info("Transforming raw data...\n\n")
        transformations = SeismicDataTransformation(spark, data_to_process)
        flattened_data_in_list_form = transformations.transform_to_flat_structure()  # Data in a list format
        flattened_df = transformations.create_spark_dataframe(flattened_data_in_list_form)
        transformed_df = transformations.apply_transformations(flattened_df)

        # STEP 4: Upload transformed Data back to the GCS bucket in Silver layer location (in parquet)
        logging.info("Uploading transformed data to Silver layer in Google Cloud Storage...\n\n")
        transformed_df.write.mode("overwrite").parquet(config.PYSPARK_SILVER_LAYER)

        # STEP 5: Read data from Silver Layer for BigQuery loading
        logging.info("Reading data from Silver layer for BigQuery upload...\n\n")
        silver_layer_pandas_df = gcs_manager.read_silver_layer_data()

        # TASK 6: Upload data from Silver layer to BigQuery
        logging.info("Uploading (daily) data to BigQuery...")
        uploader = BigQueryUploader(config.PROJECT_ID, config.DATASET_ID, config.TABLE_ID, silver_layer_pandas_df, spark)
        transformed_df_for_bq = uploader.apply_transformations_for_bigquery()  # Apply transformations to prepare the data for BigQuery
        uploader.upload_to_bigquery(transformed_df_for_bq)  # Upload the transformed data to BigQuery

    except Exception as e:
        logging.error(f"An error occurred during the data processing pipeline: {e}")
    finally:
        # Stop Spark session
        logging.info("Everything is running smoothly 😊 \n\n")
        logging.info("Successfull : Now stopping Spark session...Good bye \n")
        spark.stop()
        logging.info("The project execution is successful! 🎉")
if __name__ == "__main__":
    main()


Overwriting /content/drive/MyDrive/Project_EQ/Pyspark/main_daily.py


## default_compute_key.json

In [5]:
%%writefile "/content/drive/MyDrive/Project_EQ/Pyspark/default_compute_key.json"

{
  "type": "service_account",
  "project_id": "project-dipakraj1",
  "private_key_id": "49fe9764106942760488ba457e6f3880edee41d5",
  "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC69MKUe7po5Jsc\nminDO4aZWWbkKkqFHREMkCuov/3CDJuvrUiXrDN85Ev3VVgp664mFOxYxrRfaG8f\nzymHJImeFJh/SXZ3RO9TK04LEjwQszAD2LpT9EaZIHeQChjTkdvDMroasD2bHSBF\nvt2ftLPYEpRVevmHATW3qDCGGht0/inqi+rsxw1p3XHXyqiRQdLA7IWH9g7lF43o\nHnrCEd6NPkeT/9OYpQHBYGo30OJ5KtvdFGhQJk/Iom0EG8Z4wISHL7//+/JPckXo\n1tS19zM5gfWV+qZCvwwasjbI8novAdgIVnvP8GB0V9Umm0lhRex6v4+fR9o7s677\nlVqCR5uJAgMBAAECggEAESfwyytnecSwrzk+Z9oIif34QSvYVsrKC7sXJo8nvCzo\nBIEETlbRCr7wcav1QTONcZUt8x+Y4cB1oheimfIyZr1BGT0mj99Vju5EZSOxO18W\nIuQ6hQW/jMG79w3WJWQXJJx9E4HkxV0w3vWa5NLyRfu+fPGiAE+OHGJkwYmXx257\nKyftMqEwRPfZyiut3JT5HZxuHuXzlHPAOEs1mAGWeZFXep8rjp5ViUdWLLDSWhjG\nD0hFCLCRKPzot6zUj7K+HMoJGXPk0iKta3mXSvGKD3hWM1nWYExzGZ3Bf8DoS80O\nn3FCiOU/mGqdnrPwjOuh97tPbbvXt5+lCo4nwgOuYQKBgQDrmhtjEuBWHuz9MjUK\nB/arnSOexowklqoOLpgzmrD8acuXChfQPEDy/AMuIiDM1VpNwkllJRawgC5/f1uk\n3EqPGQYTcQ3DodRErw4Za7Udr1BOdSq4jFQMvfhlbu8twhSceSNGWwT04XJUB/l0\nbhjZAXDh1YHOrNevfR2EGpTrpQKBgQDLJHalMu+IHxb9SX3HuvoIS9FBiaW+nXYR\npMalAXF20c/ISvbRprfS2kUqNIyfxj8NolA8FRz+px6iqQmB4EHzEQTmjowuQFNc\nUDd1isE7jVO4gkoxjyK+ZtIIn/nufeVh2UWvG4mUXhqgem0UgkeZrFVws8E3QT5i\nqtuEU/h7FQKBgGAdTm7slHiNxUDF6r1c4lTAlnd/qEyE6ns+pvjmcq2Gu3eytmRN\nMHHwPZvkcF0f7OcsC6UnrEn1AQMigdCPzhryqnfj9ymIK4CZqbbVKd4iVzCqu/Lt\nzJwmI9+9kfWo55+uC6X4G92K24lMd4f4IkrlO5fl8j7OZG9DK3dpILfBAoGBAK8L\nRJElSvHomE65PQEWRM1RR4dJrOCLnmmlquYUUEkqkfjtX7FYD55JJsAvd91VXo/J\nBYpARzPHncZcEi0vqKRiTe69dl2EYFBzbtaCMe+24CVlX7lEtZOL1gBImXzATx/5\nsOoquGp4jEksNUIu4LOypAzxV9TdC3zJ7U4hbUPhAoGAIx6XFi08xsDS6mf8/pC5\nkkXl4y4+VZcHWHtGOdM7Qwg83FwujbTWVLfVB0UrVcaspco6csiYdPCFRenO0gBc\n/XU0nsuQcXScU1OlPEvFe5rK+TZt+lcSiK85h5oqZ/u2NCcTIIFvAqB+GHwS9vr2\nx5sxhfjDtrW5Op3Gn9Q3wAY=\n-----END PRIVATE KEY-----\n",
  "client_email": "771574848243-compute@developer.gserviceaccount.com",
  "client_id": "115938029565310472294",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/771574848243-compute%40developer.gserviceaccount.com",
  "universe_domain": "googleapis.com"
}


Overwriting /content/drive/MyDrive/Project_EQ/Pyspark/default_compute_key.json


## config.py

In [6]:
%%writefile "/content/drive/MyDrive/Project_EQ/Pyspark/config.py"

from datetime import datetime
date = datetime.now().strftime("%y%m%d")
# import os
# SERVICE_ACCOUNT_KEY = os.getenv('SERVICE_ACCOUNT_KEY', r"/tmp/default_compute_key.json")
# SERVICE_ACCOUNT_KEY = r"/content/drive/MyDrive/Colab Notebooks/Project_Earthquake/default_compute_key.json"

PROJECT_ID = "project-dipakraj1"
DATASET_ID = "earthquake_db"
TABLE_ID = "EQ_table_using_pyspark_colab"
GCS_BUCKET = "production-bucket-dipakraj1"
LOCATION = "us-central1"
LOCATION_FOR_DATASET = "US"

SOURCE_API_URL_HISTORICAL = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_month.geojson"
SOURCE_API_URL_DAILY = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.geojson"

PYSPARK_LANDING_LOCATION = f"pyspark_bronz/landing/{date}/raw_data_pyspark_{date}.json"

PYSPARK_SILVER_LAYER = f"gs://{GCS_BUCKET}/pyspark_silver/{date}/transformed_data_Pyspark_{date}/"
PYSPARK_SILVER_LAYER_PATH = f"pyspark_silver/{date}/transformed_data_Pyspark_{date}/"
TEMP_LOCATION = f"gs://{GCS_BUCKET}/temp"

Overwriting /content/drive/MyDrive/Project_EQ/Pyspark/config.py


In [7]:
# !if bq show --project_id=project-dipakraj earthquake_db.audit_table > /dev/null 2>&1; then echo "Table exists." else echo "Table does not exist." fi

In [8]:
# !bq show --project_id=project-dipakraj earthquake_db.audit_table && echo "Table exists." || echo "Table does not exist."

## **to execute** *or* **to run**

In [9]:
# Creating required things
import config
project_id = config.PROJECT_ID
bucket_name = config.GCS_BUCKET #"earthquake_data_bucket"
dataset = config.DATASET_ID #"Test_earthquake_db"
location = config.LOCATION_FOR_DATASET

# Authentication
!gcloud config unset project
!gcloud auth login
!gcloud config set project {project_id}

# create GCS storage bucket
!gcloud storage buckets create gs://{bucket_name} --location={location}
!gcloud services enable bigquery.googleapis.com

# # create bigquery dataset
!bq mk --dataset --description "This dataset is for earthquake data loading (historical plus incremental)" --location {location} {project_id}:{dataset}

# # Create a dataproc-cluster with basic configurations: wrt free trial account GCP
!gcloud dataproc clusters create cluster-for-eq-project --region us-central1 --zone us-central1-f --master-machine-type e2-standard-2 --master-boot-disk-type pd-balanced --master-boot-disk-size 30 --num-workers 2 --worker-machine-type e2-standard-2 --worker-boot-disk-type pd-balanced --worker-boot-disk-size 30 --image-version 2.0-debian10 --max-idle 7200s --project {project_id}

Unset property [core/project].
Go to the following link in your browser, and complete the sign-in prompts:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=32555940559.apps.googleusercontent.com&redirect_uri=https%3A%2F%2Fsdk.cloud.google.com%2Fauthcode.html&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fappengine.admin+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Faccounts.reauth&state=BH0gzov8YY0gdLzbEtosxLlTksYSpU&prompt=consent&token_usage=remote&access_type=offline&code_challenge=swojc5q2_n5aFSqyPRtO5RSk8wb1mhe5dklyqHn62nA&code_challenge_method=S256

Once finished, enter the verification code provided in your browser: 4/0AVG7fiTZ5Vg9gDijEl2lecQ3EnpJK2UDgy-kGYSeegsQRkSvCzxGVX21xi1lJUOdP_j-jQ

You are now logged in as [dipakrajpatil.

### Run a job -local

In [10]:
import os

os.chdir("/content/drive/MyDrive/Project_EQ/Pyspark")
print('Files:\n', os.listdir(),'\n')

# zipping all files to submit DataProc job
!zip -r my_spark_job_historical.zip ./

Files:
 ['__pycache__', 'seismic_data_pipeline.py', 'main_hist.py', 'default_compute_key.json', 'main_daily.py', 'config.py', 'my_spark_job_historical.zip'] 

updating: seismic_data_pipeline.py (deflated 75%)
updating: main_hist.py (deflated 62%)
updating: main_daily.py (deflated 62%)
updating: default_compute_key.json (deflated 30%)
updating: config.py (deflated 51%)
updating: __pycache__/ (stored 0%)
updating: __pycache__/config.cpython-310.pyc (deflated 35%)


In [11]:
# local testing

#uncomment if want to test locally
# !spark-submit --py-files my_spark_job_historical.zip main.py

### Run DataProc job - Historical Data

#### execute main_hist.py

In [12]:
# run main_hist.py

!gcloud dataproc jobs submit pyspark \
    --cluster cluster-for-eq-project \
    --region us-central1 \
    --files my_spark_job_historical.zip,default_compute_key.json \
    --py-files my_spark_job_historical.zip \
    main_hist.py

Job [8ceb24eb0adb43039accb30aeeb6e730] submitted.
Waiting for job output...
24/11/05 03:28:35 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
24/11/05 03:28:35 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
24/11/05 03:28:35 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
24/11/05 03:28:35 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
24/11/05 03:28:35 INFO org.sparkproject.jetty.util.log: Logging initialized @11435ms to org.sparkproject.jetty.util.log.Slf4jLog
24/11/05 03:28:35 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_422-b05
24/11/05 03:28:35 INFO org.sparkproject.jetty.server.Server: Started @11656ms
24/11/05 03:28:35 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@13682bd8{HTTP/1.1, (http/1.1)}{0.0.0.0:44917}
24/11/05 03:28:36 INFO com.google.cloud.dataproc.Datapro

#### execute main_daily.py

In [13]:
# run main_daily.py

!gcloud dataproc jobs submit pyspark \
    --cluster cluster-for-eq-project \
    --region us-central1 \
    --files my_spark_job_historical.zip,default_compute_key.json \
    --py-files my_spark_job_historical.zip \
    main_daily.py

Job [f0b9ce7f9ed14764b564bbc847cd1d93] submitted.
Waiting for job output...
24/11/05 03:29:25 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
24/11/05 03:29:25 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
24/11/05 03:29:25 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
24/11/05 03:29:25 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
24/11/05 03:29:25 INFO org.sparkproject.jetty.util.log: Logging initialized @8670ms to org.sparkproject.jetty.util.log.Slf4jLog
24/11/05 03:29:25 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_422-b05
24/11/05 03:29:26 INFO org.sparkproject.jetty.server.Server: Started @8907ms
24/11/05 03:29:26 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@60f6a977{HTTP/1.1, (http/1.1)}{0.0.0.0:44493}
24/11/05 03:29:26 INFO com.google.cloud.dataproc.DataprocS