In [4]:
import sys
from pathlib import Path
from typing import List, Dict

from loguru import logger
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import udf, col, when
import geopy.distance
from pyspark.sql.functions import *
from pyspark.sql import Window
import pyspark.sql.functions as F
from geopy.distance import great_circle
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit, struct
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import pow, col, log
from pyspark.sql.functions import *
from pyspark.sql import Window
import pyspark.sql.functions as F
import geopandas as gpd


In [5]:
CONF_LOG_PREFIX = 'CONFLOG'
FLST_LOG_PREFIX = 'FLSTLOG'
GEO_LOG_PREFIX = 'GEOLOG'
LOS_LOG_PREFIX = 'LOSLOG'
REG_LOG_PREFIX = 'REGLOG'
LOADING_PATH = '../output'
DATAFRAMES_NAMES = [CONF_LOG_PREFIX, FLST_LOG_PREFIX, GEO_LOG_PREFIX, LOS_LOG_PREFIX, REG_LOG_PREFIX]

Give access to the constants that defines

In [6]:
sys.path.append(str(Path(Path().absolute().parent, 'platform_code')))
from schemas.tables_attributes import *
from utils.config import settings

In [7]:
def load_dataframes(files_names: List[str], loading_path: str, spark: SparkSession) -> Dict[str, DataFrame]:
    """ Loads the dataframes which macht the file names passed by arguments.
    The method read from the config the path were to read the files, which
    matches the folder where the files are saved in `save_dataframes_dict()`.

    :param files_names: list of the names of the files.
    :param loading_path: path were the files are saved.
    :param spark: spark session.
    :return: dictionary with the dataframes loaded from the files, with the
     file name as key.
    """
    dataframes = dict()

    for file_name in files_names:
        file_path = Path(loading_path, f'{file_name.lower()}.parquet')
        logger.info('Loading dataframe from `{}`.', file_path)
        df = spark.read.parquet(str(file_path))
        dataframes[file_name] = df

    return dataframes

In [8]:
spark = SparkSession.builder.appName('Notebook').getOrCreate()

In [9]:
input_dataframes = load_dataframes(DATAFRAMES_NAMES, LOADING_PATH, spark)

2022-04-04 14:19:30.146 | INFO     | __main__:load_dataframes:16 - Loading dataframe from `..\output\conflog.parquet`.
2022-04-04 14:19:33.075 | INFO     | __main__:load_dataframes:16 - Loading dataframe from `..\output\flstlog.parquet`.
2022-04-04 14:19:33.193 | INFO     | __main__:load_dataframes:16 - Loading dataframe from `..\output\geolog.parquet`.
2022-04-04 14:19:33.278 | INFO     | __main__:load_dataframes:16 - Loading dataframe from `..\output\loslog.parquet`.
2022-04-04 14:19:33.365 | INFO     | __main__:load_dataframes:16 - Loading dataframe from `..\output\reglog.parquet`.


In [10]:
@udf
def get_coordinates_distance(origin_latitude: float, origin_longitude: float,
                             destination_latitude: float, destination_longitude: float) -> float:
    """ Calculates the distance in meters between two world coordinates.

    :param origin_latitude: origin latitude point.
    :param origin_longitude: origin longitude point.
    :param destination_latitude: destination latitude point.
    :param destination_longitude: destination longitude point.
    :return: distance in meters.
    """
    origin_tuple = (origin_latitude, origin_longitude)
    destination_tuple = (destination_latitude, destination_longitude)
    # TODO: direct distance calculation (in meters) between two points, is this approach correct?
    return geopy.distance.distance(origin_tuple, destination_tuple).m

# ENV-2: Weighted average altitude
Average flight level weighed by the length flown at each flight level.

In [None]:
dataframe = input_dataframes[REG_LOG_PREFIX]

First, we check the log for a given drone in a given scenario.

In [None]:
#dataframe.where(col(SCENARIO_NAME) == '1_very_low_40_8_R2').where(col(ACID) == 'D1').orderBy(SIMULATION_TIME, ACID).show()

Create a column with the next coordinates system for the same drone

In [None]:
window = Window.partitionBy(SCENARIO_NAME, ACID).orderBy(SIMULATION_TIME)
next_step = dataframe.withColumn("NEXT_LATITUDE",lag(LATITUDE, -1).over(window)).withColumn("NEXT_LONGITUDE",lag(LONGITUDE, -1).over(window)).withColumn("NEXT_ALTITUDE",lag(ALTITUDE, -1).over(window))

In [None]:
#next_step.where(col(SCENARIO_NAME) == '1_very_low_40_8_R2').where(col(ACID) == 'D1').show(50)

In [None]:
#Remove rows with NEXT_LATITUDE and NEXT_LONGITUDE null (they are the rows of separation between scenarios)
dataframe = next_step.na.drop(subset=["NEXT_LATITUDE","NEXT_LONGITUDE"])
#Check it
dataframe.filter(col("NEXT_LATITUDE").isNull() | col("NEXT_LATITUDE").isNull()).show()

In [None]:
dataframe = dataframe.withColumn("SEGMENT_LENGTH",get_coordinates_distance(LATITUDE, LONGITUDE, "NEXT_LATITUDE", "NEXT_LONGITUDE")).withColumn("SEGMENT_ALTITUDE", (col(ALTITUDE)+col("NEXT_ALTITUDE"))/2)
dataframe = dataframe.withColumn("SEGMENT_WEIGHT", col("SEGMENT_ALTITUDE") * col("SEGMENT_LENGTH"))

In [None]:
#dataframe.select(REG_ID, SCENARIO_NAME, ACID, "SEGMENT_LENGTH", "SEGMENT_ALTITUDE", "SEGMENT_WEIGHT").show()

In [None]:
dataframe.groupby(SCENARIO_NAME).agg(sum(col("SEGMENT_WEIGHT")), sum(col("SEGMENT_LENGTH"))).withColumn(ENV2, col("sum(SEGMENT_WEIGHT)")/col("sum(SEGMENT_LENGTH)")).show()

In [None]:
#ENV4
dataframe4 = input_dataframes[REG_LOG_PREFIX]
dataframe4 = dataframe4.groupby(SCENARIO_NAME, ACID).agg(F.max(ALTITUDE).alias("MAX_ALTITUDE"), F.min(ALTITUDE).alias("MIN_ALTITUDE"))
dataframe4 = dataframe4.withColumn("DIFF_ALTITUDE", col("MAX_ALTITUDE") - col("MIN_ALTITUDE"))
avg_delay = dataframe4.select(mean("DIFF_ALTITUDE").alias("MEAN_DIFF_ALTITUDE"))
dataframe4 = dataframe4.join(avg_delay, how='outer')
dataframe4.show()

dataframe4 = dataframe4.withColumn(ENV4, col("DIFF_ALTITUDE")/col("MEAN_DIFF_ALTITUDE"))
dataframe4 = dataframe4.select(SCENARIO_NAME, ACID, ENV4)
dataframe4.show()
# dataframe5 = dataframe4.groupby(SCENARIO_NAME).agg(F.max('MAX_ALTITUDE').alias("MAX_ALTITUDE_SCN"), F.min('MIN_ALTITUDE').alias("MIN_ALTITUDE_SCN"))
# dataframe5.show()


In [11]:
from pyspark.sql.functions import udf
def get_coordinates_distance(origin_latitude: float, origin_longitude: float,
                             destination_latitude: float, destination_longitude: float) -> float:
    """ Calculates the distance in meters between two world coordinates.

    :param origin_latitude: origin latitude point.
    :param origin_longitude: origin longitude point.
    :param destination_latitude: destination latitude point.
    :param destination_longitude: destination longitude point.
    :return: distance in meters.
    """
    origin_tuple = (origin_latitude, origin_longitude)
    destination_tuple = (destination_latitude, destination_longitude)
    # TODO: direct distance calculation (in meters) between two points, is this approach correct?
    return geopy.distance.distance(origin_tuple, destination_tuple).m


@udf
def in_circle (x_center, y_center, x, y, radius):
    return get_coordinates_distance(x_center, y_center, x, y) <= radius

@udf("double")
def great_circle_udf(x, y):
    return great_circle(x, y).kilometers

In [None]:

print(settings.x_center)
print(settings.time_roi)
print(settings.radius_roi)

In [None]:
dataframe3.show()

In [23]:
def loadEnv3points(geojson):
    return gpd.read_file(geojson)[0:10] #only first 100 rows

In [None]:
df = spark.read.json("env3_points.json")

In [None]:
print(df)

In [None]:
for d in df_points.iterrows():
    print(d)

In [None]:
#ENV3
dataframe3 = input_dataframes[REG_LOG_PREFIX]
udf_func = udf(great_circle_udf,DoubleType()) #Creating a 'User Defined Function' to calculate distance between two points.



In [29]:
#ENV3
dataframe3 = input_dataframes[REG_LOG_PREFIX]

#dataframe3.printSchema() 
#dataframe3.show()
df_points = loadEnv3points(settings.geojson.path)

for i,(x, y) in enumerate(zip(df_points.geometry.x, df_points.geometry.y)):
    point = struct(lit(y), lit(x))
    
    aux_dataframe = dataframe3.filter(col(SIMULATION_TIME) == settings.env3.time_roi)
    aux_dataframe = aux_dataframe.withColumn("distance", great_circle_udf(point, struct(col(LATITUDE), col(LONGITUDE))))
    aux_dataframe = aux_dataframe.filter((col("distance") <= 16))
    aux_dataframe = aux_dataframe.withColumn("sound_intensity", 1/(pow((col("distance")/settings.flight_altitude.lowest), 2)))
    aux_dataframe = aux_dataframe.groupby(SCENARIO_NAME).agg(sum("sound_intensity").alias(f"ENV3_p{i}"))
    
    
    if(i==0):
        final_dataframe = aux_dataframe
    else:
        final_dataframe = final_dataframe.join(aux_dataframe, SCENARIO_NAME)
        

    
    


AnalysisException: cannot resolve 'LAT' given input columns: [ENV3_p0, Scenario];
'Project [Scenario#117, ENV3_p0#2840, great_circle_udf(struct(col1, 48.216866, col2, 16.36884), struct(LAT, 'LAT, LON, 'LON)) AS distance#2844]
+- Aggregate [Scenario#117], [Scenario#117, sum(sound_intensity#2820) AS ENV3_p0#2840]
   +- Project [REG_ID#116, Scenario#117, Simulation_time#118, ACID#119, ALT#120, LAT#121, LON#122, distance#2811, (cast(1 as double) / cast(POWER((distance#2811 / 9.144), 2.0) as double)) AS sound_intensity#2820]
      +- Filter (distance#2811 <= cast(16 as double))
         +- Project [REG_ID#116, Scenario#117, Simulation_time#118, ACID#119, ALT#120, LAT#121, LON#122, great_circle_udf(struct(col1, 48.217423, col2, 16.371279), struct(LAT, LAT#121, LON, LON#122)) AS distance#2811]
            +- Filter (Simulation_time#118 = cast(120 as double))
               +- Relation [REG_ID#116,Scenario#117,Simulation_time#118,ACID#119,ALT#120,LAT#121,LON#122] parquet


In [26]:
final_dataframe.show()


+------------------+-----------------+-----------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+-----------------+
|          Scenario|          ENV3_p0|          ENV3_p1|          ENV3_p2|           ENV3_p3|          ENV3_p4|           ENV3_p5|          ENV3_p6|           ENV3_p7|          ENV3_p8|          ENV3_p9|
+------------------+-----------------+-----------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+-----------------+
|1_very_low_40_8_W1|5330.543421714847|10369.50705236441|6099.637040471681|14712.051075586354|6571.919849418305|6860.9610221713965|64300.00334081777|15007.990403937401|5502.500571574826|7188.049529260242|
|1_very_low_40_8_R2|5330.543421714847|10369.50705236441|6099.637040471681|14712.051075586354|6571.919849418305|6860.9610221713965|64300.00334081777|15007.990403937401|5502.500571574826

In [None]:


point = struct(lit(x_center), lit(y_center))
udf_func = udf(great_circle_udf,DoubleType()) #Creating a 'User Defined Function' to calculate distance between two points.
dataframe3 = dataframe3.withColumn("distance", udf_func(point, struct(col(LATITUDE), col(LONGITUDE)))).filter((col("distance") <= radius_roi) & (col(ALTITUDE)<= altitude_roi) & (col(SIMULATION_TIME) == time_roi)) #Creating column "distance" based on function 'get_distance'
#dataframe3 = dataframe3.withColumn("noise_level", 10*log(1/pow(col("distance"),2)))
dataframe3 = dataframe3.withColumn("noise_level", log10(1/pow(col("distance"),2)))
dataframe3.show()

In [None]:
dataframe3.groupBy(SCENARIO_NAME).agg(sum("noise_level")).show()

In [None]:
dataframe3.show()

In [None]:



dataframe = input_dataframes[REG_LOG_PREFIX]

point = struct(lit(settings.x_center), lit(settings.y_center))

# TODO: ? Define formula for the sound depending on the distance to the point.
# TODO: ? How many points and how to define.
return dataframe.filter(col(SIMULATION_TIME) == settings.time_roi) \
    .withColumn("distance", great_circle_udf(point, struct(col(LATITUDE), col(LONGITUDE)))) \
    .filter((col("distance") <= settings.radius_roi) & (col(ALTITUDE) <= settings.altitude_roi)) \
    .withColumn("noise_level", log10(1 / pow(col("distance"), 2))) \
    .groupBy(SCENARIO_NAME).agg(sum("noise_level").alias(ENV3))