**The purpose of this code is to ensure that the Azure Data Lake Storage directory for Open Weather API data is properly mounted. It first checks if the specified mount point is already active to avoid redundancy. If not, it securely mounts the directory using OAuth credentials. This step is crucial for establishing seamless access to cloud storage, enabling efficient data ingestion and processing.**


In [0]:
mount_point = "/mnt/preprocessdata/Open-weather-API/"
if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    print(f"{mount_point} is already mounted.")
else:
    # Configuration for Azure Data Lake Storage with OAuth credentials
    configs = {
        "fs.azure.account.auth.type": "OAuth",
        "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
        "fs.azure.account.oauth2.client.id": "56105415-cb9f-436c-b2b6-a0472cf7aecb",
        "fs.azure.account.oauth2.client.secret": "d568Q~ib_Do2M6XP9RuCK8hqBFMQc.HtyF4CObo7",
        "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/580abb99-e0ac-43be-805d-b1551dffaa63/oauth2/token"
    }

    # Mount the directory if not already mounted
    dbutils.fs.mount(
        source="abfss://mymaincontainer@mainprojectwis.dfs.core.windows.net/",
        mount_point=mount_point,
        extra_configs=configs
    )
    print(f"Mounted {mount_point} successfully.")


/mnt/preprocessdata/Open-weather-API/ is already mounted.


In [0]:
display(dbutils.fs.ls("/mnt/preprocessdata/Open-weather-API/Open-weather-API"))


path,name,size,modificationTime
dbfs:/mnt/preprocessdata/Open-weather-API/Open-weather-API/cleaned-data/,cleaned-data/,0,1733091520000
dbfs:/mnt/preprocessdata/Open-weather-API/Open-weather-API/raw-data.json,raw-data.json,38003,1733010173000


Now, let's process a raw JSON dataset containing weather data by flattening its nested structure and transforming key attributes, such as temperature, into human-readable formats (e.g., Celsius). It also extracts relevant attributes like humidity, wind speed, and weather descriptions for analysis and saves the cleaned dataset as a CSV file for further use.



In [0]:
from pyspark.sql.functions import col, explode

# Load the JSON file
raw_data = spark.read.format("json") \
    .option("multiLine", "true") \
    .load("/mnt/preprocessdata/Open-weather-API/Open-weather-API/raw-data.json")

# Flatten the nested structure and convert temperature to Celsius
cleaned_data = raw_data.select(
    explode(col("list")).alias("list_flat")
).select(
    col("list_flat.dt").alias("timestamp"),
    ((col("list_flat.main.temp") - 273.15)).alias("temperature_celsius"),  # Convert to Celsius
    ((col("list_flat.main.feels_like") - 273.15)).alias("feels_like_celsius"),  # Convert to Celsius
    col("list_flat.main.pressure").alias("pressure"),
    col("list_flat.main.humidity").alias("humidity"),
    col("list_flat.main.temp_min").alias("temp_min_celsius"),
    col("list_flat.main.temp_max").alias("temp_max_celsius"),
    col("list_flat.wind.speed").alias("wind_speed"),
    col("list_flat.wind.deg").alias("wind_direction"),
    col("list_flat.clouds.all").alias("clouds"),
    explode(col("list_flat.weather")).alias("weather_flat")
).select(
    col("timestamp"),
    col("temperature_celsius"),
    col("feels_like_celsius"),
    col("pressure"),
    col("humidity"),
    col("temp_min_celsius"),
    col("temp_max_celsius"),
    col("wind_speed"),
    col("wind_direction"),
    col("clouds"),
    col("weather_flat.main").alias("weather_main"),
    col("weather_flat.description").alias("weather_description")
)


In [0]:
import pandas as pd
import os

# Convert PySpark DataFrame to Pandas DataFrame if needed
if not isinstance(cleaned_data, pd.DataFrame):
    cleaned_data = cleaned_data.toPandas()

# Map existing column names to the desired column names
column_mapping = {
    "timestamp": "timestamp",
    "temperature_celsius": "temperature",
    "feels_like_celsius": "dew_point_temperature",  # Assuming alignment with "feels_like_celsius"
    "pressure": "station_level_pressure",
    "humidity": "relative_humidity",
    "temp_min_celsius": "temp_min_celsius",  # Placeholder for non-target columns
    "temp_max_celsius": "temp_max_celsius",  # Placeholder for non-target columns
    "wind_speed": "wind_speed",
    "wind_direction": "wind_direction",
    "clouds": "Sky_Cover",
    "weather_main": "Weather_Description",
    "weather_description": "Weather_Description"  # Assuming redundancy
}

# Rename columns
data = cleaned_data.rename(columns=column_mapping)

# Define target schema and retain matching columns
target_columns = [
    "temperature", "dew_point_temperature", "station_level_pressure",
    "sea_level_pressure", "wind_direction", "wind_speed", "precipitation",
    "relative_humidity", "visibility", "altimeter", "timestamp",
    "Weather_Description", "Sky_Cover"
]
data = data[[col for col in target_columns if col in data.columns]]

# Define the output directory and file path
output_directory = "/dbfs/mnt/preprocessdata/Open-weather-API/Open-weather-API/cleaned-data"
output_csv_path = os.path.join(output_directory, "cleaned_data.csv")

# Create the directory if it doesn't exist and save the DataFrame to a CSV file
os.makedirs(output_directory, exist_ok=True)
data.to_csv(output_csv_path, index=False)
