In [1]:
from pyspark.sql import SparkSession
from math import radians, cos, sin, asin, sqrt
import pandas as pd
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
import os
import sys
import matplotlib.pyplot as plt

In [2]:
# Sets up the spark session and has magic code that makes things not break
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.appName('hw5').getOrCreate()

In [3]:
# Creates a haversine function
def haversine(lat1, lon1, lat2, lon2):
    """
    Calculate the great circle distance in kilometers between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles. Determines return value units.
    return c * r

# Initiailizes the udf version of the haversine function
haversineUDF = udf(lambda x, y: haversine(28.396837, -80.605659, x, y), FloatType())

In [4]:
# Loads the stations from the csv
stations = spark.read.csv('stations.csv', header=False, inferSchema=True).withColumnRenamed("_c0", "StationID").withColumnRenamed("_c1", "WBANID").withColumnRenamed("_c2", "GPSLatitude").withColumnRenamed("_c3", "GPSLongitude")

# Creates a df with the distance from Cape Canaveral using the haversine function
stations = stations.withColumn("distance", haversineUDF(stations.GPSLatitude, stations.GPSLongitude))

In [5]:
stations.createOrReplaceTempView('stations')

In [6]:
# Removes stations that have null coordinates or IDs, removes stations that are more than 100km away from Cape Canaveral
# and drops duplicates
stations = spark.sql("""
    SELECT * FROM STATIONS
    WHERE GPSLatitude != 0 AND GPSLongitude != 0 AND distance <= 100 AND StationID IS NOT NULL
""")
stations = stations.dropDuplicates(["StationID"])
stations.show()

+---------+------+-----------+------------+---------+
|StationID|WBANID|GPSLatitude|GPSLongitude| distance|
+---------+------+-----------+------------+---------+
|   722050| 12815|     28.434|     -81.325| 70.47143|
|   722053| 12841|     28.545|     -81.333| 72.97912|
|   747946| 12886|     28.617|     -80.683|25.620928|
|   747950| 12867|     28.233|       -80.6|18.226263|
|   722040| 12838|     28.101|     -80.644|33.109257|
|   749047|  null|     28.283|     -81.416|    80.31|
|   997806|  null|       28.4|     -80.533|7.1157584|
|   720904|   299|     29.067|     -81.283| 99.57265|
|   722056| 12834|     29.183|     -81.048| 97.46734|
|   722051| 12841|     28.545|     -81.333| 72.97912|
|   747945|  null|     28.617|       -80.7|  26.1591|
|   998275|  null|     28.017|     -80.683|  42.9105|
|   747870| 12834|     29.183|     -81.048| 97.46734|
|   997354|  null|      28.42|      -80.58|3.5960674|
|   995450|  null|     28.519|     -80.166| 45.07606|
|   722046| 12898|     28.51

In [7]:
data_1986 = spark.read.csv('1986.csv', header=False, inferSchema=True).withColumnRenamed("_c0", "StationID").withColumnRenamed("_c1", "WBANID").withColumnRenamed("_c2", "Month").withColumnRenamed("_c3", "Day").withColumnRenamed("_c4", "temp")
data_1986.show()

+---------+------+-----+---+----+
|StationID|WBANID|Month|Day|temp|
+---------+------+-----+---+----+
|    10010|  null|    1|  1|17.2|
|    10010|  null|    1|  2|12.1|
|    10010|  null|    1|  3|10.4|
|    10010|  null|    1|  4|17.4|
|    10010|  null|    1|  5|26.5|
|    10010|  null|    1|  6|30.1|
|    10010|  null|    1|  7|29.7|
|    10010|  null|    1|  8|29.6|
|    10010|  null|    1|  9|29.6|
|    10010|  null|    1| 10|33.0|
|    10010|  null|    1| 11|32.5|
|    10010|  null|    1| 12|27.4|
|    10010|  null|    1| 13|22.2|
|    10010|  null|    1| 14|11.3|
|    10010|  null|    1| 15| 2.5|
|    10010|  null|    1| 16| 3.0|
|    10010|  null|    1| 17|13.4|
|    10010|  null|    1| 18|29.8|
|    10010|  null|    1| 19|27.5|
|    10010|  null|    1| 20|25.2|
+---------+------+-----+---+----+
only showing top 20 rows



In [8]:
data_1986.createOrReplaceTempView('_1986')

In [9]:
jan28 = spark.sql("""
    SELECT * FROM _1986
    WHERE Month == 1 AND Day == 28
""")
jan28.show()
jan28.createOrReplaceTempView('jan28')

+---------+------+-----+---+----+
|StationID|WBANID|Month|Day|temp|
+---------+------+-----+---+----+
|    10010|  null|    1| 28|14.1|
|    10080|  null|    1| 28|32.3|
|    10100|  null|    1| 28|30.9|
|    10230|  null|    1| 28|27.2|
|    10250|  null|    1| 28|32.5|
|    10280|  null|    1| 28|33.0|
|    10330|  null|    1| 28|34.0|
|    10350|  null|    1| 28|31.0|
|    10470|  null|    1| 28|22.1|
|    10490|  null|    1| 28|30.7|
|    10520|  null|    1| 28|29.9|
|    10530|  null|    1| 28|29.8|
|    10550|  null|    1| 28|31.6|
|    10590|  null|    1| 28|29.2|
|    10620|  null|    1| 28|30.5|
|    10630|  null|    1| 28|28.2|
|    10650|  null|    1| 28|26.1|
|    10680|  null|    1| 28|31.8|
|    10740|  null|    1| 28|28.5|
|    10780|  null|    1| 28|30.4|
+---------+------+-----+---+----+
only showing top 20 rows



In [10]:
spark.catalog.listTables()

[Table(name='_1986', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='jan28', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='stations', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [11]:
joined_data = stations.join(jan28, stations.StationID == jan28.StationID, "inner").select(stations.StationID, stations.distance, jan28.temp)
joined_data.createOrReplaceTempView('joined')
joined_data.show()

+---------+---------+----+
|StationID| distance|temp|
+---------+---------+----+
|   722040|33.109257|33.7|
|   722045| 90.04572|37.5|
|   722046|23.226759|37.0|
|   722050| 70.47143|34.7|
|   722051| 72.97912|15.3|
|   722056| 97.46734|31.8|
|   722057| 75.49695|33.4|
|   747945|  26.1591|33.7|
|   747950|18.226263|39.6|
+---------+---------+----+



In [12]:
# ((12/350) + (10/750) + (10/850)) / ((1/350) + (1/750) + (1/850))
def idw_temp_est(distances, temps):
    inv_dists = [1/dist for dist in distances]
    temp_dist = [temp/dist for temp, dist in (temps, distances)]
    
    num = sum(temp_dist)
    denom = sum(inv_dists)
    
    return num/denom

In [13]:
def inv_dist(x):
    return 1/x

def temp_inv_dist(inv_dist, temp):
    return temp * inv_dist

In [14]:
joined_data = joined_data.withColumn("inv_dist", inv_dist(joined_data.distance))
joined_data = joined_data.withColumn("temp_dist",temp_inv_dist(joined_data.inv_dist,joined_data.temp))

In [15]:
from pyspark.sql.functions import sum
calc_table = joined_data.select(sum(joined_data.inv_dist).alias("inv_dist_sum"),
                  sum(joined_data.temp_dist).alias('temp_dist_sum'))

calc_table = calc_table.withColumn("temp_estimate", calc_table.temp_dist_sum/calc_table.inv_dist_sum)

In [16]:
calc_table.show()

+-------------------+-----------------+------------------+
|       inv_dist_sum|    temp_dist_sum|     temp_estimate|
+-------------------+-----------------+------------------+
|0.22885390402133945|7.958959412036556|34.777468385658054|
+-------------------+-----------------+------------------+

