***
## Overview: 
This dataset contains cloud-to-ground lightning strike information collected by Vaisala's National Lightning Detection Network and aggregated into 1.1 degree tiles by the experts at the National Centers for Environmental Information (NCEI) as part of their Severe Weather Data Inventory. This data provides historical cloud-to-ground data aggregated into tiles that around roughly 11 KMs for redistribution. This provides users with the number of lightning strikes each day, as well as the center point for each tile. The sample queries below will help you get started using BigQuery's GIS capabilities to analyze the data. 

Update frequency: Daily

Dataset source: NOAA
***

`date`:	DATE	NULLABLE

`number_of_strikes`:	INTEGER	NULLABLE

`center_point_geom`:	GEOGRAPHY	NULLABLE	
*Center point of 0.10-degree tiles (roughly 1.1km) that aggregate strikes within the given tile.*

**Point ( Lat, Long, SRID )**

`Lat`

Is a float expression representing the y-coordinate of the Point being generated.

`Long`

Is a float expression representing the x-coordinate of the Point being generated. For more information on valid latitude and longitude values, see <a href="url" target="_blank">Point</a>.

`SRID`

Is an int expression representing the Spatial Reference Identifier of the geography instance you wish to return.

In [1]:
import pyspark

In [2]:
from pyspark.sql.types import *
schemaFields = [StructField("date", DateType(), True),
                StructField("number_of_strikes",  IntegerType(), True),
                StructField("center_point_geom", StringType(), True)]
Schema = StructType(schemaFields)

In [3]:
from pyspark.sql import SparkSession

spark1 = SparkSession \
    .builder \
    .appName("LightingAna") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
raw_lighting_data= spark1\
.read\
.option("header", "true")\
.schema(Schema)\
.csv(r"C:\Users\USER\Desktop\GCP\mypro_tiw\lightingData\year1987.csv")

In [5]:
raw_lighting_data.printSchema()
raw_lighting_data.show()

root
 |-- date: date (nullable = true)
 |-- number_of_strikes: integer (nullable = true)
 |-- center_point_geom: string (nullable = true)

+----------+-----------------+-----------------+
|      date|number_of_strikes|center_point_geom|
+----------+-----------------+-----------------+
|1987-01-01|               21|  POINT(-80.7 26)|
|1987-01-04|               23|POINT(-83.2 28.3)|
|1987-01-05|               40|POINT(-78.9 31.3)|
|1987-01-05|               20|POINT(-79.2 31.4)|
|1987-01-05|               21|POINT(-80.1 31.4)|
|1987-01-05|               18|POINT(-79.4 31.5)|
|1987-01-05|               20|POINT(-78.8 31.3)|
|1987-01-05|               35|  POINT(-80 31.4)|
|1987-01-17|               20|POINT(-77.4 31.9)|
|1987-01-17|               17|  POINT(-77 31.9)|
|1987-01-17|               20|POINT(-77.2 31.8)|
|1987-01-17|               24|POINT(-77.4 31.8)|
|1987-01-25|               17|  POINT(-91.2 32)|
|1987-01-25|               20|POINT(-91.6 32.5)|
|1987-01-25|               1

In [6]:
def geo_to_LAT(point):
    raw = point.split()
    LAT = float(raw[0][6:])
    return LAT
def geo_to_LONG(point):
    raw = point.split()
    LONG = float(raw[1][:-1])
    return LONG

In [7]:
from pyspark.sql.functions import udf

point_to_LAT = udf(lambda point: geo_to_LAT(point), FloatType())
point_to_LONG = udf(lambda point: geo_to_LONG(point), FloatType())
data_update_geo = raw_lighting_data.withColumn("LAT", point_to_LAT(raw_lighting_data.center_point_geom))\
                            .withColumn("LONG", point_to_LONG(raw_lighting_data.center_point_geom))
data_update_geo.show()

+----------+-----------------+-----------------+-----+----+
|      date|number_of_strikes|center_point_geom|  LAT|LONG|
+----------+-----------------+-----------------+-----+----+
|1987-01-01|               21|  POINT(-80.7 26)|-80.7|26.0|
|1987-01-04|               23|POINT(-83.2 28.3)|-83.2|28.3|
|1987-01-05|               40|POINT(-78.9 31.3)|-78.9|31.3|
|1987-01-05|               20|POINT(-79.2 31.4)|-79.2|31.4|
|1987-01-05|               21|POINT(-80.1 31.4)|-80.1|31.4|
|1987-01-05|               18|POINT(-79.4 31.5)|-79.4|31.5|
|1987-01-05|               20|POINT(-78.8 31.3)|-78.8|31.3|
|1987-01-05|               35|  POINT(-80 31.4)|-80.0|31.4|
|1987-01-17|               20|POINT(-77.4 31.9)|-77.4|31.9|
|1987-01-17|               17|  POINT(-77 31.9)|-77.0|31.9|
|1987-01-17|               20|POINT(-77.2 31.8)|-77.2|31.8|
|1987-01-17|               24|POINT(-77.4 31.8)|-77.4|31.8|
|1987-01-25|               17|  POINT(-91.2 32)|-91.2|32.0|
|1987-01-25|               20|POINT(-91.

**Find**  min, max of LAT

In [13]:
def rename_columns(df, columns):
    if isinstance(columns, dict):
        for old_name, new_name in columns.items():
            df = df.withColumnRenamed(old_name, new_name)
        return df
    else:
        raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")

In [14]:
from pyspark.sql.functions import max,min
renameDict = {'max(LAT)':'max_Latitude','min(LAT)':'min_Latitude'}
MinMaxLAT = data_update_geo.select(max("LAT"),min("LAT"))
MinMaxLAT = rename_columns(MinMaxLAT, renameDict)
MinMaxLAT.show()

+------------+------------+
|max_Latitude|min_Latitude|
+------------+------------+
|       -60.0|      -130.0|
+------------+------------+



In [None]:
#min max2