# Project 3: Spatiotemporal Analysis with Spark (v 1.0)

report by Gudbrand Schistad, Lovedeep Singh, Nate Wilson

## Set Up

In [43]:
%%time
from pyspark.sql.types import StructType, StructField, FloatType, LongType, StringType
import geohash

feats = []
f = open('features.txt')
for line_num, line in enumerate(f):
    if line_num == 0:
        # Timestamp
        feats.append(StructField(line.strip(), LongType(), True))
    elif line_num == 1:
        # Geohash
        feats.append(StructField(line.strip(), StringType(), True))
    else:
        # Other features
        feats.append(StructField(line.strip(), FloatType(), True))
    
schema = StructType(feats)

df = spark.read.format('csv').option('sep', '\t').schema(schema).load('/data/nam_tiny.tdv')
df.createOrReplaceTempView("TEMP_DF")

nam_tiny = spark.read.format('csv').option('sep', '\t').schema(schema).load('/data/nam_tiny.tdv')
nam_1 = spark.read.format('csv').option('sep', '\t').schema(schema).load('/data/nam_s/nam_201501_s.tdv.gz')
nam_s = spark.read.format('csv').option('sep', '\t').schema(schema).load('/data/nam_s/*')
nam = spark.read.format('csv').option('sep', '\t').schema(schema).load('/data/nam/*')

nam_tiny.createOrReplaceTempView("nam_tiny")
nam_1.createOrReplaceTempView("nam_1")
nam_s.createOrReplaceTempView("nam_s")
nam.createOrReplaceTempView("nam")

CPU times: user 13.6 ms, sys: 4.36 ms, total: 17.9 ms
Wall time: 358 ms


## Warm-Ups

### Unknown Feature: Choose a feature from the data dictionary above that you have never heard of before. Inspect some of the values for the feature (such as its average, min, max, etc.) and try to guess what it measures. Was your hypothesis correct? (Note: if you are a professional meteorologist, you can skip this question ;-))

In [44]:
%%time
friction_velocity_surface_min = spark.sql("SELECT MIN(friction_velocity_surface) FROM TEMP_DF").collect()
friction_velocity_surface_max = spark.sql("SELECT MAX(friction_velocity_surface) FROM TEMP_DF").collect()
friction_velocity_surface_avg = spark.sql("SELECT AVG(friction_velocity_surface) FROM TEMP_DF").collect()

print(f"min: {friction_velocity_surface_min[0][0]}")
print(f"max: {friction_velocity_surface_max[0][0]}")
print(f"avg: {friction_velocity_surface_avg[0][0]}")

min: 0.02586597390472889
max: 0.8758659958839417
avg: 0.3311159858293831
CPU times: user 13.1 ms, sys: 6.78 ms, total: 19.9 ms
Wall time: 3.7 s


We choose `friction_velocity_surface` as our feature of choice. We hypothesize that it is something to with airflow. 

The statistics support that hypothesis, given that I would expect the majority of places to have low airflow, but would also expect certain windy places, for example Chicago, the windy city, to have very high airflow. 

Wikipedia defines friction velocity as:
>**Shear Velocity**, also called **friction velocity**, is a form by which a shear stress may be re-written in units of velocity. It is useful as a method in fluid mechanics to compare true velocities, such as the velocity of a flow in a stream, to a velocity that relates shear between layers of flow.



### Hot hot hot: When and where was the hottest temperature observed in the dataset? Is it an anomaly?

In [45]:
%%time
# warmup2
maxtemp = spark.sql(   "SELECT temperature_surface, Timestamp/1000, Geohash \
                        FROM TEMP_DF \
                        WHERE temperature_surface in \
                           (SELECT MAX(temperature_surface) \
                            FROM TEMP_DF)").collect()


all_other_temps_there = spark.sql(
                      f"SELECT DISTINCT temperature_surface\
                        FROM TEMP_DF \
                        WHERE Geohash = '{maxtemp[0][2]}'").collect()
print(maxtemp)
print(all_other_temps_there)

[Row(temperature_surface=306.4980163574219, (CAST(Timestamp AS DOUBLE) / CAST(1000 AS DOUBLE))=1426377600.0, Geohash='9qd23ynghrrz')]
[Row(temperature_surface=306.4980163574219)]
CPU times: user 9.5 ms, sys: 5.32 ms, total: 14.8 ms
Wall time: 3.7 s


### So Snowy: Find a location that is snowy all year (there are several). Locate a nearby town/city and provide a small writeup about it. Include pictures if you’d like.



In [23]:
%%time
# warmup 3
snowysurface = spark.sql(  "SELECT Geohash \
                            FROM TEMP_DF \
                            WHERE \
                                Geohash NOT IN \
                                   (SELECT distinct(Geohash) \
                                    FROM TEMP_DF \
                                    WHERE categorical_snow_yes1_no0_surface = 0)").collect()
print(snowysurface)

[Row(Geohash='f2w29r4werxb'), Row(Geohash='fccz22w4fytb'), Row(Geohash='c1nuq5290jup'), Row(Geohash='f2d5v1jeyp7z'), Row(Geohash='c6s64488ws80'), Row(Geohash='f2fh6jpdgv5b')]
CPU times: user 4.38 ms, sys: 2.66 ms, total: 7.04 ms
Wall time: 508 ms


## Analysis

### Strangely Snowy: Find a location that contains snow while its surroundings do not. Why does this occur? Is it a high mountain peak in a desert?

In [20]:
df[df.categorical_snow_yes1_no0_surface == 1].count()

6

In [49]:
%%time
# TODO
# TODO
# TODO : THIS DOESNT WORK
snow_covered_locations = df[df.categorical_snow_yes1_no0_surface == 1].collect()

for row in snow_covered_locations:
    is_snowy = False
    neighbors = geohash.neighbors(row.Geohash)
    for neighbor in neighbors:
        print("neighbor " + neighbor)
        count = spark.sql(f"SELECT count(*) as cc \
                            FROM TEMP_DF \
                            WHERE \
                                Geohash = '{neighbor}'\
                                AND categorical_snow_yes1_no0_surface = 1 \
                            HAVING count(*) > 0").collect()
        print(count)
        print(len(count))
        if(len(count) > 0):
            print('neighbor snowy :(')
            issnowy = True
            break
    
    if (is_snowy == False):
        print("strangely snowy place found")
        print("cold place " + row.Geohash)

neighbor f2w29r4werx8
[]
0
neighbor f2w29r4wex80
[]
0
neighbor f2w29r4werx9
[]
0
neighbor f2w29r4werxc
[]
0
neighbor f2w29r4wex81
[]
0
neighbor f2w29r4werrx
[]
0
neighbor f2w29r4werrz
[]
0
neighbor f2w29r4wex2p
[]
0
strangely snowy place found
cold place f2w29r4werxb
neighbor fccz22w4fyt8
[]
0
neighbor fccz22w4fyw0
[]
0
neighbor fccz22w4fyt9
[]
0
neighbor fccz22w4fytc
[]
0
neighbor fccz22w4fyw1
[]
0
neighbor fccz22w4fymx
[]
0
neighbor fccz22w4fymz
[]
0
neighbor fccz22w4fyqp
[]
0
strangely snowy place found
cold place fccz22w4fytb
neighbor c1nuq5290jgz
[]
0
neighbor c1nuq5290jur
[]
0
neighbor c1nuq5290n5b
[]
0
neighbor c1nuq5290nh0
[]
0
neighbor c1nuq5290nh2
[]
0
neighbor c1nuq5290jgy
[]
0
neighbor c1nuq5290jun
[]
0
neighbor c1nuq5290juq
[]
0
strangely snowy place found
cold place c1nuq5290jup
neighbor f2d5v1jeyp7x
[]
0
neighbor f2d5v1jeypkp
[]
0
neighbor f2d5v1jeype8
[]
0
neighbor f2d5v1jeypeb
[]
0
neighbor f2d5v1jeyps0
[]
0
neighbor f2d5v1jeyp7w
[]
0
neighbor f2d5v1jeyp7y
[]
0
neighbo

### Lightning rod: Where are you most likely to be struck by lightning? Use a precision of at least 4 Geohash characters and provide the top 3 locations.

### Drying out: Choose a region in North America (defined by one or more Geohashes) and determine when its driest month is. This should include a histogram with data from each month.

### Travel Startup: After graduating from USF, you found a startup that aims to provide personalized travel itineraries using big data analysis. Given your own personal preferences, build a plan for a year of travel across 5 locations. Or, in other words: pick 5 regions. What is the best time of year to visit them based on the dataset? One avenue here could be determining the comfort index for a region. You could incorporate several features: not too hot, not too cold, dry, humid, windy, etc. There are several different ways of calculating this available online, and you could also analyze how well your own metrics do.


In [14]:
#%%time
# Travel Startup
relative_humidity_zerodegc_isotherm = 45
min_surface_wind_gust_surface = 1.67
max_surface_wind_gust_surface = 3.1
temperature_surface = 77
total_cloud_cover_entire_atmosphere = 60 
# Grand canyon 9qrhf6btbt3jevhn
# Panhandle san francisco 9q8yvs4t
# New york dr5regw2z6y
# Fresno 9qd23ynghrrz

traveldata = spark.sql( "SELECT  \
                            substring(Geohash, 0, 5) as region, \
                            MONTH(FROM_UNIXTIME(Timestamp/1000)) as month, \
                            AVG(temperature_surface) as temperature, \
                            AVG(relative_humidity_zerodegc_isotherm) as humdity, \
                            AVG(surface_wind_gust_surface) as windspeed, \
                            AVG(total_cloud_cover_entire_atmosphere) as cloudcover \
                        FROM nam_s \
                        WHERE \
                            Geohash like '9qd23%' or \
                            Geohash like 'dr5re%' or \
                            Geohash like '9q8yv%' or \
                            Geohash like '9qrhf%'  \
                        GROUP BY \
                            MONTH(FROM_UNIXTIME(Timestamp/1000)), \
                            substring(Geohash, 0, 5)").collect()
for row in traveldata:
    isgood = True
    if(row.temperature < (temperature_surface - (temperature_surface * .1)) or  
       row.temperature > (temperature_surface + (temperature_surface * .1))):
        isgood = False
    if(row.humdity < (relative_humidity_zerodegc_isotherm - (relative_humidity_zerodegc_isotherm * .1)) or  
       row.humdity > (relative_humidity_zerodegc_isotherm + (relative_humidity_zerodegc_isotherm * .1))):
        isgood = False
    if(row.windspeed < min_surface_wind_gust_surface or  
       row.windspeed > max_surface_wind_gust_surface):
        isgood = False
    if(row.cloudcover < (total_cloud_cover_entire_atmosphere - (total_cloud_cover_entire_atmosphere * .1)) or  
       row.cloudcover > (total_cloud_cover_entire_atmosphere + (total_cloud_cover_entire_atmosphere * .1))):
        isgood = False
    if(isgood == True):
        print(f"{row.region} {row.month}")

### Escaping the fog: After becoming rich from your startup, you are looking for the perfect location to build your Bay Area mansion with unobstructed views. Find the locations that are the least foggy and show them on a map.


### SolarWind, Inc.: You get bored enjoying the amazing views from your mansion, so you start a new company; here, you want to help power companies plan out the locations of solar and wind farms across North America. Locate the top 3 places for solar and wind farms, as well as a combination of both (solar + wind farm). You will report a total of 9 Geohashes as well as their relevant attributes (for example, cloud cover and wind speeds).


### Climate Chart: Given a Geohash prefix, create a climate chart for the region. This includes high, low, and average temperatures, as well as monthly average rainfall (precipitation). Here’s a (poor quality) script that will generate this for you.
Earn up to 1 point of extra credit for enhancing/improving this chart (or porting it to a more feature-rich visualization library)

### Influencers: Determine how features influence each other using Pearson’s correlation coefficient (PCC). The output for this job should include (1) feature pairs sorted by absolute correlation coefficient, and (2) a correlation matrix visualization (heatmaps are a good option).

In [None]:
df = sql('SELECT \
`Timestamp`, \
`geopotential_height_lltw`, \
`water_equiv_of_accum_snow_depth_surface`, \
`drag_coefficient_surface`, \
`sensible_heat_net_flux_surface`, \
`categorical_ice_pellets_yes1_no0_surface`, \
`visibility_surface`, \
`number_of_soil_layers_in_root_zone_surface`, \
`categorical_freezing_rain_yes1_no0_surface`, \
`pressure_reduced_to_msl_msl`, \
`upward_short_wave_rad_flux_surface`, \
`relative_humidity_zerodegc_isotherm`, \
`categorical_snow_yes1_no0_surface`, \
`u-component_of_wind_tropopause`, \
`surface_wind_gust_surface`, \
`total_cloud_cover_entire_atmosphere`, \
`upward_long_wave_rad_flux_surface`, \
`land_cover_land1_sea0_surface`, \
`vegitation_type_as_in_sib_surface`, \
`v-component_of_wind_pblri`, \
`albedo_surface`, \
`lightning_surface`, \
`ice_cover_ice1_no_ice0_surface`, \
`convective_inhibition_surface`, \
`pressure_surface`, \
`transpiration_stress-onset_soil_moisture_surface`, \
`soil_porosity_surface`, \
`vegetation_surface`, \
`categorical_rain_yes1_no0_surface`, \
`downward_long_wave_rad_flux_surface`, \
`planetary_boundary_layer_height_surface`, \
`soil_type_as_in_zobler_surface`, \
`geopotential_height_cloud_base`, \
`friction_velocity_surface`, \
`maximumcomposite_radar_reflectivity_entire_atmosphere`, \
`plant_canopy_surface_water_surface`, \
`v-component_of_wind_maximum_wind`, \
`geopotential_height_zerodegc_isotherm`, \
`mean_sea_level_pressure_nam_model_reduction_msl`, \
`temperature_surface`, \
`snow_cover_surface`, \
`geopotential_height_surface`, \
`convective_available_potential_energy_surface`, \
`latent_heat_net_flux_surface`, \
`surface_roughness_surface`, \
`pressure_maximum_wind`, \
`temperature_tropopause`, \
`geopotential_height_pblri`, \
`pressure_tropopause`, \
`snow_depth_surface`, \
`v-component_of_wind_tropopause`, \
`downward_short_wave_rad_flux_surface`, \
`u-component_of_wind_maximum_wind`, \
`wilting_point_surface`, \
`precipitable_water_entire_atmosphere`, \
`u-component_of_wind_pblri`, \
`direct_evaporation_cease_soil_moisture_surface` FROM nam_tiny')

features = df.rdd.map(lambda row: row[0:])

from pyspark.mllib.stat import Statistics

corr_mat=Statistics.corr(features, method="pearson")

import sys
import numpy as np
import matplotlib.pyplot as plt

from matplotlib import rc
rc('font',**{'family':'sans-serif','sans-serif':['Arial']})

plt.suptitle('Correlation Heatmap', fontsize=16)
plt.xlabel('Dimension ID', fontsize=14)
plt.ylabel('Dimension ID', fontsize=14)

plt.pcolor(corr_mat, cmap='RdBu_r')
cb = plt.colorbar()
cb.set_label('Correlation Coefficient', fontsize=14)
plt.plot()

### Prediction/Classification: Using what you learned above as your guide, choose a feature to predict or classify via machine learning models in MLlib. You will need to explain: (1) The feature you will predict/classify (2) Features used to train the model (3) How you partitioned your data

In [None]:
%%time
# Machine learning
from pyspark.ml.feature import VectorAssembler

def prepare_data(dframe, predictors, target):
    assembler = VectorAssembler(inputCols=predictors, outputCol="features")
    output = assembler.transform(dframe)
    return output.select("features", target).withColumnRenamed(target, "label")


prepped = prepare_data(df,
    ["geopotential_height_lltw", 
     "upward_long_wave_rad_flux_surface", 
     "albedo_surface", 
     "downward_long_wave_rad_flux_surface",
     "plant_canopy_surface_water_surface",
     "geopotential_height_zerodegc_isotherm",
     "temperature_surface",
     "snow_depth_surface"
    ],
    "snow_cover_surface")

prepped.show()
(trainingData, testData) = prepped.randomSplit([0.9, 0.1])

## Advanced Analysis

### Describe the dataset


In [22]:
aa_df = spark.read.csv('hdfs://orion11:25000/train_2v.csv', inferSchema=True,header=True)


### Outline the types of insights you hope to gain from it

### Make hypotheses about what you might find


### Design at least 3 “questions” (along the lines of those above) and answer them. Remember that presentation matters here.


In [23]:
%%time
aa_df.createOrReplaceTempView("aa_df")

print(aadd)

stroke = spark.sql("SELECT distinct(stroke) FROM aa_df").collect()
print(stroke)

DataFrame[id: int, gender: string, age: double, hypertension: int, heart_disease: int, ever_married: string, work_type: string, Residence_type: string, avg_glucose_level: double, bmi: double, smoking_status: string, stroke: int]
[Row(stroke=1), Row(stroke=0)]
CPU times: user 3.13 ms, sys: 1.91 ms, total: 5.04 ms
Wall time: 366 ms


In [24]:
%%time
#Find the occupation which is most vulernale to the stroke
spark.sql( "SELECT work_type, count(*) as stroke_patients \
            FROM aa_df \
            WHERE stroke = 1 \
            GROUP BY work_type \
            ORDER BY count(*) desc").show()


+-------------+---------------+
|    work_type|stroke_patients|
+-------------+---------------+
|      Private|            441|
|Self-employed|            251|
|     Govt_job|             89|
|     children|              2|
+-------------+---------------+

CPU times: user 1.64 ms, sys: 360 µs, total: 2 ms
Wall time: 565 ms


In [25]:
#Factoring affecting the stroke in the people
spark.sql( "SELECT \
                gender, \
                count(gender) as count, \
                count(gender)*100/sum(count(gender)) over() as percent \
            FROM aa_df \
            GROUP BY gender").show()

+------+-----+-------------------+
|gender|count|            percent|
+------+-----+-------------------+
|Female|25665|  59.13594470046083|
| Other|   11|0.02534562211981567|
|  Male|17724|  40.83870967741935|
+------+-----+-------------------+



In [26]:
#Age as factor
spark.sql("SELECT case when age <= 25 then '0-25' else case when age <=50 then '26-50' else '>50' end end as age_range, \
           count(*) as count \
           FROM aa_df \
           group by 1 order by count desc").show()

+---------+-----+
|age_range|count|
+---------+-----+
|      >50|17202|
|    26-50|14652|
|     0-25|11546|
+---------+-----+



In [28]:
# fill in missing values
train_f = aa_df.na.fill('No Info', subset=['smoking_status'])
# fill in miss values with mean
from pyspark.sql.functions import mean
mean = train_f.select(mean(train_f['bmi'])).collect()
mean_bmi = mean[0][0]
train_f = train_f.na.fill(mean_bmi,['bmi'])

In [29]:
from pyspark.ml.feature import (VectorAssembler,OneHotEncoder,
                                StringIndexer)
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [30]:
gender_indexer = StringIndexer(inputCol='gender', outputCol = 'genderIndex')
gender_encoder = OneHotEncoder(inputCol='genderIndex', outputCol = 'genderVec')

In [31]:
ever_married_indexer = StringIndexer(inputCol='ever_married', outputCol = 'ever_marriedIndex')
ever_married_encoder = OneHotEncoder(inputCol='ever_marriedIndex', outputCol = 'ever_marriedVec')

In [32]:
work_type_indexer = StringIndexer(inputCol='work_type', outputCol = 'work_typeIndex')
work_type_encoder = OneHotEncoder(inputCol='work_typeIndex', outputCol = 'work_typeVec')

In [33]:
Residence_type_indexer = StringIndexer(inputCol='Residence_type', outputCol = 'Residence_typeIndex')
Residence_type_encoder = OneHotEncoder(inputCol='Residence_typeIndex', outputCol = 'Residence_typeVec')

In [34]:
smoking_status_indexer = StringIndexer(inputCol='smoking_status', outputCol = 'smoking_statusIndex')
smoking_status_encoder = OneHotEncoder(inputCol='smoking_statusIndex', outputCol = 'smoking_statusVec')

In [35]:
assembler = VectorAssembler(inputCols=['genderVec',
 'age',
 'hypertension',
 'heart_disease',
 'ever_marriedVec',
 'work_typeVec',
 'Residence_typeVec',
 'avg_glucose_level',
 'bmi',
 'smoking_statusVec'],outputCol='features')

In [36]:
dtc = DecisionTreeClassifier(labelCol='stroke',featuresCol='features')

In [37]:
pipeline = Pipeline(stages=[gender_indexer, ever_married_indexer, work_type_indexer, Residence_type_indexer,
                           smoking_status_indexer, gender_encoder, ever_married_encoder, work_type_encoder,
                           Residence_type_encoder, smoking_status_encoder, assembler, dtc])

In [38]:
train_data,test_data = train_f.randomSplit([0.8,0.2])

In [39]:
model = pipeline.fit(train_data)

In [40]:
dtc_predictions = model.transform(test_data)

In [41]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
print('Accuracy using decision tree is: {0:2.2f}%'.format(dtc_acc*100))

Accuracy using decision tree is: 98.21%
