In [0]:
# [
#  {
#      "Classification": "spark-env",
#   "Configurations": [
#                      {
#                          "Classification": "export",
#                       "Properties": {
#                           "PYSPARK_PYTHON": "/usr/bin/python3"
#                       }
#                      }
#   ]
#  }
# ]

# arbabaga/s3PennGrader.sh

# CIS 545 Homework 3: Spark SQL
### **Due Date: March 27, 2020**

Welcome to CIS 545 Homework 3! In this homework you will gain a mastery of using Spark SQL. By the end, you'll be a star (not that you aren't already one). Over the next few days you will be using an EMR cluster to use Spark to manipulate the  `linkedin_small_real.json` dataset  as well as the `stocks.csv`.

The goal of the homework will be to create a training dataset for a Random Forest Machine learning model.  Yes, you'll be playing with machine learning shortly!

The training data set will contain the monthly number of employees hired by companies in `linkedin_small_real.json` and their corresponding closing stock prices over a 10 year period (2000-2011). We will try and predict, based on this data, if the company will have a positive or negative growth in stock in the first quarter of the next year. Who's ready to make some money?

## The Necessary Notes and Nags
Before we begin here are some important notes to keep in mind,


1.   **IMPORTANT!** I said it twice, it's really important. In this homework, we will be using AWS resources. You are given a quota ($75) to use for the entirety of the homework. There is a small chance you will use all this money, however it is important that at the end of every session, you **shut down your EMR cluster**.
2.   **Be sure you use Google Colab for this Homework** since we must connect to the EMR cluster and local Jupyter will have issues doing that. Using a Google Colab Notebook with an EMR cluster has two important abnormalities:
    * The first line of any cell in which you will use the spark session must be `%%spark`. Notice that all cells below have this.
    * You will, unfortunately, not be able to stop a cell while it is running. If you wish to do so, you will need to restart your cluster. See the Setup EMR Document for reference.
3.   You are **required** to use Spark SQL queries to handle the data in the assignment. Mastering SQL is more beneficial than being able to use Spark commands (functions) as it will show up in more areas of programming and data science/analytics than just Spark. Use the following [function list](https://spark.apache.org/docs/latest/api/sql/index.html#) to see all the SQL functions avaliable in Spark.
4.   Throughout the homework you will be manipulating Spark dataframes (sdfs). We do not specify any ordering on the final output. You are welcome to order your final tables in whatever way you deem fit. We will conduct our own ordering when we grade.
5. Based on the challenges you've faced in the previous homework, we are including information on the expected schema of your results.  Apache Spark is very fiddly but we hope this will help.
6. There are portions of this homework that are hard. We urge you start early to come to office hours and get help if you get stuck. But don't worry, I can see the future, and you all got this.

With that said, let's dive in.




## Step 0: Set up EMR

Your first task is to create an EMR (Elastic MapReduce) cluster your AWS Educate Accounts. Please see the [attached document](https://cis545hw1data.s3.amazonaws.com/Microsoft+Word+-+cis545_setup_emr+(2).doc.pdf) for detailed instructions.

**There is one very important addition to the instructions:  when you get to page 13 and want to `Create a Cluster`, you need to first click on 'Block public access' (left side of the screen), then *Change* `Block public access` to *Off*.**  Otherwise your cluster will consistently fail to spin up!

Move on to Step 0.1 after you have completed all the steps in the document.

![ACME GIANT RUBBER BAND](https://pbs.twimg.com/media/DRqbJh7UMAE2z4o?format=jpg&name=4096x4096)


### Step 0.1: The Superfluous Setup

Run the following two cells. These will allow your colab notebook to connect to an use your EMR.

In [0]:
%%capture
!apt update
!apt install gcc python-dev libkrb5-dev
!pip install sparkmagic

In [0]:
%%capture
%load_ext sparkmagic.magics 

### Step 0.2: The Sharp Spark

Now, connect your notebook to the EMR cluster you created. In the first cell, copy the link to the Master Public DNS specified in the setup document. You will need to add `http://` to the beginning of the address and the port to the end. The final format should be,

`http://<your-DNS-link>:8998`

For example, if my DNS (directly from the AWS EMR console) is `ec2-3-15-237-211.us-east-2.compute.amazonaws.com` my address would be,

`http://ec2-3-15-237-211.us-east-2.compute.amazonaws.com:8998`

Insert this in the `# TODO # below`. For our example, the cell would read,



```
%spark add -s spark_session -l python -u http://ec2-3-15-237-211.us-east-2.compute.amazonaws.com:8998
```

In [4]:
# TODO: Enter your Master Public DNS with the proper formatting and host

#Change to your instance

%spark add -s my_session1 -l python -u http://ec2-54-81-135-236.compute-1.amazonaws.com:8998

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1585286545888_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


In [0]:
# If you ever need to restart, you may need to...
#%spark delete -s my_session
#OR just factory reset runtime under the runtime tab

Enter your 8-digit Penn ID as an integer in the cell 
below. This will be used in the autograder.

In [6]:
%%spark
from penngrader.grader import *
STUDENT_ID = ******** #TODO: Add 8 digit Penn ID

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
%%spark
grader = PennGrader(homework_id = 'CIS545_Spring_2020_HW3', student_id = STUDENT_ID)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PennGrader initialized with Student ID: ********

Make sure this correct or we will not be able to store your grade

Run the following cell to setup the autograder, make sure to have set your 8 digit Penn ID in the cell above. It will also import all the modules you need for the homework.

_Note_: Since we are using an EMR cluster we will only have access to some of modules that exist for Python, meaning things like `pandas`, `numpy`, etc. may not all be available. We have written the entire homework such that the solution does not require any of these. Don't try to import them. It won't work.

## Step 1: Data Wrangling, Cleaning, and Shaping

![Data Wrangler](https://compendiumofcountries.org/wiki/images/6/68/Data_Wrangler_-_Gaucho.png)

The data you will use is stored in an S3 bucket, a cloud storage service. You now need to download it onto the nodes of your [EMR cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-what-is-emr.html). 

### Step 1.1: The Stupendous Schema

When loading data, Spark will try to infer it's structure on it's own. This process is faulty because it will sometimes infer the type incorrectly. JSON documents, like the one we will use, can have nested types, such as: arrays, arrays of dictionaries, dictionaries of dictionaries, etc. Spark's ability to determine these nested types is not reliable, thus you will define a schema for `linkedin_small_real.json`.

A schema is a description of the structure of data. You will be defining an explicit schema for `linkedin_small_real.json`. In Spark, schema's are defined using a `StructType` object. This is a collection of data types, termed `StructField`'s, that specify the structure and variable type of each component of the dataset. For example, suppose we have the following simple JSON object,


```
{
 "student_name": "Craig Fan",
 "GPA": 1.4,
 "courses": [
    {"department": "Computer and Information Science",
     "course_id": "CIS 545",
     "semester": "Fall 2019"},
    {"department": "Computer and Information Science",
     "course_id": "CIS 555",
     "semester": "Fall 2019"}
 ],
 "grad_year": 2020
 }
```

We would define its schema as follows,

```       
schema = StructType([
           StructField("student_name", StringType(), nullable=True),
           StructField("GPA", FloatType(), nullable=True),
           StructField("courses", ArrayType(
                StructType([
                  StructField("department", StringType(), nullable=True),
                  StructField("course_id", StringType(), nullable=True),
                  StructField("semester", StringType(), nullable=True)
                ])
           ), nullable=True),
           StructField("grad_year", IntegerType(), nullable=True)
         ])
```


Each `StructField` has the following structure: `(name, type, nullable)`. The `nullable` flag defines that the specified field may be empty. Your first task is to define the `schema` of `linkedin_small_real.json`. A smaller version of the JSON dataset can be found [here](https://drive.google.com/a/seas.upenn.edu/file/d/1yZ_0xz6uSJ8lAxhGzn2BVjCpDOjagcqb/view?usp=sharing).

_Note_: In `linkedin_small_real.json` the field `specilities` is spelled incorrectly. This is **not** a typo. (Well, it is, but it's a typo in the raw data, and we have to live with what we're given.)

There is also no grading cell for this step.  But your JSON file won't load if it's wrong, so you have a way of testing.

Please fill in the TODOs below.



In [8]:
%%spark

from pyspark.sql.types import *

# TODO: Finish defining the linkedin_small_real.json schema
# We've provided most of the fiddly details, but you'll
# need to fill in the **name** and the ** experience **!

schema = StructType([
    StructField("_id", StringType(), nullable=True),
    
    # TODO: fill in the necessary structure for the education (Don't forget the comma at the end)!
    StructField("education", ArrayType(
      StructType([
          StructField("start", StringType(), nullable=True),
          StructField("major", StringType(), nullable=True),
          StructField("end", StringType(), nullable=True),
          StructField("name", StringType(), nullable=True),
          StructField("desc", StringType(), nullable=True),
          StructField("degree", StringType(), nullable=True)  
      ])), nullable=True),

    StructField("group", StructType([
          StructField("affilition", ArrayType(StringType()), nullable=True),
          StructField("member", StringType(), nullable=True)
    ]), nullable=True), 

    # TODO: fill in the necessary structure for the name (Don't forget the comma at the end)!
    StructField("name", StructType([
          StructField("family_name", StringType(), nullable=True),
          StructField("given_name", StringType(), nullable=True)  
    ]), nullable=True),

    StructField("locality", StringType(), nullable=True),
    StructField("skills", ArrayType(StringType()), nullable=True),
    StructField("industry", StringType(), nullable=True),
    StructField("interval", IntegerType(), nullable=True),

    # TODO: fill in structure for experience (Don't forget the comma at the end)!
    StructField("experience", ArrayType(
      StructType([
          StructField("org", StringType(), nullable=True),
          StructField("title", StringType(), nullable=True),
          StructField("end", StringType(), nullable=True),
          StructField("start", StringType(), nullable=True),
          StructField("desc", StringType(), nullable=True) 
      ])
    ), nullable=True),

    StructField("summary", StringType(), nullable=True),
    StructField("interests", StringType(), nullable=True),
    StructField("overview_html", StringType(), nullable=True),
    StructField("specilities", StringType(), nullable=True),
    StructField("homepage", ArrayType(StringType()), nullable=True),
    StructField("honors", ArrayType(StringType()), nullable=True),
    StructField("url", StringType(), nullable=True),
    StructField("also_view", ArrayType(
      StructType([
          StructField("id", StringType(), nullable=True),
          StructField("url", StringType(), nullable=True)
      ])
    ), nullable=True),
    StructField("events", ArrayType(
      StructType([
          StructField("from", StringType(), nullable=True),
          StructField("to", StringType(), nullable=True),
          StructField("title1", StringType(), nullable=True),
          StructField("start", StringType(), nullable=True),
          StructField("title2", StringType(), nullable=True),
          StructField("end", IntegerType(), nullable=True)
      ])), nullable=True)
])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 1.2: The Langorous Load

Load the `linkedin_small_real.json` dataset from your S3 bucket into a Spark dataframe (sdf) called `raw_data_sdf`. If you have constructed `schema` correctly `spark.read.json()` will read in the dataset. ***You do not need to edit this cell***.

If this doesn't work, go back to the prior cell and update your `schema`!

In [9]:
%%spark

raw_data_sdf = spark.read.json("s3a://penn-cis545-files/linkedin_small_real.json", schema=schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
%%spark

import pandas as pd

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The cell below shows how to run SQL commands on Spark tables. Use this as a template for all your SQL queries in this notebook. ***You do not need to edit this cell***.

In [11]:
%%spark

# Create SQL-accesible table
raw_data_sdf.createOrReplaceTempView("raw_data")

# Declare SQL query to be excecuted
query = '''SELECT * 
           FROM raw_data LIMIT 10'''

# Save the output sdf of spark.sql() as answer_sdf and convert to Pandas
answer_sdf = spark.sql(query).toPandas()

to_submit = pd.read_json(answer_sdf.to_json())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
 #TODO Grader
%%spark

grader.grade(test_case_id = 'first', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 2/2 points. You are a star!

Your submission has been successfully recorded in the gradebook.

In the next cell, create `step_1_2_sdf` to fetch the data from the above table, returning rows with schema `(_id, given_name)`, in **lexicographical order** of `_id`.  (Note `_id` is a string as opposed to an int.) Limit your sdf to 100 rows.

In [13]:
%%spark 

# TODO
step_1_2_sdf = spark.sql(
    '''select _id, name.given_name from raw_data
    where _id is not null and name.given_name is not null
    order by _id asc
    limit 100'''
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
%%spark
#NO TOUCH
to_submit = step_1_2_sdf.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
%%spark

#NO TOUCH

grader.grade(test_case_id = 'lex_10_ids_last_names', answer = to_submit)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 4/4 points. You are a star!

Your submission has been successfully recorded in the gradebook.

### Step 1.3: The Extravagant Extraction

In our training model, we are interested in when individuals began working at a company.  From creating the schema, you should notice that the collection of companies inviduals worked at are contained in the `experience` field as an array of dictionaries. You should use the `org` for the company name and `start` for the start date. Here is an example of an `experience` field,

```
{
   "experience": [
     {
        "org": "The Walt Disney Company", 
        "title" : "Mickey Mouse",
        "end" : "Present",
        "start": "November 1928",
        "desc": "Sailed a boat."
     },
     {
        "org": "Walt Disney World Resort",
        "title": "Mickey Mouse Mascot",
        "start": "January 2005",
        "desc": "Took pictures with kids."
     }
   ]
}
```

Your task is to extract each pair of company and start date from these arrays. In Spark, this is known as "exploding" a row. If you think about how we used relational data to model a nested list in a separate table -- that's basically what `explode` does to the nested data within `linkedin`.

Create an sdf called `raw_start_dates_sdf` that contains the company and start date for every experience of every individual in `raw_data_sdf`. Drop any row that contains a `null` in either column with `dropna()`. Remember we will sort the dataframe when grading so you can sort the elements however you wish (you don't need to if you don't want to). The sdf should look similar to:

```
+--------------------------+---------------+
|org                       |start_date     |
+--------------------------+---------------+
|Walt Disney World Resort  |January 2005   | 
|The Walt Disney Company   |November 1928  |
|...                       |...            |
+--------------------------+---------------+
```

_Hint_: You may want to do two separate `explode`s for `org` and `start`. In an explode, the position of the element in the array can be extracted as well, and used to merge two seperate explodes. Reference the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html).

_Note_: Some of the entires in `org` are "weird", i.e. made up of non-english letters and characters. Keep them. **DO NOT** edit any name in the original dataframe unless we specify. **DO NOT** drop any row unless there is a `null` value as stated before. This goes for the rest of the homework as well, unless otherwise specified.

In [16]:
%%spark
from pyspark.sql.functions import explode

# TODO: Create [raw_start_dates_sdf]

# POSEXPLODE() will explode a specified column of an sdf and return two rows
# corresponding to the index of an element in an array "pos" and the element
# itself.

############################################

org_exp = spark.sql(
    '''select _id, posexplode(experience.org) from raw_data'''
)
org_exp.createOrReplaceTempView('org_table')
start_exp = spark.sql(
    '''select _id, posexplode(experience.start) from raw_data'''
)
start_exp.createOrReplaceTempView('start_table')

raw_start_dates_sdf = spark.sql(
    '''select org, start_date from
    (select org_table.col as org, start_table.col as start_date from org_table
    join start_table on org_table._id = start_table._id and org_table.pos = start_table.pos)
    where org is not null and start_date is not null'''
)

raw_start_dates_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+
|                 org|    start_date|
+--------------------+--------------+
|             Spot.us|September 2009|
|Pfizer Ltd. (WPO ...|          2005|
|              ALSTOM|          2008|
|DSM i-Nutrition B...|  January 2009|
|Sujana Metal Prod...|   August 2012|
|                 IBM| February 2010|
|             PepsiCo|      May 1998|
|Sleuths Mystery D...|   August 2011|
|           Mindshare| February 2012|
|  Project-A Ventures| February 2012|
|Forcadell Juan Ba...|  January 2000|
|  Slipgate Ironworks| February 2008|
|SpenglerFox - Int...|September 2006|
|        Everyone.net| November 2006|
|                AACN|    April 1998|
|           Garrigues|September 2004|
|                Auna|          2004|
|Cabezon Capital M...| February 2008|
|Centre for Develo...|          1985|
|  Liberty Syndicates|   August 2008|
+--------------------+--------------+
only showing top 20 rows

In [17]:
%%spark

# For your info, see if it looks reasonable
raw_start_dates_sdf.show(4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+
|                 org|    start_date|
+--------------------+--------------+
|             Spot.us|September 2009|
|Pfizer Ltd. (WPO ...|          2005|
|              ALSTOM|          2008|
|DSM i-Nutrition B...|  January 2009|
+--------------------+--------------+
only showing top 4 rows

In [18]:
%%spark 

#NO TOUCH

## AUTOGRADER Step 1.3: Run this to get your score. ##

raw_start_dates_sdf.createOrReplaceTempView("test_1_3")
test_1_3_sdf = spark.sql("SELECT * FROM test_1_3 ORDER BY org ASC, start_date DESC LIMIT 10").toPandas()
# spark.catalog.dropTempView("test_1_3")
# test_1_3_sdf.show(10)
to_submit = pd.read_json(test_1_3_sdf.to_json())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
%%spark

#NO TOUCH

grader.grade(test_case_id = 'explosion', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 4/4 points. You are a star!

Your submission has been successfully recorded in the gradebook.

### Step 1.4: The Fortuitous Formatting

There are two issues with the values in our `date` column. First, the values are saved as strings, not datetime types. This halts us from running functions such as `ORDER BY` or `GROUP BY` on common months or years. Second, some values do not have both month and year information or are in other languages. Your task is to filter out and clean the `date` column. We are interested in only those rows that have date in the following format "(month_name) (year)", e.g. "October 2010".

Create an sdf called `filtered_start_dates_sdf` from `raw_start_dates_sdf` with the `date` column filtered in the manner above. Keep only those rows with a start date between January 2000 to December 2011, inclusive. Ensure that any dates that are not in our desired format are ommitted. Drop any row that contains a `null` in either column. The format of the sdf is shown below:
```
+--------------------------+---------------+
|org                       |start_date     |
+--------------------------+---------------+
|Walt Disney World Resort  |2005-01-01     | 
|...                       |...            |
+--------------------------+---------------+
```
_Hint_: Refer to the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html) to format the `date` column. In Spark SQL the date format we are interested in is `"MMM y"`.

_Note_: Spark will return the date in the format above, with the day as `01`. This is ok, since we are interested in the month and year each individual began working and all dates will have `01` as their day.

In [20]:
%%spark

# TO_DATE() will convert a string to a datetime object. The string's format,
# i.e. "MMM y" must be provided. Any string that does not have the specified
# format will be returned as null.

# Use TO_DATE() to convert the start_date column from a string to a datetime
# object. Keep only dates that are between January 2000 and December 2011,
# inclusive.


# TODO: Create [filtered_start_dates_sdf]
raw_start_dates_sdf.createOrReplaceTempView('raw_dates')
filtered_start_dates_sdf = spark.sql(
    '''select org, start_date from(
    select org, to_date(start_date, 'MMM y') as start_date from raw_dates
    where to_date(start_date, "MMM y") >= "2000-01-01" and to_date(start_date, "MMM y") <= "2011-12-01")
    where start_date is not null'''
)
filtered_start_dates_sdf.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+
|                 org|start_date|
+--------------------+----------+
|             Spot.us|2009-09-01|
|DSM i-Nutrition B...|2009-01-01|
|                 IBM|2010-02-01|
|Sleuths Mystery D...|2011-08-01|
|Forcadell Juan Ba...|2000-01-01|
|  Slipgate Ironworks|2008-02-01|
|SpenglerFox - Int...|2006-09-01|
|        Everyone.net|2006-11-01|
|           Garrigues|2004-09-01|
|Cabezon Capital M...|2008-02-01|
|  Liberty Syndicates|2008-08-01|
|FastScale Technology|2007-10-01|
|     Reed Employment|2005-02-01|
|Technicolor Netwo...|2005-10-01|
|Maybank Philippin...|2010-01-01|
|Vanguard Energy P...|2010-05-01|
|         BSA Pty Ltd|2006-10-01|
|Macdonald Realty ...|2011-02-01|
|Irwin Financial C...|2005-06-01|
|OptumInsight - In...|2003-10-01|
+--------------------+----------+
only showing top 20 rows

In [21]:
%%spark

## AUTOGRADER Step 1.4: Run this to get your score. ##

#NO TOUCH

filtered_start_dates_sdf.createOrReplaceTempView("test_1_4")
test_1_4_sdf = spark.sql("SELECT org, DATE_FORMAT(start_date, 'yyyy-MM-dd') AS start_date FROM test_1_4 ORDER BY org DESC, start_date DESC LIMIT 20").toPandas()
# spark.catalog.dropTempView("test_1_4")
# test_1_4_sdf.show(10)
to_submit = pd.read_json(test_1_4_sdf.to_json())
#grader.grade(question_id = "1.4", answer = (test_1_4_sdf.take(10), filtered_start_dates_sdf.dtypes))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
%%spark

grader.grade(test_case_id = 'fortuitous', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 3/3 points. You are a star!

Your submission has been successfully recorded in the gradebook.

### Step 1.5 The Gregarious Grouping

We now want to collect the number of individuals that started in the same month and year for each company. Create an sdf called `start_dates_sdf` that has the total number of employees who began working at the same company on the same start date. The format of the sdf is shown below:

```
+--------------------------+---------------+---------------+
|org                       |start_date     |num_employees  |
+--------------------------+---------------+---------------+
|Walt Disney World Resort  |2005-01-01     |1              |
|...                       |...            |...            |
+--------------------------+---------------+---------------+
```

In [23]:
%%spark

# TODO: Create [start_dates_sdf]

# GROUP BY on org and start_date, in that order.

filtered_start_dates_sdf.createOrReplaceTempView("filtered_dates")
start_dates_sdf = spark.sql(
    '''select org, start_date, count(org) as num_employees from filtered_dates
    group by org, start_date
    order by num_employees desc'''
)
start_dates_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+-------------+
|                 org|start_date|num_employees|
+--------------------+----------+-------------+
|              Google|2011-05-01|           30|
|              Google|2010-06-01|           22|
|                 IBM|2011-01-01|           22|
|              Google|2011-07-01|           22|
|Brook Furniture R...|2005-04-01|           21|
|           Microsoft|2008-06-01|           21|
|              Google|2011-01-01|           20|
|                 IBM|2010-01-01|           20|
|           Accenture|2011-09-01|           19|
|           Microsoft|2008-07-01|           19|
|Nokia Siemens Net...|2007-04-01|           19|
|           Microsoft|2006-06-01|           18|
|              Google|2011-04-01|           18|
|              Google|2011-08-01|           18|
|              Google|2010-11-01|           17|
|                 IBM|2006-01-01|           17|
|              Google|2010-07-01|           17|
|              Google|2008-05-01|       

In [24]:
%%spark

#NO TOUCH

## AUTOGRADER Step 1.5: Run this to get your score. ##

start_dates_sdf.createOrReplaceTempView("test_1_5")
test_1_5_sdf = spark.sql("SELECT org, DATE_FORMAT(start_date, 'yyyy-MM-dd') as start_date, num_employees FROM test_1_5 ORDER BY num_employees DESC, org DESC, start_date ASC LIMIT 10").toPandas()
to_submit = pd.read_json(test_1_5_sdf.to_json())
# spark.catalog.dropTempView("test_1_5")
# test_1_5_sdf.show(10)

#grader.grade(question_id = "1.5", answer = (test_1_5_sdf.take(10), start_dates_sdf.dtypes))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
%%spark
grader.grade(test_case_id = 'gregarious', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 7/7 points. You are a star!

Your submission has been successfully recorded in the gradebook.

## Step 2: Hiring Trends Analysis

Now we will analyze `start_dates_sdf` to find monthly and annual hiring trends.

### Step 2.1: The Marvelous Months

Your task is to answer the question: "What is the **least popular month** for employees to start working?" Create an sdf called `monthly_hires_sdf` which contains the total number of employees that started working on a specific month, at any company and on any year. The `month` column should be of type `int`, i.e. 1-12. The format of the sdf is shown below:

```
+---------------+---------------+
|month          |num_employees  |
+---------------+---------------+
|1              |...            |
|2              |...            |
|3              |...            |
|...            |...            |
+---------------+---------------+
```

Find the month in which the *fewest* employees start working and save its number as an **integer** to the variable `least_common_month`.

_Hint_: Be careful. The start dates we have right now have both month and year. We only want the common months. See if you can find something in the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html) that will help you do this.

In [26]:
%%spark

# TODO: Create [monthly_hire_sdf] and find the least common month people were
# hired. Save its number as an integer to [least_common_month]

# HINT: You can aggregate by month, and use ordering to figure out the
# most or least common.

# MONTH() will return the month for any datetime object.

# GROUP BY on the month of start_date using MONTH(). Sum the num_employees
# column to find the total number of employees that started on that month

start_dates_sdf.createOrReplaceTempView("start_dates")

monthly_hires_sdf = spark.sql(
    '''select month(start_date) as month, sum(num_employees) as num_employees from start_dates
    group by month
    order by num_employees desc'''
)

monthly_hires_sdf.show(12)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------------+
|month|num_employees|
+-----+-------------+
|    1|        38394|
|    9|        29454|
|    6|        28693|
|    5|        25487|
|    8|        25458|
|    7|        23147|
|   10|        22798|
|    4|        21662|
|    3|        21188|
|    2|        19423|
|   11|        18844|
|   12|        14613|
+-----+-------------+

In [0]:
least_common_month = 12

In [28]:
%%spark

#NO TOUCH

## AUTOGRADER Step 2.1: Run this to get your score. ##

monthly_hires_sdf.createOrReplaceTempView("test_2_1")
test_2_1_sdf = spark.sql("SELECT * FROM test_2_1 ORDER BY month ASC").toPandas()
to_submit = pd.read_json(test_2_1_sdf.to_json())
# spark.catalog.dropTempView("test_2_1")

# test_2_1_sdf.show(12)
# print(least_common_month)
#grader.grade(question_id = "2.1", answer = (test_2_1_sdf.take(12), least_common_month))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
%%spark
grader.grade(test_case_id = 'marvelous_marko', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 4/4 points. You are a star!

Your submission has been successfully recorded in the gradebook.

### Step 2.2: The Preposterous Perennial Percentages

The next question we will answer is "What is the percentage change in hires between 2009 and 2010 for each company?" Create an sdf called `percentage_change_sdf` that has the percentage change between 2009 and 2010 for each company. The sdf should look as follows:

```
+---------------------------+--------------------+
|org                        |percentage_change   |
+---------------------------+--------------------+
|Walt Disney World Resort   |12.3                |
|...                        |...                 |
+---------------------------+--------------------+
```

_Note_: A percentage change can be positive or negative depending 
on the difference between the two years. The formula for percent change is given below,

$$\text{% change} = \frac{P_f-P_i}{P_i} \times 100$$

Here, $P_f$ is the final element (in this case the number of hires in 2010) and $P_i$ is initial element (the number of hires in 2009).

_Hint_: This is a **nontrivial** question and involves you putting the 2009 and 2010 data on the same row. I'm really sorry it isn't easier, but that's why you are in the class (I hope!). We recommend using a combination of `GROUP BY` and `JOIN`. Keep in mind that operations between columns in SQL dataframes are often easier than those between rows. Come to office hours if you need help.

In [30]:
%%spark

# TODO: Create [percentage_change_sdf]

# YEAR() will return the year of a datetime object.

# The column percentage_change is calculated by doing a JOIN. We join an sdf
# "y1" created by doing a GROUP BY on start_dates by org and the year of
# start_date where the year is 2009. We sum the number of employees. That sdf is
# joined with a similar sdf, "y2", but for the case when year is 2010. Then,
# using the formula in the description above we find the percentage_change of
# the two sum columns.

sdf_09 = spark.sql(
    '''select org, num_employees, year from (
      select org, sum(num_employees) as num_employees, year(start_date) as year from start_dates
    group by org, year)
    where year == "2009"'''
)
sdf_09.createOrReplaceTempView("sdf_09")

sdf_10 = spark.sql(
    '''select org, num_employees, year from (
      select org, sum(num_employees) as num_employees, year(start_date) as year from start_dates
    group by org, year)
    where year == "2010"'''
)
sdf_10.createOrReplaceTempView("sdf_10")

sdf_change = spark.sql(
    '''select sdf_09.org, sdf_09.num_employees as num_09, sdf_10.num_employees as num_10 from sdf_09
    join sdf_10 on sdf_09.org = sdf_10.org'''
)
sdf_change.createOrReplaceTempView("sdf_change")

percentage_change_sdf = spark.sql(
    '''select org, round(((num_10 - num_09)/num_09)*100,2) as percentage_change from sdf_change
    order by percentage_change desc'''
)

percentage_change_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------------+
|                 org|percentage_change|
+--------------------+-----------------+
|          Aon Hewitt|           1100.0|
|             Groupon|            900.0|
|             SunGard|            800.0|
|             Artezio|            800.0|
|             Cameron|            700.0|
|            MKTG INC|            600.0|
|             Paychex|            600.0|
|               Capco|            600.0|
|              Vestas|            600.0|
|    Fifth Third Bank|            600.0|
| Turner Broadcasting|            600.0|
|            JCPenney|            600.0|
|          UST Global|            600.0|
|    Philips Lighting|            600.0|
|    Societe Generale|            600.0|
|         SNC-Lavalin|            500.0|
|                ESPN|            500.0|
|Accenture Service...|            500.0|
|      Wolters Kluwer|            500.0|
|     Sandbox Network|            500.0|
+--------------------+-----------------+
only showing top

In [31]:
%%spark

#NO TOUCH

## AUTOGRADER Step 2.2: Run this to get your score. ##

percentage_change_sdf.createOrReplaceTempView("test_2_2")
test_2_2_sdf = spark.sql("SELECT * FROM test_2_2 ORDER BY percentage_change DESC, org DESC LIMIT 10")
to_submit = pd.read_json(test_2_2_sdf.toPandas().to_json())
# spark.catalog.dropTempView("test_2_2")

# test_2_2_sdf.show(10)

#grader.grade(question_id = "2.2", answer = test_2_2_sdf.take(10))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
%%spark
grader.grade(test_case_id = 'preposterous', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 12/12 points. You are a star!

Your submission has been successfully recorded in the gradebook.

## The Blessed Break

That last question was hard. And it's gonna get harder. Take a break. Sit back and relax for a minute. Listen to some music. Here's a [suggestion](https://www.youtube.com/watch?v=9Crm6xJLJgs).

In the cell below fill out the boolean variable `whatd_you_think` with `True` if you liked it or `False` if you didn't. You will be graded on your response. Put your answer as a string of "True" or "False"

In [33]:
%%spark

whatd_you_think = "True"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
%%spark
grader.grade(test_case_id = 'tunes', answer = whatd_you_think)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 1/1 points. You are a star!

Your submission has been successfully recorded in the gradebook.

## Step 3: Formatting the Training Data


Our overarching goal is to train a machine learning (ML) model that will use the monthly hiring trends of a company to predict a positive or negative gain in the company's stock in the first quarter of the following year. A ML model is trained on a set of observations. Each observation contains a set of features, `X`, and a label, `y`. The goal of the ML model is to create a function that takes any `X` as an input and outputs a predicted `y`. 

The machine learning model we will use is a [Random Forest Classifier](https://builtin.com/data-science/random-forest-algorithm). Each observation we will pass in will have 24 features (columns). These are the number of people hired from Jan to Dec and the company stock price on the last day of each month. The label will be the direction of the company's stock percentage change (positive, `1`, or negative, `-1`) in the first quarter of the following year. Each observation will correspond to a specified company's trends on a specified year. The format of our final training sdf is shown below. The first 26 columns define our observations, `X`, and the last column the label, `y`.
```
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|org |year |jan_hired |   ...   |dec_hired |jan_stock |   ...   |dec_stock |stock_result |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|IBM |2009 |...       |   ...   |...       |...       |   ...   |...       |1            |
|IBM |2010 |...       |   ...   |...       |...       |   ...   |...       |-1           |
|... |...  |...       |   ...   |...       |...       |   ...   |...       |...          |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
```

_Note_: We will use the first three letters of each month in naming, i.e. `jan, feb, mar, apr, may, jun, jul, aug, sep, oct, nov, dec`.



### Step 3.1: The Harmonious Hires

Your first task is to create the first half of the training table, i.e. the `jan_hired` through `dec_hired` columns. This will involve reshaping `start_dates_sdf`. Currently, `start_dates_sdf` has columns `org`, `start_date`, and `num_employees`. We want to group the rows together based on common `org` and years and create new columns for the number of employees that started working in each month of that year.

Create an sdf called `raw_hirings_for_training_sdf` that has for a single company and a single year, the number of hires in Jan through Dec, and the total number of hires that year. Note that for each company you will have several rows corresponding to years between 2000 and 2011. It is ok if for a given company you don't have a given year. However, ensure that for a given company and given year, each month column has an entry, i.e. if no one was hired the value should be `0`. The format of the sdf is shown below: 
```
+----+-----+----------+---------+----------+----------+
|org |year |jan_hired |   ...   |dec_hired |total_num |
+----+-----+----------+---------+----------+----------+
|IBM |2008 |...       |   ...   |...       |...       |
|IBM |2009 |...       |   ...   |...       |...       |
|... |...  |...       |   ...   |...       |...       |
+----+-----+----------+---------+----------+----------+
```
_Hint_: This is a **fiddly and somewhat difficult** question. I'm really really sorry. The tricky part is creating the additional columns of monthly hires, specifically when there are missing dates. In our dataset, if a company did not hire anybody in a given date, it will not appear in `start_dates_sdf`. 

We suggest you look into `CASE` and `WHEN` statements in the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html), and use these to **either** fill in a number for column (if appropriate) or put in a 0.

In [35]:
%%spark

# TODO: Create [raw_hire_train_sdf]

# CASE() statements are SQL's equivalent of if else statements. WHEN a CASE is
# true THEN we define a function. ELSE we do another function and then END the
# statement.

# The query is a GROUP BY. We group data based on the same company and year, as
# in the previous step. We then do a CASE statement. This will seperate out the
# sets of data corresponding to the same month using MONTH() in the WHEN clause.
# If we have a piece of data, it will be the number of employees that started
# working at a given company on a given year and a given month and we will save
# it with a corresponding column name. If there is no piece of data here, as per
# the question, we need to add a 0. This is the ELSE clause. Lastly, we do a
# SUM() to find total_num
# month(start_date) as month, 

start_year_month = spark.sql(
    '''select org, year(start_date) as year, month(start_date) as month, num_employees from start_dates'''
)
start_year_month.createOrReplaceTempView("year_month")
# start_year_month.show()

year_month_hired = spark.sql(
    '''select org, year, month, num_employees,
    case when month = 1 then num_employees else 0 end as jan,
    case when month = 2 then num_employees else 0 end as feb,
    case when month = 3 then num_employees else 0 end as mar,
    case when month = 4 then num_employees else 0 end as apr,
    case when month = 5 then num_employees else 0 end as may,
    case when month = 6 then num_employees else 0 end as jun,
    case when month = 7 then num_employees else 0 end as jul,
    case when month = 8 then num_employees else 0 end as aug,
    case when month = 9 then num_employees else 0 end as sep,
    case when month = 10 then num_employees else 0 end as oct,
    case when month = 11 then num_employees else 0 end as nov,
    case when month = 12 then num_employees else 0 end as dec
     from year_month'''
)
year_month_hired.createOrReplaceTempView("year_month_hired")
# year_month_hired.show()


raw_hire_train_sdf = spark.sql(
    '''select org, year,
    sum(jan) as jan_hired,
    sum(feb) as feb_hired,
    sum(mar) as mar_hired,
    sum(apr) as apr_hired,
    sum(may) as may_hired,
    sum(jun) as jun_hired,
    sum(jul) as jul_hired,
    sum(aug) as aug_hired,
    sum(sep) as sep_hired,
    sum(oct) as oct_hired,
    sum(nov) as nov_hired,
    sum(dec) as dec_hired
    from year_month_hired
    group by org, year'''
)

raw_hire_train_sdf.createOrReplaceTempView("raw_hire_train")
raw_hire_train_sdf = spark.sql(
    '''select *, (jan_hired + feb_hired + mar_hired + apr_hired + may_hired + 
    jun_hired + jul_hired + aug_hired + sep_hired + oct_hired + nov_hired + dec_hired) as total_num 
    from raw_hire_train'''
)

raw_hire_train_sdf.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|                 org|year|jan_hired|feb_hired|mar_hired|apr_hired|may_hired|jun_hired|jul_hired|aug_hired|sep_hired|oct_hired|nov_hired|dec_hired|total_num|
+--------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|Infosys Technolog...|2006|        1|        0|        1|        0|        1|        3|        1|        3|        2|        2|        1|        1|       16|
|The Boston Consul...|2011|        2|        1|        0|        1|        1|        0|        3|        2|        0|        2|        1|        0|       13|
|  Wipro Technologies|2007|        2|        2|        2|        2|        4|        3|        2|        3|        2|        3|        4|        3|       32|
|Tata Consultancy ...|2002|        2|        1|     

In [36]:
%%spark

#NO TOUCH

## AUTOGRADER Step 3.1: Run this to get your score. ##

raw_hire_train_sdf.createOrReplaceTempView("test_3_1")
test_3_1_sdf = spark.sql("SELECT * FROM test_3_1 ORDER BY org DESC, year ASC LIMIT 10")
#grader.grade(question_id = "3.1", answer = test_3_1_sdf.take(10))
to_submit = pd.read_json(test_3_1_sdf.toPandas().to_json())
# test_3_1_sdf.show(10)
# spark.catalog.dropTempView("test_3_1")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
%%spark
grader.grade(test_case_id = 'harmonious', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 10/10 points. You are a star!

Your submission has been successfully recorded in the gradebook.

### Step 3.2: The Formidable Filters

Create an sdf called `hire_train_sdf` that contains all the observations in `raw_hire_train_sdf` with `total_num` greater than or equal to 100. The format of the sdf is shown below:

```
+----+-----+----------+---------+----------+----------+
|org |year |jan_hired |   ...   |dec_hired |total_num |
+----+-----+----------+---------+----------+----------+
|IBM |2008 |...       |   ...   |...       |...       |
|IBM |2009 |...       |   ...   |...       |...       |
|... |...  |...       |   ...   |...       |...       |
+----+-----+----------+---------+----------+----------+
```


In [38]:
%%spark

# TODO: Create [hire_train_sdf]

# Keep all rows where total_num >= 100

raw_hire_train_sdf.createOrReplaceTempView("raw_hire_train")
hire_train_sdf = spark.sql(
    '''select * from raw_hire_train
    where total_num >= 100'''
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
%%spark

hire_train_sdf.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|      org|year|jan_hired|feb_hired|mar_hired|apr_hired|may_hired|jun_hired|jul_hired|aug_hired|sep_hired|oct_hired|nov_hired|dec_hired|total_num|
+---------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|Accenture|2010|        7|        7|        9|        7|       10|        7|        9|       10|       14|        9|       10|        9|      108|
|Microsoft|2009|       14|        4|        3|        3|       11|       13|       11|       13|        7|        8|       10|        3|      100|
|   Google|2011|       20|       12|        8|       18|       30|       15|       22|       18|        7|       15|        3|        6|      174|
|      IBM|2011|       22|        5|        7|        3|       14|       10|       15|       10|        8|        8|  

In [40]:
%%spark


#NO TOUCH

## AUTOGRADER Step 3.2: Run this to get your score. ##

hire_train_sdf.createOrReplaceTempView("test_3_2")
test_3_2_sdf = spark.sql("SELECT * FROM test_3_2 ORDER BY org DESC, year ASC LIMIT 10")
to_submit = pd.read_json(test_3_2_sdf.toPandas().to_json())
#grader.grade(question_id = "3.2", answer = test_3_2_sdf.take(10))
# test_3_2_sdf.show(10)
# spark.catalog.dropTempView("test_3_2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
%%spark
grader.grade(test_case_id = 'formidable', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 5/5 points. You are a star!

Your submission has been successfully recorded in the gradebook.

### Step 3.3: The Stupendous Stocks

Now we are ready for the stock data. The stock data we will use is saved in the same S3 bucket as `linkedin.json`. Load the data into the EMR cluster. Run the cell below. ***You do not need to edit this cell***.

In [42]:
%%spark

# Load stock data

raw_stocks_sdf = spark.read.format("csv") \
              .option("header", "true") \
              .load("s3a://penn-cis545-files/stocks.csv")

# Creates SQL-accesible table
raw_stocks_sdf.createOrReplaceTempView('raw_stocks')

# Display the first 10 rows
query = '''SELECT *
           FROM raw_stocks'''
spark.sql(query).show(10)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------+------------------+------------------+------------------+--------+-------+---+
|      Date|              Open|              High|               Low|             Close|  Volume|OpenInt|org|
+----------+------------------+------------------+------------------+------------------+--------+-------+---+
|1999-11-18|            30.713|            33.754|            27.002|29.701999999999998|66277506|      0|  A|
|1999-11-19|            28.986|29.026999999999997|            26.872|27.256999999999998|16142920|      0|  A|
|1999-11-22|            27.886|29.701999999999998|            27.044|29.701999999999998| 6970266|      0|  A|
|1999-11-23|28.688000000000002|29.445999999999998|            27.002|            27.002| 6332082|      0|  A|
|1999-11-24|27.083000000000002|            28.309|            27.002|            27.717| 5132147|      0|  A|
|1999-11-26|            27.594|28.011999999999997|            27.509|            27.807| 1832635|      0|  A|
|1999-11-2

Run the cell below to see the types of the columns in our data frame. These are not correct. We could have defined a schema when reading in data but we will handle this issue in another manner. You will do this in Step 3.4.2.

In [43]:
%%spark 

# Print types of SDF
raw_stocks_sdf.dtypes

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('Date', 'string'), ('Open', 'string'), ('High', 'string'), ('Low', 'string'), ('Close', 'string'), ('Volume', 'string'), ('OpenInt', 'string'), ('org', 'string')]

### Step 3.4 The Clairvoyant Cleaning

We now want to format the stock data set into the second half of the training table. We will then merge it with `hire_train` based off the common `org` and `year` fields. The formatting will consist of 4 steps. Actually, it is 5.

#### Step 3.4.1 The Ubiquitous UDF

The companies in our stock dataset are defined by their stock tickers. Thus, we would not be able to merge it with the `org` field in `hire_train_sdf`. We must convert them to that format. Often times when using Spark, there may not be a built-in SQL function that can do the operation we desired. Instead, we can create one on our own with a user-defined function (udf).

A udf is defined as a normal Python function and then registered to be used as a Spark SQL function. Your task is to create a udf, `TICKER_TO_NAME()` that will convert the ticker field in `raw_stocks` to the company's name. This will be done using the provided `ticker_to_name_dict` dictionary. We are only interested in the companies in that dictionary.

Fill out the function `ticker_to_name()` below. Then use `spark.udf.register()` to register it as a SQL function. The command is provided. ***You do not need to edit it***. Note, we have defined the udf as returning `StringType()`. Ensure that your function returns this. You must also deal with any potential `null` cases.

In [44]:
%%spark

# Dictionary linking stock ticker symbols to their names
ticker_to_name_dict = {'NOK': 'Nokia',
                       'UN': 'Unilever',
                       'BP': 'BP',
                       'JNJ': 'Johnson & Johnson',
                       'TCS': 'Tata Consultancy Services',
                       'SLB': 'Schlumberger',
                       'NVS': 'Novartis',
                       'CNY': 'Huawei',
                       'PFE': 'Pfizer',
                       'ACN': 'Accenture',
                       'DELL': 'Dell',
                       'MS': 'Morgan Stanley',
                       'ORCL': 'Oracle',
                       'BAC': 'Bank of America',
                       'PG': 'Procter & Gamble',
                       'CGEMY': 'Capgemini',
                       'GS': 'Goldman Sachs',
                       'C': 'Citi',
                       'IBM': 'IBM',
                       'CS': 'Credit Suisse',
                       'MDLZ': 'Kraft Foods',
                       'WIT': 'Wipro Technologies',
                       'CSCO': 'Cisco Systems',
                       'PWC': 'PwC',
                       'GOOGL': 'Google',
                       'CTSH': 'Cognizant Technology Solutions',
                       'HSBC': 'HSBC',
                       'DB': 'Deutsche Bank',
                       'MSFT': 'Microsoft',
                       'HPE': 'Hewlett-Packard',
                       'ERIC': 'Ericsson',
                       'BCS': 'Barclays Capital',
                       'GSK': 'GlaxoSmithKline'}

# TODO: Fill out [ticker_to_name()] and register it as a udf.
# Fill out ticker_to_name()

# In UDFs we have to cover all possible output cases, or else the function will
# crash. Specifically, this means we need to handle the case when "ticker" is
# not in "ticker_to_name_dict". We use a try and except statement to return null
# for this case.


def ticker_to_name(ticker):
  #TODO
  if ticker is not None:
    if ticker in ticker_to_name_dict.keys():
      return ticker_to_name_dict[ticker]
    else:
      return None
  else:
    return None


# Register udf as a SQL function. DO NOT EDIT
spark.udf.register("TICKER_TO_NAME", ticker_to_name, StringType())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<function ticker_to_name at 0x7f39594f0ae8>

Submit a tuple to the autograder for the ticker value of Google and Tesla. If the ticker value isn't in the table, set it to a string equal to "None". If this is hardcoded, we will dock you.

In [45]:
%%spark

#NO TOUCH

## AUTOGRADER Step 3.4.1: Run this to get your score. ##

print((str(ticker_to_name("GOOGL")),str(ticker_to_name("TSLA"))))
to_submit = ((str(ticker_to_name("GOOGL")),str(ticker_to_name("TSLA"))))
## AUTOGRADER Step 3.4.1: Run this to get your score. ##
#grader.grade(question_id = "3.4.1", answer = (str(ticker_to_name("GOOGL")),str(ticker_to_name("TSLA"))))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('Google', 'None')

In [46]:
%%spark
grader.grade(test_case_id = 'clairvoyant', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 2/2 points. You are a star!

Your submission has been successfully recorded in the gradebook.

#### Step 3.4.2: The Fastidious Filters

With our new `TICKER_TO_NAME()` function we will begin to wrangle `raw_stocks_sdf`.

Create an sdf called `filter_1_stocks_sdf` as follows. Convert all the ticker names in `raw_stocks_sdf` to the company names and save it as `org`. Next, convert the `date` field to a datetime type. As explained before this will help order and group the rows in future steps. Then, convert the type of the values in `closing_price` to `float`. This will take care of the `dtypes` issue we saw in Step 3.3.

Drop any company names that do not appear in `ticker_to_name_dict`. Keep any date between January 1st 2001 and December 4th 2012 inclusive, in the format shown below (note this is a datetime object not a string):

```
+----+------------+--------------+
|org |date        |Close         |
+----+------------+--------------+
|IBM |2000-01-03  |...           |
|... |...         |...           |
+----+------------+--------------+
```
_Hint_: You will use a similar function to filter the dates as in Step 1.4. In Spark SQL the format for the `date` field in `raw_stocks_sdf` is `"yyyy-MM-dd"`.

In [47]:
%%spark

# Format the "org" column using our UDF, TICKER_TO_NAME. Use TO_DATE() to
# convert the string date column to datetime object and filter on this in the
# same way as Step 1.4

# TODO: Create [filter_1_stocks_sdf]

filter_1_stocks_sdf = spark.sql(
    '''select ticker_to_name(org) as org, to_date(date, "yyyy-MM-dd") as date, float(Close) from raw_stocks
    where date >= to_date("2001-01-01","yyyy-MM-dd") and date <= to_date("2012-12-04","yyyy-MM-dd")'''
).dropna()

filter_1_stocks_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+------+
|      org|      date| Close|
+---------+----------+------+
|Accenture|2005-02-25|21.363|
|Accenture|2005-02-28|21.704|
|Accenture|2005-03-01|21.735|
|Accenture|2005-03-02|21.593|
|Accenture|2005-03-03|21.329|
|Accenture|2005-03-04|21.549|
|Accenture|2005-03-07|21.651|
|Accenture|2005-03-08|21.509|
|Accenture|2005-03-09|21.379|
|Accenture|2005-03-10|21.194|
|Accenture|2005-03-11|20.777|
|Accenture|2005-03-14|20.794|
|Accenture|2005-03-15| 20.37|
|Accenture|2005-03-16| 20.41|
|Accenture|2005-03-17|20.344|
|Accenture|2005-03-18|20.098|
|Accenture|2005-03-21|20.395|
|Accenture|2005-03-22|20.623|
|Accenture|2005-03-23|20.641|
|Accenture|2005-03-24|20.726|
+---------+----------+------+
only showing top 20 rows

In [48]:
%%spark

#NO TOUCH

## AUTOGRADER Step 3.4.2: Run this to get your score. ##

filter_1_stocks_sdf.createOrReplaceTempView("test_3_4_2")
test_3_4_2_sdf = spark.sql("SELECT org, DATE_FORMAT(date, 'yyyy-MM-dd') as date, Close FROM test_3_4_2 ORDER BY org, date, Close LIMIT 10")
to_submit = pd.read_json(test_3_4_2_sdf.toPandas().to_json())
# spark.catalog.dropTempView("test_3_4_2")
# #grader.grade(question_id = "3.4.2", answer = (test_3_4_2_sdf.take(10), filter_1_stocks_sdf.dtypes))
# test_3_4_2_sdf.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
%%spark
grader.grade(test_case_id = 'fastidious', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 7/7 points. You are a star!

Your submission has been successfully recorded in the gradebook.

#### Step 3.4.3: The Momentus Months

The data in `filter_1_stocks_sdf` gives closing prices on a daily basis. Since we are interested in monthly trends, we will only keep the closing price on the **last trading day of each month**.

Create an sdf `filter_2_stocks_sdf` that contains only the closing prices for the last trading day of each month. Note that a trading day is not simply the last day of each month, as this could be on a weekend when the market is closed . The format of the sdf is shown below:

```
+----+------------+--------------+
|org |date        |Close         |
+----+------------+--------------+
|IBM |2000-01-31  |...           |
|... |...         |...           |
+----+------------+--------------+
```

  _Hint_: This is a **difficult** question. But if you made it this far, you're a star by now. It may be helpful to create an intermediate dataframe that will help you filter out the specific dates you desire.

In [50]:
%%spark

# TODO: Create [filter_2_stocks_sdf]

# Create sdf that has for each company, the closing day for each month. We need
# to preform a GROUP BY on three features, org, YEAR(date), and MONTH(date).
# This will give us aggregations of the closing stock price for every day of a
# specified month and a specified year. Since these are all datetime objects,
# taking MAX() will give us the highest, i.e. last, one.

filter_1_stocks_sdf.createOrReplaceTempView("filter_1")

last_day_sdf = spark.sql(
    '''select org, year(date) as year, month(date) as month, max(date) as last_day from filter_1
    group by org, year, month'''
)
last_day_sdf.createOrReplaceTempView("last_d")
# last_day_sdf.show()

filter_2_stocks_sdf = spark.sql(
    '''select last_d.org, last_d.last_day as date, filter_1.close from last_d
    join filter_1 on last_d.org = filter_1.org and last_d.last_day = filter_1.date'''
)

filter_2_stocks_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+------+
|                 org|      date| close|
+--------------------+----------+------+
|                Citi|2004-06-30|451.11|
|                Citi|2004-11-30|434.13|
|                HSBC|2012-12-04|38.661|
|                 IBM|2001-10-31| 90.24|
|                 IBM|2004-08-31|70.718|
|   Johnson & Johnson|2001-12-31|48.099|
|   Johnson & Johnson|2006-09-29|52.852|
|           Microsoft|2012-10-31|24.909|
|            Novartis|2008-04-30|41.857|
|              Oracle|2001-04-30|14.856|
|    Procter & Gamble|2010-09-30| 48.38|
|            Unilever|2007-08-31|24.722|
|     Bank of America|2003-11-28|35.563|
|     Bank of America|2008-07-31|31.016|
|                Citi|2011-03-31| 42.88|
|Cognizant Technol...|2011-01-31|36.312|
|       Credit Suisse|2006-10-31|45.494|
|       Credit Suisse|2007-05-31| 57.12|
|       Credit Suisse|2012-05-31|15.431|
|     GlaxoSmithKline|2007-02-28|38.973|
+--------------------+----------+------+
only showing top

In [51]:
%%spark

#NO TOUCH

## AUTOGRADER Step 3.4.3: Run this to get your score. ##

filter_2_stocks_sdf.createOrReplaceTempView("test_3_4_3")
test_3_4_3_sdf = spark.sql("SELECT org, DATE_FORMAT(date, 'yyyy-MM-dd') as date, Close FROM test_3_4_3 ORDER BY org, date LIMIT 10")
to_submit = pd.read_json(test_3_4_3_sdf.toPandas().to_json())
# spark.catalog.dropTempView("test_3_4_3")

# #grader.grade(question_id = "3.4.3", answer = (test_3_4_3_sdf.take(10), filter_2_stocks_sdf.dtypes))
# test_3_4_3_sdf.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
%%spark

grader.grade(test_case_id = 'momentus', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 12/12 points. You are a star!

Your submission has been successfully recorded in the gradebook.

#### Step 3.4.4: The Really Random Reshape

Now, we will begin to shape our dataframe into the format of the final training sdf.

Create an sdf `filter_3_stocks_sdf` that has for a single company and a single year, the closing stock price for the last trading day of each month in that year. This is similar to the table you created in Step 3.1. In this case since we cannot make a proxy for the closing price if the data is not avaliable, drop any rows containing any `null` values, in any column. The format of the sdf is shown below:

```
+----+-----+----------+---------+----------+
|org |year |jan_stock |   ...   |dec_stock |
+----+-----+----------+---------+----------+
|IBM |2008 |...       |   ...   |...       |
|IBM |2009 |...       |   ...   |...       |
|... |...  |...       |   ...   |...       |
+----+-----+----------+---------+----------+
```


In [53]:
%%spark

# TODO: Create [filter_3_stocks_sdf]

# We will do the same operation we did in Step 3.1. In this case, however, as
# the question specifies, any missing entry in a given month are set to null.

filter_2_stocks_sdf.createOrReplaceTempView("filter_2")

filter_3_stocks_sdf = spark.sql(
    '''select org, year, sum(jan) as jan_stock, 
    sum(feb) as feb_stock,
    sum(mar) as mar_stock,
    sum(apr) as apr_stock,
    sum(may) as may_stock,
    sum(jun) as jun_stock,
    sum(jul) as jul_stock,
    sum(aug) as aug_stock,
    sum(sep) as sep_stock,
    sum(oct) as oct_stock,
    sum(nov) as nov_stock,
    sum(dec) as dec_stock from
    (select org, year(date) as year,
    case when month(date) = 1 then Close else 0 end as jan,
    case when month(date) = 2 then Close else 0 end as feb,
    case when month(date) = 3 then Close else 0 end as mar,
    case when month(date) = 4 then Close else 0 end as apr,
    case when month(date) = 5 then Close else 0 end as may,
    case when month(date) = 6 then Close else 0 end as jun,
    case when month(date) = 7 then Close else 0 end as jul,
    case when month(date) = 8 then Close else 0 end as aug,
    case when month(date) = 9 then Close else 0 end as sep,
    case when month(date) = 10 then Close else 0 end as oct,
    case when month(date) = 11 then Close else 0 end as nov,
    case when month(date) = 12 then Close else 0 end as dec
     from filter_2)
     group by org, year'''
).dropna()

filter_3_stocks_sdf.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+----+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|               org|year|         jan_stock|         feb_stock|         mar_stock|         apr_stock|         may_stock|         jun_stock|         jul_stock|         aug_stock|         sep_stock|         oct_stock|         nov_stock|         dec_stock|
+------------------+----+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|              Citi|2005| 475.8500061035156|462.95001220703125| 435.9800109863281|455.55999755859375| 457.0199890136719| 448.4700012207031|             422.0| 424.6300048828125| 441.6000061035156| 444.0899963378906|             471.0|470.

In [54]:
%%spark

#NO TOUCH

## AUTOGRADER Step 3.4.4: Run this to get your score. ##

filter_3_stocks_sdf.createOrReplaceTempView("test_3_4_4")
test_3_4_4_sdf = spark.sql("SELECT * FROM test_3_4_4 ORDER BY org, year LIMIT 12")
to_submit = pd.read_json(test_3_4_4_sdf.toPandas().to_json())
# spark.catalog.dropTempView("test_3_4_4")

# test_3_4_4_sdf.show(10)
#grader.grade(question_id = "3.4.4", answer = test_3_4_4_sdf.take(10))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
%%spark
to_submit

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

          org  year  jan_stock  ...  oct_stock  nov_stock  dec_stock
0   Accenture  2005   0.000000  ...  22.349001  24.158001  24.521000
1   Accenture  2006  26.782000  ...  27.952999  28.627001  31.368000
2   Accenture  2007  32.066002  ...  33.171001  29.355000  30.604000
3   Accenture  2008  29.407000  ...  28.073999  26.315001  27.851000
4   Accenture  2009  26.806999  ...  31.497999  34.860001  35.252998
5   Accenture  2010  34.818001  ...  37.973000  36.797001  41.189999
6   Accenture  2011  43.720001  ...  52.223999  50.199001  46.127998
7   Accenture  2012  49.693001  ...  59.734001  60.186001  60.798000
8          BP  2005   0.000000  ...  45.581001  45.196999  44.084000
9          BP  2006  49.637001  ...  46.062000  46.734001  46.062000
10         BP  2007  43.597000  ...  53.536999  49.931999  50.229000
11         BP  2008  43.898998  ...  34.118000  33.424000  32.084999

[12 rows x 14 columns]

In [56]:
%%spark
grader.grade(test_case_id = 'random', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 9/9 points. You are a star!

Your submission has been successfully recorded in the gradebook.

#### Step 3.4.5: The Decisive Direction

The final element in our training set is the binary output for each case, i.e. the `y` label. 

Create an sdf `stocks_train_sdf` from `filter_3_stocks_sdf` with an additional column `direction`. This should be the direction of percentage change in the closing stock price, i.e. `1` for positive or `-1` for negative, in the first quarter of a given year. Make this an **integer**.  The quarter of a year begins in January and ends in April, inclusive. We want to know the percent change between these two months. Reference Step 2.2 for the percent change formula. The format of the sdf is shown below:

```
+----+-----+----------+---------+----------+-------------+
|org |year |jan_stock |   ...   |dec_stock |direction    |
+----+-----+----------+---------+----------+-------------+
|IBM |2008 |...       |   ...   |...       |1            |
|IBM |2009 |...       |   ...   |...       |-1           |
|... |...  |...       |   ...   |...       |...          |
+----+-----+----------+---------+----------+-------------+
```

In [57]:
%%spark

# TODO: Create [stocks_train_sdf]

# SIGN() will return -1 if the input is negative, 0 if the input is zero, and 1
# if the input is positive.

# Keep all rows in filter_3_stocks and add another based on the sign of the
# percentage change in stock

filter_3_stocks_sdf.createOrReplaceTempView("filter_3")

stocks_train_sdf = spark.sql(
    '''select *, 
    case when apr_stock > jan_stock then 1 else -1 end as direction
    from filter_3  
    order by direction desc'''
)

stocks_train_sdf.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---------+
|                 org|year|         jan_stock|         feb_stock|         mar_stock|         apr_stock|         may_stock|         jun_stock|         jul_stock|         aug_stock|         sep_stock|         oct_stock|         nov_stock|         dec_stock|direction|
+--------------------+----+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---------+
|    Barclays Capital|2009| 4.517199993133545| 4.066299915313721|6.7245001792907715|12.666000366210938|15.274999618530273|14.586999893188477|             16.25|19.333999633789062|18.702999114990234|16.5

In [58]:
%%spark

#NO TOUCH

## AUTOGRADER Step 3.4.5: Run this to get your score. ##

stocks_train_sdf.createOrReplaceTempView("test_3_4_5")
test_3_4_5_sdf = spark.sql("SELECT * FROM test_3_4_5 ORDER BY org, year LIMIT 10")
to_submit = pd.read_json(test_3_4_5_sdf.toPandas().to_json())
# spark.catalog.dropTempView("test_3_4_5")

# test_3_4_5_sdf.show(10)
#grader.grade(question_id = "3.4.5", answer = test_3_4_5_sdf.take(10))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
%%spark
grader.grade(test_case_id = 'decisive', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 3/3 points. You are a star!

Your submission has been successfully recorded in the gradebook.

### Step 3.5: The C-r-a-z-y Combination

Now that we have individually created the two halves of our training data we will merge them together to create the final training sdf we showed in the beginning of Step 3.

Create an sdf called `training_sdf` in the format of the one shown at the beginning of Step 3. Note that in our definition for the `stock_result` column, the `stock_result` value for a particular year corresponds to the direction of the stock percentage change in the **following** year. For example, the stock_result in the `2008` row for `IBM` will contain the direction of IBM's stock in the first quarter of 2009. The format of the sdf is shown below:
```
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|org |year |jan_hired |   ...   |dec_hired |jan_stock |   ...   |dec_stock |stock_result |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|IBM |2008 |...       |   ...   |...       |...       |   ...   |...       |-1           |
|IBM |2009 |...       |   ...   |...       |...       |   ...   |...       |1            |
|... |...  |...       |   ...   |...       |...       |   ...   |...       |...          |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
```

In [61]:
%%spark

# TODO: Create [training_sdf]

# Our merge will consist of two joins. The first will use filter_3_stocks to
# join the monthly hiring rates and closing prices. The next join will be with
# stock_train and to find stock_result. This join will be done such that the
# correct years are matched between hire_train and stocks_train (think about how 
# to get one year's stock result to be the direction of the stock for the following
# year's first quarter). 

hire_train_sdf.createOrReplaceTempView("hire_train")
stocks_train_sdf.createOrReplaceTempView("stocks_train")

hiring_stock_sdf = spark.sql(
    '''select hire_train.org, hire_train.year, 
    hire_train.jan_hired,
    hire_train.feb_hired,
    hire_train.mar_hired,
    hire_train.apr_hired,
    hire_train.may_hired,
    hire_train.jun_hired,
    hire_train.jul_hired,
    hire_train.aug_hired,
    hire_train.sep_hired,
    hire_train.oct_hired,
    hire_train.nov_hired,
    hire_train.dec_hired,
    filter_3.jan_stock,
    filter_3.feb_stock,
    filter_3.mar_stock,
    filter_3.apr_stock,
    filter_3.may_stock,
    filter_3.jun_stock,
    filter_3.jul_stock,
    filter_3.aug_stock,
    filter_3.sep_stock,
    filter_3.oct_stock,
    filter_3.nov_stock,
    filter_3.dec_stock from hire_train
    join filter_3 on hire_train.org = filter_3.org and hire_train.year=filter_3.year'''
)

hiring_stock_sdf.createOrReplaceTempView("hiring_stock")
# hiring_stock_sdf.show()

training_sdf = spark.sql(
    '''select hiring_stock.*, stocks_train.direction as stock_result from hiring_stock
    join stocks_train on hiring_stock.org=stocks_train.org and hiring_stock.year = stocks_train.year-1'''
)

training_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------+
|      org|year|jan_hired|feb_hired|mar_hired|apr_hired|may_hired|jun_hired|jul_hired|aug_hired|sep_hired|oct_hired|nov_hired|dec_hired|         jan_stock|         feb_stock|         mar_stock|         apr_stock|         may_stock|         jun_stock|         jul_stock|         aug_stock|         sep_stock|         oct_stock|         nov_stock|         dec_stock|stock_result|
+---------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+------------------+------------------+------------------+------------------+------------------+-------------

In [62]:
%%spark

#NO TOUCH

## AUTOGRADER Step 3.5: Run this to get your score. ##

training_sdf.createOrReplaceTempView("test_3_5")
test_3_5_sdf = spark.sql("SELECT * FROM test_3_5 ORDER BY org, year LIMIT 10")
to_submit = pd.read_json(test_3_5_sdf.toPandas().to_json())
# spark.catalog.dropTempView("test_3_5")

# test_3_5_sdf.show(10)
#grader.grade(question_id = "3.5", answer = test_3_5_sdf.take(10))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [63]:
%%spark
grader.grade(test_case_id = 'crazy_craig', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 15/15 points. You are a star!

Your submission has been successfully recorded in the gradebook.

## Step 4: Machine ... Learning?

Well here we go. Who's ready to make some money? Well... it's not gonna happen. We didn't code the random forest model, sorry! The second half of the course will be about scalable machine learning, and we will learn how to take this beautiful data and make billions of dollars.

![Jumping for $$](https://cdn.dribbble.com/users/2749602/screenshots/7065140/shot-cropped-1567085403260.png)

One last thing, as I predicted before, you're a star.


## Optional Extra-Credit Step: Full PageRank on Spark

We've given a basic implementation of MapReduce using Spark's matrix types in:
https://colab.research.google.com/drive/1Mr2zf-Oz6W9kRFrzSN08S1lo14mGYnw2

Your task for extra credit, worth up to 5 points, is to take this basic example and flesh it out:

1. You should write a function called `pagerank` that takes two inputs: (1) a Spark dataframe `df` conforming to the schema of `initial_graph`, (2) an integer `n` specifying the max number of iterations until termination.  It **returns a Spark dataframe** with two columns: `node_id` and `pagerank` (the latter can be of type double), where the latter is the PageRank score after $n$ iterations.

2. Your PageRank algorithm should incorporate the standard "decay factor" as we've described in the lecture slides.  Use the standard value $\alpha=0.85$.  It should use Apache Spark matrices to do the computation.

3. Your PageRank algorithm should remove sinks and self-loops.

# HW Submission

**Double check that you have the correct PennID (all numbers) in the autograder**. 

Go to the "File" tab at the top left, and click "Download .ipynb". Zip it (name doesn't matter) and submit it to OpenSubmit. 

You must submit your notebook to receive credit.

**On OpenSubmit, go to Settings and make sure to set your Student ID to your PennID (all numbers)**.