In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
data = spark.read.csv(
    'WeatherEvents_Jan2016-Dec2021.csv', 
    inferSchema=True, header=True)

In [4]:
data.show(10)

+-------+----+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+--------+--------+-----+-------+
|EventId|Type|Severity|     StartTime(UTC)|       EndTime(UTC)|Precipitation(in)|   TimeZone|AirportCode|LocationLat|LocationLng|    City|  County|State|ZipCode|
+-------+----+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+--------+--------+-----+-------+
|    W-1|Snow|   Light|2016-01-06 23:14:00|2016-01-07 00:34:00|              0.0|US/Mountain|       K04V|    38.0972|  -106.1689|Saguache|Saguache|   CO|  81149|
|    W-2|Snow|   Light|2016-01-07 04:14:00|2016-01-07 04:54:00|              0.0|US/Mountain|       K04V|    38.0972|  -106.1689|Saguache|Saguache|   CO|  81149|
|    W-3|Snow|   Light|2016-01-07 05:54:00|2016-01-07 15:34:00|             0.03|US/Mountain|       K04V|    38.0972|  -106.1689|Saguache|Saguache|   CO|  81149|
|    W-4|Snow|   Light|2016-

In [5]:
data.groupBy('Type').count().show()

+-------------+-------+
|         Type|  count|
+-------------+-------+
|         Cold| 197691|
|          Fog|1722738|
|        Storm|  49203|
|Precipitation| 128836|
|         Hail|   2740|
|         Snow| 980411|
|         Rain|4397546|
+-------------+-------+



In [6]:
rain = ['Rain']
others = ['Cold', 'Fog', 'Storm', 'Precipitation', 'Hail', 'Snow']
data = data.replace(others, '0', ['Type'])
data = data.replace(rain, '1', ['Type'])

In [7]:
data.show(10)

+-------+----+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+--------+--------+-----+-------+
|EventId|Type|Severity|     StartTime(UTC)|       EndTime(UTC)|Precipitation(in)|   TimeZone|AirportCode|LocationLat|LocationLng|    City|  County|State|ZipCode|
+-------+----+--------+-------------------+-------------------+-----------------+-----------+-----------+-----------+-----------+--------+--------+-----+-------+
|    W-1|   0|   Light|2016-01-06 23:14:00|2016-01-07 00:34:00|              0.0|US/Mountain|       K04V|    38.0972|  -106.1689|Saguache|Saguache|   CO|  81149|
|    W-2|   0|   Light|2016-01-07 04:14:00|2016-01-07 04:54:00|              0.0|US/Mountain|       K04V|    38.0972|  -106.1689|Saguache|Saguache|   CO|  81149|
|    W-3|   0|   Light|2016-01-07 05:54:00|2016-01-07 15:34:00|             0.03|US/Mountain|       K04V|    38.0972|  -106.1689|Saguache|Saguache|   CO|  81149|
|    W-4|   0|   Light|2016-

In [8]:
data = data.withColumn('Type', data.Type.cast('int'))

In [9]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Severity", outputCol="Severity_id")
data = indexer.fit(data).transform(data)

In [10]:
data.groupBy('Severity_id').count().show()

+-----------+-------+
|Severity_id|  count|
+-----------+-------+
|        0.0|4489255|
|        1.0|1460025|
|        4.0| 128836|
|        3.0| 194259|
|        2.0|1204050|
|        5.0|   2740|
+-----------+-------+



In [11]:
features = ['ZipCode', 'Precipitation(in)', 'Severity_id']
target = 'Type'
attributes = features + [target]
sample = data.select(attributes).dropna()

In [12]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=features,
                            outputCol='features')
output = assembler.transform(sample)

In [13]:
output.show(5)

+-------+-----------------+-----------+----+------------------+
|ZipCode|Precipitation(in)|Severity_id|Type|          features|
+-------+-----------------+-----------+----+------------------+
|  81149|              0.0|        0.0|   0| [81149.0,0.0,0.0]|
|  81149|              0.0|        0.0|   0| [81149.0,0.0,0.0]|
|  81149|             0.03|        0.0|   0|[81149.0,0.03,0.0]|
|  81149|              0.0|        0.0|   0| [81149.0,0.0,0.0]|
|  81149|              0.0|        0.0|   0| [81149.0,0.0,0.0]|
+-------+-----------------+-----------+----+------------------+
only showing top 5 rows



In [14]:
train, test = output.randomSplit([0.8, 0.2])

In [15]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol='features',
                        labelCol='Type')

In [16]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol='Type')

In [17]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1]).addGrid(lr.maxIter, [10, 5]).addGrid(lr.threshold, 
                                                                                            [0.4, 0.3]).build()

In [104]:
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator)
cv_model = cv.fit(train)

In [105]:
predictions = cv_model.transform(test)

In [106]:
print('Evaluation:', evaluator.evaluate(predictions))

Evaluation: 0.9999999817013843


In [18]:
features = ['ZipCode', 'Type', 'Severity_id']

assembler = VectorAssembler(inputCols=features,
                            outputCol='features')
output = assembler.transform(sample)

In [19]:
from pyspark.ml.regression import GBTRegressor
regressor = GBTRegressor(
    labelCol="Precipitation(in)",
    featuresCol="features",
)

In [20]:
gbparamGrid = (ParamGridBuilder()
             .addGrid(regressor.maxDepth, [2, 10])
             .addGrid(regressor.maxIter, [5, 20])
             .build())

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol='Precipitation(in)')

In [22]:
gbcv = CrossValidator(estimator = regressor,
                      estimatorParamMaps = gbparamGrid,
                      evaluator = evaluator)
gbcv_model = gbcv.fit(train)

In [23]:
predictions = gbcv_model.transform(test)

In [24]:
print('Evaluation:', evaluator.evaluate(predictions))

Evaluation: 1.2290978864077007
