In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, to_timestamp, concat, lit, hour, avg, date_format
from pyspark.sql.types import TimestampType, DoubleType, DateType

In [2]:
# SparkSession
spark = (SparkSession.builder 
    .appName("DroneDataAnalysis") 
    .getOrCreate())
spark

24/08/24 07:21:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# CSV file 읽기
df = spark.read.csv("../data/flights.csv", header=True, inferSchema=True)
df.show(10)

24/08/24 07:21:53 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------+----+-----------+----------+---------------+---------------+------------------+-----------+------------------+--------------------+-------------+-------------+-------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+---------------------+---------------------+---------------------+-----+-------+--------+----------+--------+-----+
|flight|time| wind_speed|wind_angle|battery_voltage|battery_current|        position_x| position_y|        position_z|       orientation_x|orientation_y|orientation_z|orientation_w|          velocity_x|  velocity_y| velocity_z|           angular_x|           angular_y|           angular_z|linear_acceleration_x|linear_acceleration_y|linear_acceleration_z|speed|payload|altitude|      date|time_day|route|
+------+----+-----------+----------+---------------+---------------+------------------+-----------+------------------+--------------------+-------------+-------------+-------------+-------

In [4]:
# date,time_daye 확인
df.select("date").distinct().show(10)

df.select("time_day").distinct().show(10)

[Stage 3:>                                                        (0 + 16) / 16]                                                                                

+----------+
|      date|
+----------+
|2019-06-19|
|2019-07-09|
|2019-07-10|
|2019-04-07|
|2019-06-29|
|2019-07-03|
|2019-07-24|
|2019-09-19|
|2019-06-11|
|2019-07-18|
+----------+
only showing top 10 rows

+--------+
|time_day|
+--------+
|   17:35|
|   18:05|
|   12:26|
|    8:28|
|   10:21|
|    8:23|
|   18:32|
|   17:05|
|   18:16|
|    8:46|
+--------+
only showing top 10 rows



In [22]:
df = df.withColumn("date", col("date").cast(DateType())) \
    .withColumn("time_day", date_format(col("time_day"), "HH:mm:ss")) \
    .withColumn("timestamp", to_timestamp(concat(col("date").cast("string"), lit(" "), col("time_day")), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("altitude", col("altitude").cast(DoubleType())) \
    .withColumn("battery_voltage", col("battery_voltage").cast(DoubleType())) \
    .withColumn("battery_current", col("battery_current").cast(DoubleType()))
df.printSchema()
df.show(3)

root
 |-- flight: integer (nullable = true)
 |-- time: double (nullable = true)
 |-- wind_speed: double (nullable = true)
 |-- wind_angle: double (nullable = true)
 |-- battery_voltage: double (nullable = true)
 |-- battery_current: double (nullable = true)
 |-- position_x: double (nullable = true)
 |-- position_y: double (nullable = true)
 |-- position_z: double (nullable = true)
 |-- orientation_x: double (nullable = true)
 |-- orientation_y: double (nullable = true)
 |-- orientation_z: double (nullable = true)
 |-- orientation_w: double (nullable = true)
 |-- velocity_x: double (nullable = true)
 |-- velocity_y: double (nullable = true)
 |-- velocity_z: double (nullable = true)
 |-- angular_x: double (nullable = true)
 |-- angular_y: double (nullable = true)
 |-- angular_z: double (nullable = true)
 |-- linear_acceleration_x: double (nullable = true)
 |-- linear_acceleration_y: double (nullable = true)
 |-- linear_acceleration_z: double (nullable = true)
 |-- speed: double (nullable

In [6]:
date_counts = df.groupBy("date").count().orderBy(col("count").desc())
date_counts.show(10)

top_dates = date_counts.limit(3)
top_dates.show()

top_dates_list = [row['date'] for row in top_dates.collect()]

+----------+-----+
|      date|count|
+----------+-----+
|2019-08-05|27819|
|2019-07-15|25772|
|2019-07-24|20981|
|2019-07-03|19682|
|2019-06-19|19067|
|2019-06-11|16904|
|2019-07-10|16099|
|2019-06-25|15349|
|2019-06-24|15153|
|2019-07-09|14431|
+----------+-----+
only showing top 10 rows

+----------+-----+
|      date|count|
+----------+-----+
|2019-08-05|27819|
|2019-07-15|25772|
|2019-07-24|20981|
+----------+-----+



In [14]:
time_series_data = df.filter(col("date").isin(top_dates_list)) \
    .select("date", "timestamp", "battery_voltage", "battery_current", "altitude", "route") \
    .orderBy("timestamp")

time_series_data.show()

+----------+-------------------+------------------+--------------------+--------+-----+
|      date|          timestamp|   battery_voltage|     battery_current|altitude|route|
+----------+-------------------+------------------+--------------------+--------+-----+
|2019-07-15|2019-07-15 07:26:00| 24.29353904724121|-0.04771064221858978|   100.0|   R1|
|2019-07-15|2019-07-15 07:26:00|24.309816360473643| 0.02385532110929489|   100.0|   R1|
|2019-07-15|2019-07-15 07:26:00| 24.29479217529297|-0.03180709481239319|   100.0|   R1|
|2019-07-15|2019-07-15 07:26:00| 24.31357192993164|-0.07156596332788467|   100.0|   R1|
|2019-07-15|2019-07-15 07:26:00|  24.3148250579834|-0.07156596332788467|   100.0|   R1|
|2019-07-15|2019-07-15 07:26:00| 24.31232070922852|-0.00795177370309...|   100.0|   R1|
|2019-07-15|2019-07-15 07:26:00| 24.29228782653809|0.015903547406196594|   100.0|   R1|
|2019-07-15|2019-07-15 07:26:00|24.319833755493164|-0.10337305814027786|   100.0|   R1|
|2019-07-15|2019-07-15 07:26:00|

In [16]:
hourly_data = time_series_data.groupBy(
    time_series_data.date,
    hour(time_series_data.timestamp).alias("hour")
).agg(
    avg("battery_voltage").alias("avg_voltage"),
    avg("battery_current").alias("avg_current"),
    avg("altitude").alias("avg_altitude")
).orderBy("date", "hour")

hourly_data.show()

+----------+----+------------------+------------------+-----------------+
|      date|hour|       avg_voltage|       avg_current|     avg_altitude|
+----------+----+------------------+------------------+-----------------+
|2019-07-15|   7|21.524459404304793|18.899614950142254|75.67750677506776|
|2019-07-15|   8|22.821325483452235|16.542382335265685|55.56925498426023|
|2019-07-15|   9| 22.34294642503115| 18.45183376041162|44.65368656240025|
|2019-07-15|  10| 22.00042212229793|20.497691166924273|            100.0|
|2019-07-24|  16|22.331898761637365| 19.39118568917441|76.49306165466362|
|2019-07-24|  17|22.259677306508696|19.231812327465022|71.85773840541948|
|2019-08-05|  16|22.290163272692833|16.975974987612503|66.88161375661376|
|2019-08-05|  17|22.595817596847947|17.899889693508907|75.61634805537244|
|2019-08-05|  18|22.501782996337386|18.382378111418934|58.23569951757408|
|2019-08-05|  19| 21.80338843517238|18.240291620561834| 76.0286951813752|
+----------+----+------------------+--