Import Required Libraries

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, min, max
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import time


Define European Capital Cities for Weather Data Collection

In [0]:
european_capitals = {
    "Paris": "https://www.timeanddate.com/weather/france/paris",
    "Berlin": "https://www.timeanddate.com/weather/germany/berlin",
    "Madrid": "https://www.timeanddate.com/weather/spain/madrid",
    "Rome": "https://www.timeanddate.com/weather/italy/rome",
    "London": "https://www.timeanddate.com/weather/uk/london",
    "Amsterdam": "https://www.timeanddate.com/weather/netherlands/amsterdam",
    "Brussels": "https://www.timeanddate.com/weather/belgium/brussels",
    "Vienna": "https://www.timeanddate.com/weather/austria/vienna",
    "Copenhagen": "https://www.timeanddate.com/weather/denmark/copenhagen",
    "Athens": "https://www.timeanddate.com/weather/greece/athens",
    "Lisbon": "https://www.timeanddate.com/weather/portugal/lisbon",
    "Dublin": "https://www.timeanddate.com/weather/ireland/dublin",
    "Stockholm": "https://www.timeanddate.com/weather/sweden/stockholm",
    "Oslo": "https://www.timeanddate.com/weather/norway/oslo",
    "Helsinki": "https://www.timeanddate.com/weather/finland/helsinki",
    "Warsaw": "https://www.timeanddate.com/weather/poland/warsaw",
    "Prague": "https://www.timeanddate.com/weather/czech-republic/prague",
    "Budapest": "https://www.timeanddate.com/weather/hungary/budapest",
    "Bratislava": "https://www.timeanddate.com/weather/slovakia/bratislava",
    "Ljubljana": "https://www.timeanddate.com/weather/slovenia/ljubljana",
    "Zagreb": "https://www.timeanddate.com/weather/croatia/zagreb",
    "Sofia": "https://www.timeanddate.com/weather/bulgaria/sofia",
    "Bucharest": "https://www.timeanddate.com/weather/romania/bucharest",
    "Tallinn": "https://www.timeanddate.com/weather/estonia/tallinn",
    "Riga": "https://www.timeanddate.com/weather/latvia/riga",
    "Vilnius": "https://www.timeanddate.com/weather/lithuania/vilnius",
    "Bern": "https://www.timeanddate.com/weather/switzerland/bern",
    "Reykjavik": "https://www.timeanddate.com/weather/iceland/reykjavik",
    "Luxembourg": "https://www.timeanddate.com/weather/luxembourg/luxembourg",
    "Valletta": "https://www.timeanddate.com/weather/malta/valletta"
}

Defining Web Scraping Function to Fetch Temperature

In [0]:
def get_weather(city, url):
    """Scrape the current temperature of a city from Time and Date."""
    headers = {"User-Agent": "Mozilla/5.0"}

    try:
        response = requests.get(url, headers=headers, timeout=5)

        # Debugging: Print first 500 characters of the response
        print(f"Checking {city}: {response.text[:500]}")

        if response.status_code == 200:
            soup = BeautifulSoup(response.text, "html.parser")

            # Check if the div exists
            temperature_element = soup.find("div", class_="h2")
            if not temperature_element:
                print(f"Could not find temperature for {city}. Check HTML structure.")
                return None
            
            temperature = temperature_element.text.strip()

            # Get current date
            date = datetime.now().strftime("%Y-%m-%d")
            
            return (date, city, temperature)

    except Exception as e:
        print(f"Error fetching {city}: {e}")

    return None


Fetching Weather Data for All Cities and Creating Spark DataFrame

In [0]:
weather_data = [get_weather(city, url) for city, url in european_capitals.items()]
weather_data = [data for data in weather_data if data]  # Remove None values

if not weather_data:
    print("No weather data scraped. Check website or connection.")
else:
    columns = ["Date", "City", "Temperature"]
    weather_df = spark.createDataFrame(weather_data, columns)
    weather_df.show()


Checking Paris: <!DOCTYPE html><!--
scripts and programs that download content transparent to the user are not allowed without permission
--><html lang=en><head><meta http-equiv=Content-Type content="text/html; charset=utf-8"><title>Weather for Paris, Paris, France</title><meta name=description content="Current weather in Paris and forecast for today, tomorrow, and next 14 days"><meta name=robots content="max-image-preview:large"><meta property="og:image" content="https://www.timeanddate.com/scripts/cityog.php?
Checking Berlin: <!DOCTYPE html><!--
scripts and programs that download content transparent to the user are not allowed without permission
--><html lang=en><head><meta http-equiv=Content-Type content="text/html; charset=utf-8"><title>Weather for Berlin, Germany</title><meta name=description content="Current weather in Berlin and forecast for today, tomorrow, and next 14 days"><meta name=robots content="max-image-preview:large"><meta property="og:image" content="https://www.timea

Converting Temperature Column to String Before Cleaning


In [0]:
from pyspark.sql.functions import col

# Convert Temperature column to string before processing
weather_df = weather_df.withColumn("Temperature", col("Temperature").cast("string"))
weather_df.show()


+----------+----------+-----------+
|      Date|      City|Temperature|
+----------+----------+-----------+
|2025-03-14|     Paris|      39 °F|
|2025-03-14|    Berlin|      46 °F|
|2025-03-14|    Madrid|      48 °F|
|2025-03-14|      Rome|      66 °F|
|2025-03-14|    London|      46 °F|
|2025-03-14| Amsterdam|      43 °F|
|2025-03-14|  Brussels|      43 °F|
|2025-03-14|    Vienna|      43 °F|
|2025-03-14|Copenhagen|      36 °F|
|2025-03-14|    Athens|      72 °F|
|2025-03-14|    Lisbon|      59 °F|
|2025-03-14|    Dublin|      46 °F|
|2025-03-14| Stockholm|      39 °F|
|2025-03-14|      Oslo|      40 °F|
|2025-03-14|  Helsinki|      32 °F|
|2025-03-14|    Warsaw|      45 °F|
|2025-03-14|    Prague|      41 °F|
|2025-03-14|  Budapest|      55 °F|
|2025-03-14|Bratislava|      46 °F|
|2025-03-14| Ljubljana|      44 °F|
+----------+----------+-----------+
only showing top 20 rows



Cleaning and Converting Temperature Column to Float

In [0]:
from pyspark.sql.functions import regexp_replace, trim

# Clean and convert the Temperature column
weather_df = weather_df.withColumn(
    "Temperature",
    regexp_replace(trim(col("Temperature")), "[^0-9.-]", "").cast("float")
)

weather_df.show()


+----------+----------+-----------+
|      Date|      City|Temperature|
+----------+----------+-----------+
|2025-03-14|     Paris|       39.0|
|2025-03-14|    Berlin|       46.0|
|2025-03-14|    Madrid|       48.0|
|2025-03-14|      Rome|       66.0|
|2025-03-14|    London|       46.0|
|2025-03-14| Amsterdam|       43.0|
|2025-03-14|  Brussels|       43.0|
|2025-03-14|    Vienna|       43.0|
|2025-03-14|Copenhagen|       36.0|
|2025-03-14|    Athens|       72.0|
|2025-03-14|    Lisbon|       59.0|
|2025-03-14|    Dublin|       46.0|
|2025-03-14| Stockholm|       39.0|
|2025-03-14|      Oslo|       40.0|
|2025-03-14|  Helsinki|       32.0|
|2025-03-14|    Warsaw|       45.0|
|2025-03-14|    Prague|       41.0|
|2025-03-14|  Budapest|       55.0|
|2025-03-14|Bratislava|       46.0|
|2025-03-14| Ljubljana|       44.0|
+----------+----------+-----------+
only showing top 20 rows



Finding Cities with Minimum and Maximum Temperature

In [0]:
# Get the minimum temperature
min_temp = weather_df.select(min("Temperature")).collect()[0][0]
min_temp_city = weather_df.filter(weather_df["Temperature"] == min_temp).select("City").collect()

# Get the maximum temperature
max_temp = weather_df.select(max("Temperature")).collect()[0][0]
max_temp_city = weather_df.filter(weather_df["Temperature"] == max_temp).select("City").collect()

# Display results
print(f"Coldest City: {min_temp_city[0]['City']} with {min_temp}°C")
print(f"Hottest City: {max_temp_city[0]['City']} with {max_temp}°C")


Coldest City: Helsinki with 32.0°C
Hottest City: Athens with 72.0°C
