In [1]:
import pyspark
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from math import radians, cos, sin, asin, sqrt
from datetime import datetime
import pytz

In [2]:
account_name = "REDACTED"
account_key = "REDACTED"

spark = (
    SparkSession
        .builder
        .master('local[*]')
        .appName("Ingestion")
        .config("spark.driver.memory", "4g")
        .config("fs.azure.account.auth.type." + account_name + ".dfs.core.windows.net", "SharedKey")
        .config("fs.azure.account.key." + account_name + ".dfs.core.windows.net", account_key)
        .getOrCreate()
)

sc = spark.sparkContext

In [3]:
container_name = 'data'
path_to_table = '/extraction/'

def getDataframeFromAdls(spark_session, container_name, path_to_table, table_name):
    return spark_session.read.parquet(f"abfss://{container_name}@REDACTED.dfs.core.windows.net{path_to_table}{table_name}")

In [4]:
df_extract = getDataframeFromAdls(spark, container_name, path_to_table, 'df_extract')

In [5]:
window_spec  = Window.partitionBy("ID_FILE").orderBy(['ID_FILE', 'ID_STATE'])

In [6]:
def getWindowConditionLagGps(window_spec):
    return (
    (F.col('STIME_SEC') - F.lag(F.col('STIME_SEC')).over(window_spec)).between(0, 900) &
    (F.col('STIME_SEC') - F.col('LTIME_SEC')).between(0, 180) &
    ((F.lag(F.col('STIME_SEC'))).over(window_spec) -
        F.lag(F.col('LTIME_SEC')).over(window_spec))
    .between(0, 180)
    )

def getWindowConditionLeadGps(window_spec):
    return (
    (F.lead(F.col('STIME_SEC')).over(window_spec) - F.col('STIME_SEC')).between(0, 960) &
    (F.col('STIME_SEC') - F.col('LTIME_SEC')).between(0, 180) &
    ((F.lead(F.col('STIME_SEC'))).over(window_spec) -
        F.lead(F.col('LTIME_SEC')).over(window_spec))
    .between(0, 180)
    )

def getWindowConditionLagGpsActivity(window_spec):
    return (
    (F.col('STIME_SEC') - F.lag(F.col('STIME_SEC')).over(window_spec)) < 900
    )

def getWindowConditionLeadGpsActivity(window_spec):
    return (
    (F.lead(F.col('STIME_SEC')).over(window_spec) - F.col('STIME_SEC')) < 900
    )

def getWindowResultGpsSpeed(window_spec, lag_or_lead):
    lagOrLead = lag_or_lead
    return (
    lagOrLead(F.col('SPEED')).over(window_spec)
    )

def getWindowResultGpsActivity(window_spec, lag_or_lead):
    lagOrLead = lag_or_lead
    return (
    lagOrLead(F.col('ACTIVITY_STATE')).over(window_spec)
    )

def getWindowResultLagGpsTime(window_spec):
    return (
    F.col('LTIME_SEC') - F.lag(F.col('LTIME_SEC')).over(window_spec)
    )

def getWindowResultLeadGpsTime(window_spec):
    return (
    F.lead(F.col('LTIME_SEC')).over(window_spec) - F.col('LTIME_SEC')
    )

def getWindowResultLagGpsGeo(window_spec, lat_or_lon):
    return (
    F.lag(F.col(lat_or_lon)).over(window_spec)
    )


In [7]:
def getLagOrLeadGps(df, lag_or_lead, speed_time_activity_geo, window_spec):
    name = f'{lag_or_lead}_{speed_time_activity_geo}'
    if lag_or_lead == 'LAG':
        condition = getWindowConditionLagGps(window_spec)
        if speed_time_activity_geo == 'LTIME':
            result = getWindowResultLagGpsTime(window_spec)
        elif speed_time_activity_geo == 'SPEED':
            result = getWindowResultGpsSpeed(window_spec, F.lag)
        elif speed_time_activity_geo == 'ACTIVITY':
            condition = getWindowConditionLagGpsActivity(window_spec)
            result = getWindowResultGpsActivity(window_spec, F.lag)
        elif speed_time_activity_geo == 'LAT':
            result = getWindowResultLagGpsGeo(window_spec, 'LAT')
        elif speed_time_activity_geo == 'LON':
            result = getWindowResultLagGpsGeo(window_spec, 'LON')
            
    elif lag_or_lead == 'LEAD':
        condition = getWindowConditionLeadGps(window_spec)
        if speed_time_activity_geo == 'LTIME':
            result = getWindowResultLeadGpsTime(window_spec)
        elif speed_time_activity_geo == 'SPEED':
            result = getWindowResultGpsSpeed(window_spec, F.lead)
        elif speed_time_activity_geo == 'ACTIVITY':
            condition = getWindowConditionLeadGpsActivity(window_spec)
            result = getWindowResultGpsActivity(window_spec, F.lead)
        
    return (
    df.withColumn(name,
        F.when(condition, result)
            .otherwise(F.lit(None))
        ).cache()
    ) 

In [8]:
df_lag_speed = getLagOrLeadGps(df_extract, 'LAG', 'SPEED', window_spec)

In [9]:
df_lead_speed = getLagOrLeadGps(df_lag_speed, 'LEAD', 'SPEED', window_spec)

In [10]:
df_lag_time = getLagOrLeadGps(df_lead_speed, 'LAG', 'LTIME', window_spec)

In [11]:
df_lead_time = getLagOrLeadGps(df_lag_time, 'LEAD', 'LTIME', window_spec)

In [12]:
df_lag_activity = getLagOrLeadGps(df_lead_time, 'LAG', 'ACTIVITY', window_spec)

In [13]:
df_lead_activity = getLagOrLeadGps(df_lag_activity, 'LEAD', 'ACTIVITY', window_spec)

In [14]:
df_lag_lat = getLagOrLeadGps(df_lead_activity, 'LAG', 'LAT', window_spec)

In [15]:
df_lag_lon = getLagOrLeadGps(df_lag_lat, 'LAG', 'LON', window_spec)

In [16]:
@F.udf(returnType=IntegerType())
def calcDistance(lon_a, lat_a, lon_b, lat_b):
    geo_list = [lon_a,  lat_a, lon_b, lat_b]
    if any(x is None for x in geo_list):
        return None
    else:
        # Transform to radians
        lon_a, lat_a, lon_b, lat_b = map(radians, geo_list)
        dist_lon = lon_b - lon_a
        dist_lat = lat_b - lat_a
        # Calculate area
        area = sin(dist_lat/2)**2 + cos(lat_a) * cos(lat_b) * sin(dist_lon/2)**2
        # Calculate the central angle
        central_angle = 2 * asin(sqrt(area))
        radius = 6371
        # Calculate Distance
        distance = central_angle * radius
        return int(abs(round(distance, 3)) * 1000)

In [17]:
df_distance = (
    df_lag_lon
        .withColumn('DISTANCE', calcDistance(
            F.col('LAG_LON'), 
            F.col('LAG_LAT'), 
            F.col('LON'), 
            F.col('LAT')
            )
        )
    )   

In [18]:
df_gps_delay = (
    df_distance
        .withColumn('GPS_DELAY',
            F.when(
                F.col('STIME_SEC') - F.col('LTIME_SEC') < -32000, -32000
            )
            .otherwise(
                F.when(
                    F.col('STIME_SEC') - F.col('LTIME_SEC') > 32000, -32000
                )
                .otherwise(
                    F.col('STIME_SEC') - F.col('LTIME_SEC')
                )
            )
        )
    )

In [19]:
@F.udf(returnType=IntegerType())
def epochToWeekday(epoch):
    return int(datetime.fromtimestamp(epoch).strftime('%w'))

In [20]:
@F.udf(returnType=IntegerType())
def epochToHour(epoch):
    return int(datetime.fromtimestamp(epoch).strftime('%-H'))

In [21]:
df_weekday = df_gps_delay.withColumn('WEEKDAY', epochToWeekday(F.col('STIME_SEC')))
df_hour = df_weekday.withColumn('HOUR', epochToHour(F.col('STIME_SEC')))

In [22]:
df_weekend = (
    df_hour
        .withColumn('WEEKEND', 
            F.when(
                F.col('WEEKDAY').isin([6, 0]), True
            )
            .otherwise(False)
        )
    )   

In [23]:
def isSummerTime(dt=datetime.now(), timezone="Europe/Berlin"):
    timezone = pytz.timezone(timezone)
    timezone_aware_date = timezone.localize(dt, is_dst=None)
    return timezone_aware_date.tzinfo._dst.seconds != 0

In [24]:
hours_added = 1
if isSummerTime():
    hours_added = 2

df_time_day = (
    df_weekend
        .withColumn('TIME_DAY', 
            F.when(
                (F.col('HOUR') + F.lit(hours_added)).between(6, 11), 'MORNING'
            )
            .otherwise(
                F.when(
                    (F.col('HOUR') + F.lit(hours_added)).between(12, 17), 'AFTERNOON'
                )
                .otherwise(
                    F.when(
                        (F.col('HOUR') + F.lit(hours_added)).between(18, 24), 'NIGHT'
                    )
                    .otherwise('NIGHT')
                )
            )
        )
    )

In [25]:
#df_time_day.select(['STIME_SEC', 'WEEKDAY', 'WEEKEND', 'HOUR', 'TIME_DAY', 'LEVEL']).show(50)

In [26]:
@F.udf(returnType=DoubleType())
def dbmToMw(level):
    if level is None:
        return None
    else:
        return 10**((level)/10.)

In [27]:
df_level = df_time_day.withColumn('LEVEL_MW', dbmToMw(F.col('LEVEL')))

In [29]:
container_name = 'data'
path_to_table = '/features/'

def writeDataframeToAdls(dataframe, container_name, path_to_table, table_name, mode='overwrite'):
    (dataframe
        .write
        .mode(mode)
        .format("table_name")
        .parquet(f"abfss://{container_name}@REDACTED.dfs.core.windows.net{path_to_table}{table_name}")
    )

In [30]:
writeDataframeToAdls(df_level, container_name, path_to_table, "df_features")