In [0]:
from pyspark.ml.feature import StandardScaler, MinMaxScaler, VectorAssembler
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.functions import lead, lag
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, row_number, desc
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType 
from pyspark.sql.functions import regexp_replace
# File location and type
file_location = "/FileStore/tables/WeatherEvents_Jan2016_Dec2021.csv"



# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.csv(file_location, header = True, inferSchema = True)

#Caching frequently used df of a smaller size
df_sample = df.limit(50000)
df_sample.cache()

#Retrieving a specific df that I know has incomplete data (null city) and caching it 
df_null = df.filter(col("County") == "Ventura County")
df_null.cache()

df.printSchema()

root
 |-- EventId: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- StartTime(UTC): timestamp (nullable = true)
 |-- EndTime(UTC): timestamp (nullable = true)
 |-- Precipitation(in): double (nullable = true)
 |-- TimeZone: string (nullable = true)
 |-- AirportCode: string (nullable = true)
 |-- LocationLat: double (nullable = true)
 |-- LocationLng: double (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCode: integer (nullable = true)



In [0]:
#Replacing null values 
na_filled = df_null.fillna({"City": "San Nicolas Island"})
na_filled.show()

+--------+-------------+--------+-------------------+-------------------+-----------------+----------+-----------+-----------+-----------+------------------+--------------+-----+-------+
| EventId|         Type|Severity|     StartTime(UTC)|       EndTime(UTC)|Precipitation(in)|  TimeZone|AirportCode|LocationLat|LocationLng|              City|        County|State|ZipCode|
+--------+-------------+--------+-------------------+-------------------+-----------------+----------+-----------+-----------+-----------+------------------+--------------+-----+-------+
|W-196704|         Cold|  Severe|2016-01-01 08:57:00|2016-01-04 15:32:00|              0.0|US/Pacific|       KNSI|    33.2338|  -119.4559|San Nicolas Island|Ventura County|   CA|   null|
|W-196705|         Rain|   Light|2016-01-04 22:49:00|2016-01-04 23:30:00|              0.0|US/Pacific|       KNSI|    33.2338|  -119.4559|San Nicolas Island|Ventura County|   CA|   null|
|W-196706|         Cold|  Severe|2016-01-05 00:35:00|2016-01-05 1

In [0]:
#Dropping rows with null values

#won't drop any rows because the min threshhold < amount of non null column values. Use no thresh to drop all rows with null
df_null.dropna(thresh = 12).show()
#thresh of 13 would drop all rows because they only have 12 column valaues (2 null)

+--------+-------------+--------+-------------------+-------------------+-----------------+----------+-----------+-----------+-----------+----+--------------+-----+-------+
| EventId|         Type|Severity|     StartTime(UTC)|       EndTime(UTC)|Precipitation(in)|  TimeZone|AirportCode|LocationLat|LocationLng|City|        County|State|ZipCode|
+--------+-------------+--------+-------------------+-------------------+-----------------+----------+-----------+-----------+-----------+----+--------------+-----+-------+
|W-196704|         Cold|  Severe|2016-01-01 08:57:00|2016-01-04 15:32:00|              0.0|US/Pacific|       KNSI|    33.2338|  -119.4559|null|Ventura County|   CA|   null|
|W-196705|         Rain|   Light|2016-01-04 22:49:00|2016-01-04 23:30:00|              0.0|US/Pacific|       KNSI|    33.2338|  -119.4559|null|Ventura County|   CA|   null|
|W-196706|         Cold|  Severe|2016-01-05 00:35:00|2016-01-05 15:32:00|              0.0|US/Pacific|       KNSI|    33.2338|  -119.45

In [0]:
# Use how and subset to specify which rows to drop based specific column values being null (ex. drop rows where ANY or ALL of the SUBSET of columns are null)

#drops none because Type column is never null and ALL of specified columns need to be null
df_null.dropna(how = "all", subset = ["City", "ZipCode", "Type"]).show()

#df_null.dropna(how = "any", subset = ["City", "ZipCode", "Type"]).show() would drop all rows because at least one of the columns is always null

+--------+-------------+--------+-------------------+-------------------+-----------------+----------+-----------+-----------+-----------+----+--------------+-----+-------+
| EventId|         Type|Severity|     StartTime(UTC)|       EndTime(UTC)|Precipitation(in)|  TimeZone|AirportCode|LocationLat|LocationLng|City|        County|State|ZipCode|
+--------+-------------+--------+-------------------+-------------------+-----------------+----------+-----------+-----------+-----------+----+--------------+-----+-------+
|W-196704|         Cold|  Severe|2016-01-01 08:57:00|2016-01-04 15:32:00|              0.0|US/Pacific|       KNSI|    33.2338|  -119.4559|null|Ventura County|   CA|   null|
|W-196705|         Rain|   Light|2016-01-04 22:49:00|2016-01-04 23:30:00|              0.0|US/Pacific|       KNSI|    33.2338|  -119.4559|null|Ventura County|   CA|   null|
|W-196706|         Cold|  Severe|2016-01-05 00:35:00|2016-01-05 15:32:00|              0.0|US/Pacific|       KNSI|    33.2338|  -119.45

In [0]:
#Using withColumn to update or add a new column (depending on the name you give as param)
df_sample.withColumn("RegionNorthToSouth", 
                     when(df_sample.LocationLat > 40.0, "North")
                     .when((df_sample.LocationLat < 40.0) & (df_sample.LocationLat > 30.0) , "Central")
                     .otherwise("South")
                    )

Out[6]: DataFrame[EventId: string, Type: string, Severity: string, StartTime(UTC): timestamp, EndTime(UTC): timestamp, Precipitation(in): double, TimeZone: string, AirportCode: string, LocationLat: double, LocationLng: double, City: string, County: string, State: string, ZipCode: int, RegionNorthToSouth: string]

In [0]:
#Adjusting the units of the numerical data with scalars to ensure they have equal weight in analysis 

# Grouping data together 
cols = ['Precipitation(in)', 'LocationLat', 'LocationLng']
assembler = VectorAssembler(inputCols = cols , outputCol='x')
df_sample = assembler.transform(df_sample)

# Scaling Precipitation with StandardScaler
standard_scaler = StandardScaler(inputCol='x', outputCol = 'sScaledCols', withStd = True, withMean = True)
df_sample = standard_scaler.fit(df_sample).transform(df_sample)

# Scaling LocationLat and LocationLng with MinMaxScaler
minmax_scaler = MinMaxScaler(inputCol = 'sScaledCols', outputCol = 'finalCols')
df_sample = minmax_scaler.fit(df_sample).transform(df_sample)

# Selecting the final features column
final_df = df_sample.select('finalCols')


final_df.show()

+--------------------+
|           finalCols|
+--------------------+
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.00274977085242...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
|[0.0,0.5632809996...|
+--------------------+
only showing top 20 rows



In [0]:
#Creating column that shows severity of next (or previous with lag() instead of lead) row
df_sample.withColumn("next_event_severity", lead("Severity").over(Window.orderBy("StartTime(UTC)"))).show()

+-------+----+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+------------+----------------+-----+-------+--------------------+--------------------+--------------------+-------------------+
|EventId|Type|Severity|     StartTime(UTC)|       EndTime(UTC)|Precipitation(in)|   TimeZone|AirportCode|LocationLat|LocationLng|        City|          County|State|ZipCode|                   x|         sScaledCols|           finalCols|next_event_severity|
+-------+----+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+------------+----------------+-----+-------+--------------------+--------------------+--------------------+-------------------+
|W-23476|Snow|   Light|2016-01-01 05:53:00|2016-01-01 09:53:00|              0.0| US/Eastern|       KGRR|    42.8808|   -85.5228|Grand Rapids|            Kent|   MI|  49512|[0.0,42.8808,-85....|[-0.2698327144593...|[0.0,0.9197260

In [0]:
#Creating a column based on the ordered 'window'. Rank() counts up from 1 with ties allowed, dense rank would not allow ties, row number would count up from 1 

window = Window.orderBy(desc("Precipitation(in)"))
df_sample.withColumn("Precip_Rank", rank().over(window)).show()

+-------+-------------+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+-----------+----------------+-----+-------+--------------------+--------------------+--------------------+-----------+
|EventId|         Type|Severity|     StartTime(UTC)|       EndTime(UTC)|Precipitation(in)|   TimeZone|AirportCode|LocationLat|LocationLng|       City|          County|State|ZipCode|                   x|         sScaledCols|           finalCols|Precip_Rank|
+-------+-------------+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+-----------+----------------+-----+-------+--------------------+--------------------+--------------------+-----------+
|W-11441|         Rain|   Heavy|2017-01-19 13:18:00|2017-01-19 15:53:00|            10.91| US/Central|       KBTR|    30.5378|   -91.1468|Baton Rouge|East Baton Rouge|   LA|  70807|[10.91,30.5378,-9...|[28.6607791659927...|[1.0,0

In [0]:
#Using regexp_replace to create column that replaces all the instances of a substring in a column with another 
df_regex = df_sample.withColumn('new_city', regexp_replace('City', 'Saguache', 'New Saguache City'))
df_regex.show()


+-------+----+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+--------+--------+-----+-------+--------------------+--------------------+--------------------+-----------------+
|EventId|Type|Severity|     StartTime(UTC)|       EndTime(UTC)|Precipitation(in)|   TimeZone|AirportCode|LocationLat|LocationLng|    City|  County|State|ZipCode|                   x|         sScaledCols|           finalCols|         new_city|
+-------+----+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+--------+--------+-----+-------+--------------------+--------------------+--------------------+-----------------+
|    W-1|Snow|   Light|2016-01-06 23:14:00|2016-01-07 00:34:00|              0.0|US/Mountain|       K04V|    38.0972|  -106.1689|Saguache|Saguache|   CO|  81149|[0.0,38.0972,-106...|[-0.2698327144593...|[0.0,0.5632809996...|New Saguache City|
|    W-2|Snow|   Light|2016-

In [0]:
#Using UDFs to transform data 
def inToCm(x):
    return x * 2.54
inToCm_udf = udf(inToCm, DoubleType())
df_sample = df_sample.withColumn('Precipitation(cm)', inToCm_udf(df_sample['Precipitation(in)']))
df_sample.show()

+-------+----+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+--------+--------+-----+-------+--------------------+--------------------+--------------------+-----------------+
|EventId|Type|Severity|     StartTime(UTC)|       EndTime(UTC)|Precipitation(in)|   TimeZone|AirportCode|LocationLat|LocationLng|    City|  County|State|ZipCode|                   x|         sScaledCols|           finalCols|Precipitation(cm)|
+-------+----+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+--------+--------+-----+-------+--------------------+--------------------+--------------------+-----------------+
|    W-1|Snow|   Light|2016-01-06 23:14:00|2016-01-07 00:34:00|              0.0|US/Mountain|       K04V|    38.0972|  -106.1689|Saguache|Saguache|   CO|  81149|[0.0,38.0972,-106...|[-0.2698327144593...|[0.0,0.5632809996...|              0.0|
|    W-2|Snow|   Light|2016-