In [77]:
import os

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go


from pyspark.sql import SparkSession
from pyspark.sql.functions import asc, lit

from pyspark.sql.functions import col, \
    from_unixtime, to_date, date_add, when, monotonically_increasing_id, window, mean, count
from pyspark.sql.types import TimestampType


In [78]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("Time SeriesGenerator") \
    .getOrCreate()

In [79]:
spark

In [80]:
# FLIGHT_ICAO = 'a7124d' #a7124d

#ad564c -> 6 trips
#adc0fb -> 8 trips

FLIGHT_ICAO = 'a03b40'
# a50dd0 a64ef7 a0528b a837c8 a889c3 a5622f a7124d a00721 a48480 ad0280 ada4d5

In [81]:
#flights_file_path = os.path.join('dataset', 'raw', FLIGHT_ICAO)

flights_file_path = os.path.join('dataset', 'raw', 'ds4', FLIGHT_ICAO, '*')

#flights_file_path = os.path.join('dataset', 'raw', FLIGHT_ICAO , f'{FLIGHT_ICAO}_0.parquet')

read_flights = spark.read.parquet(flights_file_path, header=True)

In [82]:
def get_values_from_selected_columns(flight_data):

    selected_cols = flight_data.select('time', 'icao24', 'geoaltitude') \
                        .orderBy(asc('time'))
    
    selected_cols = selected_cols.withColumn("datetime", from_unixtime("time"))

    return selected_cols

In [83]:
flight_df = get_values_from_selected_columns(read_flights)

In [84]:
flight_df.show(3)

+----------+------+-----------+-------------------+
|      time|icao24|geoaltitude|           datetime|
+----------+------+-----------+-------------------+
|1645740845|a03b40|     777.24|2022-02-24 16:14:05|
|1645740846|a03b40|     792.48|2022-02-24 16:14:06|
|1645740847|a03b40|     807.72|2022-02-24 16:14:07|
+----------+------+-----------+-------------------+
only showing top 3 rows



In [85]:
flight_df.count()

9081

#### Preprocessing

##### 1. Duplicates

In [86]:
updated_flight_df = flight_df.groupBy(flight_df.columns)\
    .agg(count('*').alias("count"))\
        .filter(col("count") > 1)

updated_flight_df.show()

+----+------+-----------+--------+-----+
|time|icao24|geoaltitude|datetime|count|
+----+------+-----------+--------+-----+
+----+------+-----------+--------+-----+



In [87]:
updated_flight_df = flight_df.dropDuplicates()

In [88]:
updated_flight_df.show()

+----------+------+------------------+-------------------+
|      time|icao24|       geoaltitude|           datetime|
+----------+------+------------------+-------------------+
|1645985371|a03b40|7269.4800000000005|2022-02-27 12:09:31|
|1645985419|a03b40|            7277.1|2022-02-27 12:10:19|
|1645985742|a03b40|           7284.72|2022-02-27 12:15:42|
|1645986098|a03b40|           7292.34|2022-02-27 12:21:38|
|1645986161|a03b40| 7376.160000000001|2022-02-27 12:22:41|
|1645986802|a03b40|           8290.56|2022-02-27 12:33:22|
|1645986907|a03b40|           8290.56|2022-02-27 12:35:07|
|1645987423|a03b40|           3604.26|2022-02-27 12:43:43|
|1645987462|a03b40|           3291.84|2022-02-27 12:44:22|
|1645987517|a03b40|           2788.92|2022-02-27 12:45:17|
|1645987823|a03b40|1386.8400000000001|2022-02-27 12:50:23|
|1645985012|a03b40|           5966.46|2022-02-27 12:03:32|
|1645985304|a03b40|7269.4800000000005|2022-02-27 12:08:24|
|1645985637|a03b40|            7277.1|2022-02-27 12:13:5

##### 2. Drop NAs

In [89]:
updated_flight_df = updated_flight_df.dropna(how='any', thresh=None, subset=None)

#### Calculating Mean

In [90]:
# Convert the timestamp column to a TimestampType
# mean_calculated_df = flight_df.withColumn("datetime", flight_df["datetime"].cast(TimestampType()))

# window_size = "2 minutes"

# mean_calculated_df = mean_calculated_df.withColumn("window", window("datetime", window_size)).select('icao24', 'window', 'geoaltitude')

# mean_calculated_df.show(5, truncate=False)

In [91]:
# agg_df = mean_calculated_df.groupBy("window")\
#         .agg(mean(flight_df['geoaltitude']).alias("mean_geoaltitude")).orderBy(asc('window'))


# agg_df.show(truncate=False)

In [92]:
# agg_df.count()

In [93]:
# agg_df.toPandas().to_csv(os.path.join('dataset', 'processed', 'geoaltitude', f'{FLIGHT_ICAO}_0.csv'))

updated_flight_df = updated_flight_df.orderBy(asc('datetime'))
updated_flight_df.toPandas().to_csv(os.path.join('dataset', 'processed', 'geoaltitude', f'ds-4-{FLIGHT_ICAO}.csv'))
# updated_flight_df.toPandas().to_csv(os.path.join('dataset', 'processed', 'geoaltitude', f'ds-3-batch-4.csv'))

In [None]:
# from pyspark.sql.window import Window
# from pyspark.sql.functions import col, expr

# window_size = 3
# horizon = 2

# # Create a window specification for ordering by an arbitrary column (you can use a timestamp column if available)
# window_spec = Window.orderBy(expr("monotonically_increasing_id()"))

# for i in range(window_size + horizon):
#     agg_df = agg_df.withColumn(f"t_plus_{1 * i}", expr(f"lead(mean_geoaltitude, {1 * i})").over(window_spec))

# selected_columns = [f"t_plus_{1 * i}" for i in range(window_size)]

# windowed_df = agg_df.select(*selected_columns).na.drop()

# windowed_df.show()

In [None]:
# windowed_df.toPandas().to_csv(os.path.join('dataset', 'processed', 'geoaltitude', f'batch-2_full_flights.csv'))