In [1]:
from pymongo import MongoClient
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round
import config
import psycopg2

In [2]:
def check_postgres_health():
    """Check PostgreSQL database health"""
    try:
        conn = psycopg2.connect(
            dbname=config.postG_db,
            user=config.user,
            password=config.password,
            host=config.postG_host,
            port=config.postG_port
        )
        cursor = conn.cursor()
        
        cursor.execute("ANALYZE;")
        
        # Check database size
        cursor.execute("SELECT pg_size_pretty(pg_database_size(current_database()))")
        db_size = cursor.fetchone()[0]
        
        # Check table sizes and counts
        cursor.execute("""
            SELECT 
                relname as table_name,
                pg_size_pretty(pg_total_relation_size(relid)) as table_size,
                n_live_tup as row_count
            FROM pg_stat_user_tables
            ORDER BY pg_total_relation_size(relid) DESC;
        """)
        table_stats = cursor.fetchall()
        
        # Log results
        with open('/home/nicholas/Documents/IOT_Weather/db_logs/db_health.log', 'a') as f:
            f.write(f"\n--- PostgreSQL Health Check ({datetime.now()}) ---\n")
            f.write(f"Database Size: {db_size}\n")
            f.write("Table Statistics:\n")
            for table in table_stats:
                f.write(f"  - {table[0]}: Size={table[1]}, Rows={table[2]}\n")
            
        cursor.close()
        conn.close()
        return "PostgreSQL health check completed"
    except Exception as e:
        return f"PostgreSQL health check failed: {e}"

In [3]:
check_postgres_health()

'PostgreSQL health check completed'

In [2]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("MongoDB to PostGres") \
    .config("spark.mongodb.input.uri", f"mongodb://{config.user}:{config.password}@{config.mongo_host}:{config.mongo_port}/{config.mongo_db}.iot_weather") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,org.postgresql:postgresql:42.6.0") \
    .getOrCreate()

# MongoDB connection setup
mongo_host = config.mongo_host
mongo_port = config.mongo_port
mongo_db = config.mongo_db
mongo_collection = "iot_weather"

# PostgreSQL connection properties
postgres_properties = {
    "user": config.user,
    "password": config.password,
    "driver": "org.postgresql.Driver"
}

# Connect to MongoDB
client = MongoClient(mongo_host, mongo_port, username=config.user, password=config.password)
db = client[mongo_db]
collection = db[mongo_collection]

25/03/01 15:52:45 WARN Utils: Your hostname, nicholas-MINI-S resolves to a loopback address: 127.0.1.1; using 192.168.0.245 instead (on interface wlo1)
25/03/01 15:52:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/nicholas/miniconda3/envs/iot_weather/lib/python3.12/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/nicholas/.ivy2/cache
The jars for the packages stored in: /home/nicholas/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-073da3d4-e1bb-4324-bd60-03b2e65c23f7;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
	found org.postgresql#postgresql;42.6.0 in central
	found org.checkerframework#checker-qual;3.31.0 in central
downloading https://repo1.maven.org/maven2/org/postgresql/postgresql/42.6.0/postgresql-42.6.0.jar ...
	[SUCCESSFUL ] org.postgresql#postgresql;42.6.0!postgresql.jar (815ms)
downloading https://repo1.maven.org/maven2/org/checkerframework/checker-qual/3.31.0/checker-qual-3.31.0.jar ...
	[SUCCESSFUL ] org.checkerf

In [15]:
# Calculate the date one hour ago from today
one_hour_ago_timestamp = (datetime.now() - timedelta(hours=1)).timestamp()

# Load data from MongoDB, filtering for data from one hour ago
weather_data = spark.read.format("mongo") \
    .option("uri", f"mongodb://{config.user}:{config.password}@{mongo_host}:{mongo_port}/{mongo_db}.{mongo_collection}?authSource=admin") \
    .load() \
    .filter(col("dt") >= one_hour_ago_timestamp)

# Register the data frame as a temporary view for SQL queries
weather_data.createOrReplaceTempView("weather_raw")

# SQL query to calculate averages with proper handling of null rain values
avg_temps_sql = """
SELECT 
    ROUND(coord.lon, 2) AS lon, 
    ROUND(coord.lat, 2) AS lat,
    ROUND(AVG(main.temp), 2) AS avg_temperature,
    ROUND(AVG(main.humidity), 2) AS avg_humidity,
    ROUND(AVG(COALESCE(rain.`1h`, 0)), 2) AS avg_rain,
    ROUND(AVG(main.feels_like), 2) AS avg_feels_like,
    ROUND(AVG(clouds.all), 2) AS avg_clouds
FROM weather_raw
GROUP BY ROUND(coord.lon, 2), ROUND(coord.lat, 2)
"""

avg_temps = spark.sql(avg_temps_sql)
avg_temps.createOrReplaceTempView("avg_weather")

# Read city data directly from PostgreSQL using Spark JDBC
city_df = spark.read \
    .jdbc(
        url=f"jdbc:postgresql://{config.postG_host}:{config.postG_port}/{config.postG_db}",
        table="city",
        properties=postgres_properties
    )
city_df.createOrReplaceTempView("cities")

# SQL join query to match city data with weather averages
joined_sql = """
SELECT 
    c.idCity AS city_id,
    c.CityName AS city_name,
    w.avg_temperature,
    w.avg_humidity,
    w.avg_rain,
    w.avg_feels_like,
    w.avg_clouds,
    w.lon,
    w.lat
FROM avg_weather w
JOIN cities c 
    ON ROUND(w.lat, 2) = ROUND(c.lat, 2)
    AND ROUND(w.lon, 2) = ROUND(c.lon, 2)
"""

result_df = spark.sql(joined_sql)

# Show the joined results
result_df.show()

# # Get current date and hour
current_date = (datetime.now() - timedelta(hours=1)).strftime("%Y-%m-%d")
one_hour_ago_num = datetime.now().hour

# Convert DataFrame to a list of tuples for batch insertion
rows_to_insert = [(
    row.city_id, 
    row.avg_temperature, 
    row.avg_rain, 
    row.avg_feels_like,
    row.avg_clouds,
    current_date, 
    one_hour_ago_num
) for row in result_df.collect()]

# Connect to PostgreSQL for insertion
postgre_conn = psycopg2.connect(
    dbname=config.postG_db,
    user=config.user,
    password=config.password,
    host=config.postG_host,
    port=config.postG_port
)
cursor = postgre_conn.cursor()

# Insert data into PostgreSQL
if rows_to_insert:
    cursor.executemany("""
    INSERT INTO weather_hour (IdCity, Temp, Rain, Clouds, FeelsLike, Date, Hour)
    VALUES (%s, %s, %s, %s, %s, %s, %s)
    """, rows_to_insert)
    postgre_conn.commit()
    print(f"Successfully inserted {len(rows_to_insert)} records into avg_hour table")
else:
    print("No new data to insert.")

+-------+------------+---------------+------------+--------+--------------+----------+-------+-----+
|city_id|   city_name|avg_temperature|avg_humidity|avg_rain|avg_feels_like|avg_clouds|    lon|  lat|
+-------+------------+---------------+------------+--------+--------------+----------+-------+-----+
|     43|      Manama|         290.03|        62.0|     0.0|        289.39|       0.0|  50.58|26.23|
|     22|    Tel Aviv|         287.18|       72.75|     0.0|        286.55|       0.0|  34.78|32.08|
|     25|    Budapest|         279.26|       65.25|    0.03|        276.18|       0.0|  19.04|47.49|
|     33|        Bern|         277.41|       73.25|     0.0|        272.85|       3.0|   2.17| 46.9|
|     30|  Copenhagen|         276.72|        90.0|     0.0|        273.69|     72.75|  12.56|55.67|
|     15|       Kabul|         287.21|       51.75|     0.0|        286.03|      20.0|  71.58|34.05|
|     38|    Damascus|         283.82|        57.0|     0.0|        282.43|      6.25|  36.

In [16]:
# Close the PostgreSQL connection
cursor.close()
postgre_conn.close()

# Close the MongoDB connection
client.close()

# Stop the Spark session
spark.stop()