In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, udf, sum, avg
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

# build spark session
spark = SparkSession.builder.appName('challenger').getOrCreate()

23/04/14 15:05:39 WARN Utils: Your hostname, Felixs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.46 instead (on interface en0)
23/04/14 15:05:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/14 15:05:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# define schemas
station_schema = StructType([
    StructField('station_id', IntegerType(), True),  
    StructField('wban_id', IntegerType(), True),
    StructField('latitude', FloatType(), True),
    StructField('longitude', FloatType(), True)
])

temp_schema = StructType([
    StructField('station_id', IntegerType(), True),
    StructField('wban_id', IntegerType(), True),
    StructField('month', IntegerType(), True),
    StructField('day', IntegerType(), True),
    StructField('temperature', FloatType(), True)
])

# read in data
station_df = spark.read.csv('data/stations.csv', header=False, schema=station_schema)
temp_df = spark.read.csv('data/1986.csv', header=False, schema=temp_schema)

In [3]:
# drop null values and duplicates
spark_temp = temp_df.drop('wban_id').dropna().dropDuplicates(subset=['station_id', 'month', 'day'])
spark_station = station_df.drop('wban_id').dropna().dropDuplicates(subset=['station_id'])

In [4]:
import math
def haversine(lat1, lon1, lat2, lon2):
    """ returns the distance between two points in km
        https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-%20between-two-gps-points
    
    args.
        lat1 : latitude float of place 1
        lon1 : longitude float of place 1
        lat2 : latitude float of place 2
        lat2 : longitude float of place 2

    returns. 
        km : distance float
    """    
    # finding differences and converting to radians
    dlat = (lat1 - lat2) * math.pi / 180.0
    dlon = (lon1 - lon2) * math.pi / 180.0
    lat1 = (lat1) * math.pi / 180.0
    lat2 = (lat2) * math.pi / 180.0
    
    # haversine function
    a = (pow(math.sin(dlat / 2), 2) + 
         pow(math.sin(dlon / 2), 2) * 
             math.cos(lat1) * math.cos(lat2)); 
    rad = 6371
    c = 2 * math.asin(math.sqrt(a)) 
    km = rad * c
    return km

dist_udf = udf(haversine, FloatType())

In [5]:
# define latitude and longitude of Cape Canaveral
cape_lat = 28.396837
cape_lon = -80.605659

# filter stations within 100 km of Cape Canaveral
station_dist = spark_station.withColumn('distance', dist_udf(lit(cape_lat), lit(cape_lon), col('latitude'), col('longitude')))
station_dist = station_dist.filter(station_dist.distance < 100)
station_dist.show()

+----------+--------+---------+----------+
|station_id|latitude|longitude|  distance|
+----------+--------+---------+----------+
|    722050|  28.434|  -81.325|  70.47113|
|    722053|  28.545|  -81.333|  72.97914|
|    747946|  28.617|  -80.683|  25.62095|
|    747950|  28.233|    -80.6|  18.22629|
|    722040|  28.101|  -80.644| 33.109245|
|    749047|  28.283|  -81.416|  80.31002|
|    997806|    28.4|  -80.533|  7.116037|
|    720904|  29.067|  -81.283|  99.57241|
|    722056|  29.183|  -81.048|  97.46725|
|    722051|  28.545|  -81.333|  72.97914|
|    747945|  28.617|    -80.7| 26.159054|
|    998275|  28.017|  -80.683| 42.910454|
|    747870|  29.183|  -81.048|  97.46725|
|    997354|   28.42|   -80.58| 3.5959485|
|    995450|  28.519|  -80.166| 45.075996|
|    722046|  28.517|    -80.8| 23.227016|
|    747940|  28.483|  -80.567|10.2994995|
|    722058|   29.07|   -80.92| 80.883896|
|    722361|  29.054|  -80.948|  80.33689|
|    722011|   28.29|  -81.437|  82.22109|
+----------

In [6]:
# join station and temperature data
spark_df = spark_temp.join(station_dist, on=['station_id'], how='inner')
spark_df = spark_df.sort(col('station_id'), col('month'), col('day'))
spark_df.show()

                                                                                

+----------+-----+---+-----------+--------+---------+---------+
|station_id|month|day|temperature|latitude|longitude| distance|
+----------+-----+---+-----------+--------+---------+---------+
|    722040|    1|  1|       65.0|  28.101|  -80.644|33.109245|
|    722040|    1|  2|       67.1|  28.101|  -80.644|33.109245|
|    722040|    1|  3|       64.5|  28.101|  -80.644|33.109245|
|    722040|    1|  4|       67.6|  28.101|  -80.644|33.109245|
|    722040|    1|  5|       63.0|  28.101|  -80.644|33.109245|
|    722040|    1|  6|       51.0|  28.101|  -80.644|33.109245|
|    722040|    1|  7|       64.1|  28.101|  -80.644|33.109245|
|    722040|    1|  8|       63.5|  28.101|  -80.644|33.109245|
|    722040|    1|  9|       65.3|  28.101|  -80.644|33.109245|
|    722040|    1| 10|       68.7|  28.101|  -80.644|33.109245|
|    722040|    1| 11|       63.1|  28.101|  -80.644|33.109245|
|    722040|    1| 12|       56.0|  28.101|  -80.644|33.109245|
|    722040|    1| 13|       54.9|  28.1

In [7]:
# create view
spark_df.createOrReplaceTempView('station_temp')

# group by and calculate inverse distance weighted temperature
idw_df = spark.sql('''
    Select 
        day,
        sum(temperature/power(distance,2))/sum(power((1/distance),2)) as idw_temp
    from station_temp
    where month = 1
    group by day
    order by day
''')
idw_df.show()

# get temperature for January 28
idw_df.filter(idw_df.day == 28).show()

                                                                                

+---+------------------+
|day|          idw_temp|
+---+------------------+
|  1| 64.15828627431138|
|  2| 66.89538734389774|
|  3|  64.5315318475219|
|  4| 67.62670540341577|
|  5| 62.07530126701722|
|  6| 54.00930716003569|
|  7| 63.48764977260696|
|  8|63.203732109303076|
|  9| 65.05129218682293|
| 10|  68.1316502606028|
| 11| 60.81613626685379|
| 12| 58.47452281707809|
| 13| 56.27164037683628|
| 14|51.859639723859196|
| 15|54.042500023419514|
| 16| 58.81736070793421|
| 17| 65.69750685371976|
| 18| 64.71652949791583|
| 19| 66.32849949578997|
| 20| 59.75419232964566|
+---+------------------+
only showing top 20 rows

+---+-----------------+
|day|         idw_temp|
+---+-----------------+
| 28|36.29507299381569|
+---+-----------------+



### The Temperature on January 28 was about 36.3 degrees Fahrenheit

In [8]:
idw_df.agg(avg(col('idw_temp'))).show()

+-----------------+
|    avg(idw_temp)|
+-----------------+
|59.46397244883841|
+-----------------+



In [11]:
# plot data
import pyspark.pandas as ps
from plotly.offline import init_notebook_mode
init_notebook_mode(connected=True)

plot_df = ps.DataFrame(idw_df.selectExpr("day as day", "idw_temp as temp_f"))

fig = plot_df.plot.bar(x='day', y='temp_f', title='Temperatures in January 1986')
fig.show()

                                                                                

The above visualization shows that there's a significant drop on day 28 in temperature compared to the rest of the days in January. The one day they decided to launch happened to be the coldest day in January and more than 20 degrees below the average temperature in January. If NASA decided to delay the launch by a couple of days, perhaps such an accident would'nt have occured