d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px; height: 163px">
</div>

# CSE Coding Assignment
## Instructions

- Please answer all questions
- You can use any language you wish (e.g. Python, Scala, SQL...)
- Several Markdown cells require completion. Please edit the Markdown cells to include your answer.
- Your final notebook should compile without errors when you click "Run All"

**Please do not publish questions. This is a confidential assignment.**

### Creating a Cluster

You will need to create a Databricks Cluster. More information on this process is available here: https://docs.databricks.com/user-guide/clusters/create.html

## Getting Started

**REQUIRED:** Run the following cells exactly as written to retrieve the necessary Coding Assignment Data Sets from Amazon S3.

In [0]:
%sh curl --remote-name-all 'https://files.training.databricks.com/assessments/cse-take-home/{covertype,kafka,treecover,u.data,u.item}.csv'

In [0]:
dbutils.fs.cp("file:/databricks/driver/covertype.csv", "dbfs:/FileStore/tmp/covertype.csv")
dbutils.fs.cp("file:/databricks/driver/kafka.csv", "dbfs:/FileStore/tmp/kafka.csv")
dbutils.fs.cp("file:/databricks/driver/treecover.csv", "dbfs:/FileStore/tmp/treecover.csv")
dbutils.fs.cp("file:/databricks/driver/u.data.csv", "dbfs:/FileStore/tmp/u.data.csv")
dbutils.fs.cp("file:/databricks/driver/u.item.csv", "dbfs:/FileStore/tmp/u.item.csv")

## Part 1: Reading and Parsing Data

### Question 1:  Code Challenge - Load a CSV

- Load the CSV file at `dbfs:/FileStore/tmp/nl/treecover.csv` into a DataFrame.
- Use Apache Spark to read in the data, assigned to the variable `treeCoverDF`.
- Your method to get the CSV file into Databricks isn't graded. We are only concerned with how you use Spark to parse and load the actual data. 
- Please use the `inferSchema` option.

In [0]:
# YOUR CODE HERE
treeCoverDF = spark.read.format("csv").option("header","true").load("dbfs:/FileStore/tmp/treecover.csv")
treeCoverDF.show()

### Question 2:  Code Challenge - Print the Schema

Use Apache Spark to display the Schema of the `treeCoverDF` Dataframe.

In [0]:
treeCoverDF.printSchema()


### Question 3:  Code Challenge - Rows & Columns

Use Apache Spark to display the number of rows and columns in the DataFrame.

In [0]:
#len(treeCoverDF.columns)
treeCoverDF.count()

#Part 2: Analysis

### Question 4:  Code Challenge - Summary Statistics for a Feature

Use Apache Spark to answer these questions about the `treeCoverDF` DataFrame:
- What is the range - minimum and maximum - of values for the feature `elevation`?
- What are the mean and standard deviation of the feature `elevation`?

In [0]:
from pyspark.sql.functions import col, asc, sqrt
import math
maximum = treeCoverDF.agg({"Elevation": "max"}).collect()[0][0]#3849
minimum = treeCoverDF.agg({"Elevation": "min"}).collect()[0][0]#1863
sumElevation = treeCoverDF.agg({"Elevation":"sum"}).collect()[0][0]#41569757
countElevation = treeCoverDF.agg({"Elevation":"count"}).collect()[0][0]#15120
mean = float(sumElevation/countElevation)
print(mean)#2749.3225529100528
df1 = treeCoverDF.withColumn("Elevation Square diff", (treeCoverDF.Elevation - mean)**2)
sumElevationMeanSquared = df1.agg({"Elevation Square diff":"sum"}).collect()[0][0]#2637586175.909463
print(float(sumElevationMeanSquared/countElevation))#174443.53015274226
print(math.sqrt(float(sumElevationMeanSquared/countElevation)))#417.6643750102973

### Answer #4:

- Min `elevation`: `1863`
- Max `elevation`: `3849`
- Mean `elevation`: `2749.3225529100528`
- Standard Deviation of `elevation`: `417.6643750102973`

### Question 5:  Code Challenge - Record Count

Use Apache Spark to answer the following question:
- How many entries in the dataset have an `elevation` greater than or equal to 2749.32 meters **AND** a `Cover_Type` of 1 or 2?

In [0]:
whereDF = treeCoverDF.select("Elevation").where((col("Elevation") >= "2749.32") & ((col("Cover_Type") == "1") | (col("Cover_Type") == "2"))).count()
print(whereDF)

### Question 6: Code Challenge - Compute a Percentage

Use Apache Spark to answer the following question:
- What percentage of entries with `Cover_Type` 1 or 2 have an `elevation` at or above 2749.32 meters?

In [0]:
print(float((whereDF/treeCoverDF.count())*100))

### Question 7: Code Challenge - Visualize Feature Distribution

Use any [visualization tool available in the Databricks Runtime](https://docs.databricks.com/user-guide/visualizations/index.html) to generate the following visualization:

- a bar chart that helps visualize the distribution of different Wilderness Areas in our dataset

In [0]:
wilderness_distribute = treeCoverDF.groupBy("Wilderness_Area").count().withColumnRenamed('count', 'cnt_per_group').withColumn('perc_of_count_total', (col('cnt_per_group') / treeCoverDF.count()) * 100 )
#percentage_distribute = wilderness_distribute.withColumn("percentage",float(wilderness_distribute.count()/treeCoverDF.count()))
display(wilderness_distribute)

Wilderness_Area,cnt_per_group,perc_of_count_total
3,6349,41.99074074074074
1,3597,23.789682539682538
4,4675,30.919312169312168
2,499,3.3002645502645507


### Question 8: Code Challenge - Visualize Average Elevation by Cover Type 

Use any [visualization tool available in the Databricks Runtime](https://docs.databricks.com/user-guide/visualizations/index.html) to generate the following visualization:

- a bar chart showing the average elevation of each cover type with string labels for cover type

**NOTE: you will need to match the integer values in the column `treeCoverDF.Cover_Type` to the string values in `dbfs:/FileStore/tmp/nl/covertype.csv` to retrieve the Cover Type Labels. It is recommended to use an Apache Spark join.**

In [0]:
from pyspark.sql.types import DoubleType
coverTypeDF =  spark.read.format("csv").option("header","true").load("dbfs:/FileStore/tmp/covertype.csv")
#coverTypeDF.show();
joinDF = treeCoverDF.join(coverTypeDF,coverTypeDF.cover_type_key == treeCoverDF.Cover_Type,"inner")
changedTypedf = joinDF.withColumn("label", joinDF["Elevation"].cast(DoubleType()))
avg_elevation = changedTypedf.groupBy(["cover_type_key","cover_type_label"]).avg("label")
display(avg_elevation)

cover_type_key,cover_type_label,avg(label)
3,Ponderosa Pine,2398.4231481481484
6,Douglas-fir,2423.276851851852
5,Aspen,2786.801388888889
1,Spruce/Fir,3128.025925925926
4,Cottonwood/Willow,2223.42037037037
2,Lodgepole Pine,2922.5402777777776
7,Krummholz,3362.769907407408


#Part 3: Data Ingestion, Cleansing, and Transformations

## Instructions 

This is a multi-step, data pipeline question in which you need to achieve a few objectives to build a successful job.

### Data Sets

#### `u.data.csv`

- The full u data set, 100000 ratings by 943 users on 1682 items. 
- Each user has rated at least 20 movies.  
- Users and items are numbered consecutively from 1. 
- The data is randomly ordered. 
- This is a tab separated file consisting of four columns: 
   - user id 
   - movie id 
   - rating 
   - date (unix seconds since 1/1/1970 UTC)

#### Desired schema

- `user_id INTEGER`
- `movie_id INTEGER`
- `rating INTEGER`
- `date DATE `

#### `u.item.csv`

- This is a `|` separated file consisting of six columns:
   - movie id
   - movie title
   - release date
   - video release date
   - IMDb URL
   - genre
- movie ids in this file match movie ids in `u.data`.

#### Desired schema

- `movie_id INTEGER`
- `movie_title STRING`

### Question 9:  Code Challenge - Load DataFrames

Use Apache Spark to perform the following:
1. define the correct schemas for each Data Set to be imported as described above  
   **note:** 
      - for `u.data.csv`, `date` *must* be stored using `DateType` with the format `yyyy-MM-dd`
      - you may need to ingest `timestamp` data using `IntegerType`
      - be sure to drop unneccesary columns for `u.item.csv`
1. import the two files as DataFrames names `uDataDF` and `uItemDF` using the schemas you defined and these paths:
   - `dbfs:/FileStore/tmp/u.data.csv`
   - `dbfs:/FileStore/tmp/u.item.csv`
1. order the `uDataDF` DataFrame by the `date` column

**NOTE:** Please display the DataFrames, `uDataDF` and `uItemDF` after loading.

#### `uDataDF`

In [0]:
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType, DateType
from pyspark.sql.functions import *
schema = StructType([StructField("user_id", StringType(), False),
                     StructField("movie_id", IntegerType(), False),
                     StructField("rating", IntegerType(),False),
                     StructField("timestamp",IntegerType(),False),
                     StructField("date", DateType(),False)
                    ])
uData = spark.read.format("csv").option("delimiter", "\t").option("timestamp", "yyyy/MM/dd HH:mm:ss").schema(schema).load("dbfs:/FileStore/tmp/u.data.csv")
uData = uData.withColumn("date",from_unixtime('timestamp').cast(DateType()))
uData = uData.drop('timestamp')
uData.printSchema()
uData.show()


#### `uItemDF`

In [0]:
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType, DateType
from pyspark.sql.functions import *

schema = StructType([StructField("movie_id", IntegerType(), False),
                     StructField("movie_title", StringType(), False),
                     StructField("release_date", IntegerType(),False),
                     StructField("video_release_date",IntegerType(),False),
                     StructField("imdb_url", StringType(),False),
                     StructField("genre", IntegerType(),False)
                    ])
uItem = spark.read.format("csv").option("delimiter", "|").schema(schema).load("dbfs:/FileStore/tmp/u.item.csv")
#uItem = uItem.withColumn("date",from_unixtime('release_date').cast(DateType()))
#uItem = uItem.drop('timestamp')
uItem.printSchema()
uItem.show()


### Question 10:  Code Challenge - Perform a Join

Use Apache Spark to do the following:
- join `uDataDF` and `uItemDf` on `movie_id` as a new DataFrame called `uMovieDF`  
   **note:** make sure you do not create duplicate `movie_id` columns
   
**NOTE:** Please display the DataFrame `uMovieDF`.

In [0]:
uMovieDF = uData.join(uItem,uData.movie_id == uItem.movie_id,"inner").drop(uItem.movie_id)
display(uMovieDF)

user_id,movie_id,rating,date,movie_title,release_date,video_release_date,imdb_url,genre
196,242,3,1997-12-04,Kolya (1996),,,http://us.imdb.com/M/title-exact?Kolya%20(1996),0
186,302,3,1998-04-04,L.A. Confidential (1997),,,http://us.imdb.com/M/title-exact?L%2EA%2E+Confidential+(1997),0
22,377,1,1997-11-07,Heavyweights (1994),,,http://us.imdb.com/M/title-exact?Heavyweights%20(1994),0
244,51,2,1997-11-27,Legends of the Fall (1994),,,http://us.imdb.com/M/title-exact?Legends%20of%20the%20Fall%20(1994),0
166,346,1,1998-02-02,Jackie Brown (1997),,,http://us.imdb.com/M/title-exact?imdb-title-119396,0
298,474,4,1998-01-07,Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963),,,http://us.imdb.com/M/title-exact?Dr.%20Strangelove%20or:%20How%20I%20Learned%20to%20Stop%20Worrying%20and%20Love%20the%20Bomb%20(1963),0
115,265,2,1997-12-03,"Hunt for Red October, The (1990)",,,http://us.imdb.com/M/title-exact?Hunt+for+Red+October%2C+The+(1990),0
253,465,5,1998-04-03,"Jungle Book, The (1994)",,,"http://us.imdb.com/M/title-exact?Jungle%20Book,%20The%20(1994)",0
305,451,3,1998-02-01,Grease (1978),,,http://us.imdb.com/M/title-exact?Grease%20(1978),0
6,86,3,1997-12-31,"Remains of the Day, The (1993)",,,"http://us.imdb.com/M/title-exact?Remains%20of%20the%20Day,%20The%20(1993)",0


### Question 11:  Code Challenge - Perform an Aggregation

Use Apache Spark to do the following:
1. create an aggregate DataFrame, `aggDF` by
  1. extracting the year from the `date` (of the review)
  1. getting the average rating of each film per year as a column named `average_rating`
  1. ordering descending by year and average rating
1. write the resulting dataframe to a table named "movie_by_year_average_rating" in the Default database  
   **note:** use `mode(overwrite)` 

#### Desired Schema
The schema of you resulting DataFrame should be:
- `year INTEGER`
- `movie_title STRING`
- `average_rating DOUBLE`

**NOTE:** Please display the DataFrame `aggDF`.

In [0]:
from pyspark.sql.functions import to_timestamp,date_format
aggDF = uMovieDF.withColumn("year", date_format(col("date"), "y"))
aggDF=aggDF.groupBy(["movie_title","year"]).avg("rating").withColumnRenamed('avg(rating)','average_rating').orderBy(col("year").desc(),col("average_rating").desc())
aggDF.write.mode("overwrite").saveAsTable("movie_by_year_average_rating")

## Part 4: Fun with JSON

JSON values are typically passed by message brokers such as Kafka or Kinesis in a string encoding. When consumed by a Spark Structured Streaming application, this json must be converted into a nested object in order to be used.

Below is a list of json strings that represents how data might be passed from a message broker.

**Note:** Make sure to run the cell below to retrieve the sample data.

In [0]:
%python


sampleJson = [
 ('{"user":100, "ips" : ["191.168.192.101", "191.168.192.103", "191.168.192.96", "191.168.192.99"]}',), 
 ('{"user":101, "ips" : ["191.168.192.102", "191.168.192.105", "191.168.192.103", "191.168.192.107"]}',), 
 ('{"user":102, "ips" : ["191.168.192.105", "191.168.192.101", "191.168.192.105", "191.168.192.107"]}',), 
 ('{"user":103, "ips" : ["191.168.192.96", "191.168.192.100", "191.168.192.107", "191.168.192.101"]}',), 
 ('{"user":104, "ips" : ["191.168.192.99", "191.168.192.99", "191.168.192.102", "191.168.192.99"]}',), 
 ('{"user":105, "ips" : ["191.168.192.99", "191.168.192.99", "191.168.192.100", "191.168.192.96"]}',), 
]

### Question 12:  Code Challenge - Count the IPs

Use any coding techniques known to you to parse this list of JSON strings to answer the following question:
- how many occurrences of each IP address are in this list?

#### Desired Output
Your results should be this:


| ip | count |
|:-:|:-:|
| `191.168.192.96` | `3` |
| `191.168.192.99` | `6` |
| `191.168.192.100` | `2` |
| `191.168.192.101` | `3` |
| `191.168.192.102` | `2` |
| `191.168.192.103` | `2` |
| `191.168.192.105` | `3` |
| `191.168.192.107` | `3` |

**NOTE:** The order of your results is not important.

In [0]:
from pyspark.sql.types import *
rdd = sc.parallelize(sampleJson)
people_df = spark.createDataFrame(rdd,["json"])
schema = StructType([ 
    StructField("user",StringType(),True), 
    StructField("ips",ArrayType(StringType()),True), 
  ])
people_df = people_df.select(from_json(people_df.json,schema).alias("json")).select("json.*")
#display(people_df)
#people_df.printSchema()
str2 = []
str1 = []
def f(x):
  if x not in str1:
    str1.append(x)

for row in people_df.collect():
  list1 = row["ips"]
  #print(row["ips"])
  for x in list1:
    str2.append(x)
    f(x)
#str2 = []
#print(str2)
#print(str1)
rdd = sc.parallelize(str2)
words = rdd.flatMap(lambda line: line.split(","))
words.collect()
pairs = words.map(lambda s:(s,1))
counts = pairs.reduceByKey(lambda a, b: a + b)
newDF = spark.createDataFrame(counts).toDF("IPs","Count")
display(newDF)

IPs,Count
191.168.192.102,2
191.168.192.105,3
191.168.192.103,2
191.168.192.99,6
191.168.192.96,3
191.168.192.101,3
191.168.192.107,3
191.168.192.100,2


## Part 5: The Databricks API

### Question 13: Conceptual Question - the Databricks API

In 4-5 sentences, please explain what the Databricks API is used for at a high-level.

### Answer:

The Databricks REST API 2.0 supports services to manage your Databricks account, clusters, cluster policies, DBFS, Delta Live Tables, global init scripts, groups, pools, instance profiles, IP access lists, jobs, libraries, MLFlow experiments and models, permissions, SCIM settings, secrets, tokens, and workspaces.
We can use different methods to call these databricks APIs like by using Curl command,python,power-shell

### Question 14: Conceptual Question - Explain an API Call

In 4-5 sentences, please explain what this API call. Be sure to discuss some key attributes about the cluster.

```
$ curl -n -X POST -H 'Content-Type: application/json'                      \
  -d '{                                                                     \
  "cluster_name": "high-concurrency-cluster",                               \
  "spark_version": "4.2.x-scala2.11",                                       \
  "node_type_id": "i3.xlarge",                                              \
  "spark_conf":{                                                            \
        "spark.databricks.cluster.profile":"serverless",                    \
        "spark.databricks.repl.allowedLanguages":"sql,python,r"             \
     },                                                                     \
     "aws_attributes":{                                                     \
        "zone_id":"us-west-2c",                                             \
        "first_on_demand":1,                                                \
        "availability":"SPOT_WITH_FALLBACK",                                \
        "spot_bid_price_percent":100                                        \
     },                                                                     \
   "custom_tags":{                                                          \
        "ResourceClass":"Serverless"                                        \
     },                                                                     \
       "autoscale":{                                                        \
        "min_workers":1,                                                    \
        "max_workers":2                                                     \
     },                                                                     \
  "autotermination_minutes":10                                              \
}' https://dogfood.staging.cloud.databricks.com/api/2.0/clusters/create '
```

### Answer:

This is a POST API call which tells the databricks to create a cluster with the information provided in the request body like using cluster name,spark version to be used ,node type or size to be configured,spark configurations like what are the languages allowed and profile of the cluster,AWS attributes like on which availability zone cluster needs to be deployed and spot bid price percent ,autoscaling factor min/max both so that application adjust accordingly with data to be processed and auto termination minutes with 10min so that cluster terminates itself after sitting idle for 10mins.

## Part 6: Security

### Question 15: Conceptual Question - Security on Databricks

Using the Databricks Documentation, what would you recommend to a Databricks and AWS customer for **securely** storing and accessing their data.

### Answer:
--Use S3 Bucket Policies to restrict access to trusted IPs and VPCs
--Leverage AWS PrivateLink or Gateway VPC Endpoints to ensure private connectivity between your Databricks clusters and AWS cloud-native data sources. Use VPC Endpoint Policies to strictly enforce which S3 buckets can be accessed from your Customer-managed VPC, ensuring that you also allow read-only access to the S3 buckets that are required by Databricks.
--Deploy Databricks clusters in your private subnets. With secure cluster connectivity, VPCs require no inbound ports to be open, and cluster infrastructure does not require public IPs to interact with the Control Plane.

# This is the end of the official test. Bonus below!

## Part 7: Bonus: Data Science & Machine Learning

### Question 16: Conceptual Question - A Skewed Feature

One of these lines is the *mean* of this feature. The other is the *median*. Which of these lines is the **mean** - the red line or the black line?

<img width=400px src=https://www.evernote.com/l/AAEycL6CQ0hLi5V5pIo91Ko-Pfk2i0AnGyMB/image.png>

### Answer:

`EDIT THIS MARKDOWN CELL WITH YOUR REPLY`

### Question 17: Conceptual Question - Exploratory Data Analysis

The plots below show the distribution of home selling prices differentiated by a few categorical features. Based on these plots, **which of these categorical features** - Property Type, Exterior Quality, or Month Sold - would you expect to be most associated with Price? **Why**?

<img width=1600px src=https://www.evernote.com/l/AAHulkcc20hHSJV6D1udKiwSDCN0S6oV_5YB/image.png>

### Answer:

`EDIT THIS MARKDOWN CELL WITH YOUR REPLY`

### Question 18: Conceptual Question - Analyze Model Performance

Consider the following results for a decision tree model against training and testing data sets:

`decision tree regression - train r2 score: 0.9944`  
`decision tree regression - test r2 score:  0.3119`


What is your assessment of this model?

### Answer:

`EDIT THIS MARKDOWN CELL WITH YOUR REPLY`

### Question 19: Conceptual Question - Model Selection

A series of models has been built using the same training data, but each with a subset of features.

Consider the following results for a series of logistic regression models and then answer this question:

- Which model would you choose and why?
- What other things would you want to look at and why?

| model number | feature subset | logistic regression test accuracy|
|:-:|:-:|:-:|
| 1| feat_1 |	 0.631|
| 2| feat_2 |	 0.552|
| 3| feat_3 |	 0.868|
| 4| feat_4 |	 0.868|
| 5| feat_1, feat_2 |	 0.657|
| 6| feat_1, feat_3 |	 0.947|
| 7| feat_1, feat_4 |	 0.921|
| 8| feat_2, feat_3 |	 0.947|
| 9| feat_2, feat_4 |	 0.973|
| 10| feat_3, feat_4 |	 0.947|
| 11| feat_1, feat_2, feat_3 |	 0.947|
| 12| feat_1, feat_2, feat_4 |	 0.947|
| 13| feat_1, feat_3, feat_4 |	 0.947|
| 14| feat_2, feat_3, feat_4 |	 0.973|
| 15| feat_1, feat_2, feat_3, feat_4 |	 0.973|

### Answer:

`EDIT THIS MARKDOWN CELL WITH YOUR REPLY`

-sandbox
&copy; 2019 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>