# Lightning Flash Dataset: Spark Queries

In [None]:
# 1. pyspark library : 3.4.1
import warnings

import pyspark.sql.functions as F 
import pyspark.sql.types as T 

from pyspark.sql import SparkSession

warnings.filterwarnings('ignore')

## Load Data - Geospatial

In [None]:
# 2.Batch load - Geospatial coordinates
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

# latitude data frame
lat_sdf = spark.read \
            .format('csv') \
            .load("*.lat.csv.test", header=True, inferSchema=True) 
# longitude data frame
lon_sdf = spark.read \
               .format('csv') \
               .load("*.lon.csv.test", header=True, inferSchema=True)   
# geospatial (lat, lon)          
geo_sdf = lat_sdf.join(lon_sdf,on="ts_date",how="inner") \
                 .withColumnRenamed("ts_date", "timestamp") \
                 .orderBy("timestamp", ascending=True) \
                 .show()

## Load Data - Energy Stream

In [None]:
# 3.Structured Streaming - Flash energy discharges
spark.conf.set("spark.sql.adaptive.enabled", "false")
energySchema = T.StructType().add("ts_date", "timestamp").add("energy", "double")
energy_sdf = spark \
            .readStream.schema(energySchema) \
            .option("maxFilesPerTrigger", 1) \
            .format("csv") \
            .load("*.ene.csv.test", header=True, inferSchema=True) \
            .withColumnRenamed("ts_date", "timestamp") \
            .writeStream.format('console') \
            .trigger(processingTime="5 seconds") \
            .outputMode('append') \
            .start()

In [None]:
# check status of stream
energy_sdf.status

In [None]:
# turn off the stream
energy_sdf.stop()

## References

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/index.html

https://www.projectpro.io/article/pyspark-learning-spark-with-python/554
