# Goals
* Menyamakan fitur dari tiap-tiap sumber adapun semua fitur yang telah diseleksi adalah sebagai berikut :
1. `Temperature` (C)
2. `Apparent Temperature / feels like` (C)
3. `Precipitation` (mm)
4. `Dewpoint` (C)
5. `Humidity` (%)
6. `Cloud` (Total)
7. `wind speed` (km/h)
8. `wind degree` (km/h)
9. `wind gust` (km/h)

* Menyamakan satuan tiap fitur, adapun yang perlu dilakukan :
1. Pada weatherapi.com ada pilihan field untuk satuan yang berbeda, yang perlu dilakukan hanya seleksi
2. Pada open-meteo juga ada pilihan query untuk memilih satuan, tidak perlu dilakukan pengubahan satuan ukur
3. Pada openweathermap.com tidak ada pilihan untuk mengubah satuan, yang perlu diganti adalah pada semua fitur `wind`

* Melakukan aggregasi

In [103]:
import os
import sys
import requests

from pyspark.sql import DataFrame
from dotenv import load_dotenv
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, row_number, lit, avg

In [41]:
# Load api key
load_dotenv()
FREEWEATHER_KEY = os.getenv("FREEWEATHER_KEY")
OPENWEATHERMAP_KEY = os.getenv("OPENWEATHERMAP_KEY")

# Tambahkan 2 baris ini agar spark mengenali env python
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [4]:
# Buat spark session terlebih dahulu
spark = SparkSession.builder \
    .appName("rdv-project") \
    .master("local[*]") \
    .getOrCreate()

In [None]:
"""
Malang sendiri sebenarnya tidak memiliki stasiun pengukuran dari BMKG
Oleh karena itu latitude dan longitude di bawah ini menunjukkan 
koordinat dari kota malang seluruhnya
"""
# --------------------------- Parameter API -------------------------

# Latitude & Longitude utk Malang
locations = {
    "Klojen":(-7.969421375342755, 112.6285308513895),
    "Blimbing":(-7.945926995201971, 112.64310740385885),
    "Sukun":(-7.987926536974757, 112.6100743302227),
    "Lowokwaru":(-7.9348455775950795, 112.60665573821122),
    "Lawang":(-7.828716329007881, 112.70181673602366),
    "Singosari":(-7.875350536265517, 112.64837233274164),
    "Kepanjen":(-8.11761411293624, 112.57906911889476),
    "Pakis":(-7.958180248694484, 112.71010117082125),
    "Wagir":(-7.980111689103741, 112.49397160866646),
    "Tumpang":(-8.007270828630284, 112.74768804369178)
}

# List Fitur untuk open-meteo
feature_list = [
    "temperature_2m",
    "relative_humidity_2m",
    "precipitation",
    "wind_speed_10m",
    "wind_direction_10m",
    "wind_gusts_10m",
    "apparent_temperature",
    "cloud_cover",
    "dew_point_2m"
]
features = ",".join(feature_list)

# Timestamp hari ini
today = datetime.today()
today_str = today.strftime("%Y-%m-%d")
timestamp_today = today.timestamp()

In [24]:
weatherapi_psdf = None

for district in locations.keys() :
    latitude = locations[district][0]
    longitude = locations[district][1]

    # recent weather url call for weatherapi
    weatherapi_url = f"http://api.weatherapi.com/v1/current.json?key={FREEWEATHER_KEY}&q={latitude},{longitude}"

    success = False
    attempts = 0
    
    # openmeteo call
    while not success and attempts < 3:
        try:
            print(f"Getting recent data for {district}")
            response = requests.get(weatherapi_url)
            if response.status_code == 200:
                data = response.json()['current']
                num_records = len(data)
                rows = [data]
                psdf = spark.read.json(spark.sparkContext.parallelize(rows))
                psdf = psdf.withColumn("district", lit(district)) \
                        .withColumn("latitude", lit(latitude)) \
                        .withColumn("longitude", lit(longitude))

                if weatherapi_psdf is None:
                    weatherapi_psdf = psdf
                else:
                    weatherapi_psdf = weatherapi_psdf.unionByName(psdf)

                if "_corrupt_record" in weatherapi_psdf.columns:
                    weatherapi_psdf = weatherapi_psdf.drop("_corrupt_record")
                
                success = True

            else :
                print(f"Can't fetch data, status code: {response.status_code}")
                print(f"Retrying... (Attempts {attempts+1}/3)")
                attempts += 1

        except Exception as e:
            print(f"Error while getting recent data for {district}. Message: {e}. Retrying...")
            print(f"Retrying... (Attempts {attempts+1}/3)")
            attempts += 1

Getting recent data for Klojen
Getting recent data for Blimbing
Getting recent data for Sukun
Getting recent data for Lowokwaru
Getting recent data for Lawang
Getting recent data for Singosari
Getting recent data for Kepanjen
Getting recent data for Pakis
Getting recent data for Wagir
Getting recent data for Tumpang


In [25]:
weatherapi_psdf.show()

+-----+--------------------+----------+----------+-----------+-----------+--------+--------+-----------+-----------+--------+------+----------------+------------------+---------+---------+-----------+-----------+------+------+---+------+---------+-----------+--------+--------+--------+-----------+-----------+---------+-------------------+------------------+
|cloud|           condition|dewpoint_c|dewpoint_f|feelslike_c|feelslike_f|gust_kph|gust_mph|heatindex_c|heatindex_f|humidity|is_day|    last_updated|last_updated_epoch|precip_in|precip_mm|pressure_in|pressure_mb|temp_c|temp_f| uv|vis_km|vis_miles|wind_degree|wind_dir|wind_kph|wind_mph|windchill_c|windchill_f| district|           latitude|         longitude|
+-----+--------------------+----------+----------+-----------+-----------+--------+--------+-----------+-----------+--------+------+----------------+------------------+---------+---------+-----------+-----------+------+------+---+------+---------+-----------+--------+--------+---

In [30]:
openmeteo_psdf = None

for district in locations.keys() :
    latitude = locations[district][0]
    longitude = locations[district][1]
    today = datetime.today()
    end_date = today.strftime("%Y-%m-%d")
    start_date = end_date

    # recent weather url call for openmeteo
    openmeteo_url = f"https://api.open-meteo.com/v1/forecast?latitude={latitude}&longitude={longitude}&hourly={features}&start_date={start_date}&end_date={end_date}"

    success = False
    attempts = 0

    # openmeteo call
    while not success and attempts < 3:
        try:
            print(f"Getting recent data for {district}")
            response = requests.get(openmeteo_url)
            if response.status_code == 200:
                data = response.json()['hourly']
                num_records = len(data['time'])
                rows = [
                    {key: data[key][i] for key in data}
                    for i in range(num_records)
                ]
                psdf = spark.read.json(spark.sparkContext.parallelize(rows))
                psdf = psdf.withColumn("district", lit(district)) \
                        .withColumn("latitude", lit(latitude)) \
                        .withColumn("longitude", lit(longitude))

                if openmeteo_psdf is None:
                    openmeteo_psdf = psdf
                else:
                    openmeteo_psdf = openmeteo_psdf.unionByName(psdf)

                if "_corrupt_record" in psdf.columns:
                    psdf = psdf.drop("_corrupt_record")
                
                success = True

            else :
                print(f"Can't fetch data, status code: {response.status_code}")
                print(f"Retrying... (Attempts {attempts+1}/3)")
                attempts += 1

        except Exception as e:
            print(f"Error while getting recent data for {district}. Message: {e}. Retrying...")
            print(f"Retrying... (Attempts {attempts+1}/3)")
            attempts += 1

Getting recent data for Klojen
Getting recent data for Blimbing
Getting recent data for Sukun
Getting recent data for Lowokwaru
Getting recent data for Lawang
Getting recent data for Singosari
Getting recent data for Kepanjen
Getting recent data for Pakis
Getting recent data for Wagir
Getting recent data for Tumpang


In [32]:
openmeteo_psdf.show()

+--------------------+-----------+------------+-------------+--------------------+--------------+----------------+------------------+--------------+--------------+--------+------------------+-----------------+
|apparent_temperature|cloud_cover|dew_point_2m|precipitation|relative_humidity_2m|temperature_2m|            time|wind_direction_10m|wind_gusts_10m|wind_speed_10m|district|          latitude|        longitude|
+--------------------+-----------+------------+-------------+--------------------+--------------+----------------+------------------+--------------+--------------+--------+------------------+-----------------+
|                27.5|         93|        22.1|          0.0|                  95|          23.0|2025-05-21T00:00|               297|           9.7|           3.2|  Klojen|-7.969421375342755|112.6285308513895|
|                29.4|        100|        22.1|          0.0|                  84|          25.0|2025-05-21T01:00|               241|           8.3|           3

In [83]:
openweathermap_psdf = None

for district in locations.keys():
    latitude = locations[district][0]
    longitude = locations[district][1]
    
    openweathermap_url = f"https://api.openweathermap.org/data/2.5/weather?lat={latitude}&lon={longitude}&appid={OPENWEATHERMAP_KEY}&units=metric"

    success = False
    attempts = 0

    while not success and attempts < 3:
        try:
            print(f"Getting recent data for {district}")
            response = requests.get(openweathermap_url)
            if response.status_code == 200:
                data = response.json()

                flat_weather = {
                    "temperature_c": data['main'].get("temp"),
                    "feels_like_c": data['main'].get("feels_like"),
                    "humidity_pct": data['main'].get("humidity"),
                    "cloud_total_pct": data['clouds'].get("all"),
                    "wind_speed_kmph": round(data['wind'].get("speed", 0) * 3.6, 2),
                    "wind_deg": data['wind'].get("deg"),
                    "wind_gust_kmph": round(data['wind'].get("gust", 0) * 3.6, 2)
                }

                psdf = spark.read.json(spark.sparkContext.parallelize([flat_weather]))
                psdf = psdf.withColumn("district", lit(district)) \
                           .withColumn("latitude", lit(latitude)) \
                           .withColumn("longitude", lit(longitude))

                if openweathermap_psdf is None:
                    openweathermap_psdf = psdf
                else:
                    openweathermap_psdf = openweathermap_psdf.unionByName(psdf)

                if "_corrupt_record" in openweathermap_psdf.columns:
                    openweathermap_psdf = openweathermap_psdf.drop("_corrupt_record")

                success = True

            else:
                print(f"Can't fetch data, status code: {response.status_code}")
                print(f"Retrying... (Attempts {attempts+1}/3)")
                attempts += 1

        except Exception as e:
            print(f"Error while getting recent data for {district}. Message: {e}. Retrying...")
            attempts += 1

Getting recent data for Klojen
Getting recent data for Blimbing
Getting recent data for Sukun
Getting recent data for Lowokwaru
Getting recent data for Lawang
Getting recent data for Singosari
Getting recent data for Kepanjen
Getting recent data for Pakis
Getting recent data for Wagir
Getting recent data for Tumpang


In [82]:
openweathermap_psdf.show()

+---------------+------------+------------+-------------+--------+--------------+---------------+--------+------------------+-----------------+
|cloud_total_pct|feels_like_c|humidity_pct|temperature_c|wind_deg|wind_gust_kmph|wind_speed_kmph|district|          latitude|        longitude|
+---------------+------------+------------+-------------+--------+--------------+---------------+--------+------------------+-----------------+
|             98|       37.23|          98|        30.23|     222|          4.43|           3.89|  Klojen|-7.969421375342755|112.6285308513895|
+---------------+------------+------------+-------------+--------+--------------+---------------+--------+------------------+-----------------+



# Aggregat

In [93]:
standard_col_map = {
    # Temperature
    "temperature_2m": "temperature_c",
    "temp_c": "temperature_c",
    "temperature_c": "temperature_c",

    # Feels like
    "feelslike_c": "feels_like_c",
    "feels_like_c": "feels_like_c",

    # Humidity
    "relative_humidity_2m": "humidity_pct",
    "humidity": "humidity_pct",
    "humidity_pct": "humidity_pct",

    # Wind speed
    "wind_speed_10m": "wind_speed_kmph",
    "wind_kph": "wind_speed_kmph",
    "wind_speed_kmph": "wind_speed_kmph",

    # Wind gust
    "wind_gusts_10m": "wind_gust_kmph",
    "gust_kph": "wind_gust_kmph",
    "wind_gust_kmph": "wind_gust_kmph",

    # Wind degree
    "wind_direction_10m": "wind_deg",
    "wind_degree": "wind_deg",
    "wind_deg": "wind_deg",

    # Cloud
    "cloud_cover": "cloud_total_pct",
    "cloud": "cloud_total_pct",
    "cloud_total_pct": "cloud_total_pct",

    # Lokasi
    "district": "district",
    "latitude": "latitude",
    "longitude": "longitude"
}

In [94]:
def standardize_columns(df, col_map):
    renamed_cols = [col(c).alias(col_map[c]) for c in df.columns if c in col_map]
    return df.select(*renamed_cols)

In [98]:
openmeteo_psdf = standardize_columns(openmeteo_psdf, standard_col_map)
weatherapi_psdf = standardize_columns(weatherapi_psdf, standard_col_map)
openweathermap_psdf = standardize_columns(openweathermap_psdf, standard_col_map)

In [100]:
openmeteo_psdf.show()

+---------------+------------+-------------+--------+--------------+---------------+--------+------------------+-----------------+
|cloud_total_pct|humidity_pct|temperature_c|wind_deg|wind_gust_kmph|wind_speed_kmph|district|          latitude|        longitude|
+---------------+------------+-------------+--------+--------------+---------------+--------+------------------+-----------------+
|             93|          95|         23.0|     297|           9.7|            3.2|  Klojen|-7.969421375342755|112.6285308513895|
|            100|          84|         25.0|     241|           8.3|            3.7|  Klojen|-7.969421375342755|112.6285308513895|
|            100|          74|         26.9|     203|           8.3|            2.7|  Klojen|-7.969421375342755|112.6285308513895|
|            100|          68|         28.4|     163|           9.4|            3.8|  Klojen|-7.969421375342755|112.6285308513895|
|            100|          65|         29.2|     159|          15.1|            7.0

In [101]:
weatherapi_psdf.show()

+---------------+------------+--------------+------------+-------------+--------+---------------+---------+-------------------+------------------+
|cloud_total_pct|feels_like_c|wind_gust_kmph|humidity_pct|temperature_c|wind_deg|wind_speed_kmph| district|           latitude|         longitude|
+---------------+------------+--------------+------------+-------------+--------+---------------+---------+-------------------+------------------+
|             83|        28.3|          10.8|          82|         25.8|     223|            3.6|   Klojen| -7.969421375342755| 112.6285308513895|
|             83|        28.3|          10.8|          82|         25.8|     223|            3.6| Blimbing| -7.945926995201971|112.64310740385885|
|             83|        28.3|          10.8|          82|         25.8|     223|            3.6|    Sukun| -7.987926536974757| 112.6100743302227|
|             83|        28.3|          10.8|          82|         25.8|     223|            3.6|Lowokwaru|-7.93484557

In [102]:
openweathermap_psdf.show()

+---------------+------------+------------+-------------+--------+--------------+---------------+---------+-------------------+------------------+
|cloud_total_pct|feels_like_c|humidity_pct|temperature_c|wind_deg|wind_gust_kmph|wind_speed_kmph| district|           latitude|         longitude|
+---------------+------------+------------+-------------+--------+--------------+---------------+---------+-------------------+------------------+
|             98|       37.23|          98|        30.23|     222|          4.43|           3.89|   Klojen| -7.969421375342755| 112.6285308513895|
|             99|       36.99|          99|        29.99|     240|          3.71|           3.06| Blimbing| -7.945926995201971|112.64310740385885|
|             98|       36.96|          98|        29.96|     210|          4.75|           4.14|    Sukun| -7.987926536974757| 112.6100743302227|
|             98|       36.78|          98|        29.78|     224|          3.96|            3.1|Lowokwaru|-7.93484557

In [104]:
def aggregate_common_columns(dfs: list[DataFrame], group_cols: list[str] = ["district", "latitude", "longitude"]) -> DataFrame:
    """
    Menggabungkan dan mengagregasi kolom-kolom yang sama dari beberapa DataFrame PySpark.
    
    Args:
        dfs (List[DataFrame]): Daftar DataFrame PySpark.
        group_cols (List[str]): Kolom yang dijadikan dasar pengelompokan/agregasi.
    
    Returns:
        DataFrame: DataFrame hasil agregasi rata-rata kolom umum.
    """
    if len(dfs) < 2:
        raise ValueError("Minimal dua DataFrame diperlukan.")

    # 1. Ambil kolom yang sama di semua DataFrame
    common_cols = set(dfs[0].columns)
    for df in dfs[1:]:
        common_cols &= set(df.columns)
    common_cols = list(common_cols)

    if not all(col in common_cols for col in group_cols):
        raise ValueError(f"Semua group_cols ({group_cols}) harus ada di kolom umum.")

    # 2. Select hanya kolom yang sama
    dfs_selected = [df.select(common_cols) for df in dfs]

    # 3. Union semua DataFrame
    union_df = dfs_selected[0]
    for df in dfs_selected[1:]:
        union_df = union_df.unionByName(df)

    # 4. Agregasi
    value_cols = [col for col in common_cols if col not in group_cols]
    agg_exprs = [avg(col).alias(f"avg_{col}") for col in value_cols]
    result_df = union_df.groupBy(*group_cols).agg(*agg_exprs)

    return result_df

In [105]:
final_psdf = aggregate_common_columns([openmeteo_psdf, openweathermap_psdf, weatherapi_psdf])
final_psdf.show()

+---------+-------------------+------------------+------------------+-------------------+-------------------+------------------+-----------------+------------------+
| district|           latitude|         longitude|      avg_wind_deg|avg_cloud_total_pct|avg_wind_speed_kmph| avg_temperature_c| avg_humidity_pct|avg_wind_gust_kmph|
+---------+-------------------+------------------+------------------+-------------------+-------------------+------------------+-----------------+------------------+
|   Klojen| -7.969421375342755| 112.6285308513895| 226.8846153846154|  91.38461538461539|  3.791923076923077| 25.16653846153846|85.92307692307692|  8.96653846153846|
| Blimbing| -7.945926995201971|112.64310740385885|227.57692307692307|  91.42307692307692| 3.7600000000000002|25.091923076923077|85.96153846153847| 8.938846153846153|
|    Sukun| -7.987926536974757| 112.6100743302227|226.42307692307693|  91.38461538461539| 3.8015384615384615|25.275384615384617|85.92307692307692| 8.978846153846153|
|Low

In [37]:
import couchdb
import tempfile
import os
import logging
import json
from dotenv import load_dotenv
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, to_timestamp, when, minute, lit, date_format, date_trunc
from pyspark.sql.types import *
from weather_aggregator import Aggregator

In [3]:
load_dotenv()
# PySpark variables
PYSPARK_WORKER_URI = os.getenv("PYSPARK_WORKER_URI")

# Couchdb variables
COUCHDB_HOST = os.getenv("COUCHDB_HOST")
COUCH_PORT = os.getenv("COUCH_PORT")
COUCHDB_USERNAME = os.getenv("COUCHDB_USERNAME")
COUCHDB_PASSWORD = os.getenv("COUCHDB_PASSWORD")
FREEWEATHER_DB = os.getenv("FREEWEATHER_DB")
OPENMETEO_DB = os.getenv("OPENMETEO_DB")
OPENWEATHER_DB = os.getenv("OPENWEATHER_DB")
AGGREGATE_DB = os.getenv("AGGREGATE_DB")

In [13]:
COUCHDB_URI = f"http://{COUCHDB_USERNAME}:{COUCHDB_PASSWORD}@{COUCHDB_HOST}:{COUCH_PORT}"
server = couchdb.Server(COUCHDB_URI)
db_name_list = [FREEWEATHER_DB, OPENMETEO_DB, OPENWEATHER_DB]

In [42]:
db = server[FREEWEATHER_DB]

index_def = {
    "index": {"fields": ["created_at"]},
    "name": "timestamp_index",
    "type": "json"
}
index_url = f"{COUCHDB_URI}/{FREEWEATHER_DB}/_index"
resp = requests.post(index_url, json=index_def)
print("Index creation:", resp.status_code, resp.text)

# 2. Hitung waktu 1 jam lalu
one_hour_ago = datetime.utcnow() - timedelta(hours=2)
one_hour_ago_str = one_hour_ago.strftime("%Y-%m-%d %H:%M:%S.%f")  # ISO 8601 UTC

# 3. Query Mango untuk ambil data 1 jam terakhir, di-sort dari terbaru
query = {
    "selector": {
        "created_at": {"$gte": one_hour_ago_str}
    },
    "sort": [{"created_at": "desc"}],
    "limit": 100
}

# 4. Jalankan query
results = db.find(query)

Index creation: 200 {"result":"exists","id":"_design/ef68dbff0209c6cdf0f227ef758ae009f05cd684","name":"timestamp_index"}



In [43]:
# Cara 2: konversi ke list dulu
list_results = list(results)
print(list_results)

[<Document '4dc3cd1d2fec170fde4a76498ddf973826933cf09a10188fa1e3fc214f3ee617'@'1-e7be94d32e780a8f44720e7af1b83b87' {'provider': 'weatherapi', 'location': {'name': 'Malang', 'region': 'East Java', 'country': 'Indonesia', 'lat': -7.98, 'lon': 112.63, 'tz_id': 'Asia/Jakarta', 'localtime_epoch': 1748885775, 'localtime': '2025-06-03 00:36'}, 'current_weather': {'last_updated_epoch': 1748885400, 'last_updated': '2025-06-03 00:30', 'temp_c': 18.9, 'temp_f': 66.1, 'is_day': 0, 'condition': {'text': 'Mist', 'icon': '//cdn.weatherapi.com/weather/64x64/night/143.png', 'code': 1030}, 'wind_mph': 2.2, 'wind_kph': 3.6, 'wind_degree': 114, 'wind_dir': 'ESE', 'pressure_mb': 1014.0, 'pressure_in': 29.93, 'precip_mm': 0.0, 'precip_in': 0.0, 'humidity': 96, 'cloud': 56, 'feelslike_c': 18.9, 'feelslike_f': 66.1, 'windchill_c': 18.9, 'windchill_f': 66.1, 'heatindex_c': 18.9, 'heatindex_f': 66.1, 'dewpoint_c': 18.4, 'dewpoint_f': 65.1, 'vis_km': 2.0, 'vis_miles': 1.0, 'uv': 0.0, 'gust_mph': 2.3, 'gust_kph':

In [44]:
list_results[0]

<Document '4dc3cd1d2fec170fde4a76498ddf973826933cf09a10188fa1e3fc214f3ee617'@'1-e7be94d32e780a8f44720e7af1b83b87' {'provider': 'weatherapi', 'location': {'name': 'Malang', 'region': 'East Java', 'country': 'Indonesia', 'lat': -7.98, 'lon': 112.63, 'tz_id': 'Asia/Jakarta', 'localtime_epoch': 1748885775, 'localtime': '2025-06-03 00:36'}, 'current_weather': {'last_updated_epoch': 1748885400, 'last_updated': '2025-06-03 00:30', 'temp_c': 18.9, 'temp_f': 66.1, 'is_day': 0, 'condition': {'text': 'Mist', 'icon': '//cdn.weatherapi.com/weather/64x64/night/143.png', 'code': 1030}, 'wind_mph': 2.2, 'wind_kph': 3.6, 'wind_degree': 114, 'wind_dir': 'ESE', 'pressure_mb': 1014.0, 'pressure_in': 29.93, 'precip_mm': 0.0, 'precip_in': 0.0, 'humidity': 96, 'cloud': 56, 'feelslike_c': 18.9, 'feelslike_f': 66.1, 'windchill_c': 18.9, 'windchill_f': 66.1, 'heatindex_c': 18.9, 'heatindex_f': 66.1, 'dewpoint_c': 18.4, 'dewpoint_f': 65.1, 'vis_km': 2.0, 'vis_miles': 1.0, 'uv': 0.0, 'gust_mph': 2.3, 'gust_kph': 

In [20]:
def flatten_dict(d):
    flat = {}
    for k, v in d.items():
        if isinstance(v, dict):
            flat.update(flatten_dict(v))
        elif isinstance(v, list):
            if v and isinstance(v[0], dict):
                for item in v:
                    flat.update(flatten_dict(item))
            else:
                flat[k] = v
        else:
            flat[k] = v
    return flat

In [66]:
spark = SparkSession.builder \
            .remote(PYSPARK_WORKER_URI) \
            .config("spark.sql.connect.grpc.deadline", "60s") \
            .config("spark.sql.connect.grpc.maxInboundMessageSize", "134217728") \
            .getOrCreate()

In [9]:
freeweather_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("_rev", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("name", StringType(), True),
    StructField("region", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
    StructField("tz_id", StringType(), True),
    StructField("localtime_epoch", LongType(), True),
    StructField("localtime", StringType(), True),
    StructField("last_updated_epoch", LongType(), True),
    StructField("last_updated", StringType(), True),
    StructField("temp_c", DoubleType(), True),
    StructField("temp_f", DoubleType(), True),
    StructField("is_day", IntegerType(), True),
    StructField("text", StringType(), True),
    StructField("icon", StringType(), True),
    StructField("code", IntegerType(), True),
    StructField("wind_mph", DoubleType(), True),
    StructField("wind_kph", DoubleType(), True),
    StructField("wind_degree", IntegerType(), True),
    StructField("wind_dir", StringType(), True),
    StructField("pressure_mb", DoubleType(), True),
    StructField("pressure_in", DoubleType(), True),
    StructField("precip_mm", DoubleType(), True),
    StructField("precip_in", DoubleType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("cloud", IntegerType(), True),
    StructField("feelslike_c", DoubleType(), True),
    StructField("feelslike_f", DoubleType(), True),
    StructField("windchill_c", DoubleType(), True),
    StructField("windchill_f", DoubleType(), True),
    StructField("heatindex_c", DoubleType(), True),
    StructField("heatindex_f", DoubleType(), True),
    StructField("dewpoint_c", DoubleType(), True),
    StructField("dewpoint_f", DoubleType(), True),
    StructField("vis_km", DoubleType(), True),
    StructField("vis_miles", DoubleType(), True),
    StructField("uv", DoubleType(), True),
    StructField("gust_mph", DoubleType(), True),
    StructField("gust_kph", DoubleType(), True),
    StructField("created_at", StringType(), True),
])

open_meteo_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("_rev", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
    StructField("grid_lat", DoubleType(), True),
    StructField("grid_lon", DoubleType(), True),
    StructField("time", StringType(), True),
    StructField("interval", IntegerType(), True),
    StructField("temperature_2m", DoubleType(), True),
    StructField("apparent_temperature", DoubleType(), True),
    StructField("relative_humidity_2m", IntegerType(), True),
    StructField("rain", DoubleType(), True),
    StructField("precipitation", DoubleType(), True),
    StructField("weather_code", IntegerType(), True),
    StructField("cloud_cover", IntegerType(), True),
    StructField("showers", DoubleType(), True),
    StructField("wind_speed_10m", DoubleType(), True),
    StructField("wind_direction_10m", IntegerType(), True),
    StructField("pressure_msl", DoubleType(), True),
    StructField("surface_pressure", DoubleType(), True),
    StructField("wind_gusts_10m", DoubleType(), True),
    StructField("created_at", StringType(), True),
    StructField("last_updated", StringType(), True)
])

openweather_schema = StructType([
    StructField('_id', StringType(), True),
    StructField('_rev', StringType(), True),
    StructField('provider', StringType(), True),
    StructField('lat', DoubleType(), True),
    StructField('lon', DoubleType(), True),
    StructField('name', StringType(), True),
    StructField('country', StringType(), True),
    StructField('all', IntegerType(), True),
    StructField('feels_like', DoubleType(), True),
    StructField('pressure_ground_level', IntegerType(), True),
    StructField('humidity', IntegerType(), True),
    StructField('pressure', IntegerType(), True),
    StructField('sea_level', IntegerType(), True),
    StructField('temp', DoubleType(), True),
    StructField('visibility', IntegerType(), True),
    StructField('wind_dir', IntegerType(), True),
    StructField('wind_speed', DoubleType(), True),
    StructField('wind_gust', DoubleType(), True),
    StructField('id', IntegerType(), True),
    StructField('main', StringType(), True),
    StructField('description', StringType(), True),
    StructField('icon', StringType(), True),
    StructField('created_at', StringType(), True),
    StructField('last_updated', StringType(), True),
])

In [67]:
df = spark.createDataFrame(freeweather, freeweather_schema)

In [68]:
print(f"column {df.columns} \n{df.count()} rows")

column ['_id', '_rev', 'provider', 'name', 'region', 'country', 'lat', 'lon', 'tz_id', 'localtime_epoch', 'localtime', 'last_updated_epoch', 'last_updated', 'temp_c', 'temp_f', 'is_day', 'text', 'icon', 'code', 'wind_mph', 'wind_kph', 'wind_degree', 'wind_dir', 'pressure_mb', 'pressure_in', 'precip_mm', 'precip_in', 'humidity', 'cloud', 'feelslike_c', 'feelslike_f', 'windchill_c', 'windchill_f', 'heatindex_c', 'heatindex_f', 'dewpoint_c', 'dewpoint_f', 'vis_km', 'vis_miles', 'uv', 'gust_mph', 'gust_kph', 'created_at'] 
25643 rows


In [69]:
aggregator = Aggregator(column_map=None)
df = aggregator.standardize_columns(df, col_map=None)
if "name" in df.columns:
    df = df.withColumnRenamed("name", "location_name")
else :
    df = df.withColumn("location_name", lit("Malang"))

logging.info(f"df has successfully renamed, with column: {df.columns}")

# Pengecekan skala kolom temperatur
temp_avg_row = df.select(avg("temperature_c")).collect()[0]
temp_avg_value = temp_avg_row[0] if temp_avg_row[0] is not None else 0

df = df.withColumnRenamed("last_updated", "date")
if "date" in df.columns:
    df = df.withColumn(
        "timestamp",
        when(
            col("date").contains("+"),
            to_timestamp("date")
        ).otherwise(
            when(
                col("date").contains("T"),
                to_timestamp("date", "yyyy-MM-dd'T'HH:mm")
            ).otherwise(
                to_timestamp("date", "yyyy-MM-dd HH:mm")
            )
        )
    )

# Parsing agar semua tanggal (dengan 3 format berbeda) menjadi 1 format ISO yg seragam
df = df.withColumn("date", date_format("timestamp", "yyyy-MM-dd'T'HH:mm:ss"))

In [None]:
# Potong timestamp ke awal jam
df = df.withColumn("hour_timestamp", date_trunc("hour", col("timestamp")))

# Tentukan kolom-kolom untuk group dan nilai
group_cols = ["location_name", "latitude", "longitude", "hour_timestamp"]
value_cols = [c for c in df.columns if c not in group_cols + ["date", "timestamp"]]

# Buat ekspresi agregasi untuk semua kolom numerik
agg_exprs = [avg(c).alias(c) for c in value_cols]

# Agregasi berdasarkan hour_timestamp dan lokasi
df = df.groupBy(*group_cols).agg(*agg_exprs)

In [71]:
df.columns

['location_name',
 'latitude',
 'longitude',
 'hour_timestamp',
 'temperature_c',
 'wind_speed_kmph',
 'wind_degree',
 'pressure',
 'humidity_pct',
 'cloud_total_pct',
 'feels_like_c',
 'wind_gust_kmph']

In [73]:
df = df.orderBy("hour_timestamp", ascending=True)

In [74]:
df.show()

+-------------+--------+---------+-------------------+------------------+------------------+-----------+--------+------------+---------------+------------------+------------------+
|location_name|latitude|longitude|     hour_timestamp|     temperature_c|   wind_speed_kmph|wind_degree|pressure|humidity_pct|cloud_total_pct|      feels_like_c|    wind_gust_kmph|
+-------------+--------+---------+-------------------+------------------+------------------+-----------+--------+------------+---------------+------------------+------------------+
|       Malang|   -7.98|   112.63|2025-05-13 09:00:00|23.100000000000005|3.6000000000000005|       88.0|  1012.0|        88.0|           94.0|25.300000000000004|10.800000000000002|
|       Malang|   -7.98|   112.63|2025-05-13 10:00:00| 23.60000000000001| 3.599999999999999|      116.0|  1012.0|        90.0|           98.0|25.899999999999988|10.800000000000004|
|       Malang|   -7.98|   112.63|2025-05-13 11:00:00|              24.0|3.6000000000000005|   

In [16]:
import logging
import traceback
import tempfile
import os
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

logging.basicConfig(
    level=logging.DEBUG,  # Ubah ke DEBUG supaya semua log tampil
    format="[%(levelname)s] %(asctime)s - %(name)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)

def get_spark_session(max_retries=3):
    for attempt in range(max_retries):
        try:
            logger.info(f"Attempting to connect to Spark Connect at: {PYSPARK_WORKER_URI}")
            spark = SparkSession.builder \
                .remote(PYSPARK_WORKER_URI) \
                .config("spark.sql.connect.grpc.deadline", "60s") \
                .config("spark.sql.connect.grpc.maxInboundMessageSize", "134217728") \
                .config("spark.sql.connect.grpc.keepAliveTimeout", "5s") \
                .config("spark.sql.connect.grpc.keepAliveTime", "30s") \
                .config("spark.sql.connect.grpc.keepAliveWithoutCalls", "true") \
                .config("spark.sql.connect.grpc.maxConnecionIdle", "900s") \
                .getOrCreate()
            spark.sql("SELECT 1 as test").collect()
            logger.info("Spark Connect connection successful!")
            return spark
        except Exception as e:
            logger.warning(f"Spark Connect failed: {e}")
            logger.warning("Falling back to local Spark mode...")
            spark = SparkSession.builder \
                .appName("WeatherAggregation") \
                .master("local[*]") \
                .config("spark.sql.adaptive.enabled", "true") \
                .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
                .getOrCreate()
            logger.info("Local Spark session started.")
            return spark

def flatten_dict(d):
    flat = {}
    for k, v in d.items():
        if isinstance(v, dict):
            flat.update(flatten_dict(v))
        elif isinstance(v, list):
            if v and isinstance(v[0], dict):
                for item in v:
                    flat.update(flatten_dict(item))
            else:
                flat[k] = v
        else:
            flat[k] = v
    return flat

def create_temp_json_df(spark, json_strings):
    logger.debug("Creating temp JSON file for DataFrame creation")
    with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file:
        for json_str in json_strings:
            temp_file.write(json_str + '\n')
        temp_file_path = temp_file.name
    
    try:
        df = spark.read.json(f"file://{temp_file_path}")
        logger.debug(f"Temporary JSON DataFrame created from file: {temp_file_path}")
        return df
    finally:
        try:
            os.unlink(temp_file_path)
            logger.debug(f"Temporary JSON file deleted: {temp_file_path}")
        except Exception as e:
            logger.warning(f"Failed to delete temporary file: {e}")


def fetch_data(spark=None):
    logger.info("Starting fetch_data process...")
    if spark is None:
        spark = get_spark_session()
    
    freeweather_psdf = None
    openmeteo_psdf = None
    openweather_psdf = None

    for db_name in db_name_list:
        logger.info(f"Fetching data from database: {db_name}")
        temp_list = []
        db = server[db_name]
        try:
            rows = db.view("_all_docs", include_docs=True)
        except Exception as e:
            logger.error(f"Error fetching rows from {db_name}: {e}")
            continue

        for row in rows:
            doc = row.get("doc", {})
            doc = flatten_dict(doc)
            temp_list.append(doc)

        if not temp_list:
            logger.warning(f"No documents found in {db_name}")
            continue

        try:
            if db_name == FREEWEATHER_DB:
                freeweather_psdf = spark.createDataFrame(temp_list, freeweather_schema)
                # Force evaluation and cache
                freeweather_psdf = freeweather_psdf.cache()
                row_count = freeweather_psdf.count()
                logger.info(f"Created freeweather DataFrame: columns={len(freeweather_psdf.columns)}, rows={row_count}")
            elif db_name == OPENMETEO_DB:
                openmeteo_psdf = spark.createDataFrame(temp_list, open_meteo_schema)
                # Force evaluation and cache
                openmeteo_psdf = openmeteo_psdf.cache()
                row_count = openmeteo_psdf.count()
                logger.info(f"Created openmeteo DataFrame: columns={len(openmeteo_psdf.columns)}, rows={row_count}")
            elif db_name == OPENWEATHER_DB:
                openweather_psdf = spark.createDataFrame(temp_list, openweather_schema)
                # Force evaluation and cache
                openweather_psdf = openweather_psdf.cache()
                row_count = openweather_psdf.count()
                logger.info(f"Created openweather DataFrame: columns={len(openweather_psdf.columns)}, rows={row_count}")
        except Exception as e:
            logger.warning(f"CreateDataFrame failed for {db_name}: {e}")
            try:
                json_strings = [json.dumps(item) for item in temp_list]
                df = spark.read.json(create_temp_json_df(spark, json_strings))
                df = df.cache()  # Cache the fallback DataFrame too
                if db_name == FREEWEATHER_DB:
                    freeweather_psdf = df
                elif db_name == OPENMETEO_DB:
                    openmeteo_psdf = df
                elif db_name == OPENWEATHER_DB:
                    openweather_psdf = df
                logger.info(f"Created DataFrame using parallelize/read.json for {db_name} with {df.count()} rows")
            except Exception as e2:
                logger.warning(f"Parallelize method failed for {db_name}: {e2}")
                try:
                    json_strings = [json.dumps(item) for item in temp_list]
                    df = create_temp_json_df(spark, json_strings)
                    df = df.cache()  # Cache the temp file DataFrame too
                    if db_name == FREEWEATHER_DB:
                        freeweather_psdf = df
                    elif db_name == OPENMETEO_DB:
                        openmeteo_psdf = df
                    elif db_name == OPENWEATHER_DB:
                        openweather_psdf = df
                    logger.info(f"Created DataFrame using temp file method for {db_name} with {df.count()} rows")
                except Exception as e3:
                    logger.error(f"All DataFrame creation methods failed for {db_name}: {e3}")
    
    # Debug: Log the final counts
    logger.info(f"Final fetch results:")
    logger.info(f"  FreeWeather: {freeweather_psdf.count() if freeweather_psdf else 0} rows")
    logger.info(f"  OpenMeteo: {openmeteo_psdf.count() if openmeteo_psdf else 0} rows")
    logger.info(f"  OpenWeather: {openweather_psdf.count() if openweather_psdf else 0} rows")
    
    return freeweather_psdf, openmeteo_psdf, openweather_psdf, spark


def aggregate(data_list: list, spark=None):
    logger.info(f"Starting aggregation with {len(data_list)} dataframes")
    if spark is None:
        spark = get_spark_session()
    
    processed_psdf = []
    aggregator = Aggregator(column_map=None)

    # Debug: Check input data
    for i, data in enumerate(data_list):
        logger.info(f"Input DataFrame No.{i+1}: {data is not None} - {data.count() if data else 0} rows")

    for i, data in enumerate(data_list):
        if data is None:
            logger.warning(f"DataFrame at index {i} is None, skipping aggregation")
            continue
        
        logger.info(f"Processing DataFrame No.{i+1} with {data.count()} rows")
        logger.info(f"DataFrame No.{i+1} columns before standardize: {data.columns}")
        
        try:
            data = aggregator.standardize_columns(data, col_map=None)
            if "name" in data.columns:
                data = data.withColumnRenamed("name", "location_name")
            else:
                data = data.withColumn("location_name", lit("Malang"))
            
            logger.info(f"DataFrame No.{i+1} columns after rename/add 'location_name': {data.columns} with {data.count()} rows")

            temp_avg_row = data.select(avg("temperature_c")).collect()[0]
            temp_avg_value = temp_avg_row[0] if temp_avg_row[0] is not None else 0
            logger.debug(f"Average temperature_c for DataFrame No.{i+1}: {temp_avg_value}")

            if temp_avg_value > 273.15:
                logger.info(f"Adjusting temperature_c from Kelvin to Celsius for DataFrame No.{i+1}")
                data = data.withColumn("temperature_c", col("temperature_c") - 273.15)

            data = data.withColumnRenamed("last_updated", "date")

            if "date" in data.columns:
                data = data.withColumn(
                    "timestamp",
                    when(
                        col("date").contains("+"),
                        to_timestamp("date", "yyyy-MM-dd HH:mm:ssXXX")
                    ).otherwise(
                        when(
                            col("date").contains("T"),
                            to_timestamp("date", "yyyy-MM-dd'T'HH:mm")
                        ).otherwise(
                            to_timestamp("date", "yyyy-MM-dd HH:mm")
                        )
                    )
                )
                logger.debug(f"Timestamp column created for DataFrame No.{i+1}")
            else:
                logger.warning(f"Date column not found in DataFrame No.{i+1}")

            data = data.withColumn("date", date_format("timestamp", "yyyy-MM-dd'T'HH:mm:ss"))
            data = data.withColumn("hour_timestamp", date_trunc("hour", col("timestamp")))

            group_cols = ["location_name", "latitude", "longitude", "hour_timestamp"]
            value_cols = [c for c in data.columns if c not in group_cols + ["date", "timestamp"]]
            logger.debug(f"Group columns: {group_cols}")
            logger.debug(f"Value columns for aggregation: {value_cols}")

            logger.info(f"Before groupBy aggregation: {data.count()} rows")
            agg_exprs = [avg(c).alias(c) for c in value_cols]
            data = data.groupBy(*group_cols).agg(*agg_exprs)
            logger.info(f"After groupBy aggregation: {data.count()} rows")

            data = data.withColumn("date", date_format("hour_timestamp", "yyyy-MM-dd'T'HH:mm:ss"))
            data = data.withColumnRenamed("hour_timestamp", "timestamp")

            logger.info(f"DataFrame No.{i+1} aggregated with {len(data.columns)} columns and {data.count()} rows")

            for col_name in ["_id", "_rev", "provider", "created_at"]:
                if col_name in data.columns:
                    data = data.drop(col_name)
                    logger.debug(f"Dropped column {col_name} from DataFrame No.{i+1}")

            data = data.orderBy("timestamp")
            
            # Cache the processed DataFrame
            data = data.cache()
            processed_psdf.append(data)
            
        except Exception as e:
            logger.error(f"Error processing DataFrame No.{i+1}: {e}")
            logger.error(traceback.format_exc())
            continue
    
    logger.info(f"Successfully processed {len(processed_psdf)} DataFrames")
    
    if processed_psdf:
        logger.info(f"Before final aggregation: {len(processed_psdf)} DataFrames")
        
        # Debug: Show counts before final aggregation
        for i, df in enumerate(processed_psdf):
            logger.info(f"  DataFrame {i+1}: {df.count()} rows")
        
        final_result = aggregator.aggregate_common_columns(
            dfs=processed_psdf, 
            group_cols=["location_name", "latitude", "longitude", "timestamp"]
        )
        
        # Cache final result and force evaluation
        final_result = final_result.cache()
        final_count = final_result.count()
        
        logger.info(f"Aggregated common columns across all DataFrames successfully: {final_count} rows")
        return final_result
    else:
        logger.warning("No DataFrames to aggregate, returning empty DataFrame")
        empty_df = spark.sql("SELECT 1 as dummy").limit(0)
        return empty_df


def perform_pipeline():
    logger.info("Pipeline started")
    spark = None
    
    try:
        # Step 1: Initialize Spark session
        spark = get_spark_session()
        spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
        
        # Step 2: Fetch data with explicit spark session
        freeweather_psdf, openmeteo_psdf, openweather_psdf, spark = fetch_data(spark)
        
        # Step 3: Debug - Check data fetching results
        logger.info(f"Fetched data verification:")
        logger.info(f"  FreeWeather: {freeweather_psdf.count() if freeweather_psdf else 0} rows")
        logger.info(f"  OpenMeteo: {openmeteo_psdf.count() if openmeteo_psdf else 0} rows")
        logger.info(f"  OpenWeather: {openweather_psdf.count() if openweather_psdf else 0} rows")
        
        # Step 4: Show sample data for debugging
        for i, (name, df) in enumerate([("FreeWeather", freeweather_psdf), 
                                       ("OpenMeteo", openmeteo_psdf), 
                                       ("OpenWeather", openweather_psdf)]):
            if df is not None:
                logger.info(f"{name} DataFrame sample (first 2 rows):")
                try:
                    df.show(2, truncate=False)
                except Exception as e:
                    logger.warning(f"Could not show {name} sample: {e}")
        
        # Step 5: Aggregate data
        processed_psdf = aggregate([freeweather_psdf, openmeteo_psdf, openweather_psdf], spark)
        processed_psdf = processed_psdf.orderBy("timestamp", ascending=False)

        if processed_psdf is not None:
            count_rows = processed_psdf.count()
            logger.info(f"Data successfully aggregated with {len(processed_psdf.columns)} columns and {count_rows} rows")
            
            # Show final result sample
            logger.info("Final aggregated data sample:")
            try:
                processed_psdf.show(5, truncate=False)
            except Exception as e:
                logger.warning(f"Could not show final sample: {e}")
        else:
            logger.warning("No aggregated data returned")

        # Step 6: Save to CouchDB
        if processed_psdf is not None and processed_psdf.count() > 0:
            db = server[AGGREGATE_DB]
            collected_data = processed_psdf.collect()
            logger.info(f"Saving {len(collected_data)} documents to CouchDB: {AGGREGATE_DB}")

            for i, row in enumerate(collected_data):
                doc = row.asDict()
                for key, value in doc.items():
                    if value is None:
                        continue
                    if hasattr(value, 'isoformat'):
                        doc[key] = value.isoformat()
                try:
                    db.save(doc)
                    logger.debug(f"Document {i+1} saved")
                except Exception as e:
                    logger.error(f"Error saving document {i+1}: {e}")
        else:
            logger.warning("No data to save to CouchDB")

        logger.info("Pipeline completed successfully")

    except Exception as e:
        logger.error(f"Pipeline error: {e}")
        logger.error(traceback.format_exc())
        raise
    finally:
        # Step 7: Clean up Spark session
        if spark is not None:
            try:
                spark.stop()
                logger.info("Spark session stopped successfully")
            except Exception as e2:
                logger.error(f"Failed to stop Spark session: {e2}")

In [17]:
perform_pipeline()

[INFO] 2025-06-03 00:42:50 - __main__ - Pipeline started
[INFO] 2025-06-03 00:42:50 - __main__ - Attempting to connect to Spark Connect at: sc://10.34.100.114:15002
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:10.34.100.114:15002: tcp handshaker shutdown"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-06-02T17:53:05.2446393+00:00", grpc_status:14, grpc_message:"failed to connect to all addresses; last error: UNKNOWN: ipv4:10.34.100.114:15002: tcp handshaker shutdown"}"
>


KeyboardInterrupt: 

In [36]:
print('last_updated' == "last_updated")

True


In [2]:
print(FREEWEATHER_DB)

free_weather


In [3]:
perform_pipeline()

[INFO]:Attempting to connect to Spark Connect at: sc://10.34.100.114:15002
[INFO]:Spark Connect connection successful!
[ERROR]:Pipeline error: Wrong FS: file://C:\Users\fadhil\AppData\Local\Temp\tmpse9vorqh.json, expected: file:///
[INFO]:Attempting to connect to Spark Connect at: sc://10.34.100.114:15002
[INFO]:Spark Connect connection successful!


IllegalArgumentException: Wrong FS: file://C:\Users\fadhil\AppData\Local\Temp\tmpse9vorqh.json, expected: file:///