In [0]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

spark = SparkSession.builder.appName("WeatherDataDBFS").getOrCreate()

Snowflake credentials for connecting the snowflake table.
Reading the data from Snowflake.

In [0]:
snowflake_options = {
    "sfURL": "EJDJBCL-ZI07581.snowflakecomputing.com",
    "sfDatabase": "PLACES_COORDS",
    "sfSchema": "PUBLIC",
    "sfWarehouse": "COMPUTE_WH",
    "sfUser": "ASWINK756",
    "sfPassword": "Aswin@123"
}

places_df = spark.read.format("snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "PLACES_LAT_LONG") \
    .load()

Type casting and converting df to Python dictionary for WeatherAPI integration.

In [0]:

places_df = places_df.withColumn("SERIAL_NO", col("SERIAL_NO").cast("int")) \
                     .withColumn("LATITUDE", col("LATITUDE").cast("float")) \
                     .withColumn("LONGITUDE", col("LONGITUDE").cast("float"))


places_data = places_df.select("SERIAL_NO", "PLACE", "LATITUDE", "LONGITUDE").rdd.map(lambda row: {
    "serial_no": row["SERIAL_NO"],
    "place": row["PLACE"],
    "lat": row["LATITUDE"],
    "lon": row["LONGITUDE"]
}).collect()


WeatherAPI Endpoint and Key

In [0]:
url = "http://api.weatherapi.com/v1/current.json"
api_key = "c7f5143f818f45b0b0a131645241911"

weather_data = []

Loops through each place in the dictionary and queries WeatherAPI using latitude and longitude.

In [0]:
for place in places_data:
    params = {
        "key": api_key,
        "q": f"{place['lat']},{place['lon']}"  # Use latitude and longitude
    }

    response = requests.get(url, params=params)

    if response.status_code == 200:
        data = response.json()
        current = data["current"]
        weather_data.append({
            "S.No": place["serial_no"],
            "City": place["place"],
            "Time Stamp": current["last_updated"],
            "Temperature (°C)": float(current["temp_c"]),
            "Humidity (%)": current["humidity"],
            "Weather Condition": current["condition"]["text"]
        })
    else:
        print(f"Failed to fetch weather for {place['place']}: {response.status_code}, {response.text}")

Pyspark df schema

In [0]:
schema = StructType([
    StructField("S.No", IntegerType(), True),
    StructField("City", StringType(), True),
    StructField("Time Stamp", StringType(), True),
    StructField("Temperature (°C)", FloatType(), True),
    StructField("Humidity (%)", IntegerType(), True),
    StructField("Weather Condition", StringType(), True),
])
weather_df = spark.createDataFrame(weather_data, schema=schema)


Saving df to DBFS


In [0]:
dbfs_path = "dbfs:/mnt/data/weather_data"
weather_df.coalesce(1).write.format("csv").option("header", "true").mode("overwrite").save(dbfs_path)

print(f"Weather data successfully saved to {dbfs_path}")


Weather data successfully saved to dbfs:/mnt/data/weather_data


Verifying the saved file

In [0]:
dbutils.fs.ls("dbfs:/mnt/data/weather_data")

Out[41]: [FileInfo(path='dbfs:/mnt/data/weather_data/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1732624803000),
 FileInfo(path='dbfs:/mnt/data/weather_data/_committed_1467219106847776944', name='_committed_1467219106847776944', size=209, modificationTime=1732623090000),
 FileInfo(path='dbfs:/mnt/data/weather_data/_committed_2519200640333390962', name='_committed_2519200640333390962', size=197, modificationTime=1732618439000),
 FileInfo(path='dbfs:/mnt/data/weather_data/_committed_277864075190282229', name='_committed_277864075190282229', size=198, modificationTime=1732624803000),
 FileInfo(path='dbfs:/mnt/data/weather_data/_committed_5929951757320236098', name='_committed_5929951757320236098', size=197, modificationTime=1732621117000),
 FileInfo(path='dbfs:/mnt/data/weather_data/_committed_8459937103913108008', name='_committed_8459937103913108008', size=197, modificationTime=1732522689000),
 FileInfo(path='dbfs:/mnt/data/weather_data/_started_1467219106847776944', name='_sta

Reads the saved CSV file back into a PySpark DataFrame and displays the output.


In [0]:
weather_df = spark.read.csv("dbfs:/mnt/data/weather_data", header=True, inferSchema=True)
weather_df.display()

S.No,City,Time Stamp,Temperature (°C),Humidity (%),Weather Condition
1,Jaipur,2024-11-26T17:45:00.000+0000,25.3,32,Mist
2,Nanjangud,2024-11-26T17:45:00.000+0000,22.5,77,Patchy rain nearby
3,Chittorgarh,2024-11-26T17:45:00.000+0000,23.4,25,Sunny
4,Ratnagiri,2024-11-26T18:00:00.000+0000,24.8,61,Clear
5,Goregaon,2024-11-26T18:00:00.000+0000,28.1,48,Overcast
6,Pindwara,2024-11-26T18:00:00.000+0000,22.0,31,Clear
7,Raipur,2024-11-26T18:00:00.000+0000,20.6,35,Clear
8,Gokak,2024-11-26T18:00:00.000+0000,22.5,37,Clear
9,Lucknow,2024-11-26T18:00:00.000+0000,21.3,31,Clear
10,Delhi,2024-11-26T18:00:00.000+0000,23.0,41,Mist


Storing output table to Snowflake

In [0]:
weather_df.write.format("snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "Weather_Live_Date") \
    .mode("overwrite") \
    .save()

print("Data saved succesfully")

Data saved succesfully
