In [87]:
#!pip install requests
#!pip install psycopg2

Collecting psycopg2
  Downloading psycopg2-2.9.9-cp311-cp311-win_amd64.whl.metadata (4.5 kB)
Downloading psycopg2-2.9.9-cp311-cp311-win_amd64.whl (1.2 MB)
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   - -------------------------------------- 0.1/1.2 MB 1.3 MB/s eta 0:00:01
   ----------- ---------------------------- 0.3/1.2 MB 4.3 MB/s eta 0:00:01
   --------------------------------- ------ 1.0/1.2 MB 7.7 MB/s eta 0:00:01
   ---------------------------------------- 1.2/1.2 MB 7.4 MB/s eta 0:00:00
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.9


In [1]:
import requests
import pytz
from datetime import datetime, timedelta
import os
import tempfile
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, when,concat, lit,regexp_replace
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import concat_ws
import json
import psycopg2

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

### Define JDBC path
jar_path = os.path.abspath("./postgresql-42.7.4.jar")


try:
    spark.stop()
except:
    pass

#Create Spark session including JAR
spark = SparkSession.builder \
    .appName('pySparkSetup') \
    .config("spark.jars", jar_path) \
    .getOrCreate()

In [2]:
#DB Connection
url_db = "jdbc:postgresql://localhost:5432/postgres"  # Cambia la URL si tu DB tiene otro puerto o host
properties = {
    "user": "postgres",         # Coloca tu usuario de PostgreSQL
    "password": "postgres",  # Coloca tu contraseña de PostgreSQL
    "driver": "org.postgresql.Driver"
}

In [3]:
#Note: You should change the values to the database connection psycopg2
conn = psycopg2.connect(
    host="localhost",
    database="postgres",
    user="postgres",
    password="postgres"
)


In [4]:
#Check currently date
timezone = pytz.timezone('US/Pacific')

# datetime variables containing current date
date_now = datetime.now(timezone)

# Substract 7 days to fetch the last week
last_days=(date_now - timedelta(days=7))

date_now_str = date_now.strftime("%Y-%m-%dT%H:%M:%S%z")
last_days_str = last_days.strftime("%Y-%m-%dT%H:%M:%S%z")
print("date_now_str", date_now_str)
print("last_days_str", last_days_str)

#Now I can get the last 7 days in the api endpoint https://api.weather.gov/stations/station_id/observations/

date_now_str 2024-09-06T09:47:13-0700
last_days_str 2024-08-30T09:47:13-0700


In [5]:
# Variables declaration, if I need change the statition ID or another params to the request 

#Here I can change for other value, if the solution is called in AWS GLUE for example or called it in State machine
#The station_id will be in the arguments

station_id= "0128W" #"0112W" 


# Build ulr with the parameters
url_ep = f"https://api.weather.gov/stations/{station_id}/observations?start={last_days_str}&end={date_now_str}"


print("URL:", url_ep)

# Fetch de data
response = requests.get(url_ep)

#If response == 200 it is OK
if response.status_code == 200:
    data = response.json()

    #Save the JSON in temporal file, I don't know the size of data so this way keep less memory in usage
    with tempfile.NamedTemporaryFile(delete=False, suffix=".json", mode='w') as temp_file:
        json.dump(data['features'], temp_file)
        temp_filename = temp_file.name
    # Read the json file
    df = spark.read.json(temp_filename)
else:
    print("Error", response.status_code)
    print(response.text)



URL: https://api.weather.gov/stations/0128W/observations?start=2024-08-30T09:47:13-0700&end=2024-09-06T09:47:13-0700


In [6]:
# Create an UDF for get the name from other link properties.station with that I can go to the name,timeZone,coordinates of station.
def get_station_info(station_url):
    try:
        # Fetch data,
        response = requests.get(station_url)
        # if response equal to 200 it is OK
        if response.status_code == 200:
            station_data = response.json()
            
            #Get the station name
            station_name = station_data.get('properties', {}).get('name', "Unknown")
            
            #Get the station time_zone
            time_zone = station_data.get('properties', {}).get('timeZone', "Unknown")
            
            #Get the station coordinates
            coordinates = station_data.get('geometry', {}).get('coordinates', ["Unknown", "Unknown"])
            
            return (station_name, time_zone, coordinates[0], coordinates[1])  # Nombre, zona horaria, latitud, longitud    
        else:
            return ("Unknown", "Unknown", "Unknown", "Unknown")  # Default values
    except Exception as e:
        return ("Unknown", "Unknown", "Unknown", "Unknown")  # Exception default values



In [7]:
df_aux=df #Copy the data frame to avoid loose the original information

df_selected = df_aux.select(
    col("id"),
    col("properties.timestamp").alias("timestamp"),
    col("properties.temperature.value").alias("temperature_value"),
    col("properties.temperature.unitCode").alias("temperature_unit"),
    col("properties.windSpeed.value").alias("wind_speed_value"),
    col("properties.windSpeed.unitCode").alias("wind_speed_unit"),
    col("properties.relativeHumidity.value").alias("humidity_value"),
    col("properties.relativeHumidity.unitCode").alias("humidity_unit"),
    col("properties.station").alias("station_url")
)

get_station_info_udf = udf(get_station_info, ArrayType(StringType()))

#Using the UDF
df_selected = df_selected.withColumn("station_info", get_station_info_udf(col("station_url")))

#Splitting each data column
df_selected = df_selected.withColumn("station_name", col("station_info")[0])
df_selected = df_selected.withColumn("time_zone", col("station_info")[1])
df_selected = df_selected.withColumn("latitude", col("station_info")[2])
df_selected = df_selected.withColumn("longitude", col("station_info")[3])
df_selected = df_selected.withColumn("station_id",lit(station_id))
df_selected =  df_selected.withColumn("temperature_unit",regexp_replace(col("temperature_unit"), "wmoUnit:", ""))
df_selected =  df_selected.withColumn("humidity_unit",regexp_replace(col("humidity_unit"), "wmoUnit:", ""))
df_selected =  df_selected.withColumn("wind_speed_unit",regexp_replace(col("wind_speed_unit"), "wmoUnit:", ""))


#Drop column 'station_info' 
df_selected = df_selected.drop("station_info")


In [8]:
#Create a cursor
cursor = conn.cursor()

# Verificar si la tabla existe
cursor.execute("""
    SELECT EXISTS (
        SELECT 1
        FROM information_schema.tables 
        WHERE table_schema = 'public' 
        AND table_name = 'weather_information'
    );
""")

#Check if the table exists
table_exists = cursor.fetchone()[0]


if not table_exists:
    cursor.execute("""
        CREATE TABLE public.weather_information (
            station_id varchar NOT NULL,
            station_name varchar NULL,
            station_timezone varchar NULL,
            latitude float8 NULL,
            longitude float8 NULL,
            observation_timestamp varchar NULL,
            temperature varchar NULL,
            temperature_unit varchar NULL,
            wind_speed varchar NULL,
            wind_speed_unit varchar NULL,
            humidity varchar NULL,
            humidity_unit varchar NULL,
            createt_at timestamp(0) NULL DEFAULT now(),
            station_url varchar NULL,
            id varchar NULL
        );
    """)
    conn.commit()
    print("Table created successfully.")
else:
    print("Table exists.")

# Cerrar la conexión
cursor.close()
conn.close()

Table created successfully.


In [9]:
#Mapping each column to SQL table
df_sql = df_selected.select(
    col("station_id"),
    col("station_name"),
    col("time_zone").alias("station_timezone"),
    when(col("longitude") == "Unknown", None).otherwise(col("longitude").cast("float")).alias("longitude"), 
    when(col("latitude") == "Unknown", None).otherwise(col("latitude").cast("float")).alias("latitude"), 
    col("timestamp").alias("observation_timestamp"),
    col("temperature_value").alias("temperature"),
    col("temperature_unit"),
    col("wind_speed_value").alias("wind_speed"),
    col("wind_speed_unit"),
    col("humidity_value").alias("humidity"),
    col("humidity_unit"),
    col("station_url"),
    col("id")
)

#Query to get max  observation_timestamp in all table to ask Which information will be Insert or Not
query = "(SELECT MAX(observation_timestamp) as max_timestamp FROM public.weather_information WHERE station_id = '"+station_id+"' ) AS temp"


#Read query
df_max_timestamp = spark.read.jdbc(url=url_db, table=query, properties=properties)


max_timestamp = df_max_timestamp.collect()[0]["max_timestamp"]

#Filter the data to Insert only the rows wich timestamp greater than the max timestamp in the table
#If max timestamp does not exist will insert all data frame
if max_timestamp is None:
    df_filtered=df_sql
else:
    df_filtered = df_sql.filter(col("observation_timestamp") > max_timestamp)

# Mostrar el DataFrame filtrado
df_filtered.show()

#Write data frame in database
df_filtered.write.jdbc(url=url_db, table="weather_information", mode="append", properties=properties)


+----------+--------------------+----------------+---------+---------+---------------------+-----------+----------------+----------+---------------+---------------+-------------+--------------------+--------------------+
|station_id|        station_name|station_timezone|longitude| latitude|observation_timestamp|temperature|temperature_unit|wind_speed|wind_speed_unit|       humidity|humidity_unit|         station_url|                  id|
+----------+--------------------+----------------+---------+---------+---------------------+-----------+----------------+----------+---------------+---------------+-------------+--------------------+--------------------+
|     0128W|Ringling Museum o...|America/New_York| 27.38139|-82.55997| 2024-09-06T16:10:...|      33.72|            degC|     4.824|         km_h-1|63.420955014286|      percent|https://api.weath...|https://api.weath...|
|     0128W|Ringling Museum o...|America/New_York| 27.38139|-82.55997| 2024-09-06T15:40:...|      33.44|            