In [1]:
from pyspark.sql import SparkSession, functions as F
import os

In [2]:
# SparkSession
spark = SparkSession.builder \
    .appName('ReadSensonrData') \
    .config('spark.driver.memory', '5g') \
    .config('spark.executor.memory', '5g') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/25 13:03:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# İndirdiğimiz verinin yolu
base_path = "sensor_data/KETI"

In [None]:
# Verideki tüm sensör tipleri (Odaların içindeki csv dosyaları)
sensor_types = ["co2", "humidity", "light", "pir", "temperature"]

In [None]:
# Tüm sensor dflerini dfs değişkeninde alıyorum
dfs = {}

for sensor in sensor_types:
    # Her bir sensör için dataframe oluşturuyorum.
    sensor_path = f"{base_path}/*/{sensor}.csv"

    df = spark.read.option("header", False).csv(sensor_path) \
        .withColumn("file_path", F.input_file_name()) \
        .withColumn("room_id", F.regexp_extract("file_path", rf"/([^/]+)/{sensor}\.csv", 1)) \
        .withColumnRenamed("_c0", "sensor_unix_epoch_time") \
        .withColumnRenamed("_c1", "sensor_value") \
        .withColumn("sensor_timestamp", F.from_unixtime(F.col("sensor_unix_epoch_time").cast("bigint"))) \
        .withColumn("sensor_datetime_hm", F.date_format("sensor_timestamp", "yyyy-MM-dd HH:mm")) # Aggregate de kullanmak için
    dfs[sensor] = df

                                                                                

In [None]:

for sensor, df in dfs.items():
    # Tip dönüşümleri
    df = df.withColumn("sensor_unix_epoch_time", F.col("sensor_unix_epoch_time").cast("long")) \
           .withColumn("sensor_value", F.col("sensor_value").cast("double")) \
           .withColumn("sensor_timestamp", F.col("sensor_timestamp").cast("timestamp"))
    
    dfs[sensor] = df


In [None]:
# Room id ve sensor_datetime_hm ye göre ortalama sensör değerlerini aldım
agg_dfs = {} 
for sensor, df in dfs.items():
    agg_df = df.groupBy("room_id", "sensor_datetime_hm").agg(
        F.round(F.avg("sensor_value"), 3).alias(f"avg_{sensor}"),
        F.min("sensor_timestamp").alias(f"{sensor}_sensor_timestamp"))
    agg_dfs[sensor] = agg_df

In [8]:
df_co2 = agg_dfs["co2"]
df_humidity = agg_dfs["humidity"]
df_light = agg_dfs["light"]
df_pir = agg_dfs["pir"]
df_temperature = agg_dfs["temperature"]

In [None]:
# En çok kayda sahip CO2 ye göre joinliyorum
df_joined = df_co2 \
    .join(df_humidity, on=["sensor_datetime_hm", "room_id"], how="left") \
    .join(df_light, on=["sensor_datetime_hm", "room_id"], how="left") \
    .join(df_pir, on=["sensor_datetime_hm", "room_id"], how="left") \
    .join(df_temperature, on=["sensor_datetime_hm", "room_id"], how="left") 


In [None]:
# Diske yazılacak df in alanları
final_df = df_joined.select(
    "sensor_datetime_hm", 
    "room_id",
    "avg_co2",
    "avg_humidity",
    "avg_light",
    "avg_pir",
    "avg_temperature"
).orderBy("sensor_datetime_hm")

In [11]:
final_df = final_df.dropna()

In [12]:
final_df.limit(5).show(truncate=False)

                                                                                

+------------------+-------+-------+------------+---------+-------+---------------+
|sensor_datetime_hm|room_id|avg_co2|avg_humidity|avg_light|avg_pir|avg_temperature|
+------------------+-------+-------+------------+---------+-------+---------------+
|2013-08-23 23:04  |510    |404.0  |52.62       |204.0    |0.0    |23.46          |
|2013-08-23 23:04  |621    |500.0  |49.115      |67.5     |14.0   |25.59          |
|2013-08-23 23:04  |511    |389.0  |52.75       |250.5    |0.0    |22.62          |
|2013-08-23 23:04  |746    |633.0  |52.84       |29.0     |21.0   |23.06          |
|2013-08-23 23:04  |644    |468.5  |52.385      |165.0    |0.0    |22.805         |
+------------------+-------+-------+------------+---------+-------+---------------+



In [13]:
output_dir = "output"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

In [14]:
final_df.write.mode("overwrite").parquet(os.path.join(output_dir, "sensor_data.parquet"))

                                                                                

In [None]:
spark.stop()