In [2]:
!pip install pysolar
from pysolar.solar import *
from dateutil import tz
import datetime
import pandas as pd
import math

from tqdm import tqdm
tqdm.pandas()



In [2]:
tzone = tz.gettz('Europe/Berlin')

In [3]:
# EXAMPLE:
latitude = 52.55075
longitude = 13.414106

date = datetime.datetime(2018, 1, 1, 2, 0, 0, tzinfo=tzone)
sun_altitude = get_altitude(latitude, longitude, date)
sun_altitude

-53.749102831159014

In [4]:
def get_sun_altitude(year,month,weekday,hour, lat, long):
  date_1 = datetime.datetime(year,
                           month,
                           weekday,
                           hour,
                           0, 
                           0, 
                           tzinfo=tzone)
  date_2 = datetime.datetime(year,
                               month,
                               weekday+7,
                               hour,
                               0, 
                               0, 
                               tzinfo=tzone)
  date_3 = datetime.datetime(year,
                               month,
                               weekday+14,
                               hour,
                               0, 
                               0, 
                               tzinfo=tzone)
  date_4 = datetime.datetime(year,
                               month,
                               weekday+21,
                               hour,
                               0, 
                               0, 
                               tzinfo=tzone)
  sun_altitude = get_altitude(lat, long, date_1) + get_altitude(lat, long, date_2) + get_altitude(lat, long, date_3) + get_altitude(lat, long, date_4)
  sun_altitude = sun_altitude/4
  return float(sun_altitude)

In [5]:
get_sun_altitude(2022,4,1,16,52.509,13.385)

33.868894127442694

In [6]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, FloatType, DoubleType

In [7]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('ML_Project').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [8]:
df_negative_samples = pd.read_csv('../data/output/negative_combinations.csv', index_col=[0])


In [9]:
df_negative_samples.head()

Unnamed: 0,hour,year,month,weekday_name,weekday,segment_id
0,0,2018,1,Monday,2,42079.0
1,0,2018,1,Monday,2,20232.0
2,3,2018,1,Monday,2,42612.0
3,3,2018,1,Monday,2,24521.0
4,4,2018,1,Monday,2,20491.0


In [15]:
df_negative_samples['sun_elevation_angle'] = df_negative_samples.progress_apply(lambda row:get_sun_altitude(row['year'],row['month'],row['weekday'],row['hour'],row['YGCSWGS84'],row['XGCSWGS84']), axis=1)

100%|█████████████████████████████████████████████████████████████████████| 1635962/1635962 [3:24:41<00:00, 133.21it/s]


In [19]:
df_negative_samples = df_negative_samples.drop(['_c0','weekday_name'], axis=1)

In [33]:
df_negative_samples

Unnamed: 0,hour,year,month,weekday,segment_id,XGCSWGS84,YGCSWGS84,sun_elevation_angle,hour_sin,hour_cos,month_sin,month_cos,collision
0,0,2018,1,2,42079.0,13.298572,52.534142,-58.729261,0.000000e+00,1.000000,5.000000e-01,0.866025,0
1,0,2018,1,2,20232.0,13.397531,52.495270,-58.774695,0.000000e+00,1.000000,5.000000e-01,0.866025,0
2,3,2018,1,2,42612.0,13.386045,52.515511,-45.581977,7.308360e-01,0.682553,5.000000e-01,0.866025,0
3,3,2018,1,2,24521.0,13.553259,52.568371,-45.467438,7.308360e-01,0.682553,5.000000e-01,0.866025,0
4,4,2018,1,2,20491.0,13.343180,52.479771,-37.103338,8.878852e-01,0.460065,5.000000e-01,0.866025,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1635957,22,2020,12,2,10603.0,13.229249,52.573277,-52.410378,-2.697968e-01,0.962917,-2.449294e-16,1.000000,0
1635958,22,2020,12,2,26910.0,13.540070,52.534370,-52.579419,-2.697968e-01,0.962917,-2.449294e-16,1.000000,0
1635959,23,2020,12,2,1235.0,13.287013,52.417854,-58.277757,-2.449294e-16,1.000000,-2.449294e-16,1.000000,0
1635960,23,2020,12,2,22223.0,13.372655,52.540176,-58.193322,-2.449294e-16,1.000000,-2.449294e-16,1.000000,0


In [None]:
df_negative_samples = spark.createDataFrame(df_negative_samples)

In [None]:
df_negative_samples = df_negative_samples.withColumn("collision", lit(0))

In [None]:
df_negative_samples = df_negative_samples.drop('_c0',"YGCSWGS84","XGCSWGS84")

In [9]:
df_collisions = spark.read.csv('../data/output/df_collisions_merged.csv', header=True, inferSchema=True)

In [10]:
df_collisions = df_collisions.select(['hour','year','month','weekday','segment_id','XGCSWGS84','YGCSWGS84'])

In [11]:
df_collisions = df_collisions.withColumn('YGCSWGS84', regexp_replace('YGCSWGS84',',','.'))
df_collisions = df_collisions.withColumn('XGCSWGS84', regexp_replace('XGCSWGS84',',','.'))

In [12]:
df_collisions=df_collisions.withColumn("XGCSWGS84",df_collisions.XGCSWGS84.cast(FloatType()))
df_collisions=df_collisions.withColumn("YGCSWGS84",df_collisions.YGCSWGS84.cast(FloatType()))

In [13]:
df_collisions = df_collisions.toPandas()

In [22]:
df_collisions.dtypes

hour            int32
year            int32
month           int32
weekday         int32
segment_id    float64
XGCSWGS84     float32
YGCSWGS84     float32
hour_sin      float64
hour_cos      float64
month_sin     float64
month_cos     float64
dtype: object

In [25]:
df_collisions['sun_elevation_angle'] = df_collisions.progress_apply(lambda row:get_sun_altitude(row['year'].astype('int'),row['month'].astype('int'),row['weekday'].astype('int'),row['hour'].astype('int'),row['YGCSWGS84'].astype('int'),row['XGCSWGS84'].astype('int')), axis=1)

100%|███████████████████████████████████████████████████████████████████████████| 96666/96666 [09:46<00:00, 164.79it/s]


In [19]:
df_collisions['hour_sin'] = df_collisions.progress_apply(lambda row: math.sin(2 * math.pi * row['hour']/23.0),axis=1)
df_collisions['hour_cos'] = df_collisions.progress_apply(lambda row: math.cos(2 * math.pi * row['hour']/23.0),axis=1)
df_collisions['month_sin'] = df_collisions.progress_apply(lambda row: math.sin(2 * math.pi * row['month']/12.0),axis=1)
df_collisions['month_cos'] = df_collisions.progress_apply(lambda row: math.cos(2 * math.pi * row['month']/12.0),axis=1)



100%|█████████████████████████████████████████████████████████████████████████| 96666/96666 [00:02<00:00, 44877.65it/s]
100%|█████████████████████████████████████████████████████████████████████████| 96666/96666 [00:02<00:00, 47213.52it/s]
100%|█████████████████████████████████████████████████████████████████████████| 96666/96666 [00:02<00:00, 48219.03it/s]
100%|█████████████████████████████████████████████████████████████████████████| 96666/96666 [00:01<00:00, 52560.22it/s]


In [27]:
df_collisions['collision'] = 1

In [28]:
df_collisions

Unnamed: 0,hour,year,month,weekday,segment_id,XGCSWGS84,YGCSWGS84,hour_sin,hour_cos,month_sin,month_cos,sun_elevation_angle,collision
0,15,2018,1,4,6209.0,13.475018,52.513596,-0.816970,-0.576680,5.000000e-01,0.866025,8.940090,1
1,11,2018,1,2,41374.0,13.291022,52.587257,0.136167,-0.990686,5.000000e-01,0.866025,14.868199,1
2,11,2018,1,2,41373.0,13.291022,52.587257,0.136167,-0.990686,5.000000e-01,0.866025,14.868199,1
3,9,2018,1,3,4410.0,13.420578,52.526020,0.631088,-0.775711,5.000000e-01,0.866025,5.386518,1
4,17,2018,1,2,1452.0,13.348288,52.481846,-0.997669,-0.068242,5.000000e-01,0.866025,-5.666644,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...
96661,16,2020,12,3,11485.0,13.391817,52.510570,-0.942261,-0.334880,-2.449294e-16,1.000000,-0.917142,1
96662,16,2020,12,3,11486.0,13.391817,52.510570,-0.942261,-0.334880,-2.449294e-16,1.000000,-0.917142,1
96663,16,2020,12,3,26570.0,13.391817,52.510570,-0.942261,-0.334880,-2.449294e-16,1.000000,-0.917142,1
96664,16,2020,12,3,11484.0,13.391817,52.510570,-0.942261,-0.334880,-2.449294e-16,1.000000,-0.917142,1


In [28]:
df_negative_samples = pd.read_csv('../data/output/df_negative_.csv')

In [24]:
df_collisions = pd.read_csv('../data/output/positive_sample.csv', index_col=[0])
df_collisions

Unnamed: 0,hour,year,month,weekday,segment_id,XGCSWGS84,YGCSWGS84,hour_sin,hour_cos,month_sin,month_cos,sun_elevation_angle
0,15,2018,1,4,6209.0,13.475018,52.513596,-0.816970,-0.576680,5.000000e-01,0.866025,8.940090
1,11,2018,1,2,41374.0,13.291022,52.587257,0.136167,-0.990686,5.000000e-01,0.866025,14.868199
2,11,2018,1,2,41373.0,13.291022,52.587257,0.136167,-0.990686,5.000000e-01,0.866025,14.868199
3,9,2018,1,3,4410.0,13.420578,52.526020,0.631088,-0.775711,5.000000e-01,0.866025,5.386518
4,17,2018,1,2,1452.0,13.348288,52.481846,-0.997669,-0.068242,5.000000e-01,0.866025,-5.666644
...,...,...,...,...,...,...,...,...,...,...,...,...
96661,16,2020,12,3,11485.0,13.391817,52.510570,-0.942261,-0.334880,-2.449294e-16,1.000000,-0.917142
96662,16,2020,12,3,11486.0,13.391817,52.510570,-0.942261,-0.334880,-2.449294e-16,1.000000,-0.917142
96663,16,2020,12,3,26570.0,13.391817,52.510570,-0.942261,-0.334880,-2.449294e-16,1.000000,-0.917142
96664,16,2020,12,3,11484.0,13.391817,52.510570,-0.942261,-0.334880,-2.449294e-16,1.000000,-0.917142


In [None]:
#df_collisions = df_collisions.withColumn("sun_elevation", sun_elevation_udf(df_collisions.year,df_collisions.month,df_collisions.weekday,df_collisions.hour,df_collisions.YGCSWGS84,df_collisions.XGCSWGS84))

In [None]:
df_collisions = df_collisions.withColumn('weekday_name',when(df_collisions.weekday == 1,'Sunday').when(df_collisions.weekday == 2,'Monday').when(df_collisions.weekday == 3,'Tuesday').when(df_collisions.weekday == 4,'Wednesday').when(df_collisions.weekday == 5,'Thursday').when(df_collisions.weekday == 6,'Friday').otherwise('Saturday'))

In [None]:
df_collisions = df_collisions.withColumn("collision", lit(1))

In [None]:
df_negative_samples.dtypes

In [None]:
df_collisions.dtypes

In [None]:
df_collisions.count()

In [None]:
df_collisions = df_collisions.toPandas()

In [27]:
df_collisions.to_csv('../data/output/positive_sample.csv')

In [None]:
df_negative_samples = df_negative_samples.toPandas()

In [21]:
df_negative_samples.to_csv('../data/output/negative_sample_large.csv')

In [34]:
df_full = df_collisions.append(df_negative_samples)

In [35]:
df_full

Unnamed: 0,XGCSWGS84,YGCSWGS84,collision,hour,hour_cos,hour_sin,month,month_cos,month_sin,segment_id,sun_elevation_angle,weekday,year
0,13.475018,52.513596,1,15,-0.576680,-8.169699e-01,1,0.866025,5.000000e-01,6209.0,8.940090,4,2018
1,13.291022,52.587257,1,11,-0.990686,1.361666e-01,1,0.866025,5.000000e-01,41374.0,14.868199,2,2018
2,13.291022,52.587257,1,11,-0.990686,1.361666e-01,1,0.866025,5.000000e-01,41373.0,14.868199,2,2018
3,13.420578,52.526020,1,9,-0.775711,6.310879e-01,1,0.866025,5.000000e-01,4410.0,5.386518,3,2018
4,13.348288,52.481846,1,17,-0.068242,-9.976688e-01,1,0.866025,5.000000e-01,1452.0,-5.666644,2,2018
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1635957,13.229249,52.573277,0,22,0.962917,-2.697968e-01,12,1.000000,-2.449294e-16,10603.0,-52.410378,2,2020
1635958,13.540070,52.534370,0,22,0.962917,-2.697968e-01,12,1.000000,-2.449294e-16,26910.0,-52.579419,2,2020
1635959,13.287013,52.417854,0,23,1.000000,-2.449294e-16,12,1.000000,-2.449294e-16,1235.0,-58.277757,2,2020
1635960,13.372655,52.540176,0,23,1.000000,-2.449294e-16,12,1.000000,-2.449294e-16,22223.0,-58.193322,2,2020


In [None]:
df_full = df_full.toPandas()

In [None]:
df_full.dtypes

In [None]:
df_full.show(10)

In [None]:
df_full = df_full.withColumn('hour_sin', sin(2 * math.pi * df_full.hour/23.0))
df_full = df_full.withColumn('hour_cos', cos(2 * math.pi * df_full.hour/23.0))
df_full = df_full.withColumn('month_sin', sin(2 * math.pi * df_full.month/12.0))
df_full = df_full.withColumn('month_cos', cos(2 * math.pi * df_full.month/12.0))

In [None]:
df_full.show(10)

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")


In [None]:
df_full = df_full.toPandas()

In [36]:
df_full.to_csv('../data/output/full_sample_big.csv')