In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
from math import radians, cos, sin, asin, sqrt

## Exploring the Challenger weather data

In [2]:
spark = SparkSession.builder.appName("Challenger weather analysis").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/16 23:39:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/16 23:39:29 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


stations.csv
- Station ID (Int)
- WBAN ID (Int)  "Weather Bureau Army Navy"
- GPS Latitude (Decimal)
- GPS Longitude (Decimal)

Stations are identified by the combination of their Station ID and WBAN ID.
Either of these IDs might be NULL.

In [3]:
stations_all = spark.read.csv('data/stations.csv', inferSchema=True, header=True)
stations_all.printSchema()
print(f'{stations_all.count()} rows in df')

                                                                                

root
 |-- station_id: integer (nullable = true)
 |-- wban_id: integer (nullable = true)
 |-- gps_lat: double (nullable = true)
 |-- gps_long: double (nullable = true)

29444 rows in df


In [4]:
# drop duplicate stations and rows with unspecified id, latitude, or longitude
# Note that some rows have lat/long of zero which will be naturally excluded in next operation
stations = stations_all.filter(
    stations_all.station_id.isNotNull() &
    stations_all.gps_lat.isNotNull() &
    stations_all.gps_long.isNotNull()
).dropDuplicates(['station_id'])
stations.show()
print(f'{stations.count()} rows in df')

                                                                                

+----------+-------+-------+--------+
|station_id|wban_id|gps_lat|gps_long|
+----------+-------+-------+--------+
|     13840|   null| 60.194|    11.1|
|     14450|   null|  59.45|     8.0|
|     21220|   null| 66.533|  17.667|
|     21700|   null|  65.75|  21.767|
|     28170|   null| 68.433|   27.45|
|     30970|   null|  56.75|  -6.883|
|     32460|   null| 54.983|    -1.6|
|     40011|   null| 65.533| -24.467|
|     62680|   null| 52.533|   5.433|
|     64590|   null| 50.583|   4.683|
|     66010|   null|  47.55|   7.583|
|     74820|   null| 45.987|   5.328|
|     81410|   null| 41.633|   -4.75|
|    104880|   null| 51.133|  13.767|
|    105536|   null| 50.982|  12.506|
|    108560|   null| 48.186|  10.861|
|    109800|   null|   47.7|  12.017|
|    111300|   null| 47.583|  12.167|
|    112020|   null| 47.117|    12.5|
|    117500|   null| 49.433|    17.4|
+----------+-------+-------+--------+
only showing top 20 rows

24638 rows in df


Identify all weather stations within 100 km of Cape Canaveral using the haversine function (Note: not all the stations necessarily recorded a temperature on any given day.)

In [5]:
# first, get haversine distance between any two specified lat-long pairs
import numpy as np

# use cape canaveral's lat/long as base
cc_lat = 28.3922
cc_long = -80.6077

# haversine function borrowed from https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-between-two-gps-points
def haversine(lat1, lon1, lat2=cc_lat, lon2=cc_long):
    """
    Calculate the great circle distance in kilometers between two points
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles. Determines return value units.
    return c * r

# convert haversine() function into a spark UDF
haversine_udf = udf(lambda lat, long: haversine(lat, long), DoubleType())

In [6]:
# add column to df containing distance between each station and Cape Canaveral

# get distance between each lat-long pair and cape canaveral
haversine_col = haversine_udf(col('gps_lat'), col('gps_long'))
# add column to stations df containing distances
stations_cc_dist = stations.withColumn('cc_dist', haversine_col)
print(f'{stations_cc_dist.count()} rows in df')

24638 rows in df


In [7]:
# filter stations to retain only rows where distance is leq 100km
stations_near_cc = stations_cc_dist.filter(col('cc_dist') <= 100)
stations_near_cc.show()
print(f'{stations_near_cc.count()} rows in df')

                                                                                

+----------+-------+-------+--------+------------------+
|station_id|wban_id|gps_lat|gps_long|           cc_dist|
+----------+-------+-------+--------+------------------+
|    722050|  12815| 28.434| -81.325| 70.30590142260303|
|    722053|  12841| 28.545| -81.333| 72.90473375184327|
|    747946|  12886| 28.617| -80.683| 26.05707106937488|
|    747950|  12867| 28.233|   -80.6| 17.71827325464815|
|    722040|  12838| 28.101| -80.644| 32.57460775618103|
|    749047|   null| 28.283| -81.416| 80.03461347256854|
|    997806|   null|   28.4| -80.533| 7.358154613618123|
|    720904|    299| 29.067| -81.283| 99.82869362421721|
|    722056|  12834| 29.183| -81.048| 97.84308281668112|
|    722051|  12841| 28.545| -81.333| 72.90473375184327|
|    747945|   null| 28.617|   -80.7|26.573973942086713|
|    998275|   null| 28.017| -80.683|42.367830338530915|
|    747870|  12834| 29.183| -81.048| 97.84308281668112|
|    997354|   null|  28.42|  -80.58| 4.110433692817337|
|    995450|   null| 28.519| -8

Use [inverse distance weighting](https://gisgeography.com/inverse-distance-weighting-idw-interpolation/) to estimate the temperature at Cape Canaveral on January 28, 1986. (Weather stations closer to the Cape Canaveral will be given more weight than stations that are far away)

yyyy.csv (yyyy = year)
- StationID (Int)
- WBANID (Int)
- Month (Int)
- Day (Int)
- Temperature / degrees F (Decimal)

In [8]:
temps = spark.read.csv('data/1986.csv', inferSchema=True, header=True)
temps.printSchema()



root
 |-- station_id: integer (nullable = true)
 |-- wban_id: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- temp_f: double (nullable = true)



                                                                                

In [9]:
temps.show()

+----------+-------+-----+---+------+
|station_id|wban_id|month|day|temp_f|
+----------+-------+-----+---+------+
|     10010|   null|    1|  1|  17.2|
|     10010|   null|    1|  2|  12.1|
|     10010|   null|    1|  3|  10.4|
|     10010|   null|    1|  4|  17.4|
|     10010|   null|    1|  5|  26.5|
|     10010|   null|    1|  6|  30.1|
|     10010|   null|    1|  7|  29.7|
|     10010|   null|    1|  8|  29.6|
|     10010|   null|    1|  9|  29.6|
|     10010|   null|    1| 10|  33.0|
|     10010|   null|    1| 11|  32.5|
|     10010|   null|    1| 12|  27.4|
|     10010|   null|    1| 13|  22.2|
|     10010|   null|    1| 14|  11.3|
|     10010|   null|    1| 15|   2.5|
|     10010|   null|    1| 16|   3.0|
|     10010|   null|    1| 17|  13.4|
|     10010|   null|    1| 18|  29.8|
|     10010|   null|    1| 19|  27.5|
|     10010|   null|    1| 20|  25.2|
+----------+-------+-----+---+------+
only showing top 20 rows



In [10]:
# get list of stations near cape canaveral from prev analysis
near_cc = stations_near_cc.rdd.map(lambda x: x.station_id).collect()
sorted(near_cc)

                                                                                

[720904,
 722011,
 722040,
 722045,
 722046,
 722050,
 722051,
 722053,
 722056,
 722057,
 722058,
 722361,
 747870,
 747930,
 747940,
 747945,
 747946,
 747950,
 749047,
 994951,
 995450,
 997354,
 997806,
 998275]

In [11]:
# filter weather data to exclude stations >100km from cape canaveral
# include only stations that recorded a temp on 28 jan 1986
chal_temps = temps.filter(
    temps.station_id.isin(near_cc) &
    (temps.month == 1) &
    (temps.day == 28)
)
chal_temps.show()

                                                                                

+----------+-------+-----+---+------+
|station_id|wban_id|month|day|temp_f|
+----------+-------+-----+---+------+
|    722040|  12838|    1| 28|  33.7|
|    722045|  12843|    1| 28|  37.5|
|    722046|   null|    1| 28|  37.0|
|    722050|  12815|    1| 28|  34.7|
|    722051|  12841|    1| 28|  15.3|
|    722056|  12834|    1| 28|  31.8|
|    722057|  12854|    1| 28|  33.4|
|    747945|   null|    1| 28|  33.7|
|    747950|  12867|    1| 28|  39.6|
+----------+-------+-----+---+------+



In [12]:
# join stations and temps dataframes on station_id (retain lat, long, temps)
chal_temp_ll = chal_temps\
    .join(stations_near_cc, 'station_id')\
    .select(col('station_id'), col('temp_f'), col('gps_lat'), col('gps_long'), col('cc_dist'))
chal_temp_ll.show()

                                                                                

+----------+------+-------+--------+------------------+
|station_id|temp_f|gps_lat|gps_long|           cc_dist|
+----------+------+-------+--------+------------------+
|    722040|  33.7| 28.101| -80.644| 32.57460775618103|
|    722045|  37.5| 27.653| -80.243| 89.65245500958036|
|    722046|  37.0| 28.517|   -80.8| 23.36665592452053|
|    722050|  34.7| 28.434| -81.325| 70.30590142260303|
|    722051|  15.3| 28.545| -81.333| 72.90473375184327|
|    722056|  31.8| 29.183| -81.048| 97.84308281668112|
|    722057|  33.4|  28.78| -81.244| 75.62645082596696|
|    747945|  33.7| 28.617|   -80.7|26.573973942086713|
|    747950|  39.6| 28.233|   -80.6| 17.71827325464815|
+----------+------+-------+--------+------------------+



Use inverse distance weighting (https://gisgeography.com/inverse-distance-weighting-idw-interpolation/) to estimate the temperature at Cape Canaveral on January 28, 1986. (Weather stations closer to the Cape Canaveral will be given more weight than stations that are far away.).

In [13]:
# add IDW column to df for weight discounting
# The weight for a point is calculated as 1 / (haversine distance between given points)
idw_udf = udf(lambda dist: 1 / dist, DoubleType())
chal_idw = chal_temp_ll.withColumn('inv_dist_weight', idw_udf(col('cc_dist')))
# add a temp/dist col
temp_dist_udf = udf(lambda temp,dist: temp / dist, DoubleType())
chal_idw_temp = chal_idw.withColumn("temp_dist", temp_dist_udf(col('temp_f'), col('cc_dist')))
# calculate the weighted average temperature
# final agg: sum(temp/dist col) / sum(idw col)
chal_idw_temp.agg(
    {
        'temp_dist': 'sum',
        'inv_dist_weight': 'sum',
    }
).show()



+-----------------+--------------------+
|   sum(temp_dist)|sum(inv_dist_weight)|
+-----------------+--------------------+
|8.009496816582459| 0.23010210063007489|
+-----------------+--------------------+



                                                                                

In [14]:
# final agg: sum(temp/dist col) / sum(idw col)
# estimated temp using IDW
test_list = chal_idw_temp.agg(
    {
        'temp_dist': 'sum',
        'inv_dist_weight': 'sum',
    }
).collect()

test_list[0][0] / test_list[0][1]

                                                                                

34.808447183448266

Estimate the temperature at Cape Canaveral for every day in January 1986

In [15]:
def estimate_temp(day_temps):
    """
    UDF used to estimate temp per day at Cape Canaveral
    :param day_temps: df containing temp data at surrounding stations on a single day
    :return: estimated temp at CC for this data's day
    """
    # The weight for a point is calculated as 1 / (haversine distance between given points)
    idw = day_temps.withColumn('inv_dist_weight', idw_udf(col('cc_dist')))
    # add a temp/dist col
    idw_temp = idw.withColumn("temp_dist", temp_dist_udf(col('temp_f'), col('cc_dist')))
    # calculate the weighted average temperature
    # final agg: sum(temp/dist col) / sum(idw col)
    temp_idw = idw_temp.agg(
        {
            'temp_dist': 'sum',
            'inv_dist_weight': 'sum',
        }
    ).collect()
    return temp_idw[0][0] / temp_idw[0][1]

# generate UDF from above function
est_temp_udf = udf(lambda day_temps: estimate_temp(day_temps), DoubleType())

In [16]:
# filter weather data to exclude stations >100km from cape canaveral, include only jan 1986
near_temps = temps.filter(
    temps.station_id.isin(near_cc) &
    (temps.month == 1)
)

# join stations and temps dataframes on station_id (retain lat, long, temps)
near_temp_ll = near_temps\
    .join(stations_near_cc, 'station_id')\
    .select(col('station_id'), col('temp_f'), col('gps_lat'), col('gps_long'), col('cc_dist'))
near_temp_ll.show()

                                                                                

+----------+------+-------+--------+-----------------+
|station_id|temp_f|gps_lat|gps_long|          cc_dist|
+----------+------+-------+--------+-----------------+
|    722040|  65.0| 28.101| -80.644|32.57460775618103|
|    722040|  67.1| 28.101| -80.644|32.57460775618103|
|    722040|  64.5| 28.101| -80.644|32.57460775618103|
|    722040|  67.6| 28.101| -80.644|32.57460775618103|
|    722040|  63.0| 28.101| -80.644|32.57460775618103|
|    722040|  51.0| 28.101| -80.644|32.57460775618103|
|    722040|  64.1| 28.101| -80.644|32.57460775618103|
|    722040|  63.5| 28.101| -80.644|32.57460775618103|
|    722040|  65.3| 28.101| -80.644|32.57460775618103|
|    722040|  68.7| 28.101| -80.644|32.57460775618103|
|    722040|  63.1| 28.101| -80.644|32.57460775618103|
|    722040|  56.0| 28.101| -80.644|32.57460775618103|
|    722040|  54.9| 28.101| -80.644|32.57460775618103|
|    722040|  49.6| 28.101| -80.644|32.57460775618103|
|    722040|  50.4| 28.101| -80.644|32.57460775618103|
|    72204

In [20]:
# groupby day, get estimated temp for cape canaveral for each group
# TODO modify UDF to be a pandas_udf, finish remaining impl
near_temp_ll.groupby('day').agg(est_temp_udf)

AssertionError: all exprs should be Column

Plot the temperature at Cape Canaveral for every day in January 1986. Comment on your visualization. What might have happened if NASA had decided to delay the launch for a couple of days?