In [1]:
from pyspark.sql import SparkSession

# Specify the absolute path to the JDBC driver
jdbc_driver_path = "jars/postgresql-42.7.3.jar"

# Create Spark session with the JDBC driver included
spark = SparkSession.builder \
    .appName("PostgreSQL Connector") \
    .master("local") \
    .config("spark.driver.extraClassPath", jdbc_driver_path) \
    .config("spark.jars", jdbc_driver_path) \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

# Verify the Spark session
spark

In [2]:
# Define the connection properties and read the data from PostgreSQL
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/weather") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "weather_data") \
    .option("user", "postgres") \
    .option("password", "sasql") \
    .load()

In [3]:
df.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- lon: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- weather_icon: string (nullable = true)
 |-- base: string (nullable = true)
 |-- temp: double (nullable = true)
 |-- feels_like: double (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- temp_min: double (nullable = true)
 |-- temp_max: double (nullable = true)
 |-- sea_level: integer (nullable = true)
 |-- grnd_level: integer (nullable = true)
 |-- visibility: integer (nullable = true)
 |-- wind_speed: double (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- wind_gust: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- rain_1h: double (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- snow_1h: double (nullable = true)
 |-- sno

In [19]:
from pyspark.sql.functions import from_unixtime, date_format, col,round

In [26]:
def kelvin_to_celsius(kelvin):
    return round((kelvin - 273.15),2)

df = df.withColumn("datetime", date_format(from_unixtime(col("dt")), "yyyy-MM-dd HH:mm:ss")) \
       .withColumn("temp_celsius", kelvin_to_celsius(col("temp"))) \
       .withColumn("temp_min_celsius", kelvin_to_celsius(col("temp_min"))) \
       .withColumn("temp_max_celsius", kelvin_to_celsius(col("temp_max"))) \
       .withColumn("feels_like_celsius", kelvin_to_celsius(col("feels_like"))) \

spark.catalog.dropTempView("weather_data")
df.createTempView("weather_data")

In [28]:
transaction_data = spark.sql("select transaction_id, weather_main, weather_description, temp_celsius, temp_min_celsius, \
            temp_max_celsius,feels_like_celsius, pressure, \
            humidity,  visibility, wind_speed, wind_deg, wind_gust, \
        	clouds_all, rain_1h, rain_3h,datetime, city_name \
            from weather_data")

In [29]:
transaction_data.show(5)

+--------------+------------+-------------------+------------+----------------+----------------+------------------+--------+--------+----------+----------+--------+---------+----------+-------+-------+-------------------+----------+
|transaction_id|weather_main|weather_description|temp_celsius|temp_min_celsius|temp_max_celsius|feels_like_celsius|pressure|humidity|visibility|wind_speed|wind_deg|wind_gust|clouds_all|rain_1h|rain_3h|           datetime| city_name|
+--------------+------------+-------------------+------------+----------------+----------------+------------------+--------+--------+----------+----------+--------+---------+----------+-------+-------+-------------------+----------+
|             1|      Clouds|      broken clouds|       32.61|           31.81|            33.1|             36.35|    1003|      53|     10000|      2.57|     200|     NULL|        75|   NULL|   NULL|2024-07-12 14:28:01|Chiang Mai|
|             2|      Clouds|      broken clouds|       30.25|      

In [30]:
transaction_data = transaction_data \
    .withColumnRenamed("weather_main", "main_weather") \
    .withColumnRenamed("weather_description", "weather_desc") \
    .withColumnRenamed("temp", "temperature") \
    .withColumnRenamed("temp_min", "min_temperature") \
    .withColumnRenamed("temp_max", "max_temperature") \
    .withColumnRenamed("feels_like", "feels_like_temp") \
    .withColumnRenamed("pressure", "pressure_hPa") \
    .withColumnRenamed("humidity", "humidity_percent") \
    .withColumnRenamed("visibility", "visibility_meters") \
    .withColumnRenamed("wind_speed", "wind_speed_mps") \
    .withColumnRenamed("wind_deg", "wind_direction_deg") \
    .withColumnRenamed("wind_gust", "wind_gust_mps") \
    .withColumnRenamed("clouds_all", "cloudiness_percent") \
    .withColumnRenamed("rain_1h", "rain_volume_1h_mm") \
    .withColumnRenamed("rain_3h", "rain_volume_3h_mm") \
    .withColumnRenamed("datetime", "timestamp") \
    .withColumnRenamed("city_name", "city")

In [31]:
transaction_data.show(5)

+--------------+------------+---------------+------------+----------------+----------------+------------------+------------+----------------+-----------------+--------------+------------------+-------------+------------------+-----------------+-----------------+-------------------+----------+
|transaction_id|main_weather|   weather_desc|temp_celsius|temp_min_celsius|temp_max_celsius|feels_like_celsius|pressure_hPa|humidity_percent|visibility_meters|wind_speed_mps|wind_direction_deg|wind_gust_mps|cloudiness_percent|rain_volume_1h_mm|rain_volume_3h_mm|          timestamp|      city|
+--------------+------------+---------------+------------+----------------+----------------+------------------+------------+----------------+-----------------+--------------+------------------+-------------+------------------+-----------------+-----------------+-------------------+----------+
|             1|      Clouds|  broken clouds|       32.61|           31.81|            33.1|             36.35|       