## What's this all about?

In order to run Spark applications on your local machine, you must have **Java 8**, **Spark**, and the **PySpark** package installed.  Additionally, for the Jupyter Notebook kernel to use your locally installed **Spark**, you may need to use the **findspark** module.   If you are unsure whether you have any or all of these requirements, we recommend you follow the instructions in this notebook.

### Automatic Java Installation

Executing the following cell will install **Java 8** into your home directory (i.e., `$HOME`) in its own directory (i.e., `$HOME/java/`).  If you would rather install Java yourself, do not execute the following cell and download Java and follow the instructions for your operating system found on the [Java website](https://www.java.com/en/download/).

*Note: If you have Java already installed, we recommend uninstalling it before running the following cell.  The next cell of code will delete anything located in `$HOME/java/` and add the installed Java directory to your `PATH` variable.  This installation code also assumes you use Bash as your default shell (and will modify your `$HOME/.bashrc` file).*

**OS specific notes:**

*Linux distros: This code has been tested and works for most distributions of Linux (32 and 64 bit)*

*Mac OSX: The code to install Java may or may not work on your machine.  If you experience an error, please download Java and follow instructions for installation found on the [Java website](https://www.java.com/en/download/).*

*Windows OS: You must download and follow instructions for installation found on the [Java website](https://www.java.com/en/download/).*

In [3]:
import platform as arch
from sys import platform
   
print('Beginning Java 8 installation!')

# Check which OS we are running on
if platform.startswith('linux'):
    print('Now installing Java on Linux...')
    if arch.architecture()[0] == '64bit':
        !curl -o ~/java.tar.gz -L http://javadl.oracle.com/webapps/download/AutoDL?BundleId=234464_96a7b8442fe848ef90c96a2fad6ed6d1
    elif arch.architecture()[0] == '32bit':
        !curl -o ~/java.tar.gz -L http://javadl.oracle.com/webapps/download/AutoDL?BundleId=234462_96a7b8442fe848ef90c96a2fad6ed6d1
    !rm -rf ~/java && mkdir ~/java && tar -xzf ~/java.tar.gz -C ~/java --strip-components=1 && rm ~/java.tar.gz

    # Define JAVA_HOME and add to PATH
    !echo 'export JAVA_HOME=$HOME/java' >> ~/.bashrc
    !echo 'export PATH=$JAVA_HOME/bin:$PATH' >> ~/.bashrc
    !. ~/.bashrc
    
    print('Installation of Java 8 complete!')

elif platform == 'darwin':
    print('Now installing Java on Mac...')
    !curl -o ~/java.dmg -L http://javadl.oracle.com/webapps/download/AutoDL?BundleId=234465_96a7b8442fe848ef90c96a2fad6ed6d1
    !hdiutil attach ~/java.dmg
    !sudo installer -pkg /Volumes/Java\ 8\ Update\ 181/Java\ 8\ Update\ 181.app/Contents/Resources/JavaAppletPlugin.pkg -target /
    !diskutil umount /Volumes/Java\ 8\ Update\ 181 
    print('Installation of Java 8 complete (maybe)... If there was an error, please mount java.dmg located in your home directory and follow the instructions to install')

elif platform == 'win32':
    print('You are running a Windows OS.  Please download the correct version of Java from here: https://java.com/en/download/manual.jsp and install following the instructions.')

else:
    print('We had trouble determining which OS you are running.  Please ask for help.')

Beginning Java 8 installation!
Now installing Java on Linux...
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   909    0   909    0     0   3223      0 --:--:-- --:--:-- --:--:--  3223
100 77.4M  100 77.4M    0     0  3133k      0  0:00:25  0:00:25 --:--:-- 2999k
Installation of Java 8 complete!


### Automatic Spark Installation

Executing the following cell will install the latest **Apache Spark** into your home directory (i.e., `$HOME`) in its own directory (i.e., `$HOME/spark/`).

If you would rather install Spark yourself, do not execute the following cell and download the pre-built version of Spark found on the [Spark website](https://spark.apache.org/downloads.html).  You will need to extract the contents of the tarball and add Spark to your `PATH` variable.

*Note: If you have Spark already installed, we recommend uninstalling it before running the following cell.  The next cell of code will delete anything located in `$HOME/spark/` and add the installed Spark directory to your `PATH` variable.  This installation code also assumes you use Bash as your default shell (and will modify your `$HOME/.bashrc` file).*

**OS specific notes:**

*Linux distros: This code has been tested and works for most distributions of Linux (32 and 64 bit).*

*Mac OSX: The code has been tested and should work for modern Macs.*

*Windows OS: You must download and follow instructions for installation found on the [Spark website](https://spark.apache.org/downloads.html).*

In [None]:
from sys import platform

print('Beginning Apache Spark installation!')

# Check which OS we are running on
if (platform.startswith('linux')) or (platform == 'darwin'):
    # Download Spark and extract into $HOME/spark/
    !curl -o ~/spark.tar.gz -L http://apache.cs.utah.edu/spark/spark-2.3.2/spark-2.3.2-bin-hadoop2.7.tgz
    !rm -rf ~/spark && mkdir ~/spark && tar -xzf ~/spark.tar.gz -C ~/spark --strip-components=1 && rm ~/spark.tar.gz
    
    # Define SPARK_HOME and add to PATH
    !echo 'export SPARK_HOME=$HOME/spark' >> ~/.bashrc
    !echo 'export PATH=$SPARK_HOME/bin:$PATH' >> ~/.bashrc
    
    # Set spark master to localhost (may not be necessary)
    !echo 'export SPARK_LOCAL_IP="127.0.0.1"' >> ~/.bashrc
    !. ~/.bashrc
    
    print('Installation of Apache Spark complete!')

elif platform == 'win32':
    print('You are running a Windows OS.  Please download the correct version of Spark from here: https://spark.apache.org/downloads.html and install following the instructions.')

else:
    print('We had trouble determining which OS you are running.  Please ask for help.')

Beginning Apache Spark installation!
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
 22  215M   22 47.6M    0     0  1209k      0  0:03:02  0:00:40  0:02:22 1207k

### Automatic Pyspark Installation

Executing the following cell will install **pyspark** and **findspark**.  We will use either `conda` (if you have anaconda3 or miniconda installed) or `pip`.  At the minimum, you must have `pip` installed.  You likely have one of these installed already!  If you do not, you can download and install [Anaconda3](https://www.anaconda.com/download/) or [pip](https://pip.pypa.io/en/stable/installing/).  We will also install **matplotlib** so you can plot results from your assignments and pre-build the font cache to save time in the future!

*Note: These instructions should work regardless of the OS you are running!*

In [None]:
import shutil

# Method to build Matplotlib font cache
def buildFontCache():
    import matplotlib
    matplotlib.use('AGG')
    from matplotlib import pyplot as plt
    plt.plot([0],[0])
    plt.show()
    plt.clf()

# Check for conda
if shutil.which('conda'):
    # Update conda
    !conda update -n base conda --yes
    
    # Install pyspark and findspark
    !conda install pyspark --yes
    !conda install -c conda-forge findspark --yes
    
    # Install matplotlib and build font cache
    !conda install matplotlib --yes
    buildFontCache()
    
    print('Python package installation complete!')

    
# Check for pip if conda is not found
elif shutil.which('pip'):
    # Update pip
    !pip install --upgrade pip
    
    # Install pyspark and findspark
    !pip install pyspark
    !pip install findspark
    
    # Install matplotlib and build font cache
    !pip install matplotlib
    buildFontCache()
    
    print('Python packages installation complete!')
    
else:
    print('Could not find conda or pip, please follow the instructions above to install either Anaconda3 or pip.')

### Does it Work?

Let's test that **Java**, **Spark**, **pyspark**, and **findspark** were all installed correctly.  The following should create a `SparkContext`, create an `RDD` from a python list, and print the values in the `RDD`.

**Expected output:**
`[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]`

In [None]:
import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

data = sc.parallelize(range(10))
print(data.collect())

**If the output of the previous cell did not match the expected output or you received an error message, please ask for assistance!**

### Writing Spark Code

Now that you have all the software installed to run **Spark** code in a Jupyter Notebook, keep in mind that you will need to use the following code at the beginning of each notebook where you wish to use **Spark**.  This initialization code will create a `SparkContext` which you can access via `sc`.

In [None]:
import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Nov 20 20:21:31 2018

@author: trahman4
"""

import pandas as pd
from datetime import date, timedelta
import datetime
from pyspark import SparkContext, SparkConf
from matplotlib import pyplot as plt
import numpy as np
from scipy.stats import norm



sc = SparkContext.getOrCreate()

dataset_tags = pd.read_csv('scav_mumm.csv').iloc[:,:-1]
dataset_max_min = pd.read_csv('outputs/maxdate_mindate.csv')
dataset_weather = pd.read_csv('KnoxvilleWeather_modified.csv')

scav_df = dataset_tags[dataset_tags['tag'].str.contains('scav')]
mumm_df = dataset_tags[dataset_tags['tag'].str.contains('mumm')]

dataset_weather["date"] = ""

#convert the weather dataset's date to date object date so that we can use the match to find date range
dataset_weather['date']= dataset_weather['dt_iso'].apply(lambda x: 
                    datetime.datetime(int(x.split(" ")[0].split("-")[0]),int(x.split(" ")[0].split("-")[1]),int(x.split(" ")[0].split("-")[2])).date())


donor_match_df = pd.DataFrame
match = pd.DataFrame


#this function will take a dataframe of certain tags and will return the min date 
#of all the minimum dates of each images and max dates for all maximium...
#meaning each image has a max(current) date and a minimum date...we collect all the minimum dates for all images
#same for max
def collect_max_min_date(tag, dataf):
    global donor_match_df,match
    print("We are working on ", tag , " tag")
    min_dates = []
    max_dates = []
    donor = choose_donor(dataf)
    print("The donor with most data is ", donor)
    donor_match_df = dataf[dataf['image'].str.contains(donor)] #matching the image name with donor's name
    
    for row in donor_match_df.iterrows():
        image_name = row[1].values[0]
        image_name = image_name.split(" ")[0]      
        match = dataset_max_min[dataset_max_min['image_name'].str.match(image_name)] #matching the image name with tag's data with max_min_date data
        
        max_dates.append(match['date_from_image/max_date'].values[0])
        min_dates.append(match['min_date/first date'].values[0])
    max_date = max(max_dates)
    min_date = min(min_dates)
    return max_date, min_date



def choose_donor(dataf):
    donors = []
    for row in dataf.iterrows():
        donor_name = row[1].values[0].split(" ")[0].split("_")[0]
        donors.append(donor_name)
    freq_rdd = sc.parallelize(donors)

    freq_rdd_count = freq_rdd.map(makeKeyValue)
    freq_rdd_sum = freq_rdd_count.reduceByKey(addValues)
    
    freq_rdd_counts_sorted = sorted (freq_rdd_sum.collect(), key = lambda x: x[1], reverse = True)
    print_map_reduce_data(freq_rdd_counts_sorted)
    return freq_rdd_counts_sorted[1][0]

#this function gets a string in the form of "YYYY-MM-DAY" and return date object    
def make_date(curr_date):
    year, month, day = str(curr_date).split("-")            
    curr_date = date(int(year), int(month), int(day))
    return curr_date    
            

def makeKeyValue(key, value=1):
    return (key, value)

# Count (reduce) the values for a given key (word length)
def addValues(val1, val2):
    return val1 + val2

def plot_histo(tag,sorted_list,x_label,y_label,title):
    plt.clf()
    x, y = zip(*sorted_list)
    fig, axs = plt.subplots(1,1,figsize=(20,9))
    index = np.arange(len(x))
    plt.barh(index, y, color ='g')
    plt.xlabel(y_label, fontsize=5)
    plt.ylabel(x_label, fontsize=10)
    plt.yticks(index, x, fontsize=10, rotation=30)
    plt.title(title)
    plt.savefig('plt_'+str(tag)+'.png', dpi = 300)
    plt.show()
    

def print_map_reduce_data(sorted_list):  
    print('Weather Type          :  Count')
    for weather_f, count in sorted_list:
        print('{:<20}    : {:>6}'.format(weather_f, count))
        
    ls = [vehicle_body for vehicle_body, count in sorted_list[:]]
        
    print ("Total Number of Weather Types: ", len(ls))
    
    
    
def map_reduce(tag, list_to_mr):
    weather_freq = sc.parallelize(list_to_mr)

    weather_freq_count = weather_freq.map(makeKeyValue)
    weather_freq_sum = weather_freq_count.reduceByKey(addValues)
    
    weather_freq_counts_sorted = sorted (weather_freq_sum.collect(), key = lambda x: x[1], reverse = True)
    print_map_reduce_data(weather_freq_counts_sorted)
    plot_histo(tag, weather_freq_counts_sorted, "type", "freq", "histogram for "+str(tag))
    




def plot_normal_distribution(temp, column_name):
    
    mu, std = norm.fit(temp)
    
    plt.hist(temp, bins=25, density=True, alpha=0.6, color='g')
    
    # Plot the PDF.
    xmin, xmax = plt.xlim()
    x = np.linspace(xmin, xmax, 100)
    p = norm.pdf(x, mu, std)
    plt.plot(x, p, 'k', linewidth=2)
    title = "Normal Distribution of %s\nFit results: mu = %.2f,  std = %.2f" % (column_name,
                                                                                mu, std)
    plt.title(title)
    plt.show()
    

      
scav_max_date, scav_min_date = collect_max_min_date("scav",scav_df)
scav_min_date = make_date(scav_min_date)
scav_max_date = make_date(scav_max_date)


mumm_max_date, mumm_min_date = collect_max_min_date("mumm",mumm_df)
mumm_min_date = make_date(mumm_min_date)
mumm_max_date = make_date(mumm_max_date)

#getting the date range between max date and min date
scav_date_range = [scav_min_date + timedelta(days=x) for x in range((scav_max_date-scav_min_date).days + 1)]
mumm_date_range = [mumm_min_date + timedelta(days=x) for x in range((mumm_max_date-mumm_min_date).days + 1)]

#extract the weather dataset's dates that are within the date range
scav_weather_df = dataset_weather[dataset_weather['date'].isin(scav_date_range)]
mumm_weather_df = dataset_weather[dataset_weather['date'].isin(mumm_date_range)]

#getting the weather description column to build the histogram/mapreduce
scav_weather_description = scav_weather_df['weather_description'].values.T.tolist()
mumm_weather_description = mumm_weather_df['weather_description'].values.T.tolist()




map_reduce("mumm", mumm_weather_description)
map_reduce("scav", scav_weather_description)

#plot_normal_distribution(scav_weather_df['humidity'],'scav weather humidity')
