In [None]:
# Mount ADLS Gen2
# We're setting up tiered storage paths in Azure Data Lake Storage Gen2 based on the Medallion Architecture (Bronze, Silver, Gold). 
# This is a common data lake design pattern used in Azure Synapse and Databricks to organize data efficiently

# Bronze Layer: Stores raw, unprocessed data (directly ingested from sources).
# Silver Layer: Stores cleaned and validated data (after transformations).
# Gold Layer: Stores aggregated and business-ready data (optimized for analytics).

tiers = ['bronze', 'silver', 'gold']
adls_paths = {tier: f'abfss://{tier}@tut03datalake.dfs.core.windows.net/' for tier in tiers}

# tut03datalake" is the name of your Azure Data Lake Storage Gen2 account.
# abfss:// → This is the Azure Blob File System (ABFSS) protocol, used for accessing files in Data Lake Storage Gen2.
# {tier} → This is a folder name inside the storage account (e.g., bronze, silver, gold).
# tut03datalake → This is your storage account name.
# .dfs.core.windows.net → This is the domain for Azure Data Lake Storage Gen2.

bronze_adls = adls_paths['bronze']
silver_adls = adls_paths['silver']
gold_adls = adls_paths['gold']

In [None]:
adls_paths

In [None]:
files = mssparkutils.fs.ls(bronze_adls)
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size, file.modifyTime)

In [None]:
import requests
import json
from datetime import date, timedelta

In [None]:
# Data for today and yesterday
start_date = date.today() - timedelta(days=1)
end_date = date.today()

In [None]:
start_date, end_date

In [None]:
# Constructing the API URL with start and end dates provided by Data Factory
url = f"https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&starttime={start_date}&endtime={end_date}"
# This constructs a URL to request earthquake data from the USGS API.
# The API returns data in GeoJSON format for the specified date range (start_date to end_date).

try:
    # Making the API Request
    # Make the GET request to fetch data
    response = requests.get(url)
    # Sends a GET request to the API.
    
    # Check if the request was successful
    response.raise_for_status() # Raise HTTPError for bad response
    # raise_for_status() ensures the request was successful—if not, it raises an error.
    data = response.json().get('features', [])

    if not data:
        print("No data returned for the specified date range.")
    else:
        # Specify the ADLS path
        file_path = f"{bronze_adls}/{start_date}_earthquake_data.json"

        # Convert data to JSON string
        json_data = json.dumps(data, indent=4)
        # Converts the extracted earthquake data into a formatted JSON string.

        # Write the JSON data to ADLS
        # Create an RDD with the JSON string and parallelize it
        rdd = spark.sparkContext.parallelize([json_data])
        # Creates an RDD (Resilient Distributed Dataset) from the JSON string.
        # Reads the RDD into a Spark DataFrame.
        # Limits the DataFrame to 3 rows (for faster processing in the tutorial).
        # Writes the DataFrame to ADLS in JSON format, overwriting any existing file.

        # Convert RDD to DataFrame and write to ADLS
        df = spark.read.json(rdd)
        df.limit(3) # To spead up processing for tutorial
        df.write.mode("overwrite").json(file_path)

        print(f"Data successfully saved to {file_path}")
except requests.exceptions.RequestException as e:
    print(f"Error fetching data from API: {e}")

In [None]:
data[0]

In [None]:
import json

# Defining variables
output_data = {
    "start_date" : start_date.isoformat(),
    "bronze_adls" : bronze_adls,
    "silver_adls" : silver_adls,
    "gold_adls" : gold_adls
}

# Seralizing the dictionary to a JSON format
bronze_output = json.dumps(output_data)
# Converts the dictionary into a JSON string (bronze_output).
# JSON format makes it easy to pass data between services.

# Passing the JSON output to the pipeline by using mssparkutils.notebook.exit
mssparkutils.notebook.exit(bronze_output)

# Allows the pipeline to access notebook results dynamically. 
# Enables automation—the pipeline can make decisions based on the notebook's output. 
# Improves data tracking—ensures key variables are available for further processing.