In [None]:
# install pyspark (should have java installed first)
%pip install pyspark

In [3]:
import pyspark
import pandas as pd
from pyspark.sql import  SparkSession

from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import col

from pyspark.sql.functions import lit

In [5]:
spark = SparkSession.builder.appName('B5').getOrCreate()

In [7]:
# raed csv
# to have header as files's header name
df = spark.read.csv('dumpRawDataB5.csv', header=True, inferSchema=True)

In [9]:
df.printSchema()

root
 |-- _FileId: integer (nullable = true)
 |-- _SegmentNr: integer (nullable = true)
 |-- Snelheid_HA: double (nullable = true)
 |-- B5_Abs_Plaus_Time: double (nullable = true)
 |-- B5_Time_Based: double (nullable = true)



In [10]:
# replace nan value with 0 for column 'B5_Time_Based', 'B5_Abs_Plaus_Time'
df1 = df.fillna(0, subset=['B5_Time_Based', 'B5_Abs_Plaus_Time'])
df1.show()

+----------+----------+-----------+-----------------+-------------+
|   _FileId|_SegmentNr|Snelheid_HA|B5_Abs_Plaus_Time|B5_Time_Based|
+----------+----------+-----------+-----------------+-------------+
|1651763725|         0|    7301.34|              0.0|          0.0|
|1651763725|         1|    7306.74|              0.0|          0.0|
|1651763725|         2|     7311.6|              0.0|          0.0|
|1651763725|         3|    7311.87|              0.0|          0.0|
|1651763725|         4|    7313.49|              0.0|          0.0|
|1651763725|         5|     7306.2|              0.0|          0.0|
|1651763725|         6|     7306.2|              0.0|          0.0|
|1651763725|         7|    7307.28|              0.0|          0.0|
|1651763725|         8|     7317.0|              0.0|          0.0|
|1651763725|         9|     7317.0|              0.0|          0.0|
|1651763725|        10|    7317.27|              0.0|          0.0|
|1651763725|        11|     7319.7|             

In [11]:
#show the shcema and data type
df1.printSchema()

root
 |-- _FileId: integer (nullable = true)
 |-- _SegmentNr: integer (nullable = true)
 |-- Snelheid_HA: double (nullable = true)
 |-- B5_Abs_Plaus_Time: double (nullable = false)
 |-- B5_Time_Based: double (nullable = false)



In [12]:
# convert datatyoe of the columns
df1 = df1.withColumn('Snelheid_HA', df1['Snelheid_HA'].cast('float'))

df1 = df1 \
  .withColumn("B5_Abs_Plaus_Time" ,
              df1["B5_Abs_Plaus_Time"]
              .cast(FloatType()))   \
  .withColumn("B5_Time_Based",
              df1["B5_Time_Based"]
              .cast(FloatType()))



In [15]:
# replace column null with 0 if it is null
df1 = df.withColumn('lengthCoordinate', lit(0))
df2 = df1.na.fill({'B5_Abs_Plaus_Time':'0', 'B5_Time_Based':'0'})

df2 = df2 \
  .withColumn("lengthCoordinate" ,
              df2["lengthCoordinate"]
              .cast(FloatType()))
df2.show()

+----------+----------+-----------+-----------------+-------------+----------------+
|   _FileId|_SegmentNr|Snelheid_HA|B5_Abs_Plaus_Time|B5_Time_Based|lengthCoordinate|
+----------+----------+-----------+-----------------+-------------+----------------+
|1651763725|         0|    7301.34|             null|         null|               0|
|1651763725|         1|    7306.74|             null|         null|               0|
|1651763725|         2|     7311.6|             null|         null|               0|
|1651763725|         3|    7311.87|             null|         null|               0|
|1651763725|         4|    7313.49|             null|         null|               0|
|1651763725|         5|     7306.2|             null|         null|               0|
|1651763725|         6|     7306.2|             null|         null|               0|
|1651763725|         7|    7307.28|             null|         null|               0|
|1651763725|         8|     7317.0|             null|         nul

In [18]:
# calculate B5
df2 = df2.withColumn('B5', col('B5_Time_Based')*0.1)

In [21]:
df2.show()

+----------+----------+-----------+-----------------+-------------+----------------+---+
|   _FileId|_SegmentNr|Snelheid_HA|B5_Abs_Plaus_Time|B5_Time_Based|lengthCoordinate| B5|
+----------+----------+-----------+-----------------+-------------+----------------+---+
|1651763725|         0|    7301.34|              0.0|          0.0|             0.0|0.0|
|1651763725|         1|    7306.74|              0.0|          0.0|             0.0|0.0|
|1651763725|         2|     7311.6|              0.0|          0.0|             0.0|0.0|
|1651763725|         3|    7311.87|              0.0|          0.0|             0.0|0.0|
|1651763725|         4|    7313.49|              0.0|          0.0|             0.0|0.0|
|1651763725|         5|     7306.2|              0.0|          0.0|             0.0|0.0|
|1651763725|         6|     7306.2|              0.0|          0.0|             0.0|0.0|
|1651763725|         7|    7307.28|              0.0|          0.0|             0.0|0.0|
|1651763725|         

In [22]:
df2 = df2 \
  .withColumn("B5_Abs_Plaus_Time" ,
              df2["B5_Abs_Plaus_Time"]
              .cast(FloatType()))   \
  .withColumn("B5_Time_Based",
              df2["B5_Time_Based"]
              .cast(FloatType()))\
    .withColumn("B5",
                df2['B5']
                .cast(FloatType()))

In [23]:
df2.dtypes

[('_FileId', 'int'),
 ('_SegmentNr', 'int'),
 ('Snelheid_HA', 'double'),
 ('B5_Abs_Plaus_Time', 'float'),
 ('B5_Time_Based', 'float'),
 ('lengthCoordinate', 'float'),
 ('B5', 'float')]

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.functions import col

# use UDF function to calculate lengthCoordinate
def lengthCoordinate(x, y):
    df2_length = df2.count()
    for i in range (df2_length):
        x[i] = x[i-1] + y[i-1] *0.001* 0.04
    return x

lengthCoordinate1 = udf(lengthCoordinate, FloatType())   
df2 = df2.withColumn('lengthCoordinate', lengthCoordinate1(df2.select(col('lengthCoordinate')), df2.select(col('Snelheid_HA'))))
df2.show()

In [28]:
# test the UDF function
from pyspark.sql.functions import udf
from pyspark.sql.functions import col

def lengthCoordinate(x):
    
    return x * 2

lengthCoordinate1 = udf(lengthCoordinate, FloatType())   
df2 = df2.withColumn('lengthCoordinate', lengthCoordinate1('Snelheid_HA'))
df2.show()

+----------+----------+-----------+-----------------+-------------+----------------+---+
|   _FileId|_SegmentNr|Snelheid_HA|B5_Abs_Plaus_Time|B5_Time_Based|lengthCoordinate| B5|
+----------+----------+-----------+-----------------+-------------+----------------+---+
|1651763725|         0|    7301.34|              0.0|          0.0|        14602.68|0.0|
|1651763725|         1|    7306.74|              0.0|          0.0|        14613.48|0.0|
|1651763725|         2|     7311.6|              0.0|          0.0|         14623.2|0.0|
|1651763725|         3|    7311.87|              0.0|          0.0|        14623.74|0.0|
|1651763725|         4|    7313.49|              0.0|          0.0|        14626.98|0.0|
|1651763725|         5|     7306.2|              0.0|          0.0|         14612.4|0.0|
|1651763725|         6|     7306.2|              0.0|          0.0|         14612.4|0.0|
|1651763725|         7|    7307.28|              0.0|          0.0|        14614.56|0.0|
|1651763725|         

In [57]:
# try out UDF function
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
def lengthCoordinate(x, y):
    df2_length = df2.count()
    for i in range (df2_length):
        x[i] = x[i-1] + y[i-1] *0.001* 0.04
    return x

length_coor = df2.select('*', lengthCoordinate(col('lengthCoordinate'), col('Snelheid_HA')))

length_coor.show()

# lenth_coor = udf(lambda x, y:lengthCoordinate(x, y), FloatType())
# df3 = df2.withColumn('lengthCoordinate', lenth_coor(col('lengthCoordinate'), col('Snelheid_HA')))
# df3.show()

TypeError: 'Column' object does not support item assignment