# CIS 545 Homework 3: Spark SQL

Welcome to CIS 545 Homework 3! In this homework you will gain a mastery of using Spark SQL. By the end, you'll be a star (not that you aren't already one). Over the next few days you will be using an EMR cluster to use Spark to manipulate the entire `linkedin.json` dataset from Homework 2 as well as a new data set, `stock_prices.csv`.

The goal of the homework will be to create a training dataset for a Random Forest Machine learning model. The training data set will contain the monthly number of employees hired by companies in `linkedin.json` and their corresponding closing stock prices over a 10 year period (2000-2011). We will try and predict, based on this data, if the company will have a positive or negative growth in stock in the first quarter of the next year. Who's ready to make some money?

## The Noteworthy Notes
Before we begin here are some important notes to keep in mind,


1.   **IMPORTANT!** I said it twice, it's really important. In this homework, we will be using AWS resources. You are given a quota ($150) to use for the entirety of the homework. There is a small chance you will use all this money, however it is important that at the end of every session, you **shut down your EMR cluster**.
2.   **You can only use Google Colab for this Homework** since we must connect to the EMR cluster and Jupyter doesn't do that. Using a Google Colab Notebook with an EMR cluster has two important abnormalities:
    * The first line of any cell in which you will use the spark session must be `%%spark`. Notice that all cells below have this.
    * You will, unfortunately, not be able to stop a cell while it is running. If you wish to do so, you will need to restart your cluster. See the Setup EMR Document for reference.
3.   You are **required** to use Spark SQL queries to handle the data in the assignment. Mastering SQL is more beneficial than being able to use Spark commands (functions) as it will show up in more areas of programming and data science/analytics than just Spark. Use the following [function list](https://spark.apache.org/docs/latest/api/sql/index.html#) to see all the SQL functions avaliable in Spark.
4.   Throughout the homework you will be manipulating Spark dataframes (sdfs). We do not specify any ordering on the final output. You are welcome to order your final tables in whatever way you deem fit. We will conduct our own ordering when we grade.
5. Speaking of grading, this homework introduces `The_Gallant_Grader` an astonishing autograder create by everybody's favorite TA, Leonardo Murri. After each work cell you will see a cell titled `## AUTOGRADER Step 1.X: Run this to get your score ##`. By running the cell you will get your score for a specific task. You are allowed to submit to the autograder as many times as you'd like. However, know that we can track all your submission so if you try to pull anything shady we'll figure it out.
6. There are portions of this homework that are _very_ hard. We urge you start early to come to office hours (especially Fridays and Wednesdays) and get help if you get stuck. But don't worry, I can see the future, and you all got this.

With that said, let's dive in.




## Step 0: Set up EMR

Your first task is to create an EMR cluster your AWS Educate Accounts. Please see the [attached document](https://drive.google.com/open?id=1_8NB_3QXfQKm5Vyu7XyxU2mnH5HnGM-F) for detailed instructions. Move on to Step 0.1 after you have completed all the steps in the document.

### Step 0.1: The Superfluous Setup

Run the following two cells. These will allow your colab notebook to connect to an use your EMR.

In [0]:
%%capture
!apt update
!apt install gcc python-dev libkrb5-dev
!pip install sparkmagic

In [0]:
%%capture
%load_ext sparkmagic.magics

### Step 0.2: The Succulent Spark

Now, connect your notebook to the EMR cluster you created. In the first cell, copy the link to the Master Public DNS specified in the setup document. You will need to add `http://` to the beginning of the address and the port to the end. The final format should be,

`http://<your-DNS-link>:8998`

For example, if my DNS (directly from the AWS EMR console) is `ec2-3-15-237-211.us-east-2.compute.amazonaws.com` my address would be,

`http://ec2-3-15-237-211.us-east-2.compute.amazonaws.com:8998`

Insert this in the `# TODO # below`. For our example, the cell would read,



```
%spark add -s spark_session -l python -u http://ec2-3-15-237-211.us-east-2.compute.amazonaws.com:8998
```

In [0]:
# TODO: Enter your Master Public DNS with the proper formatting and host

%spark add -s spark_session -l python -u http://ec2-52-90-64-175.compute-1.amazonaws.com:8998


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1572359350162_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


Enter your 8-digit Penn Key as an integer in the cell below. This will be used in the autograder.

In [0]:
%%spark

# TODO: Enter your 8-digit Penn Key as an integer 

STUDENT_PENN_KEY = 30205907


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Run the following cell to setup the autograder, make sure to have set your 8 digit Penn ID in the cell above. It will also import all the modules you need for the homework.

_Note_: Since we are using an EMR cluster we will not have access to the several modules that exist for python, e.g. `pandas`, `numpy`, etc. We have written the entire homework such that the solution does not require any of these. Don't try to import them. It won't work.

In [0]:
%%spark

import json
import pyspark
import urllib.request

from pyspark.sql.types import * 
from datetime import datetime

api_endpoint = 'https://q0gtldc2ck.execute-api.us-east-2.amazonaws.com/default/GalantGrader_v0'

class TheGallantGrader: 
    
    def __init__(self, student_id, api_endpoint = api_endpoint, homework_id = '3'):
      
        if student_id == None:
            print('Error Autograder Not Setup: Enter your 8 digit PennID in the cell above.')
            
        self.student_id   = str(student_id)
        self.api_endpoint = api_endpoint
        self.homework_id  = homework_id 
        
    def grade(self, question_id, answer):
        payload = {'student_id'   : self.student_id,
                   'homework_id'  : self.homework_id,
                   'test_case_id' : question_id,
                   'answer'       : answer}

        params = json.dumps(payload).encode('utf-8')
        request = urllib.request.Request(self.api_endpoint, 
                                         data    = params, 
                                         headers = {'content-type': 'application/json'})
        try:
            response = urllib.request.urlopen(request)
            response_body = response.read().decode('utf-8')
            print('{}'.format(response_body))
        except:
            print('Error: Grading request could not be complete.')
            
            
grader = TheGallantGrader(student_id = STUDENT_PENN_KEY, homework_id = '3')  


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 1: Data Cleaning and Shaping

The data you will use is stored in an S3 bucket, a cloud storage service. You now need to download it onto the nodes of your [EMR cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-what-is-emr.html). 

### Step 1.1: The Stupendous Schema

When loading data, Spark will try to infer it's structure on it's own. This process is faulty because it will sometimes infer the type incorrectly. JSON documents, like the one we will use, can have nested types, such as: arrays, arrays of dictionaries, dictionaries of dictionaries, etc. Spark's ability to determine these nested types is not reliable, thus you will define a schema for `linkedin.json`.

A schema is a description of the structure of data. You will be defining an explicit schema for `linkedin.json`. In Spark, schema's are defined using a `StructType` object. This is a collection of data types, termed `StructField`'s, that specify the structure and variable type of each component of the dataset. For example, suppose we have the following simple JSON object,


```
{
 "student_name": "Leonardo Murri",
 "GPA": 1.4,
 "courses": [
    {"department": "Computer and Information Science",
     "course_id": "CIS 545",
     "semester": "Fall 2018"},
    {"department": "Computer and Information Science",
     "course_id": "CIS 520",
     "semester": "Fall 2018"},
    {"department": "Electrical and Systems Engineering",
     "course_id": "ESE 650",
     "semester": "Spring 2018"}
 ],
 "grad_year": 2019
 }
```

We would define it's schema as follows,

```       
schema = StructType([
           StructField("student_name", StringType(), nullable=True),
           StructField("GPA", FloatType(), nullable=True),
           StructField("courses", ArrayType(
                StructType([
                  StructField("department", StringType(), nullable=True),
                  StructField("course_id", StringType(), nullable=True),
                  StructField("semester", StringType(), nullable=True)
                ])
           ), nullable=True),
           StructField("grad_year", IntegerType(), nullable=True)
         ])
```


Each `StructField` has the following structure: `(name, type, nullable)`. The `nullable` flag defines that the specified field may be empty. Your first task is to define the `schema` of `linkedin.json`. A sample JSON object for the dataset can be found [here](http://oneclickpaste.com/194048/).

_Note_: In `linkedin.json` the field `specilities` is spelled incorrectly. This is **not** a typo. There is also no grading cell for this step.



In [0]:
%%spark

# TODO: Define [linkedin.json] schema
schema = StructType([
           StructField("_id", StringType(), nullable=True),
    
           StructField("education", ArrayType(
                StructType([
                  StructField("major", StringType(), nullable=True),
                  StructField("end", StringType(), nullable=True),
                  StructField("name", StringType(), nullable=True),
                  StructField("degree", StringType(), nullable=True),
                  StructField("start", StringType(), nullable=True),
                  StructField("desc", StringType(), nullable=True)
                ])
           ), nullable=True),
    
           StructField("group", StructType([
               StructField("affilition", ArrayType(StringType()), nullable=True)
           ]), nullable=True),
    
           StructField("name", StructType([
                    StructField("family_name", StringType(), nullable=True),
                    StructField("given_name", StringType(), nullable=True)
                ]), nullable=True),
    
           StructField("locality", StringType(), nullable=True),
           StructField("skills", ArrayType(StringType()), nullable=True),
           StructField("industry", StringType(), nullable=True),
    
           StructField("interval", IntegerType(), nullable=True),
    
           StructField("experience", ArrayType(
                StructType([
                  StructField("org", StringType(), nullable=True),
                  StructField("title", StringType(), nullable=True),
                  StructField("end", StringType(), nullable=True),
                  StructField("start", StringType(), nullable=True),
                  StructField("desc", StringType(), nullable=True) 
                ])
           ), nullable=True),
    
           StructField("summary", StringType(), nullable=True),
           StructField("interests", StringType(), nullable=True),
           StructField("overview_html", StringType(), nullable=True),
           StructField("specilities", StringType(), nullable=True),
           StructField("homepage", ArrayType(StringType()), nullable=True), 
           StructField("honors", ArrayType(StringType()), nullable=True),
           StructField("url", StringType(), nullable=True), 
    
           StructField("also_view", ArrayType(
                StructType([
                  StructField("url", StringType(), nullable=True),
                  StructField("id", StringType(), nullable=True)
                ])
           ), nullable=True),
    
           StructField("events", ArrayType(
                StructType([
                  StructField("from", StringType(), nullable=True),
                  StructField("to", StringType(), nullable=True),
                  StructField("title1", StringType(), nullable=True),
                  StructField("start", IntegerType(), nullable=True),
                  StructField("title2", StringType(), nullable=True),
                  StructField("end", IntegerType(), nullable = True)
                ])
           ), nullable=True)
         ])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 1.2: The Laudable Loading

Load the `linkedin.json` dataset from your S3 bucket into a Spark dataframe (sdf) called `raw_data_sdf`. If you have constructed `schema` correctly `spark.read.json()` will read in the dataset. ***You do not need to edit this cell***.

In [0]:
%%spark

raw_data_sdf = spark.read.json("s3a://grewal-545-emr/linkedin.json", schema=schema)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The cell below shows how to run SQL commands on Spark tables. Use this as a template for all your SQL queries in this notebook. ***You do not need to edit this cell***.

In [0]:
%%spark

# Create SQL-accesible table
raw_data_sdf.createOrReplaceTempView("raw_data")

# Declare SQL query to be excecuted
query = '''SELECT * 
           FROM raw_data'''

# Save the output sdf of spark.sql() as answer_sdf
answer_sdf = spark.sql(query)

# Display the first 10 rows
answer_sdf.show(10)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|               _id|           education|               group|                name|            locality|              skills|            industry|interval|          experience|             summary|           interests|       overview_html|         specilities|homepage|              honors|                 url|           also_view|              events|
+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--

In [0]:
%%spark 

## AUTOGRADER Step 1.2: Run this to get your score. ##

raw_data_sdf.createOrReplaceTempView("test_1_2")
test_1_2_sdf = spark.sql("SELECT _id, name.given_name FROM test_1_2 ORDER BY _id ASC LIMIT 10")
grader.grade(question_id = "1.2", answer = test_1_2_sdf.take(10))
spark.catalog.dropTempView("test_1_2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 1/1 points. You are a star!

Score for question 1.2 has been recorded in the backend.

### Step 1.3: The Extravagent Extraction

In our training model, we are interested in when individuals began working at a company.  From creating the schema, you should notice that the collection of companies inviduals worked at are contained in the `experience` field as an array of dictionaries. You should use the `org` for the company name and `start` for the start date. Here is an example of an `experience` field,

```
{
   "experience": [
     {
        "org": "The Walt Disney Company", 
        "title" : "Mickey Mouse",
        "end" : "Present",
        "start": "November 1928",
        "desc": "Sailed a boat."
     },
     {
        "org": "Walt Disney World Resort",
        "title": "Mickey Mouse Mascot",
        "start": "January 2005",
        "desc": "Took pictures with kids."
     }
   ]
}
```

Your task is to extract each pair of company and start date from these arrays. In Spark, this is known as "exploding" a row. An explode will seperate the elements of an array into multiple rows.

Create an sdf called `raw_start_dates_sdf` that contains the company and start date for every experience of every individual in `raw_data_sdf`. Drop any row that contains a `null` in either column with `dropna()`. Remember we will sort the dataframe when grading so you can sort the elements however you wish (you don't need to if you don't want to). The sdf should look as follows:

```
+--------------------------+---------------+
|org                       |start_date     |
+--------------------------+---------------+
|Walt Disney World Resort  |January 2005   | 
|The Walt Disney Company   |November 1928  |
|...                       |...            |
+--------------------------+---------------+
```

_Hint_: You may want to do two seperate explodes for `org` and `start`. In an explode, the position of the element in the array can be extracted as well, and used to merge two seperate explodes. Reference the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html).

_Note_: Some of the entires in `org` are "weird", i.e. made up of non-english letters and characters. Keep them. **DO NOT** edit any name in the original dataframe unless we specify. **DO NOT** drop any row unless there is a `null` value as stated before. This goes for the rest of the homework as well, unless otherwise specified.

In [0]:
%%spark

# TODO: Create [raw_start_dates_sdf]

from pyspark.sql.functions import col, explode
from pyspark.sql.functions import monotonically_increasing_id
# 1st explode
orgDF = raw_data_sdf.withColumn("firstExplode", explode(col("experience"))).select("firstExplode.org")
# 2nd explode
startDF = raw_data_sdf.withColumn("secondExplode", explode(col("experience"))).select("secondExplode.start")
# add a column called "id" for join two dataframes
orgDF = orgDF.withColumn("id", monotonically_increasing_id())
startDF = startDF.withColumn("id", monotonically_increasing_id())
# join two dataframes by "id"
raw_start_dates_sdf = orgDF.join(startDF, "id", "outer").drop("id")
# reneme the start column as start_date
raw_start_dates_sdf = raw_start_dates_sdf.select(col("org"),col("start").alias("start_date"))
# drop any row that contains a null in either column 
raw_start_dates_sdf = raw_start_dates_sdf.dropna()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [0]:
%%spark 

## AUTOGRADER Step 1.3: Run this to get your score. ##

raw_start_dates_sdf.createOrReplaceTempView("test_1_3")
test_1_3_sdf = spark.sql("SELECT * FROM test_1_3 ORDER BY org DESC, start_date DESC LIMIT 10")
grader.grade(question_id = "1.3", answer = test_1_3_sdf.take(10))
spark.catalog.dropTempView("test_1_3")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 4/4 points. You are a star!

Score for question 1.3 has been recorded in the backend.

### Step 1.4: The Fortuitous Formatting

There are two issues with the values in our `date` column. First, the values are saved as strings, not datetime types. This halts us from running functions such as `ORDER BY` or `GROUP BY` on common months or years. Second, some values do not have both month and year information or are in other languages. Your task is to filter out and clean the `date` column. We are interested in only those rows that have date in the following format "(month_name) (year)", e.g. "October 2010".

Create an sdf called `filtered_start_dates_sdf` from `raw_start_dates_sdf` with the `date` column filtered in the manner above. Keep only those rows with a start date between January 2000 to December 2011, inclusive. Ensure that any dates that are not in our desired format are ommitted. Drop any row that contains a `null` in either column. The format of the sdf is shown below:
```
+--------------------------+---------------+
|org                       |start_date     |
+--------------------------+---------------+
|Walt Disney World Resort  |2005-01-01     | 
|...                       |...            |
+--------------------------+---------------+
```
_Hint_: Refer to the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html) to format the `date` column. In Spark SQL the date format we are interested in is `"MMM y"`.

_Note_: Spark will return the date in the format above, with the day as `01`. This is ok, since we are interested in the month and year each individual began working and all dates will have `01` as their day.

In [0]:
%%spark

# TODO: Create [filtered_start_dates_sdf]
from pyspark.sql.functions import unix_timestamp, to_date
from pyspark.sql.functions import lit
# use a simple name
df = raw_start_dates_sdf
# convert the string to date form and drop null
df = df.withColumn('start_date', 
                   to_date(unix_timestamp(col('start_date'), 'MMM y').cast("timestamp"))).dropna()
# filter the date column by given time range
df.createOrReplaceTempView('filter_start_date')
query = '''SELECT org, start_date
           FROM filter_start_date
           WHERE start_date >='2000-01-01' AND start_date <= '2011-12-01'
        '''
filtered_start_dates_sdf = spark.sql(query)





FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [0]:
%%spark

## AUTOGRADER Step 1.4: Run this to get your score. ##

filtered_start_dates_sdf.createOrReplaceTempView("test_1_4")
test_1_4_sdf = spark.sql("SELECT org, DATE_FORMAT(start_date, 'yyyy-MM-dd') FROM test_1_4 ORDER BY org DESC, start_date DESC LIMIT 10")
grader.grade(question_id = "1.4", answer = (test_1_4_sdf.take(10), filtered_start_dates_sdf.dtypes))
spark.catalog.dropTempView("test_1_4")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 4/4 points. You are a star!

Score for question 1.4 has been recorded in the backend.

### Step 1.5 The Gregarious Grouping

We now want to collect the number of individuals that started in the same month and year for each company. Create an sdf called `start_dates_sdf` that has the total number of employees who began working at the same company on the same start date. The format of the sdf is shown below:

```
+--------------------------+---------------+---------------+
|org                       |start_date     |num_employees  |
+--------------------------+---------------+---------------+
|Walt Disney World Resort  |2005-01-01     |1              |
|...                       |...            |...            |
+--------------------------+---------------+---------------+
```

In [0]:
%%spark

# TODO: Create [start_dates_sdf]

filtered_start_dates_sdf.createOrReplaceTempView("filtered_start_dates")
query = '''SELECT org, start_date, count(*) as num_employees 
          FROM filtered_start_dates 
          GROUP BY org, start_date'''
start_dates_sdf = spark.sql(query)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [0]:
%%spark

## AUTOGRADER Step 1.5: Run this to get your score. ##

start_dates_sdf.createOrReplaceTempView("test_1_5")
test_1_5_sdf = spark.sql("SELECT org, DATE_FORMAT(start_date, 'yyyy-MM-dd'), num_employees FROM test_1_5 ORDER BY num_employees DESC, org DESC, start_date ASC LIMIT 10")
grader.grade(question_id = "1.5", answer = (test_1_5_sdf.take(10), start_dates_sdf.dtypes))
spark.catalog.dropTempView("test_1_5")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 4/4 points. You are a star!

Score for question 1.5 has been recorded in the backend.

## Step 2: Hiring Trends Analysis

Now we will analyze `start_dates_sdf` to find monthly and annual hiring trends.

### Step 2.1: The Marvelous Months

Your task is to answer the question: "On average, what month do most employees start working?" Create an sdf called `monthly_hires_sdf` which contains the total number of employees that started working on a specific month, at any company and on any year. The `month` column should be of type `int`, i.e. 1-12. The format of the sdf is shown below:

```
+---------------+---------------+
|month          |num_employees  |
+---------------+---------------+
|1              |...            |
|2              |...            |
|3              |...            |
|...            |...            |
+---------------+---------------+
```

Find the month in which the most employees start working and save it's number as an integer to the variable `most_common_month`.

_Hint_: Be careful. The starts dates we have right now have both month and year. We only want the common months. See if you can find something in the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html) that will help you do this.

In [0]:
%%spark

# TODO: Create [monthly_hire_sdf] and find the most common month people were
# use a simple name
from pyspark.sql.functions import month
# use a simple name
df = filtered_start_dates_sdf
# change the date to the month
df2 = df.select(month(df.start_date).alias("month"))
# groupby the dataframe by month
df3 = df2.groupby(df2.month).count()
# change the name of 'month' column
monthly_hires_sdf = df3.select(col("month"),col("count").alias("num_employees"))
# order the dataframe by num_employees of every month
monthly_hires_sdf = monthly_hires_sdf.orderBy(monthly_hires_sdf.num_employees, ascending = False)

#monthly_hires_sdf.show()

# get the variable of the month with most employees
most_common_month = monthly_hires_sdf.collect()[0][0]



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [0]:
%%spark
# TODO: Create [monthly_hire_sdf] and find the most common month people were
from pyspark.sql.functions import month
filtered_start_dates_sdf.createOrReplaceTempView("filtered_start_dates")
query = '''SELECT month(start_date) as month, count(*) as num_employees 
          FROM filtered_start_dates 
          GROUP BY month
          ORDER BY num_employees DESC'''
monthly_hires_sdf = spark.sql(query)
# get the variable of the month with most employees
most_common_month = monthly_hires_sdf.collect()[0][0]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [0]:
%%spark

## AUTOGRADER Step 2.1: Run this to get your score. ##

monthly_hires_sdf.createOrReplaceTempView("test_2_1")
test_2_1_sdf = spark.sql("SELECT * FROM test_2_1 ORDER BY month ASC")
grader.grade(question_id = "2.1", answer = (test_2_1_sdf.take(12), most_common_month))
spark.catalog.dropTempView("test_2_1")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 2.2: The Preposterous Percentages

The next question we will answer is "What is the percentage change in hires between 2010 and 2011 for each company?" Create an sdf called `percentage_change_sdf` that has the percentage change between 2010 and 2011 for each company. The sdf should look as follows:

```
+---------------------------+--------------------+
|org                        |percentage_change   |
+---------------------------+--------------------+
|Walt Disney World Resort   |12.3                |
|...                        |...                 |
+---------------------------+--------------------+
```

_Note_: A percentage change can be positive or negative depending 
on the difference between the two years.The formula for percent change is given below,

$$\text{% change} = \frac{P_f-P_i}{P_f} \times 100$$

Here, $P_f$ is the final element (in this case the number of hires in 2011) and $P_i$ is initial element (the number of hires in 2010).

_Hint_: This is a **difficult** question. I'm really sorry. We recommend using a combination of `GROUP BY` and `JOIN`. Keep in mind that operations between columns in SQL dataframes are often easier than those between rows. Come to office hours if you need help. Especially **Fridays** and Wednesdays _*cough *cough_.

In [0]:
%%spark

# TODO: Create [percentage_change_sdf]
from pyspark.sql.functions import year
# meke a simple name
df = filtered_start_dates_sdf
# change the date column into year
df2 = df.select(col("org"), year(df.start_date).alias("year"))
# filter the dataframe by year 2010 and 2011 separately
df_initial = df2.filter(df2.year == 2010)
df_final = df2.filter(df2.year == 2011)

# group by these 2 dataframes by column 'org' separately
df_initial = df_initial.groupby(df_initial.org).count()
df_initial = df_initial.select(col("org"),col("count").alias("initial_year"))
df_final = df_final.groupby(df_final.org).count()
df_final = df_final.select(col("org"),col("count").alias("final_year"))

# merge these two dataframe by column 'org'
df_merge = df_initial.join(df_final, "org", "outer").dropna()
#df_merge.show(5)
final_sdf = df_merge.withColumn('percentage_change', (df_merge.final_year - df_merge.initial_year) / df_merge.final_year *100)
percentage_change_sdf = final_sdf.select(col('org'), col('percentage_change'))
percentage_change_sdf.show()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [0]:
%%spark


In [0]:
%%spark

## AUTOGRADER Step 2.2: Run this to get your score. ##

percentage_change_sdf.createOrReplaceTempView("test_2_2")
test_2_2_sdf = spark.sql("SELECT * FROM test_2_2 ORDER BY org DESC, percentage_change ASC LIMIT 10")
grader.grade(question_id = "2.2", answer = test_2_2_sdf.take(10))
spark.catalog.dropTempView("test_2_2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 4/4 points. You are a star!

Score for question 2.2 has been recorded in the backend.

## The Bountiful Break

That last question was hard. And it's gonna get harder. Take a break. Sit back and relax for a minute. Listen to some music. Here's a [suggestion](https://www.youtube.com/watch?v=9Crm6xJLJgs).

In the cell below fill out the boolean variable `whatd_you_think` with `True` if you liked it or `False` if you didn't. You will be graded on your response.

In [0]:
%%spark

# TODO: Listen to some music

whatd_you_think = True


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [0]:
%%spark

## AUTOGRADER Bountiful Break: Run this to get your score. ##

grader.grade(question_id = "break", answer = str(whatd_you_think))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 1/1 points. You are a star!

Score for question break has been recorded in the backend.

## Step 3: Formatting the Training Data


Our overaching goal is to train a machine learning (ML) model that will use the monthly hiring trends of a company to predict a positive or negative gain in the company's stock in the first quarter of the following year. A ML model is trained on a set of observations. Each observation contains a set of features, `X`, and a label, `y`. The goal of the ML model is to create a function that takes any `X` as an input and outputs a predicted `y`. 

The machine learning model we will use is a [Random Forest Classifier](https://builtin.com/data-science/random-forest-algorithm). Each observation we will pass in will have 24 features (columns). These are the number of people hired from Jan to Dec and the company stock price on the last day of each month. The label will be the direction of the company's stock percentage change (positive, `1`, or negative, `-1`) in the first quarter of the following year. Each observation will correspond to a specified company's trends on a specified year. The format of our final training sdf is shown below. The first 26 columns define our observations, `X`, and the last column the label, `y`.
```
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|org |year |jan_hired |   ...   |dec_hired |jan_stock |   ...   |dec_stock |stock_result |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|IBM |2008 |...       |   ...   |...       |...       |   ...   |...       |1            |
|IBM |2009 |...       |   ...   |...       |...       |   ...   |...       |-1           |
|... |...  |...       |   ...   |...       |...       |   ...   |...       |...          |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
```

_Note_: We will use the first three letters of each month in naming, i.e. `jan, feb, mar, apr, may, jun, jul, aug, sep, oct, nov, dec`



### Step 3.1: The Harmonious Hires

Your first task is to create the first half of the training table, i.e. the `jan_hired` through `dec_hired` columns. This will involve reshaping `start_dates_sdf`. Currently, `start_dates_sdf` has columns `org`, `start_date`, and `num_employees`. We want to group the rows together based on common `org` and years and create new columns for the number of employees that started working in each month of that year.

Create an sdf called `raw_hirings_for_training_sdf` that has for a single company and a single year, the number of hires in Jan through Dec, and the total number of hires that year. Note that for each company you will have several rows corresponding to years between 2000 and 2011. It is ok if for a given company you don't have a given year. However, ensure that for a given company and given year, each month column has an entry, i.e. if no one was hired the value should be `0`. The format of the sdf is shown below: 
```
+----+-----+----------+---------+----------+----------+
|org |year |jan_hired |   ...   |dec_hired |total_num |
+----+-----+----------+---------+----------+----------+
|IBM |2008 |...       |   ...   |...       |...       |
|IBM |2009 |...       |   ...   |...       |...       |
|... |...  |...       |   ...   |...       |...       |
+----+-----+----------+---------+----------+----------+
```
_Hint_: This is a **difficult difficult** question. I'm really really sorry. The tricky part is creating the additional columns of monthly hires, specifically when there are missing dates. In our dataset, if a company did not hire anybody in a given date, it will not appear in `start_dates_sdf`. We suggest you look into `CASE` and `WHEN` statements in the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html).

In [0]:
%%spark
# TODO: Create [raw_hire_train_sdf]
start_dates_sdf.createOrReplaceTempView('start_dates')
query = '''select org, year(start_date) as year,
          max(case when (month(start_date) = 1) then num_employees else 0 end) as jan_hired,
          max(case when (month(start_date) = 2) then num_employees else 0 end)as feb_hired,
          max(case when (month(start_date) = 3) then num_employees else 0 end)as mar_hired,
          max(case when (month(start_date) = 4) then num_employees else 0 end)as apr_hired,
          max(case when (month(start_date) = 5) then num_employees else 0 end)as may_hired,
          max(case when (month(start_date) = 6) then num_employees else 0 end)as jun_hired,
          max(case when (month(start_date) = 7) then num_employees else 0 end)as jul_hired,
          max(case when (month(start_date) = 8) then num_employees else 0 end)as aug_hired,
          max(case when (month(start_date) = 9) then num_employees else 0 end)as sep_hired,
          max(case when (month(start_date) = 10) then num_employees else 0 end)as oct_hired,
          max(case when (month(start_date) = 11) then num_employees else 0 end)as nov_hired,
          max(case when (month(start_date) = 12) then num_employees else 0 end)as dec_hired,
          sum(num_employees) as total_num
          from start_dates
          group by org,year '''
raw_hire_train_sdf = spark.sql(query)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [0]:
%%spark

## AUTOGRADER Step 3.1: Run this to get your score. ##

raw_hire_train_sdf.createOrReplaceTempView("test_3_1")
test_3_1_sdf = spark.sql("SELECT * FROM test_3_1 ORDER BY org DESC, year ASC LIMIT 10")
grader.grade(question_id = "3.1", answer = test_3_1_sdf.take(10))
spark.catalog.dropTempView("test_3_1")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 4/4 points. You are a star!

Score for question 3.1 has been recorded in the backend.

### Step 3.2: The Formidable Filters

Create an sdf called `hire_train_sdf` that contains all the observations in `raw_hire_train_sdf` with `total_num` greater than or equal to 500. The format of the sdf is shown below:

```
+----+-----+----------+---------+----------+----------+
|org |year |jan_hired |   ...   |dec_hired |total_num |
+----+-----+----------+---------+----------+----------+
|IBM |2008 |...       |   ...   |...       |...       |
|IBM |2009 |...       |   ...   |...       |...       |
|... |...  |...       |   ...   |...       |...       |
+----+-----+----------+---------+----------+----------+
```


In [0]:
%%spark

# TODO: Create [hire_train_sdf]

hire_train_sdf = raw_hire_train_sdf.filter(raw_hire_train_sdf.total_num >= 500)
#hire_train_sdf.show(5)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [0]:
%%spark

## AUTOGRADER Step 3.2: Run this to get your score. ##

hire_train_sdf.createOrReplaceTempView("test_3_2")
test_3_2_sdf = spark.sql("SELECT * FROM test_3_2 ORDER BY org DESC, year ASC LIMIT 10")
grader.grade(question_id = "3.2", answer = test_3_2_sdf.take(10))
spark.catalog.dropTempView("test_3_2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 4/4 points. You are a star!

Score for question 3.2 has been recorded in the backend.

### Step 3.3: The Stupendous Stocks

Now we are ready for the stock data. The stock data we will use is saved in the same S3 bucket as `linkedin.json`. Load the data into the EMR cluster. Run the cell below. ***You do not need to edit this cell***.

In [0]:
%%spark

# Load stock data
raw_stocks_sdf = spark.read.format("csv") \
              .option("header", "true") \
              .load("s3a://545emr/stock_prices.csv")

# Creates SQL-accesible table
raw_stocks_sdf.createOrReplaceTempView('raw_stocks')

# Display the first 10 rows
query = '''SELECT *
           FROM raw_stocks'''
spark.sql(query).show(10)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-----------------+----------+
|ticker|    closing_price|      date|
+------+-----------------+----------+
|   AHH| 8.49315452575684|2013-05-08|
|   AHH| 8.47115135192871|2013-05-09|
|   AHH| 8.50782203674316|2013-05-10|
|   AHH| 8.54449367523193|2013-05-13|
|   AHH|8.456483840942381|2013-05-14|
|   AHH| 8.50782203674316|2013-05-15|
|   AHH| 8.61050128936768|2013-05-16|
|   AHH|8.625171661376951|2013-05-17|
|   AHH| 8.60316944122314|2013-05-20|
|   AHH|8.676511764526369|2013-05-21|
+------+-----------------+----------+
only showing top 10 rows

Run the cell below to see the types of the columns in our data frame. These are not correct. We could have defined a schema when reading in data but we will handle this issue in another manner. You will do this in Step 3.4.2.

In [0]:
%%spark 

# Print types of SDF
raw_stocks_sdf.dtypes


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('ticker', 'string'), ('closing_price', 'string'), ('date', 'string')]

### Step 3.4 The Clairvoyant Cleaning

We now want to format the stock data set into the second half of the training table. We will then merge it with `hire_train` based off the common `org` and `year` fields. The formatting will consist of 4 steps. Actually, it is 5.

#### Step 3.4.1 The Ubiquitous UDF

The companies in our stock dataset are defined by their stock tickers. Thus, we would not be able to merge it with the `org` field in `hire_train_sdf`. We must convert them to that format. Often times when using Spark, there may not be a built-in SQL function that can do the operation we desired. Instead, we can create one on our own with a user-defined function (udf).

A udf is defined as a normal Python function and then registered to be used as a Spark SQL function. Your task is to create a udf, `TICKER_TO_NAME()` that will convert the ticker field in `raw_stocks` to the company's name. This will be done using the provided `ticker_to_name_dict` dictionary. We are only interested in the companies in that dictionary.

Fill out the function `ticker_to_name()` below. Then use `spark.udf.register()` to register it as a SQL function. The command is provided. ***You do not need to edit it***. Note, we have defined the udf as returning `StringType()`. Ensure that your function returns this. You must also deal with any potential `null` cases.

In [0]:
%%spark

# TODO: Fill out [ticker_to_name()] and register it as a udf.

# Dictionary linking stock ticker's to their name
ticker_to_name_dict = {'NOK': 'Nokia',
                       'UN': 'Unilever',
                       'BP': 'BP',
                       'JNJ': 'Johnson & Johnson',
                       'TCS': 'Tata Consultancy Services',
                       'SLB': 'Schlumberger',
                       'NVS': 'Novartis',
                       'CNY': 'Huawei',
                       'PFE': 'Pfizer',
                       'ACN': 'Accenture',
                       'DELL': 'Dell',
                       'MS': 'Morgan Stanley',
                       'ORCL': 'Oracle',
                       'BAC': 'Bank of America',
                       'PG': 'Procter & Gamble',
                       'CGEMY': 'Capgemini',
                       'GS': 'Goldman Sachs',
                       'C': 'Citi',
                       'IBM': 'IBM',
                       'CS': 'Credit Suisse',
                       'MDLZ': 'Kraft Foods',
                       'WIT': 'Wipro Technologies',
                       'CSCO': 'Cisco Systems',
                       'PWC': 'PwC',
                       'GOOGL': 'Google',
                       'CTSH': 'Cognizant Technology Solutions',
                       'HSBC': 'HSBC',
                       'DB': 'Deutsche Bank',
                       'MSFT': 'Microsoft',
                       'HPE': 'Hewlett-Packard',
                       'ERIC': 'Ericsson',
                       'BCS': 'Barclays Capital',
                       'GSK': 'GlaxoSmithKline'}

# Fill out ticker_to_name()
from itertools import chain
from pyspark.sql.functions import create_map, lit

def ticker_to_name(ticker):
  
  if ticker in ticker_to_name_dict:
    return ticker_to_name_dict[ticker]
  else:
    return None


# Register udf as a SQL function. DO NOT EDIT
spark.udf.register("TICKER_TO_NAME", ticker_to_name, StringType())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<function ticker_to_name at 0x7f1bffdcb598>

In [0]:
%%spark

## AUTOGRADER Step 3.4.1: Run this to get your score. ##
grader.grade(question_id = "3.4.1", answer = (str(ticker_to_name("GOOGL")),str(ticker_to_name("TSLA"))))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 4/4 points. You are a star!

Score for question 3.4.1 has been recorded in the backend.

#### Step 3.4.2: The Fastidious Filters

With our new `TICKER_TO_NAME()` function we will begin to wrangle `raw_stocks_sdf`.

Create an sdf called `filter_1_stocks_sdf` as follows. Convert all the ticker names in `raw_stocks_sdf` to the company names and save it as `org`. Next, convert the `date` field to a datetime type. As explained before this will help order and group the rows in future steps. Then, convert the type of the values in `closing_price` to `float`. This will take care of the `dtypes` issue we saw in Step 3.3.

Drop any company names that do not appear in `ticker_to_name_dict`. Keep any date between January 1st 2001 and December 4th 2012 inclusive, in the format shown below (note this is a datetime object not a string):

```
+----+------------+--------------+
|org |date        |closing_price |
+----+------------+--------------+
|IBM |2000-01-03  |...           |
|... |...         |...           |
+----+------------+--------------+
```
_Hint_: You will use a similar function to filter the dates as in Step 1.4. In Spark SQL the format for the `date` field in `raw_stocks_sdf` is `"yyyy-MM-dd"`.

In [0]:
%%spark

# TODO: Create [filter_1_stocks_sdf]
query = '''select TICKER_TO_NAME(ticker) as org, to_date(date) as date, float(closing_price) as closing_price
            from raw_stocks WHERE TICKER_TO_NAME(ticker) IS NOT NULL'''
df = spark.sql(query)
# filter the date
df1 = df.filter(df.date >= lit('2001-01-01')).filter(df.date <= lit('2012-12-04'))
# rearrange the dataframe
filter_1_stocks_sdf = df1.select(col('org'), col('date'), col('closing_price'))
filter_1_stocks_sdf.show(10)
print(filter_1_stocks_sdf.dtypes)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+-------------+
|   org|      date|closing_price|
+------+----------+-------------+
|Pfizer|2001-01-02|     24.92212|
|Pfizer|2001-01-03|    23.537558|
|Pfizer|2001-01-04|       22.592|
|Pfizer|2001-01-05|    22.895922|
|Pfizer|2001-01-08|    22.625769|
|Pfizer|2001-01-09|    23.368702|
|Pfizer|2001-01-10|    22.862156|
|Pfizer|2001-01-11|    22.152983|
|Pfizer|2001-01-12|    22.389381|
|Pfizer|2001-01-16|     22.52445|
+------+----------+-------------+
only showing top 10 rows

[('org', 'string'), ('date', 'date'), ('closing_price', 'float')]

In [0]:
%%spark

## AUTOGRADER Step 3.4.2: Run this to get your score. ##

filter_1_stocks_sdf.createOrReplaceTempView("test_3_4_2")
test_3_4_2_sdf = spark.sql("SELECT org, DATE_FORMAT(date, 'yyyy-MM-dd'), closing_price FROM test_3_4_2 ORDER BY org, date, closing_price LIMIT 10")
grader.grade(question_id = "3.4.2", answer = (test_3_4_2_sdf.take(10), filter_1_stocks_sdf.dtypes))
spark.catalog.dropTempView("test_3_4_2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 4/4 points. You are a star!

Score for question 3.4.2 has been recorded in the backend.

#### Step 3.4.3: The Magnanimous Months

The data in `filter_1_stocks_sdf` gives closing prices on a daily basis. Since we are interested in monthly trends, we will only keep the closing price on the **last trading day of each month**.

Create an sdf `filter_2_stocks_sdf` that contains only the closing prices for the last trading day of each month. Note that a trading day is not simply the last day of each month, as this could be on a weekend when the market is closed . The format of the sdf is shown below:

```
+----+------------+--------------+
|org |date        |closing_price |
+----+------------+--------------+
|IBM |2000-01-31  |...           |
|... |...         |...           |
+----+------------+--------------+
```

  _Hint_: This is a **difficult** question. But if you made it this far, you're a star by now. It may be helpful to create an intermediate dataframe that will help you filter out the specific dates you desire.

In [0]:
%%spark
# TODO: Create [filter_2_stocks_sdf]
filter_1_stocks_sdf.createOrReplaceTempView('filter_1_stocks')
query = '''SELECT org, date, date_format(date, 'yyyy-MM') as year_month, closing_price
          FROM filter_1_stocks'''
year_month_sdf = spark.sql(query)

year_month_sdf.createOrReplaceTempView('year_month_sdf')
query2 = '''SELECT MAX(date) as date
            FROM year_month_sdf
            GROUP BY year_month'''
last_day_sdf = spark.sql(query2)

last_day_sdf.createOrReplaceTempView('last_day')
query3 = '''SELECT filter_1_stocks.org, last_day.date, filter_1_stocks.closing_price
            FROM filter_1_stocks
            JOIN last_day
            on filter_1_stocks.date = last_day.date'''
filter_2_stocks_sdf = spark.sql(query3)
filter_2_stocks_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+-------------+
|                 org|      date|closing_price|
+--------------------+----------+-------------+
|            Unilever|2006-02-28|    14.122391|
|       Cisco Systems|2006-02-28|    16.338911|
|     Bank of America|2006-02-28|    35.752632|
|        Schlumberger|2006-02-28|    46.230057|
|              Pfizer|2006-02-28|    15.677175|
|Cognizant Technol...|2006-02-28|     14.16171|
|                HSBC|2006-02-28|     45.15267|
|                 IBM|2006-02-28|     58.62066|
|            Ericsson|2006-02-28|    10.551788|
|         Kraft Foods|2006-02-28|    14.287087|
|    Barclays Capital|2006-02-28|     28.98189|
|                Citi|2006-02-28|      437.391|
|              Google|2006-02-28|    181.49149|
|      Morgan Stanley|2006-02-28|    40.475765|
|   Johnson & Johnson|2006-02-28|    40.029995|
|     GlaxoSmithKline|2006-02-28|    27.388903|
|           Accenture|2006-02-28|    25.491789|
|                  BP|2006-02-28|     35

In [0]:
%%spark

## AUTOGRADER Step 3.4.3: Run this to get your score. ##

filter_2_stocks_sdf.createOrReplaceTempView("test_3_4_3")
test_3_4_3_sdf = spark.sql("SELECT org, DATE_FORMAT(date, 'yyyy-MM-dd'), closing_price FROM test_3_4_3 ORDER BY org, date LIMIT 10")
grader.grade(question_id = "3.4.3", answer = (test_3_4_3_sdf.take(10), filter_2_stocks_sdf.dtypes))
spark.catalog.dropTempView("test_3_4_3")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 4/4 points. You are a star!

Score for question 3.4.3 has been recorded in the backend.

#### Step 3.4.4: The Rambunctious Reshape

Now, we will begin to shape our dataframe into the format of the final training sdf.

Create an sdf `filter_3_stocks_sdf` that has for a single company and a single year, the closing stock price for the last trading day of each month in that year. This is similar to the table you created in Step 3.1. In this case since we cannot make a proxy for the closing price if the data is not avaliable, drop any rows containing any `null` values, in any column. The format of the sdf is shown below:

```
+----+-----+----------+---------+----------+
|org |year |jan_stock |   ...   |dec_stock |
+----+-----+----------+---------+----------+
|IBM |2008 |...       |   ...   |...       |
|IBM |2009 |...       |   ...   |...       |
|... |...  |...       |   ...   |...       |
+----+-----+----------+---------+----------+
```


In [0]:
%%spark

# TODO: Create [filter_3_stocks_sdf]
filter_2_stocks_sdf.createOrReplaceTempView('filter_2_stocks')
query = '''select org, year(date) as year,
           max(case when(month(date) = 1) then closing_price else Null end) as jan_stock,
           max(case when(month(date) = 2) then closing_price else Null end) as feb_stock,
           max(case when(month(date) = 3) then closing_price else Null end) as mar_stock,
           max(case when(month(date) = 4) then closing_price else Null end) as apr_stock,
           max(case when(month(date) = 5) then closing_price else Null end) as may_stock,
           max(case when(month(date) = 6) then closing_price else Null end) as jun_stock,
           max(case when(month(date) = 7) then closing_price else Null end) as jul_stock,
           max(case when(month(date) = 8) then closing_price else Null end) as aug_stock,
           max(case when(month(date) = 9) then closing_price else Null end) as sep_stock,
           max(case when(month(date) = 10) then closing_price else Null end) as oct_stock,
           max(case when(month(date) = 11) then closing_price else Null end) as nov_stock,
           max(case when(month(date) = 12) then closing_price else Null end) as dec_stock
           FROM filter_2_stocks
           GROUP BY org, year(date)
        '''
filter_3_stocks_sdf = spark.sql(query)
filter_3_stocks_sdf = filter_3_stocks_sdf.dropna()
filter_3_stocks_sdf.show()




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+
|                 org|year|jan_stock|feb_stock|mar_stock|apr_stock|may_stock|jun_stock|jul_stock|aug_stock|sep_stock|oct_stock|nov_stock| dec_stock|
+--------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+
|                Citi|2005| 460.4375|448.34967|422.23044|441.62225|443.03284|434.75717|409.48947| 412.0311|428.50482|430.95242| 457.4699|  457.2813|
|  Wipro Technologies|2007|4.0273743|3.6900146|3.6909454|3.8690486|3.8128057|3.6499178|3.4269323|3.4339743| 3.389377|3.8780794| 3.414217|  3.494274|
|                  BP|2011|30.924541|31.862003|29.015654|30.330366|30.682598|29.388681|30.151766|26.391449|24.167034|29.600761|29.468372| 28.920296|
|              Oracle|2008|18.352621|16.789745|17.468481|18.620543| 20.39775|18.754501|19.227835|19.585062

In [0]:
%%spark

## AUTOGRADER Step 3.4.4: Run this to get your score. ##

filter_3_stocks_sdf.createOrReplaceTempView("test_3_4_4")
test_3_4_4_sdf = spark.sql("SELECT * FROM test_3_4_4 ORDER BY org, year LIMIT 10")
grader.grade(question_id = "3.4.4", answer = test_3_4_4_sdf.take(10))
spark.catalog.dropTempView("test_3_4_4")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 4/4 points. You are a star!

Score for question 3.4.4 has been recorded in the backend.

#### Step 3.4.5: The Decisive Direction

The final element in our training set is the binary output for each case, i.e. the `y` label. 

Create an sdf `stocks_train_sdf` from `filter_3_stocks_sdf` with an additional column `direction`. This should be the direction of percentage change in the closing stock price, i.e. `1` for positive or `-1` for negative, in the first quarter of a given year. The quarter of a year begins in January and ends in April, inclusive. We want to know the percent change between these two months. Reference Step 2.2 for the percent change formula. The format of the sdf is shown below:

```
+----+-----+----------+---------+----------+-------------+
|org |year |jan_stock |   ...   |dec_stock |direction    |
+----+-----+----------+---------+----------+-------------+
|IBM |2008 |...       |   ...   |...       |1.0          |
|IBM |2009 |...       |   ...   |...       |-1.0         |
|... |...  |...       |   ...   |...       |...          |
+----+-----+----------+---------+----------+-------------+
```

In [0]:
%%spark

# TODO: Create [stocks_train_sdf]
filter_3_stocks_sdf.createOrReplaceTempView("initial_sdf")
query = '''SELECT org, year,
          jan_stock, feb_stock, mar_stock, apr_stock, may_stock, jun_stock, jul_stock, aug_stock, sep_stock, oct_stock, nov_stock, dec_stock,
          (apr_stock - jan_stock) as direction
          FROM initial_sdf
        '''
stocks_train_sdf = spark.sql(query)

stocks_train_sdf.createOrReplaceTempView("difference_sdf")
query2 = '''SELECT org, year,
          jan_stock, feb_stock, mar_stock, apr_stock, may_stock, jun_stock, jul_stock, aug_stock, sep_stock, oct_stock, nov_stock, dec_stock,
          (case when(direction > 0) then 1 else -1 end) as direction
          FROM difference_sdf
        '''
stocks_train_sdf = spark.sql(query2)
print(stocks_train_sdf.dtypes)
stocks_train_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('org', 'string'), ('year', 'int'), ('jan_stock', 'float'), ('feb_stock', 'float'), ('mar_stock', 'float'), ('apr_stock', 'float'), ('may_stock', 'float'), ('jun_stock', 'float'), ('jul_stock', 'float'), ('aug_stock', 'float'), ('sep_stock', 'float'), ('oct_stock', 'float'), ('nov_stock', 'float'), ('dec_stock', 'float'), ('direction', 'int')]
+--------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+---------+
|                 org|year|jan_stock|feb_stock|mar_stock|apr_stock|may_stock|jun_stock|jul_stock|aug_stock|sep_stock|oct_stock|nov_stock| dec_stock|direction|
+--------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+---------+
|  Wipro Technologies|2007|4.0273743|3.6900146|3.6909454|3.8690486|3.8128057|3.6499178|3.4269323|3.4339743| 3.389377|3.8780794| 3.414217|  3.494274|       -1|
|                

In [0]:
%%spark

## AUTOGRADER Step 3.4.5: Run this to get your score. ##

stocks_train_sdf.createOrReplaceTempView("test_3_4_5")
test_3_4_5_sdf = spark.sql("SELECT * FROM test_3_4_5 ORDER BY org, year LIMIT 10")
grader.grade(question_id = "3.4.5", answer = test_3_4_5_sdf.take(10))
spark.catalog.dropTempView("test_3_4_5")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 4/4 points. You are a star!

Score for question 3.4.5 has been recorded in the backend.

### Step 3.5: The Capricious Combination

Now that we have individually created the two halfs of our training data we will merge them together to create the final training sdf we showed in the beginning of Step 3.

Create an sdf called `training_sdf` in the format of the one shown at the beginning of Step 3. Note that in our definition for the `stock_result` column, the `stock_result` value for a particular year corresponds to the direction of the stock percentage change in the **following** year. For example, the stock_result in the `2008` row for `IBM` will contain the direction of IBM's stock in the first quarter of 2009. The format of the sdf is shown below:
```
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|org |year |jan_hired |   ...   |dec_hired |jan_stock |   ...   |dec_stock |stock_result |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|IBM |2008 |...       |   ...   |...       |...       |   ...   |...       |-1.0         |
|IBM |2009 |...       |   ...   |...       |...       |   ...   |...       |1.0          |
|... |...  |...       |   ...   |...       |...       |   ...   |...       |...          |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
```

In [0]:
%%spark

# TODO: Create [training_sdf]
stocks_train_sdf.createOrReplaceTempView("stocks_train_sdf")
query = '''SELECT org, year-1 as year,
          direction as stock_result
          FROM stocks_train_sdf
          '''
merge_sdf = spark.sql(query)

merge_sdf.createOrReplaceTempView("merge_sdf")
query1 = '''SELECT
            stocks_train_sdf.org, stocks_train_sdf.year,
            stocks_train_sdf.jan_stock, stocks_train_sdf.feb_stock, stocks_train_sdf.mar_stock, stocks_train_sdf.apr_stock, 
            stocks_train_sdf.may_stock, stocks_train_sdf.jun_stock, stocks_train_sdf.jul_stock, stocks_train_sdf.aug_stock, 
            stocks_train_sdf.sep_stock, stocks_train_sdf.oct_stock, stocks_train_sdf.nov_stock, stocks_train_sdf.dec_stock, 
            merge_sdf.stock_result
            FROM stocks_train_sdf
            JOIN merge_sdf
            on stocks_train_sdf.org = merge_sdf.org AND stocks_train_sdf.year = merge_sdf.year
          '''
actual_stocks_sdf = spark.sql(query1)

actual_stocks_sdf.createOrReplaceTempView("actual_stocks_sdf")
hire_train_sdf.createOrReplaceTempView("hire_sdf")
query2 = '''SELECT 
            hire_sdf.org, hire_sdf.year, hire_sdf.jan_hired, hire_sdf.feb_hired, hire_sdf.mar_hired, hire_sdf.apr_hired,
            may_hired, jun_hired, jul_hired, aug_hired, sep_hired, oct_hired, nov_hired, dec_hired,
            actual_stocks_sdf.jan_stock, actual_stocks_sdf.feb_stock, actual_stocks_sdf.mar_stock, actual_stocks_sdf.apr_stock,
            actual_stocks_sdf.may_stock, actual_stocks_sdf.jun_stock, actual_stocks_sdf.jul_stock, actual_stocks_sdf.aug_stock,
            actual_stocks_sdf.sep_stock, actual_stocks_sdf.oct_stock, actual_stocks_sdf.nov_stock, actual_stocks_sdf.dec_stock,
            actual_stocks_sdf.stock_result
            FROM hire_sdf
            JOIN actual_stocks_sdf
            on hire_sdf.year = actual_stocks_sdf.year AND hire_sdf.org = actual_stocks_sdf.org
'''
training_sdf = spark.sql(query2)

training_sdf.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+------------+
|                 org|year|jan_hired|feb_hired|mar_hired|apr_hired|may_hired|jun_hired|jul_hired|aug_hired|sep_hired|oct_hired|nov_hired|dec_hired|jan_stock|feb_stock|mar_stock|apr_stock|may_stock|jun_stock|jul_stock|aug_stock|sep_stock|oct_stock|nov_stock|dec_stock|stock_result|
+--------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+------------+
|           Microsoft|2006|       96|       37|       65|       57|      116|      107|      119|       72|       68|       62|       49|       42|21.069159|

In [0]:
%%spark

## AUTOGRADER Step 3.5: Run this to get your score. ##

training_sdf.createOrReplaceTempView("test_3_5")
test_3_5_sdf = spark.sql("SELECT * FROM test_3_5 ORDER BY org, year LIMIT 10")
grader.grade(question_id = "3.5", answer = test_3_5_sdf.take(10))
spark.catalog.dropTempView("test_3_5")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! Earned 4/4 points. You are a star!

Score for question 3.5 has been recorded in the backend.

## Step 4: Running the Model

Well here we go. Who's ready to make some money? Well... it's not gonna happen. We didn't code the random forest model, sorry blame ESE 530. The remainder of the course will be about machine learning and we will learn how to take this beautiful data and make billions of dollars, here is some cool ASCII for the meantime.

One last thing, as I predicted before, you're a star.

```

``````````````````````````````````....-:/+ooooo++/:-..``````````````................://-` ``````````
``````````````````````````````.-/syhhhhhdhhhhhdhhhhhyssso+/-.```````................://-` ``````````
`````````````````````````````.+syhhhhdddddmmddhdddhyyyhhhhhhyo+:.```................://-` ``````````
`````````````````````..-:/+++osyhddhddddmmmmdddddddhhhyhddhdddhhhs:.................://-` ``````````
``````````````````.-/osssyyyyyyyhyyyhdddmmmmmmmmddmhhdddyhddddmdhddyo:......`.......://.` ``````````
`````````````````-/oyyhhdmmmdddddddddhhhdddmmdmmmdmmmdddddhhddhhddhdmy+-....`.......://-` ``````````
```````````````./syhhhddmmmmmNNNmNmmmmmmmddddddddmmmmNmdmmddddmdhhddhdds:...........://-` ``````````
`````````````./syyyyhhhdddmmmmmNmmmmmmmmmmmdddddhddmddmmmmmmmdddmdhhddyhh+-.........://.` ``````````
````````````-oyysyyyhhhhhhhdddmmmdddddmmmmmddddmdddddddmmmmmmmmmdddhyddhhdy:.......-///.` ``````````
```````````/yyssyhhhdddhyyssyyyyyyyyyyyhhhhdddddmddddhydmmmmmmmmmmhhyhhhdhyy:.......///.` ``````````
`````````./yyssyydddddhysooooooooooossssssyyhhhhddmdddmhhmmmmmmmddmdhhhhhddhy-......///.````````````
````````./yhssyhddmmdyso+++++++oooooooooosssyhhhhdddddmmdmmNNNmmmmddmddhhhhdho......///-````````````
````````.shysydhmdNmyoo+++++++++++++++ooooossyyhhhhhddmmmmdNNNNNmmmdhdmdhhhhyy:....-///-````````````
````````.ohyyhddmmmyoo++++++++++++++++++ooossssyyyyyhhdmddmmmNNNNmdmdhhddhhhyy:....-///-````````````
```````..+yyyddmmmdso+++++///+++++++++++oooosssssyyyyyhdmmdmmmmmmNmddhyhdhyysy+....-///-````````````
````````.:oyyhhmmmho++//////+/++++++++++ooooossssssyyyhhddmmmNNmmmmmmdhhhddyhss....-///-````````````
..```````./syyhdmmyo++/////++/+++++++++++oooossssyyhhyhhhhddmNNNNNmmmddmdhmhyss-...-///-````````````
`..``````..+yyhddmy+++/////////+/////++++oosyyhhddmmmmddhhhhddmNNNNmdhhmmdmhsso-...-///-````````````
......`````-+hydddsooo+++++++///////++osyhmmmmNNNNNNNmmmmdhhhddmNNNNmhhdmmdhss+-...-///.````````````
......`````..:syddhyhhhdddhyyso+++++ossyddmddhhyyyhhhhhhddhhhhhdmNNNmhydmmdhyo+-...-//:.````````````
......`````.`.:ohddddddhhhhhyysoooooosyyhyyyssssyyyyyyyhhhhhyyyyhdmmmdhdmNdhhs+-...-///.````````````
``````````````./yhyyoo++oossyssso++oosyhyysyhhhhddmdddhyhdhsyyssyhdddddddddddy+:...-///-````````````
`````````````.:++/oo++osyhhhhyyyysosydddhhhddyshdmmddddhdddhhyssssyyhhhhhhhhdhs:...-///-.```````````
````````````.-++::/+shyoddmmhhyhyo+oshmmmhhhhyyyyssyyyyyyyhyyyyssoosyyyhhhyyyhs:...-///-.```````````
``````......``./:://oooossyyyosso++osyhddhsssssssssssssssyyyyyysoo+oosyhddhysss:...-///:.``....`````
``````..........:://+++oooooo+oo++oosyhhhhyssoooooooooossssssyysooo+osyyhhhyyss+...-///:.``......`..
`````````........-/+//++++++++++++oosyyyyhhyyssoooo+oooossssssssooooosyyyhyyyyyo-..-///:.``.........
````````````````..://///++++++//+++osyyysysyhhys+++++++oosssssssssooosyyhhyssyh+...-///:.``.........
`````````````````.///////++++osssosyhddmmhsyhdhyso++++ooossssssssssoosyyyyssshd:...-///-.``.........
`````````````````.///////++++sysssossyyyyyyyhyyyssoooooosssyyyyysssoossyysosdNh:...-///-.```....````
````````..````````://///+++/+++ooo+oooosssyyyyyyyyssooosssyyyyyysssoossyyyhNNms:...-///-````````````
```````....```````-//+//+++++++++++++++ooossyyyyhhhyyssssyyyyyyyysssossyhmMMNmo-...-///.````````````
```````....```````./++//+++++++++++++++oossssssyhhhhyyyyssyyyssyyssssosyymMNNm+....-///-````````````
````````...````````-///+++++++oooooosssyyyyhyhhhdhdhhyyyssyyyssyyyyssssssdMMNm+....-///-````````````
```   `````````````.////++ooyhy++////+//oooyosdydmmhysoooyyhsosyyyyyssssshMMNm+....-///-````````````
```    `````````````-////+oshmyosso++ooosyhdhhmdddhsoooosyyyosyyyhyyyysssyNMMm/....-///-````````````
````````````````````.:////+++osooooooosssyyhhhhhyyysosssyhyssyhhhhhyyyyysymdh/.....-///-.````...````
`.```````````````````.////+++//++++ooossssssssssyyssssyyhyssyhhhhhhhhyyyso:-.......-///-..``.`.`````
`.````````````````````./++++++//+++ooooooosssssyyssssyhhhyyhdddhhdhhhhyys+:----....-///-..``...`````
..`````````````````````./++//++++++++oooooosssysssssyyhhhhddddddddddhhyyssmmmmd/...-///-.```..``````
..````````````````````...:++/////++++++++ooosssssssyhhhhhdddddddddddhhyyyydNNmyo:..-///-.```..``````
..````````````````````.....:///////+++++ooossssssyhhhhhddddddmdddddhhhhhhddhhyso+-.-///-..``..``````
`.``````` `````````````...``./+/+++++ooooosssyyyhdddddddddmmmdddhhyyhhhddhyyyysso/--///:..``....````
`.``````` `````````````..`````-+oooosssssyyyhhdddddddmmmmmmddhhhyyyhhhhyyyyyyyssoo/-/+/:..``.....```
..``````  `````````````````````.-+yyyhhhhhhhddddddmmmmdddddhhyyyyyyyyyyyyyyyyysssoo+/oo:..``.....```
..````     ```````````````````````./syhhhhhdddddmmddddddhhyyysyyyysssssssssysysssssyhdmy:.``.....```
.````       ```````````````````````.+osyyhhhhhhhhhhhyyyyyssssssssssssssssssssssssydmNNNmmhs:....````
.`````      ``````````````````````-+y+oooossssssyyyssyyyssosssssssossssssssssssyydMNNNNNmmddy/-.````
....```    `````````````````````.:osss++++oooossssssyyyssoooooooooossssssoosssyyhNNNNNNNNNNNmdhs:```
```

```
                                             ``.--::::::-`                                          
                                       `.-:/+syyhyyyyssssso:.--.`                                   
                                   .:/+++osyhhhhhyyyyyyyyyyyyyyyyo/-`                               
                                `/++//+osyhhdhhhhhhhhhhhhhddhsoosyhhy/`                             
                              .+ss+//+syhhhhddddddddddhhhhhhsooosyyhhhy:                            
                            .+yhs++/+osyyyhhhhddhhhyyysysyssooossyyyhdhhs:``                        
                          .+syhyo+/+ossosssshdhyoo+/////+++++ossyyhhhhddhhs:``                      
                        `:ooshhso++ossooooosyyo/::--------::/+osyhhhddddddhhs-`                     
                       ./++ohhys++ooss++oooso/:---..--------::/osyhhdhhdddddhy/`                    
                     `/+///syys++osyoooo+oo+:-.......-------:::/+oyhdddddhhhhhs/`                   
                   `:so++++ssssoossooss+o+:-..........-------:::/++shddddddhhhys/`                  
                 `.+ssoooo+ooosso+osys++/-...`.......-------::::///+shhdmdddhyyyy:                  
              ``.:+ooossyso+oooooosss+/-..............------::::://+syhhdmmdhhhyyo`                 
              `:+oosssyyss+/o+ossooo+:-...`...........-------:::://+oyyhhdddhyyyyo`                 
              `:oosssyysyo//+oosooo+/.................-------:::://+oyyyhhdhddyoss-                 
             `.+ooossssss+++ooooso+/:.................------::::///+oyyyyhhhydhsss/``               
            ` :oo++++osoo/++ooooo++/-.................-------:::///+osyyyyhhohdysso:-.`` ````       
             `+osso++++++/+++oos+//:-..............----:::/+ooooossssyyyyyhhyddhyso+oo++////:-`     
            `/osyyyo++++///++sso+/+/://+oooo+/:------:/+syyhhyssssssyyhhhhhdmmdhhyoooyhhysoooo:`    
          `.:+ssyyyyssso+//++sso+++:///+oosoo+/:-----/shhhyo+/++osyhyyhhhhhdmmmhhysoosddhyso+/:.`   
         `.-:ossyyhhyyyyssoooyssooo+//++ssoo+++/-..-:ohhs++++shhhyyhhyyyyhhhddmdhyyssshddhys/-.`    
      ....-:+oossyhhyyhhhyyysyyyyyyso+yyho/+/:---...:syo///::/+oooshhysooyhhddmdhyyyyyydddy+/:.     
     `.::::/+++ossssyyhhhhyhyhhhyys+:-:::///:----...-+so+////+++oossso+/+yhddmmmmddyyyhhdhy/--`     
      -::/+++ooo++ooosyyyhhhhhhhys+/::://::--....-..-/so//::::://///////+yhdmmmmdddhyyyhhys:`-`     
      ./++++oo++++++++ossyyyyhhys+/:-...............-:oso/::::-----:://+syhdmmmddhhdyyyyyso```      
     .::/+++oosssssoo++++ossyyyo/-..................--+sso/::-------://oyhhdmmdddddddyyyyo-`        
    `/.:+oooosooosyyyyysssooss+:--..`.........--.....-/osos+/:------:/+syhddmmmddmmmddyyo+`         
    `/-/+ossssso+osyhhhhhyyhdy/:---.......----.--....-/sysys++/:::::/+oyhdddmmmmmddddhhs/.          
     `  `:oyhyso+osyyyhhhhhddy/:-------://:---:://--:/yddddyo++//++++syddhddmmmmmmmmddh+            
          .ss+ooosyyhhyhhhhhhs/::::-:/++:------/+++oshdmdhysoooooo/+oyhhhhhmNmmmmmmmdh+`            
          `/o-+oosyyyhhhhddddo/::::::++::--------::+osssssosssyyo+:/syyyhhdmmmmmddhyy/              
           `:+:-:/+syyyhhddddy/:--::-///:+oo+///::://+osyyhhdhs+/--ohssyhhmmmmmmddhs-               
            `.:/:-:oyhhhdddmmd+/:--::---..-://///:://///+ossso+/::oysoshhdddmmddhhs`                
              ``  -yhhhhddddddh+/:---:--...---::::::://+osssso+//oyysyhhdmhyhsohs.                  
                  .oysyyhhhddddh+/::-----...-----:::///++ooo++/+yhyyhdhhmmmmmdhy:                   
                  `.:+oossssyhdhh+//:::---...----::::////+++//oyhhhhdhhh++//:-`                     
                    ``--.`..-:+/s+++/::::--....-------::///:/+shhhhhhddhh/``                        
                      ````````--://+++/:::--........---:::-:+syddhhddddh/-`                         
                      `````````..:::/+o+/:::---..---:-::://+sydddddmdddh.                           
                       ``````````:::::/+o+//:::::::::://++oyhddddmmmdddh.                           
                      ```````````::---::/+oo+//////+++oosyhddddmmmmmdddh.                           
                      ``````````-::-----::/+oossyyyyyyyhhdddmmmmmmdddddh.                           
                     .:+ssooo/::::--------:://++osyhhhddmmmmmmmmddddddhh:`                          
                    -ooydhyo/:::::---------::::://++ossyyhhhddddddddhhhhhdy/`                       
                  `-oooyy+::----:----------:::::///++++osyyhhhddddhhhhhhhmmmdo.                     
                `-+ooooo+--------------------::://///++osyyyhhhdhhhhhhhhhddmmdh/`                   
             `-/+ooooooo+-.....----------------:::///++ossyyyyhhhhhhhhhhhddhdddhs:.                 
          `-/oooooooooooo:.....----------------:::::://++ooosyhhhyyhhhhhhdhhyhhhyyys:`              
       `-/+oooooooooooooo+-.......--------------::::::///++osyhhyyhhhhhhhhhhysyyyyyhhy/. 

```