<a href="https://colab.research.google.com/github/AAAloui/AAAloui/blob/main/WeatherDataPySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Weather data on pyspark

In this notebook I'll try to get the weather data from an open source and free api, format that data and visualize it.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('weather').getOrCreate()

## Open Meteo Data

First we start by creating a function that requests data from open meteo api for specific location for each hour of the day.

In [None]:
import requests

params =  ["temperature_2m", "relative_humidity_2m", "dew_point_2m", "apparent_temperature", "precipitation_probability", "precipitation", "rain", "showers", "snowfall", "snow_depth", "weather_code", "pressure_msl", "surface_pressure", "cloud_cover", "cloud_cover_low", "cloud_cover_mid", "cloud_cover_high", "visibility", "evapotranspiration", "et0_fao_evapotranspiration", "vapour_pressure_deficit", "wind_speed_10m", "wind_speed_80m", "wind_speed_120m", "wind_speed_180m", "wind_direction_10m", "wind_direction_80m", "wind_direction_120m", "wind_direction_180m", "wind_gusts_10m", "temperature_80m", "temperature_120m", "temperature_180m", "soil_temperature_0cm", "soil_temperature_6cm", "soil_temperature_18cm", "soil_temperature_54cm", "soil_moisture_0_to_1cm", "soil_moisture_1_to_3cm", "soil_moisture_3_to_9cm", "soil_moisture_9_to_27cm", "soil_moisture_27_to_81cm"]


def get_weather_data(latitude, longitude, param_list):
    # Open-Meteo API endpoint for current weather
    url = "https://api.open-meteo.com/v1/forecast"

    # Define the query parameters
    params = {
        "latitude": 52.52,
        "longitude": 13.41,
        "hourly": param_list
        }

    try:
        # Make a GET request to the API
        response = requests.get(url, params=params)
        response.raise_for_status()  # Raise an error for bad status

        # Parse the JSON response
        weather_data = response.json()

        return weather_data

    except requests.RequestException as e:
        print("Error fetching weather data:", e)
        return None


We execute our function with Paris latitude/longitude to see what the data looks like.

In [None]:
# Example: Get weather data for Paris, France (Latitude: 48.8566, Longitude: 2.3522)
latitude = 48.8566
longitude = 2.3522
get_weather_data(latitude, longitude)

{'latitude': 52.52,
 'longitude': 13.419998,
 'generationtime_ms': 3.903031349182129,
 'utc_offset_seconds': 0,
 'timezone': 'GMT',
 'timezone_abbreviation': 'GMT',
 'elevation': 38.0,
 'hourly_units': {'time': 'iso8601',
  'temperature_2m': '°C',
  'relative_humidity_2m': '%',
  'dew_point_2m': '°C',
  'apparent_temperature': '°C',
  'precipitation_probability': '%',
  'precipitation': 'mm',
  'rain': 'mm',
  'showers': 'mm',
  'snowfall': 'cm',
  'snow_depth': 'm',
  'weather_code': 'wmo code',
  'pressure_msl': 'hPa',
  'surface_pressure': 'hPa',
  'cloud_cover': '%',
  'cloud_cover_low': '%',
  'cloud_cover_mid': '%',
  'cloud_cover_high': '%',
  'visibility': 'm',
  'evapotranspiration': 'mm',
  'et0_fao_evapotranspiration': 'mm',
  'vapour_pressure_deficit': 'kPa',
  'wind_speed_10m': 'km/h',
  'wind_speed_80m': 'km/h',
  'wind_speed_120m': 'km/h',
  'wind_speed_180m': 'km/h',
  'wind_direction_10m': '°',
  'wind_direction_80m': '°',
  'wind_direction_120m': '°',
  'wind_directio

Our data is in json format and the api response has some header attributes such as the lat/long of the city and the measurment units and then the hourly measurment data. To create a data frame with data we start by getting on the hourly measurments.

In [None]:
weather_data = get_weather_data(latitude, longitude)
hourly_ceather_data = weather_data["hourly"]

hourly_weather_data_spark_df = spark.createDataFrame(pd.DataFrame(hourly_ceather_data))

In [None]:
hourly_weather_data_spark_df.show()

+----------------+--------------+--------------------+------------+--------------------+-------------------------+-------------+----+-------+--------+----------+------------+------------+----------------+-----------+---------------+---------------+----------------+----------+------------------+--------------------------+-----------------------+--------------+--------------+---------------+---------------+------------------+------------------+-------------------+-------------------+--------------+---------------+----------------+----------------+--------------------+--------------------+---------------------+---------------------+----------------------+----------------------+----------------------+-----------------------+------------------------+
|            time|temperature_2m|relative_humidity_2m|dew_point_2m|apparent_temperature|precipitation_probability|precipitation|rain|showers|snowfall|snow_depth|weather_code|pressure_msl|surface_pressure|cloud_cover|cloud_cover_low|cloud_cover_

In [None]:
head_weather_data = {key: value for key, value in weather_data.items() if key != "hourly"}

head_weather_data_spark_df = spark.createDataFrame(pd.DataFrame(head_weather_data))

In [None]:
# prompt: I want to create a spark dataframe with the weather_data json using all columns except hourly and hourly_units

from pyspark.sql import SparkSession
import requests
from pyspark.sql import DataFrame, functions as F
import pandas as pd


head_weather_data = {key: value for key, value in weather_data.items() if key not in ("hourly", "hourly_units")}

# Create a pandas DataFrame with an explicit index
head_weather_data_pd_df = pd.DataFrame(head_weather_data, index=[0])  # Using [0] for a single-row index

# Convert the pandas DataFrame to a Spark DataFrame
head_weather_data_spark_df = spark.createDataFrame(head_weather_data_pd_df)
head_weather_data_spark_df.show()

+--------+---------+------------------+------------------+--------+---------------------+---------+
|latitude|longitude| generationtime_ms|utc_offset_seconds|timezone|timezone_abbreviation|elevation|
+--------+---------+------------------+------------------+--------+---------------------+---------+
|   52.52|13.419998|2.2699832916259766|                 0|     GMT|                  GMT|     38.0|
+--------+---------+------------------+------------------+--------+---------------------+---------+



In [None]:
from pyspark.sql import functions as F
head_weather_data_spark_df =  head_weather_data_spark_df.withColumn("time_unit",F.lit("iso8601"))\
      .withColumn("temperature_unit",F.lit("°C"))\
      .withColumn("wind_speed_10m",F.lit("km/h"))\
      .withColumn("relative_humidity_2m",F.lit("%"))

In [None]:
head_weather_data_spark_df.show()

+--------+---------+------------------+------------------+--------+---------------------+---------+---------+----------------+--------------+--------------------+
|latitude|longitude| generationtime_ms|utc_offset_seconds|timezone|timezone_abbreviation|elevation|time_unit|temperature_unit|wind_speed_10m|relative_humidity_2m|
+--------+---------+------------------+------------------+--------+---------------------+---------+---------+----------------+--------------+--------------------+
|   52.52|13.419998|2.2699832916259766|                 0|     GMT|                  GMT|     38.0|  iso8601|              °C|          km/h|                   %|
+--------+---------+------------------+------------------+--------+---------------------+---------+---------+----------------+--------------+--------------------+



## Cities Data

For the cities data we use a csv file from Kaggle that we download to google drive. This data will be used to add value to our weather by coordinates data and will allow us to do other aggregations such as per country, or even continent and capital type.


In [None]:
# prompt: I want to unmount drive and remount it

from google.colab import drive
drive.flush_and_unmount()
print("Drive unmounted successfully.")

drive.mount('/content/drive')
print("Drive remounted successfully.")


Drive unmounted successfully.
Mounted at /content/drive
Drive remounted successfully.


In [None]:
%%sh

ls drive/MyDrive/Datasets/

worldcities.csv
worldcities.csv.zip


In [None]:
# Assuming your CSV file is in 'MyDrive/path/to/your/file'
file_path = 'drive/MyDrive/Datasets/worldcities.csv'  # Replace with the actual path

# Read the CSV file into a PySpark DataFrame
world_cities_df = spark.read.csv(file_path, header=True, inferSchema=True)

# Display the first few rows of the DataFrame
world_cities_df.show()

+------------+------------+--------+--------+-------------+----+----+--------------------+-------+-----------+----------+
|        city|  city_ascii|     lat|     lng|      country|iso2|iso3|          admin_name|capital| population|        id|
+------------+------------+--------+--------+-------------+----+----+--------------------+-------+-----------+----------+
|       Tokyo|       Tokyo| 35.6897|139.6922|        Japan|  JP| JPN|               Tōkyō|primary|   3.7732E7|1392685764|
|     Jakarta|     Jakarta|  -6.175|106.8275|    Indonesia|  ID| IDN|             Jakarta|primary|   3.3756E7|1360771077|
|       Delhi|       Delhi|   28.61|   77.23|        India|  IN| IND|               Delhi|  admin|   3.2226E7|1356872604|
|   Guangzhou|   Guangzhou|   23.13|  113.26|        China|  CN| CHN|           Guangdong|  admin|    2.694E7|1156237133|
|      Mumbai|      Mumbai| 19.0761| 72.8775|        India|  IN| IND|         Mahārāshtra|  admin|   2.4973E7|1356226629|
|      Manila|      Mani

In [None]:
world_cities_df.printSchema()

root
 |-- city: string (nullable = true)
 |-- city_ascii: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- country: string (nullable = true)
 |-- iso2: string (nullable = true)
 |-- iso3: string (nullable = true)
 |-- admin_name: string (nullable = true)
 |-- capital: string (nullable = true)
 |-- population: double (nullable = true)
 |-- id: integer (nullable = true)



In [None]:
lat, lng, city = world_cities_df.select('lat', 'lng', 'city').first()
print(city,get_weather_data(lat, lng))

Tokyo {'latitude': 52.52, 'longitude': 13.419998, 'generationtime_ms': 1.437067985534668, 'utc_offset_seconds': 0, 'timezone': 'GMT', 'timezone_abbreviation': 'GMT', 'elevation': 38.0, 'hourly_units': {'time': 'iso8601', 'temperature_2m': '°C', 'relative_humidity_2m': '%', 'dew_point_2m': '°C', 'apparent_temperature': '°C', 'precipitation_probability': '%', 'precipitation': 'mm', 'rain': 'mm', 'showers': 'mm', 'snowfall': 'cm', 'snow_depth': 'm', 'weather_code': 'wmo code', 'pressure_msl': 'hPa', 'surface_pressure': 'hPa', 'cloud_cover': '%', 'cloud_cover_low': '%', 'cloud_cover_mid': '%', 'cloud_cover_high': '%', 'visibility': 'm', 'evapotranspiration': 'mm', 'et0_fao_evapotranspiration': 'mm', 'vapour_pressure_deficit': 'kPa', 'wind_speed_10m': 'km/h', 'wind_speed_80m': 'km/h', 'wind_speed_120m': 'km/h', 'wind_speed_180m': 'km/h', 'wind_direction_10m': '°', 'wind_direction_80m': '°', 'wind_direction_120m': '°', 'wind_direction_180m': '°', 'wind_gusts_10m': 'km/h', 'temperature_80m': 

## France
We will create a a dataframe with france cities and get the hourly weather measurments for those cities.

In [None]:
france_cities_df = world_cities_df.filter(world_cities_df.country == 'France')
france_cities_df.show()

+-------------+-------------+-------+-------+-------+----+----+--------------------+-------+----------+----------+
|         city|   city_ascii|    lat|    lng|country|iso2|iso3|          admin_name|capital|population|        id|
+-------------+-------------+-------+-------+-------+----+----+--------------------+-------+----------+----------+
|        Paris|        Paris|48.8567| 2.3522| France|  FR| FRA|       Île-de-France|primary|   1.106E7|1250015082|
|     Bordeaux|     Bordeaux|  44.84|  -0.58| France|  FR| FRA|  Nouvelle-Aquitaine|  admin|  994920.0|1250449238|
|    Marseille|    Marseille|43.2964|   5.37| France|  FR| FRA|Provence-Alpes-Cô...|  admin|  873076.0|1250774071|
|         Lyon|         Lyon|  45.76|   4.84| France|  FR| FRA|Auvergne-Rhône-Alpes|  admin|  522250.0|1250196189|
|     Toulouse|     Toulouse|43.6045|  1.444| France|  FR| FRA|           Occitanie|  admin|  504078.0|1250258110|
|         Nice|         Nice|43.7034| 7.2663| France|  FR| FRA|Provence-Alpes-Cô

In [None]:
france_cities_hourly_meteo_df = france_cities_df.withColumn("hourly_meteo", F.udf(lambda lat, lng: get_weather_data(lat, lng, ["temperature_2m"])["hourly"])("lat", "lng"))

In [None]:
display(france_cities_hourly_meteo_df.select("hourly_meteo").show())

+--------------------+
|        hourly_meteo|
+--------------------+
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
|{temperature_2m=[...|
+--------------------+
only showing top 20 rows



None