# Preliminaries

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.0-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

In [5]:
import pyspark.sql
from pyspark.sql import Row
from pyspark.sql.types import *
import json

In [6]:
import numpy as np
np.set_printoptions(precision = 10, suppress = True)

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#Targets

In [8]:
myFolder = "/content/drive/My Drive/H518/Project_PDA/"

In [9]:
myFile = 'targets_w_keys_20231109.csv'

In [10]:
myPath = myFolder + myFile
print(myPath)

/content/drive/My Drive/H518/Project_PDA/targets_w_keys_20231109.csv


In [11]:
outFolder = "/content/drive/My Drive/ERA5/processed/"

In [12]:
outFile = 'processed_targets_20231115.csv'

In [13]:
outPath = outFolder + outFile
print(outPath)

/content/drive/My Drive/ERA5/processed/processed_targets_20231115.csv


In [14]:
custom_schema = StructType([
    StructField("EVENT_ID", StringType(), False),
    StructField("STATE_ABBR", StringType(), False),
    StructField("LAT", FloatType(), False),
    StructField("LON", FloatType(), False),
    StructField("YEAR", IntegerType(), False),
    StructField("MONTH", IntegerType(), False),
    StructField("DAY", IntegerType(), False),
    StructField("HOUR", IntegerType(), False)
])

In [15]:
from time import strftime, gmtime, mktime, strptime
from pytz import timezone
from datetime import datetime #, date, time, timezone
from dateutil import tz
import pytz

In [16]:
#rawTargetsDF = spark.read.schema(custom_schema).option("header", True).option("delimiter", ",").csv(myPath)

In [17]:
rawTargetsDF = spark.read.option("header", True).csv(myPath)
type(rawTargetsDF)

pyspark.sql.dataframe.DataFrame

In [18]:
def map_times(y,m,d,h):
  to_map = datetime(y,m,d,h,0,0)
  est_tz = timezone('US/Eastern')
  awaretime = est_tz.localize(to_map)
  utc_tz = timezone('UTC')
  mapped = awaretime.astimezone(utc_tz)
  return  mapped.year,  mapped.month, mapped.day, mapped.hour

In [19]:
rawTargetsR = rawTargetsDF.rdd.map(lambda x:[x[0],x[1],x[2],x[3],map_times(int(x[4]),int(x[5]),int(x[6]),int(x[7]))]).map(lambda x:[x[0],x[1],x[2],x[3],x[4][0],x[4][1],x[4][2],x[4][3]])
rawTargetsR.take(5)

[['805930', 'IN', '37.98', '-87.6602', 2019, 3, 14, 15],
 ['805929', 'IN', '37.9902', '-87.563', 2019, 3, 14, 15],
 ['811260', 'IN', '40.42', '-86.27', 2019, 3, 14, 15],
 ['805926', 'IN', '38.2506', '-87.0904', 2019, 3, 14, 16],
 ['805925', 'IN', '37.8666', '-87.057', 2019, 3, 14, 17]]

In [20]:
relabeledTargetsR = rawTargetsR.map(lambda p: Row(EVENT_ID=p[0],STATE_ABBR=p[1],LAT=p[2],LON=p[3],YEAR=p[4],MONTH=p[5],DAY=p[6],HOUR=p[7]))
relabeledTargetsR.take(5)

[Row(EVENT_ID='805930', STATE_ABBR='IN', LAT='37.98', LON='-87.6602', YEAR=2019, MONTH=3, DAY=14, HOUR=15),
 Row(EVENT_ID='805929', STATE_ABBR='IN', LAT='37.9902', LON='-87.563', YEAR=2019, MONTH=3, DAY=14, HOUR=15),
 Row(EVENT_ID='811260', STATE_ABBR='IN', LAT='40.42', LON='-86.27', YEAR=2019, MONTH=3, DAY=14, HOUR=15),
 Row(EVENT_ID='805926', STATE_ABBR='IN', LAT='38.2506', LON='-87.0904', YEAR=2019, MONTH=3, DAY=14, HOUR=16),
 Row(EVENT_ID='805925', STATE_ABBR='IN', LAT='37.8666', LON='-87.057', YEAR=2019, MONTH=3, DAY=14, HOUR=17)]

In [21]:
relabeledTargetsDF = spark.createDataFrame(relabeledTargetsR)
relabeledTargetsDF.createOrReplaceTempView("targets")
relabeledTargetsDF.show(5)

+--------+----------+-------+--------+----+-----+---+----+
|EVENT_ID|STATE_ABBR|    LAT|     LON|YEAR|MONTH|DAY|HOUR|
+--------+----------+-------+--------+----+-----+---+----+
|  805930|        IN|  37.98|-87.6602|2019|    3| 14|  15|
|  805929|        IN|37.9902| -87.563|2019|    3| 14|  15|
|  811260|        IN|  40.42|  -86.27|2019|    3| 14|  15|
|  805926|        IN|38.2506|-87.0904|2019|    3| 14|  16|
|  805925|        IN|37.8666| -87.057|2019|    3| 14|  17|
+--------+----------+-------+--------+----+-----+---+----+
only showing top 5 rows



In [22]:
hailDaysDF = spark.sql("SELECT Distinct YEAR, MONTH, DAY FROM targets ORDER BY YEAR, MONTH, DAY")
hailDaysDF.createOrReplaceTempView("haildays")
hailDaysDF.show(10)

+----+-----+---+
|YEAR|MONTH|DAY|
+----+-----+---+
|2013|    1| 29|
|2013|    4| 10|
|2013|    4| 16|
|2013|    4| 17|
|2013|    4| 18|
|2013|    5|  3|
|2013|    5| 10|
|2013|    5| 21|
|2013|    5| 30|
|2013|    6| 11|
+----+-----+---+
only showing top 10 rows



Match targets to nearest ERA5 grid value

In [23]:
tPoints = spark.sql("SELECT Distinct EVENT_ID, LAT, LON FROM targets")
tPoints.show(10)

+--------+-----+------+
|EVENT_ID|  LAT|   LON|
+--------+-----+------+
|  830232|40.03|-84.94|
|  838532|41.63|   -85|
|  838553|41.14|-85.02|
| 1021415|41.62|-87.25|
|  682585|41.67|-84.98|
|  764051|38.37|-86.11|
|  588786|41.19|-86.88|
|  541233| 41.3|-86.12|
|  541917|41.04|-86.03|
|  838547|41.43|-84.87|
+--------+-----+------+
only showing top 10 rows



In [24]:
pointList = tPoints.collect()
print(pointList)

[Row(EVENT_ID='830232', LAT='40.03', LON='-84.94'), Row(EVENT_ID='838532', LAT='41.63', LON='-85'), Row(EVENT_ID='838553', LAT='41.14', LON='-85.02'), Row(EVENT_ID='1021415', LAT='41.62', LON='-87.25'), Row(EVENT_ID='682585', LAT='41.67', LON='-84.98'), Row(EVENT_ID='764051', LAT='38.37', LON='-86.11'), Row(EVENT_ID='588786', LAT='41.19', LON='-86.88'), Row(EVENT_ID='541233', LAT='41.3', LON='-86.12'), Row(EVENT_ID='541917', LAT='41.04', LON='-86.03'), Row(EVENT_ID='838547', LAT='41.43', LON='-84.87'), Row(EVENT_ID='1033436', LAT='38.23', LON='-86.53'), Row(EVENT_ID='638824', LAT='40.15', LON='-86.36'), Row(EVENT_ID='605522', LAT='38.29', LON='-85.83'), Row(EVENT_ID='455967', LAT='40.59', LON='-84.96'), Row(EVENT_ID='904155', LAT='41.51', LON='-87.04'), Row(EVENT_ID='940476', LAT='40.13', LON='-85.69'), Row(EVENT_ID='682581', LAT='41.63', LON='-86.22'), Row(EVENT_ID='690562', LAT='40.15', LON='-85.75'), Row(EVENT_ID='690588', LAT='40.14', LON='-85.69'), Row(EVENT_ID='524375', LAT='39.2

In [25]:
type(pointList)

list

In [26]:
lonVals = [-89.0, -88.75, -88.5, -88.25, -88.0, -87.75, -87.5, -87.25, -87.0, -86.75, -86.5, -86.25, -86.0, -85.75, -85.5, -85.25, -85.0, -84.75, -84.5, -84.25, -84.0]

In [27]:
latVals = [42.0, 41.75, 41.5, 41.25, 41.0, 40.75, 40.5, 40.25, 40.0, 39.75, 39.5, 39.25, 39.0, 38.75, 38.5, 38.25, 38.0, 37.75, 37.5, 37.25, 37.0]

In [28]:
cells = []

def getCells(lon,lat):
  lonRange = range(len(lon))
  latRange = range(len(lat))
  n = 0
  for i in lonRange:
    for j in latRange:
      cells.append([n,lat[j],lon[i]])
      n = n + 1
  print("total cell count:",n)
  print(cells)
getCells(lonVals,latVals)

total cell count: 441
[[0, 42.0, -89.0], [1, 41.75, -89.0], [2, 41.5, -89.0], [3, 41.25, -89.0], [4, 41.0, -89.0], [5, 40.75, -89.0], [6, 40.5, -89.0], [7, 40.25, -89.0], [8, 40.0, -89.0], [9, 39.75, -89.0], [10, 39.5, -89.0], [11, 39.25, -89.0], [12, 39.0, -89.0], [13, 38.75, -89.0], [14, 38.5, -89.0], [15, 38.25, -89.0], [16, 38.0, -89.0], [17, 37.75, -89.0], [18, 37.5, -89.0], [19, 37.25, -89.0], [20, 37.0, -89.0], [21, 42.0, -88.75], [22, 41.75, -88.75], [23, 41.5, -88.75], [24, 41.25, -88.75], [25, 41.0, -88.75], [26, 40.75, -88.75], [27, 40.5, -88.75], [28, 40.25, -88.75], [29, 40.0, -88.75], [30, 39.75, -88.75], [31, 39.5, -88.75], [32, 39.25, -88.75], [33, 39.0, -88.75], [34, 38.75, -88.75], [35, 38.5, -88.75], [36, 38.25, -88.75], [37, 38.0, -88.75], [38, 37.75, -88.75], [39, 37.5, -88.75], [40, 37.25, -88.75], [41, 37.0, -88.75], [42, 42.0, -88.5], [43, 41.75, -88.5], [44, 41.5, -88.5], [45, 41.25, -88.5], [46, 41.0, -88.5], [47, 40.75, -88.5], [48, 40.5, -88.5], [49, 40.25, 

In [29]:
matched = []

def getNearest(points,cells):
  for p in points:
    lst = [p[0],float(p[1]),float(p[2])]
    nearest = ('',999999.1)
    for c in cells:
      a = np.array((lst[1], lst[2]))
      b = np.array((c[1], c[2]))
      distance = np.linalg.norm(a-b)
      if distance < nearest[1]:
        nearest = (c[0],distance)
    matched.append([p[0],nearest[0]])

getNearest(pointList,cells)
print(matched)

[['830232', 344], ['838532', 337], ['838553', 339], ['1021415', 149], ['682585', 337], ['764051', 267], ['588786', 171], ['541233', 255], ['541917', 256], ['838547', 359], ['1033436', 225], ['638824', 238], ['605522', 288], ['455967', 342], ['904155', 170], ['940476', 280], ['682581', 232], ['690562', 280], ['690588', 280], ['524375', 221], ['537838', 296], ['533729', 254], ['429682', 149], ['682574', 338], ['687798', 246], ['624756', 261], ['578466', 121], ['600767', 298], ['819535', 149], ['1016689', 340], ['1021106', 217], ['718397', 240], ['638822', 240], ['638834', 261], ['654122', 136], ['586891', 345], ['523816', 218], ['527314', 239], ['454156', 149], ['457276', 128], ['457277', 128], ['827501', 153], ['755093', 318], ['588792', 233], ['444108', 326], ['957364', 220], ['702832', 221], ['632980', 240], ['658577', 260], ['575937', 287], ['520652', 212], ['541230', 318], ['433719', 260], ['958909', 285], ['690468', 176], ['690496', 176], ['625495', 320], ['659028', 236], ['664153'

Join matched cells back into target list

In [30]:
matchedR = sc.parallelize(matched)
matchedR.take(5)

[['830232', 344],
 ['838532', 337],
 ['838553', 339],
 ['1021415', 149],
 ['682585', 337]]

In [31]:
labeledR = matchedR.map(lambda p: Row(event_id=p[0],cell=p[1]))
labeledR.take(5)

[Row(event_id='830232', cell=344),
 Row(event_id='838532', cell=337),
 Row(event_id='838553', cell=339),
 Row(event_id='1021415', cell=149),
 Row(event_id='682585', cell=337)]

In [32]:
labeledDF = spark.createDataFrame(labeledR)
labeledDF.createOrReplaceTempView("cells")
labeledDF.show(5)

+--------+----+
|event_id|cell|
+--------+----+
|  830232| 344|
|  838532| 337|
|  838553| 339|
| 1021415| 149|
|  682585| 337|
+--------+----+
only showing top 5 rows



In [33]:
targetCells = spark.sql("SELECT Distinct b.cell, a.YEAR, a.MONTH, a.DAY FROM targets a LEFT JOIN cells b ON a.EVENT_ID = b.event_id ")
targetCells.createOrReplaceTempView("targetCells")
targetCells.show(10)

+----+----+-----+---+
|cell|YEAR|MONTH|DAY|
+----+----+-----+---+
| 276|2014|    7| 26|
| 337|2019|    3| 14|
| 170|2020|    4|  8|
| 246|2015|   12| 23|
| 195|2013|    4| 10|
| 141|2017|    3|  1|
| 240|2017|    7|  7|
| 138|2016|   10| 19|
| 159|2022|    5| 19|
| 267|2019|    3| 14|
+----+----+-----+---+
only showing top 10 rows



In [34]:
targetCellsPD = targetCells.toPandas()

In [35]:
targetCellsPD.to_csv(outPath)