In [None]:
%%pyspark
# making a table out of our data stored bij the streaming analytics Job in the blobcontainer
blob_account_name = "storageb2group3"
blob_container_name = "streamingoutputcontainer"
from pyspark.sql import SparkSession

sc = SparkSession.builder.getOrCreate()
token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
blob_sas_token = token_library.getConnectionString("AzureBlobStorage1")

spark.conf.set(
    'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
    blob_sas_token)
df = spark.read.load('wasbs://streamingoutputcontainer@storageb2group3.blob.core.windows.net/sailingdataoutput/*', format='csv'
## If header exists uncomment line below
, header=True
)
display(df.limit(5))

In [None]:
from pyspark.sql import functions as F, Window as W
from pyspark.sql.functions import lit

# getting only the latest 10 rows in, one for each boat
w = W.partitionBy('boat').orderBy(F.desc('EventEnqueuedUtcTime'))
df = (df
    .withColumn('latest', F.first('EventEnqueuedUtcTime').over(w))
    .filter('EventEnqueuedUtcTime = latest')
)

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from pyspark.sql.functions import sqrt, sin, cos, udf
import numpy as np

# calculate the distance of each boat from the first point to the latest
def haversine(lat2, lon2):
    lon1 = np.radians(-9.41)
    lat1 = np.radians(38.69)
    lon2 = np.radians(float(lon2))
    lat2 = np.radians(float(lat2)) 
    newlon = lon2 - lon1
    newlat = lat2 - lat1
    haver_formula = (
        np.sin(newlat / 2.0) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(newlon / 2.0) ** 2 )
    haver_formula = float(haver_formula)
    dist = 2 * np.arcsin(np.sqrt(haver_formula))
    
    return float(6373.0) * float(dist)

dist_udf = F.udf(haversine,FloatType())

In [None]:
# to add column with distance
df_new = df.withColumn("distance", dist_udf(df['latitude'], df['longitude']))

# make new table with boat, distance, header df.first() drop
#df_final = df.select(df.columns[0:4]).show()


In [None]:
from pyspark.sql import SparkSession

# Use same name for file every time when overwriting
def write_csv_with_specific_file_name(sc, df, path, filename):
    file_format = 'csv'
    df.repartition(1).write.option("header", "true").format(file_format).mode("overwrite").save(path)
    try:
        sc_uri = sc._gateway.jvm.java.net.URI
        sc_path = sc._gateway.jvm.org.apache.hadoop.fs.Path
        file_system = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
        configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
        fs = file_system.get(sc_uri("wasbs://streamingoutputcontainer@storageb2group3.blob.core.windows.net/latest_output_table"), configuration())
        src_path = None
        status = fs.listStatus(sc_path(path))
        for fileStatus in status:
            temp = fileStatus.getPath().toString()
            if "part" in temp:
                src_path = sc_path(temp)
        dest_path = sc_path(path + filename)
        if fs.exists(src_path) and fs.isFile(src_path):
            fs.rename(src_path, dest_path)
            fs.delete(src_path, True)
    except Exception as e:
        raise Exception("Error renaming the part file to {}:".format(filename, e))



if __name__ == '__main__':
    spark = SparkSession.builder.getOrCreate()
    write_csv_with_specific_file_name(spark.sparkContext, df_new, "wasbs://streamingoutputcontainer@storageb2group3.blob.core.windows.net/latest_output_table", "/sailingdata.csv")