# **RSE/RSA Coding Challengs**

## Exam Instructions

 1. Sign up to this databricks community edition in https://community.cloud.databricks.com. Import this html notebook to your home folder. It will automatically be imported as a Scala notebook.
 1. This exam has various questions in 9 different Sections. Unless otherwise instructed, solve as many of the problems below as you can within the alloted time frame. Some of the challenges are more advanced than others and are expected to take more time to solve. Please note that Section 9 is Mandatory to answer.
 1. You can create as many notebooks as you would like to answer the challenges 
 1. Notebooks should be presentable and should be able to execute succesfully with `Run All`
 1. Notebooks should be idempotent as well. Ideally, you'll also clean up after yourself (i.e. drop your tables)
 1. Once completed, export your notebook(s) as *.html file with full results and email to ``vgiri@databricks.com`` (Giri Varatharajan) cc the Recruiting and Hiring team.
 1. Please note answers won't be accpeted if they were copied as it is from online sources.

# Tips and Instructions
Read through the Databricks Guide for Notebook Usage and Other usage instructions in http://docs.databricks.com/

####  Available  Context objects
spark object is available in databricks notebook by default. Please do not explicitly create sc, spark or sqlContext.

In [0]:
%scala
spark

#### Using SQL in your cells

You can change to native SQL mode in your cells using the `%sql` prefix, demonstrated in the example below. Note that these include visualizations by default.

In [0]:
%sql
--select * from diamonds limit 50

#### Using Python in your cells

In [0]:
%python
import numpy as np

#### Using Scala in your cells

In [0]:
%scala
val a = sc.parallelize(1 to 5).take(5)

#### Creating Visualizations from non-SQL Cells

When you needs to create a visualization from a cell where you are not writing native SQL, use the `display` function, as demonstrated below.

In [0]:

# // scala
# // val same_query_as_above = spark.sql("select cut, count(color) as cnt from diamonds group by cut, color ")
# // display(same_query_as_above)

##                                       Coding Challenges Starts Here!!..Wish you All the Best!!
---

# Section 1 = > TPC-H Dataset
You're provided with a TPCH data set. The data is located in `/databricks-datasets/tpch/data-001/`. You can see the directory structure below:

In [0]:
# enable AQE
spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)

In [0]:
%scala
display(dbutils.fs.ls("/databricks-datasets/tpch/data-001/"))

path,name,size,modificationTime
dbfs:/databricks-datasets/tpch/data-001/README.md,README.md,236,1419281876000
dbfs:/databricks-datasets/tpch/data-001/customer/,customer/,0,0
dbfs:/databricks-datasets/tpch/data-001/lineitem/,lineitem/,0,0
dbfs:/databricks-datasets/tpch/data-001/nation/,nation/,0,0
dbfs:/databricks-datasets/tpch/data-001/orders/,orders/,0,0
dbfs:/databricks-datasets/tpch/data-001/part/,part/,0,0
dbfs:/databricks-datasets/tpch/data-001/partsupp/,partsupp/,0,0
dbfs:/databricks-datasets/tpch/data-001/region/,region/,0,0
dbfs:/databricks-datasets/tpch/data-001/supplier/,supplier/,0,0


As you can see above, this dataset consists of 8 different folders with different datasets. The schema of each dataset is demonstrated below:

![test](http://kejser.org/wp-content/uploads/2014/06/image_thumb2.png)

You can take a quick look at each dataset by running the following Spark commmand. Feel free to explore and get familiar with this dataset

In [0]:
%scala
//sc.textFile("/databricks-datasets/tpch/data-001/partsupp/").take(100).foreach(println)

#### **Question #1**: Joins in Core Spark
Pick any two datasets and join them using Spark's API. Feel free to pick any two datasets. For example: `PART` and `PARTSUPP`. The goal of this exercise is not to derive anything meaningful out of this data but to demonstrate how to use Spark to join two datasets. For this problem you're **NOT allowed to use SparkSQL**. You can only use RDD API. You can use either Python or Scala to solve this problem.

In [0]:
# create a sparkContext
sc = spark.sparkContext

**Read the data for the partsupp.tbl**

In [0]:
display(dbutils.fs.ls("/databricks-datasets/tpch/data-001/partsupp"))

path,name,size,modificationTime
dbfs:/databricks-datasets/tpch/data-001/partsupp/partsupp.tbl,partsupp.tbl,599829166,1419285399000


In [0]:
# import Row
from pyspark.sql import Row

# read the file
raw_data = sc.textFile("/databricks-datasets/tpch/data-001/partsupp/partsupp.tbl")
csv_data = raw_data.map(lambda l: l.split("|"))
row_data = csv_data.map(lambda p: Row(
                        part_key        =int(p[0]), 
                        supply_key      =int(p[1]),
                        avail_qty       =int(p[2]),
                        supply_cost     =float(p[3]),
                        part_comment    =p[4]
    )
)

partsupp_df = spark.createDataFrame(row_data)

display(partsupp_df)

part_key,supply_key,avail_qty,supply_cost,part_comment
1,2,3325,771.64,", even theodolites. regular, final theodolites eat after the carefully pending foxes. furiously regular deposits sleep slyly. carefully bold realms above the ironic dependencies haggle careful"
1,12502,8076,993.49,ven ideas. quickly even packages print. pending multipliers must have to are fluff
1,25002,3956,337.09,"after the fluffily ironic deposits? blithely special dependencies integrate furiously even excuses. blithely silent theodolites could have to haggle pending, express requests; fu"
1,37502,4069,357.84,"al, regular dependencies serve carefully after the quickly final pinto beans. furiously even deposits sleep quickly final, silent pinto beans. fluffily reg"
2,3,8895,378.49,"nic accounts. final accounts sleep furiously about the ironic, bold packages. regular, regular accounts"
2,12503,4969,915.27,ptotes. quickly pending dependencies integrate furiously. fluffily ironic ideas impress blithely above the express accounts. furiously even epitaphs need to wak
2,25003,8539,438.37,blithely bold ideas. furiously stealthy packages sleep fluffily. slyly special deposits snooze furiously carefully regular accounts. regular deposits according to the accounts nag carefully slyl
2,37503,3025,306.39,"olites. deposits wake carefully. even, express requests cajole. carefully regular ex"
3,4,4651,920.92,ilent foxes affix furiously quickly unusual requests. even packages across the carefully even theodolites nag above the sp
3,12504,4093,498.13,"ending dependencies haggle fluffily. regular deposits boost quickly carefully regular requests. deposits affix furiously around the pinto beans. ironic, unusual platelets across the p"


**Read the data for the part.tbl**

In [0]:
display(dbutils.fs.ls("/databricks-datasets/tpch/data-001/part"))

path,name,size,modificationTime
dbfs:/databricks-datasets/tpch/data-001/part/part.tbl,part.tbl,121121810,1419285386000


In [0]:
# import Row
from pyspark.sql import Row

# read the file
raw_data = sc.textFile("/databricks-datasets/tpch/data-001/part/part.tbl")
csv_data = raw_data.map(lambda l: l.split("|"))
row_data = csv_data.map(lambda p: Row(
                        part_key           =int(p[0]),
                        part_name          =p[1],
                        part_mfgr          =p[2],
                        part_brand         =p[3], 
                        part_type          =p[4],
                        part_size          =int(p[5]),
                        part_container     =p[6],
                        part_retailprice   =float(p[7]), 
                        part_comment       =p[8]
    )
)

part_df = spark.createDataFrame(row_data)

display(part_df)

part_key,part_name,part_mfgr,part_brand,part_type,part_size,part_container,part_retailprice,part_comment
1,goldenrod lavender spring chocolate lace,Manufacturer#1,Brand#13,PROMO BURNISHED COPPER,7,JUMBO PKG,901.0,ly. slyly ironi
2,blush thistle blue yellow saddle,Manufacturer#1,Brand#13,LARGE BRUSHED BRASS,1,LG CASE,902.0,lar accounts amo
3,spring green yellow purple cornsilk,Manufacturer#4,Brand#42,STANDARD POLISHED BRASS,21,WRAP CASE,903.0,egular deposits hag
4,cornflower chocolate smoke green pink,Manufacturer#3,Brand#34,SMALL PLATED BRASS,14,MED DRUM,904.0,p furiously r
5,forest brown coral puff cream,Manufacturer#3,Brand#32,STANDARD POLISHED TIN,15,SM PKG,905.0,wake carefully
6,bisque cornflower lawn forest magenta,Manufacturer#2,Brand#24,PROMO PLATED STEEL,4,MED BAG,906.0,sual a
7,moccasin green thistle khaki floral,Manufacturer#1,Brand#11,SMALL PLATED COPPER,45,SM BAG,907.0,lyly. ex
8,misty lace thistle snow royal,Manufacturer#4,Brand#44,PROMO BURNISHED TIN,41,LG DRUM,908.0,eposi
9,thistle dim navajo dark gainsboro,Manufacturer#4,Brand#43,SMALL BURNISHED STEEL,12,WRAP CASE,909.0,ironic foxe
10,linen pink saddle puff powder,Manufacturer#5,Brand#54,LARGE BURNISHED STEEL,44,LG CAN,910.01,ithely final deposit


**Join part_df and partsupp_df using pyspark API**

In [0]:
part_partsupp_df = part_df.join(partsupp_df, part_df["part_key"] == partsupp_df["part_key"], how="inner")
display(part_partsupp_df)

part_key,part_name,part_mfgr,part_brand,part_type,part_size,part_container,part_retailprice,part_comment,part_key.1,supply_key,avail_qty,supply_cost,part_comment.1
7,moccasin green thistle khaki floral,Manufacturer#1,Brand#11,SMALL PLATED COPPER,45,SM BAG,907.0,lyly. ex,7,8,7454,763.98,y express tithes haggle furiously even foxes. furiously ironic deposits sleep toward the furiously unusual
7,moccasin green thistle khaki floral,Manufacturer#1,Brand#11,SMALL PLATED COPPER,45,SM BAG,907.0,lyly. ex,7,12508,2770,149.66,hould have to nag after the blithely final asymptotes. fluffily spe
7,moccasin green thistle khaki floral,Manufacturer#1,Brand#11,SMALL PLATED COPPER,45,SM BAG,907.0,lyly. ex,7,25008,3377,68.77,usly against the daring asymptotes. slyly regular platelets sleep quickly blithely regular deposits. boldly regular deposits wake blithely ironic accounts
7,moccasin green thistle khaki floral,Manufacturer#1,Brand#11,SMALL PLATED COPPER,45,SM BAG,907.0,lyly. ex,7,37508,9460,299.58,". furiously final ideas hinder slyly among the ironic, final packages. blithely ironic dependencies cajole pending requests: blithely even packa"
19,chocolate navy tan deep brown,Manufacturer#2,Brand#23,SMALL ANODIZED NICKEL,33,WRAP BOX,919.01,pending acc,19,20,1416,144.8,"o beans. even packages nag boldly according to the bold, special deposits. ironic packages after the pinto beans nag above the quickly ironic requests. bl"
19,chocolate navy tan deep brown,Manufacturer#2,Brand#23,SMALL ANODIZED NICKEL,33,WRAP BOX,919.01,pending acc,19,12520,5467,405.7,nstructions use furiously. fluffily regular excuses wake. slyly special grouches are carefully regular Tiresias. regular requests use about the quickly furio
19,chocolate navy tan deep brown,Manufacturer#2,Brand#23,SMALL ANODIZED NICKEL,33,WRAP BOX,919.01,pending acc,19,25020,8800,635.66,"sual requests sleep carefully. deposits cajole carefully over the regular, regular requests. quickly unusual asymptotes use some"
19,chocolate navy tan deep brown,Manufacturer#2,Brand#23,SMALL ANODIZED NICKEL,33,WRAP BOX,919.01,pending acc,19,37520,1340,346.92,"requests. final, pending realms use carefully; slyly dogged foxes impress fluffily above the blithely regular deposits. ironic, regular courts wake carefully. bold requests impress"
22,medium forest blue ghost black,Manufacturer#4,Brand#43,PROMO POLISHED BRASS,19,LG DRUM,922.02,even p,22,23,4410,786.18,"even accounts. final excuses try to sleep regular, even packages. carefully express dolphins cajole; furiously special pinto bea"
22,medium forest blue ghost black,Manufacturer#4,Brand#43,PROMO POLISHED BRASS,19,LG DRUM,922.02,even p,22,12523,9779,635.84,l instructions cajole across the blithely special deposits. blithely pending accounts use thinly slyly final requests. instructions haggle. pinto beans sleep along the slyly pen


#### **Question #2**: Joins With Spark SQL
Pick any two datasets and join them using SparkSQL API. Feel free to pick any two datasets. For example: PART and PARTSUPP. The goal of this exercise is not to derive anything meaningful out of this data but to demonstrate how to use Spark to join two datasets. For this problem you're **NOT allowed to use the RDD API**. You can only use SparkSQL API. You can use either Python or Scala to solve this problem.

**Create a temporary table for partsupp**

In [0]:
%sql
-- create a temp table for the partupp
CREATE TEMPORARY TABLE partsupp(part_key int, supply_key int, avail_qty int, supply_cost decimal, part_comment string)
USING com.databricks.spark.csv
OPTIONS (header = "false", delimiter = "|")
LOCATION "/databricks-datasets/tpch/data-001/partsupp/partsupp.tbl";

**Create a temporary table for part**

In [0]:
%sql
-- create a temp table for the part dataset
CREATE TEMPORARY TABLE part(part_key int, part_name string, part_mfgr string, part_brand string, part_type string,
                             part_size int, part_container string, part_retailprice double, part_comment string)
USING com.databricks.spark.csv
OPTIONS (header = "false", delimiter  = "|")
LOCATION "/databricks-datasets/tpch/data-001/part/part.tbl";

**Join the part and partsupp tables using SparkSQL**

In [0]:
%sql
SELECT ps.part_key, ps.supply_key, ps.avail_qty, ps.supply_cost, ps.part_comment,
       p.part_name, p.part_mfgr, p.part_brand, p.part_type, p.part_size, p.part_container, p.part_retailprice, p.part_comment
FROM partsupp ps
INNER JOIN part p
ON ps.part_key = p.part_key

part_key,supply_key,avail_qty,supply_cost,part_comment,part_name,part_mfgr,part_brand,part_type,part_size,part_container,part_retailprice,part_comment.1
26,27,5020,684,es. fluffily express deposits kindle slyly accounts. slyly ironic requests wake blithely bold ideas,beige frosted moccasin chocolate snow,Manufacturer#3,Brand#32,SMALL BRUSHED STEEL,32,SM CASE,926.02,instructions i
26,12527,6577,892,riously pending pinto beans. furiously express instructions detect slyly according to the b,beige frosted moccasin chocolate snow,Manufacturer#3,Brand#32,SMALL BRUSHED STEEL,32,SM CASE,926.02,instructions i
26,25027,3499,382,imes even pinto beans among the busily ironic accounts doubt blithely quickly final courts. furiously fluffy packages despite the carefully even plate,beige frosted moccasin chocolate snow,Manufacturer#3,Brand#32,SMALL BRUSHED STEEL,32,SM CASE,926.02,instructions i
26,37527,9702,822,behind the blithely regular courts impress after the silent sheaves. bravely final ideas haggle,beige frosted moccasin chocolate snow,Manufacturer#3,Brand#32,SMALL BRUSHED STEEL,32,SM CASE,926.02,instructions i
27,28,2111,444,"the even, ironic deposits. theodolites along the ironic, final dolphins cajole slyly quickly bold asymptotes. furiously regular theodolites integrate furiously furiously bold requests. carefully",saddle puff beige linen yellow,Manufacturer#1,Brand#14,LARGE ANODIZED TIN,20,MED PKG,927.02,s wake. ir
27,12528,9080,157,"ole express, final requests. carefully regular packages lose about the regular pinto beans. blithely re",saddle puff beige linen yellow,Manufacturer#1,Brand#14,LARGE ANODIZED TIN,20,MED PKG,927.02,s wake. ir
27,25028,3407,151,"ironic theodolites are by the furiously bold ideas. ironic requests shall have to sublate final packages. furiously quick foxes alongside of the express, special deposits was boldly according",saddle puff beige linen yellow,Manufacturer#1,Brand#14,LARGE ANODIZED TIN,20,MED PKG,927.02,s wake. ir
27,37528,4283,349,ound the final foxes detect furiously across the even warhorses. quickly t,saddle puff beige linen yellow,Manufacturer#1,Brand#14,LARGE ANODIZED TIN,20,MED PKG,927.02,s wake. ir
28,29,6643,205,y ironic deposits above the slyly final deposits sleep furiously above the final deposits. quickly even i,navajo yellow drab white misty,Manufacturer#4,Brand#44,SMALL PLATED COPPER,19,JUMBO PKG,928.02,"x-ray pending, iron"
28,12529,2452,745,ully regular theodolites haggle about the blithely pending packages. carefully ironic sentiments use quickly around the blithely silent requests. slyly ironic frays bo,navajo yellow drab white misty,Manufacturer#4,Brand#44,SMALL PLATED COPPER,19,JUMBO PKG,928.02,"x-ray pending, iron"


#### **Question #3**: Alternate Data Formats
The given dataset above is in raw text storage format. What other data storage format can you suggest to optimize the performance of our Spark workload if we were to frequently scan and read this dataset. Please come up with a code example and explain why you decide to go with this approach. Please note that there's no completely correct answer here. We're interested to hear your thoughts and see the implementation details.shell/1282

I will prefer delta as the preferred data storage format in this case. So I can write these data to delta format and subsequently read the data file formats

In [0]:
part_df.write.format("delta").mode("overwrite").saveAsTable("default.part")
partsupp_df.write.format("delta").mode("overwrite").saveAsTable("default.partsupp")

# Section 2 = > Baby Names Dataset

This dataset comes from a website referenced by [Data.gov](http://catalog.data.gov/dataset/baby-names-beginning-2007). It lists baby names used in the state of NY from 2007 to 2012. Use this JSON file as an input and answer the 3 questions accordingly.

https://health.data.ny.gov/api/views/jxy9-yhdk/rows.json

#### **Question #1**: Spark SQL's Native JSON Support
Use Spark SQL's native JSON support to create a temp table you can use to query the data (you'll use the `registerTempTable` operation). Show a simple sample query.

In [0]:
sc = spark.sparkContext

In [0]:
from pyspark.sql import SparkSession, functions as F
from urllib.request import urlopen

url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.json"
jsonData = urlopen(url).read().decode('utf-8')
rdd = sc.parallelize([jsonData])
df = spark.read.json(rdd)

In [0]:
# Select just the data portion of the json file
from pyspark.sql.functions import explode, col
baby_names_raw_df = df.select(explode("data").alias("data"))
# create a temp view for this raw data
baby_names_raw_df.createOrReplaceTempView("baby_names_raw")

In [0]:
%sql
Select * from baby_names_raw

data
"List(row-brkm-7izk-trjm, 00000000-0000-0000-4F52-2DEB83640FD1, 0, 1611674742, null, 1611674742, null, { }, 2018, OLIVIA, Albany, F, 17)"
"List(row-2m5x_rpr2.gwvc, 00000000-0000-0000-1EA7-DD209008AB39, 0, 1611674742, null, 1611674742, null, { }, 2018, AVA, Albany, F, 17)"
"List(row-xcx9~hw65_ib5p, 00000000-0000-0000-B9C1-8C79EAD9373A, 0, 1611674742, null, 1611674742, null, { }, 2018, ISABELLA, Albany, F, 15)"
"List(row-684m~4agu.tevb, 00000000-0000-0000-2B4E-17FEA39BCFA9, 0, 1611674742, null, 1611674742, null, { }, 2018, EMMA, Albany, F, 14)"
"List(row-jgz3~57z7_bxvk, 00000000-0000-0000-148F-D9BDB1F240A9, 0, 1611674742, null, 1611674742, null, { }, 2018, ELLA, Albany, F, 14)"
"List(row-wd5t.hgpt_y3ca, 00000000-0000-0000-B33D-FBFDDD4A2ED9, 0, 1611674742, null, 1611674742, null, { }, 2018, SOPHIA, Albany, F, 13)"
"List(row-nx5n_6sgb-9zw6, 00000000-0000-0000-E6B5-81172AF1194C, 0, 1611674742, null, 1611674742, null, { }, 2018, CHARLOTTE, Albany, F, 13)"
"List(row-a7si_ssat_idv7, 00000000-0000-0000-5404-3A8960FEC50A, 0, 1611674742, null, 1611674742, null, { }, 2018, ABIGAIL, Albany, F, 13)"
"List(row-2wv2_td5s.4ez7, 00000000-0000-0000-28D7-F41FABE3E982, 0, 1611674742, null, 1611674742, null, { }, 2018, EVELYN, Albany, F, 11)"
"List(row-fcck.7stn~2bpy, 00000000-0000-0000-492D-E5EE5BEF7EA9, 0, 1611674742, null, 1611674742, null, { }, 2018, AMELIA, Albany, F, 9)"


#### **Question #2**: Working with Nested Data
What does the nested schema of this dataset look like? How can you bring these nested fields up to the top level in a DataFrame?

In [0]:
baby_names_raw_df.printSchema()

root
 |-- data: array (nullable = true)
 |    |-- element: string (containsNull = true)



Bring these nested fields up to the top level in a dataframe

In [0]:
query = """
        SELECT data[0] AS sid, data[1] AS id, data[2] AS position, data[3] AS created_at, data[4] AS created_meta, 
               data[5] AS updated_at, data[6] AS updated_meta, data[7] AS meta, data[8] AS year, data[9] AS first_name, 
               data[10] AS country, data[11] AS sex, data[12] AS name_count FROM baby_names_raw
"""
baby_names_df = spark.sql(query)
# create a temporary view on the dataframe
baby_names_df.createOrReplaceTempView("baby_names")

In [0]:
%sql
select * from baby_names

sid,id,position,created_at,created_meta,updated_at,updated_meta,meta,year,first_name,country,sex,name_count
row-brkm-7izk-trjm,00000000-0000-0000-4F52-2DEB83640FD1,0,1611674742,,1611674742,,{ },2018,OLIVIA,Albany,F,17
row-2m5x_rpr2.gwvc,00000000-0000-0000-1EA7-DD209008AB39,0,1611674742,,1611674742,,{ },2018,AVA,Albany,F,17
row-xcx9~hw65_ib5p,00000000-0000-0000-B9C1-8C79EAD9373A,0,1611674742,,1611674742,,{ },2018,ISABELLA,Albany,F,15
row-684m~4agu.tevb,00000000-0000-0000-2B4E-17FEA39BCFA9,0,1611674742,,1611674742,,{ },2018,EMMA,Albany,F,14
row-jgz3~57z7_bxvk,00000000-0000-0000-148F-D9BDB1F240A9,0,1611674742,,1611674742,,{ },2018,ELLA,Albany,F,14
row-wd5t.hgpt_y3ca,00000000-0000-0000-B33D-FBFDDD4A2ED9,0,1611674742,,1611674742,,{ },2018,SOPHIA,Albany,F,13
row-nx5n_6sgb-9zw6,00000000-0000-0000-E6B5-81172AF1194C,0,1611674742,,1611674742,,{ },2018,CHARLOTTE,Albany,F,13
row-a7si_ssat_idv7,00000000-0000-0000-5404-3A8960FEC50A,0,1611674742,,1611674742,,{ },2018,ABIGAIL,Albany,F,13
row-2wv2_td5s.4ez7,00000000-0000-0000-28D7-F41FABE3E982,0,1611674742,,1611674742,,{ },2018,EVELYN,Albany,F,11
row-fcck.7stn~2bpy,00000000-0000-0000-492D-E5EE5BEF7EA9,0,1611674742,,1611674742,,{ },2018,AMELIA,Albany,F,9


#### **Question #3**: Executing Full Data Pipelines
Create a second version of the answer to Question 2, and make sure one of your queries makes the original web call every time a query is run, while another version only executes the web call one time.

In [0]:
# url = ""
# response = urlopen(url)
# data = str(response.read())
# json_data = json.loads(data)
# json_string = json.dumps(json_data)
# rdd = sc.parallelize(json_string)
# df = spark.read.json(rdd)

In [0]:
%scala
//Answer here for Section2/Question#3 or create required cells here 

#### **Question #4**: Analyzing the Data

Using the tables you created, create a simple visualization that shows what is the most popular first letters baby names to start with in each year.

In [0]:
%sql
DROP TABLE IF EXISTS popular_first_letter;
-- create a table with first letters of baby names and their count using CTAS

CREATE TABLE IF NOT EXISTS popular_first_letter AS 
(SELECT year, SUBSTR(first_name, 0, 1) first_letters_name, sum(name_count) AS count_names FROM baby_names GROUP BY first_letters_name,year);

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM popular_first_letter

year,first_letters_name,count_names
2015,X,206.0
2016,Y,969.0
2010,P,1208.0
2014,Y,1213.0
2012,L,7238.0
2007,M,11138.0
2008,Y,603.0
2018,X,227.0
2011,A,17132.0
2015,V,1726.0


In [0]:
%sql
-- use window function and CTE in SparkSQL to get the most popular first letter baby names in each year
WITH ranked_first_letters AS
(
  SELECT year, 
         first_letters_name, 
         count_names,
         dense_rank() OVER (PARTITION BY year ORDER BY count_names DESC) AS rank
 FROM popular_first_letter
)
SELECT
 year, first_letters_name, count_names
FROM ranked_first_letters
WHERE
  rank == 1
ORDER BY year

year,first_letters_name,count_names
2007,J,18573.0
2008,J,18301.0
2009,A,17361.0
2010,A,17097.0
2011,A,17132.0
2012,A,16727.0
2013,A,16424.0
2014,A,18833.0
2015,A,16705.0
2016,A,16557.0


In [0]:
query_popular = """
WITH ranked_first_letters AS
(
  SELECT year, 
         first_letters_name, 
         count_names,
         dense_rank() OVER (PARTITION BY year ORDER BY count_names DESC) AS rank
 FROM popular_first_letter
)
SELECT
 year, first_letters_name, count_names
FROM ranked_first_letters
WHERE
  rank == 1
ORDER BY year
"""
popular_first_df = spark.sql(query_popular)

In [0]:
display(popular_first_df)

year,first_letters_name,count_names
2007,J,18573.0
2008,J,18301.0
2009,A,17361.0
2010,A,17097.0
2011,A,17132.0
2012,A,16727.0
2013,A,16424.0
2014,A,18833.0
2015,A,16705.0
2016,A,16557.0


In [0]:
display(popular_first_df)

year,first_letters_name,count_names
2007,J,18573.0
2008,J,18301.0
2009,A,17361.0
2010,A,17097.0
2011,A,17132.0
2012,A,16727.0
2013,A,16424.0
2014,A,18833.0
2015,A,16705.0
2016,A,16557.0


# Section 3 => Log Processing

The following data comes from the _Learning Spark_ book.

In [0]:
%scala
display(dbutils.fs.ls("/databricks-datasets/learning-spark/data-001/fake_logs"))

path,name,size,modificationTime
dbfs:/databricks-datasets/learning-spark/data-001/fake_logs/log1.log,log1.log,958,1418341602000
dbfs:/databricks-datasets/learning-spark/data-001/fake_logs/log2.log,log2.log,193,1418341602000


In [0]:
%scala
println(dbutils.fs.head("/databricks-datasets/learning-spark/data-001/fake_logs/log1.log"))

#### **Question #1**: Parsing Logs
Parse the logs in to a DataFrame/Spark SQL table that can be queried. This should be done using the Dataset API.

Using the pySpark API to analyze the logs

In [0]:
fake_logs_rdd = sc.textFile("/databricks-datasets/learning-spark/data-001/fake_logs/log1.log")
fake_logs_rdd.take(10)

Out[82]: ['66.249.69.97 - - [24/Sep/2014:22:25:44 +0000] "GET /071300/242153 HTTP/1.1" 404 514 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"',
 '71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /error HTTP/1.1" 404 505 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"',
 '71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /favicon.ico HTTP/1.1" 200 1713 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"',
 '71.19.157.174 - - [24/Sep/2014:22:26:37 +0000] "GET / HTTP/1.1" 200 18785 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"',
 '71.19.157.174 - - [24/Sep/2014:22:26:37 +0000] "GET /jobmineimg.php?q=m HTTP/1.1" 200 222 "http://www.holdenkarau.com/" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"']

Get the regex pattern

In [0]:
# regex pattern for the logs and the date
fake_logs_regex_pattern = r'(.+)\s+-\s+([\w-]+)\s+\[(.+)\]\s+".+"\s+(\d+)\s+(\d+)'
fake_logs_time_format = '%d/%b/%Y:%H:%M:%S %z'

In [0]:
# function to parse the logs and dates

import re
from datetime import datetime

def parse_date(date_string, time_format):
  """
  function to parse a date_string given a time_format
  
  parameter:
  ---------
    date_string: The date_string you want to parse
    time_format: the format of date you want returned
    
  returns:
  ------
    date: it returns a datetime object according to the time_format given.
  
  """
  return datetime.strptime(date_string, time_format)


def parse_regex(line_to_parse):
  """
  function to parse a given group of words
  
  parameter:
  ---------
    line_to_parse: the words or group of words you want to parse
    pattern: the regex pattern to use in parsing the given words
    
  returns:
  ------
    list: returns a list of certain groups of the matched pattern
    none: returns none if no match is found
  """
  matched = re.match(pattern,line_to_parse)
  if matched:
    return [
      matched.group(1),
      matched.group(2),
      parse_date(matched.group(3), fake_logs_time_format),
      int(matched.group(4))
    ]
  else:
    return None

In [0]:
# get an rdd for the fake logs
pattern = fake_logs_regex_pattern
fake_logs_rdd_parsed = fake_logs_rdd.map(parse_regex)
fake_logs_rdd_parsed.take(10)

Out[85]: [['66.249.69.97',
  '-',
  datetime.datetime(2014, 9, 24, 22, 25, 44, tzinfo=datetime.timezone.utc),
  404],
 ['71.19.157.174',
  '-',
  datetime.datetime(2014, 9, 24, 22, 26, 12, tzinfo=datetime.timezone.utc),
  404],
 ['71.19.157.174',
  '-',
  datetime.datetime(2014, 9, 24, 22, 26, 12, tzinfo=datetime.timezone.utc),
  200],
 ['71.19.157.174',
  '-',
  datetime.datetime(2014, 9, 24, 22, 26, 37, tzinfo=datetime.timezone.utc),
  200],
 ['71.19.157.174',
  '-',
  datetime.datetime(2014, 9, 24, 22, 26, 37, tzinfo=datetime.timezone.utc),
  200]]

In [0]:
# get a dataframe from the rdd
from pyspark.sql import types as T

fake_logs_schema =  T.StructType([
                    T.StructField('ip', T.StringType()),
                    T.StructField('user', T.StringType()),
                    T.StructField('timestamp', T.TimestampType()),
                    T.StructField('status', T.IntegerType())
])

fake_logs_df = fake_logs_rdd_parsed.toDF(fake_logs_schema)
display(fake_logs_df)

ip,user,timestamp,status
66.249.69.97,-,2014-09-24T22:25:44.000+0000,404
71.19.157.174,-,2014-09-24T22:26:12.000+0000,404
71.19.157.174,-,2014-09-24T22:26:12.000+0000,200
71.19.157.174,-,2014-09-24T22:26:37.000+0000,200
71.19.157.174,-,2014-09-24T22:26:37.000+0000,200


In [0]:
fake_logs = fake_logs_df.drop("user")
display(fake_logs)

ip,timestamp,status
66.249.69.97,2014-09-24T22:25:44.000+0000,404
71.19.157.174,2014-09-24T22:26:12.000+0000,404
71.19.157.174,2014-09-24T22:26:12.000+0000,200
71.19.157.174,2014-09-24T22:26:37.000+0000,200
71.19.157.174,2014-09-24T22:26:37.000+0000,200


#### **Question #2**: Analysis
Generate some insights from the log data.

In [0]:
# create a temp view for sql analysis
fake_logs.createOrReplaceTempView("fake_logs")

In [0]:
fake_logs_grouped_status = fake_logs.groupby(["status"]).count()
display(fake_logs_grouped_status)

status,count
404,2
200,3


In [0]:
fake_logs_grouped_ip = fake_logs.groupby(["ip"]).count()
display(fake_logs_grouped_ip)

ip,count
71.19.157.174,4
66.249.69.97,1


In [0]:
%sql
SELECT window.start, status, count(*) as total
FROM fake_logs
GROUP BY window(timestamp, '30 days'), status
ORDER BY total

start,status,total
2014-09-07T00:00:00.000+0000,404,2
2014-09-07T00:00:00.000+0000,200,3


In [0]:
%scala
//Answer here for Section3/Question#2 or create required cells here 

# Section 4 => CSV Parsing
The following examples involove working with simple CSV data

In [0]:
%scala
val full_csv = sc.parallelize(Array(
  "col_1, col_2, col_3",
  "1, ABC, Foo1",
  "2, ABCD, Foo2",
  "3, ABCDE, Foo3",
  "4, ABCDEF, Foo4",
  "5, DEF, Foo5",
  "6, DEFGHI, Foo6",
  "7, GHI, Foo7",
  "8, GHIJKL, Foo8",
  "9, JKLMNO, Foo9",
  "10, MNO, Foo10"))
 


#### **Question #1**: CSV Header Rows
Given the simple RDD `full_csv` below, write the most efficient Spark job you can to remove the header row

In [0]:
%scala
val header = full_csv.first();  //extract header
val data = full_csv.filter(row => row != header) 

#### **Question #2**: SparkSQL Dataframes
Using the `full_csv` RDD above, write code that results in a DataFrame where the schema was created programmatically based on the heard row. Create a second RDD similair to `full_csv` and uses the same function(s) you created in this step to make a Dataframe for it.

In [0]:
%scala
//Answer here for Section4/Question#2 or create required cells here 
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row}

val full_csv = sc.parallelize(Array(
  "col_1, col_2, col_3",
  "1, ABC, Foo1",
  "2, ABCD, Foo2",
  "3, ABCDE, Foo3",
  "4, ABCDEF, Foo4",
  "5, DEF, Foo5",
  "6, DEFGHI, Foo6",
  "7, GHI, Foo7",
  "8, GHIJKL, Foo8",
  "9, JKLMNO, Foo9",
  "10, MNO, Foo10"))

val header = sample_csv.first.split(",").map(_.trim)

val full_csv_DF = spark.createDataFrame(
    full_csv.zipWithIndex
    .filter(_._2 > 0)
    .map{case (str, _) => Row.fromSeq(str.split(",").map(_.trim))}
  ,
    StructType(header.map(c => StructField(c, StringType)))
  )
display(full_csv_DF)

col_1,col_2,col_3
1,ABC,Foo1
2,ABCD,Foo2
3,ABCDE,Foo3
4,ABCDEF,Foo4
5,DEF,Foo5
6,DEFGHI,Foo6
7,GHI,Foo7
8,GHIJKL,Foo8
9,JKLMNO,Foo9
10,MNO,Foo10


#### **Question #3**: Parsing Pairs

Write a Spark job that processes comma-seperated lines that look like the below example to pull out Key Value pairs. 

Given the following data:

~~~
Row-Key-001, K1, 10, A2, 20, K3, 30, B4, 42, K5, 19, C20, 20
Row-Key-002, X1, 20, Y6, 10, Z15, 35, X16, 42
Row-Key-003, L4, 30, M10, 5, N12, 38, O14, 41, P13, 8
~~~

You'll want to create an RDD that contains the following data:

~~~
Row-Key-001, K1
Row-Key-001, A2
Row-Key-001, K3
Row-Key-001, B4
Row-Key-001, K5
Row-Key-001, C20
Row-Key-002, X1
Row-Key-002, Y6
Row-Key-002, Z15
Row-Key-002, X16
Row-Key-003, L4
Row-Key-003, M10
Row-Key-003, N12
Row-Key-003, O14
Row-Key-003, P13
~~~

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
row_key_raw = """
Row-Key-001, K1, 10, A2, 20, K3, 30, B4, 42, K5, 19, C20, 20
Row-Key-002, X1, 20, Y6, 10, Z15, 35, X16, 42
Row-Key-003, L4, 30, M10, 5, N12, 38, O14, 41, P13, 8
"""

# Build an example DataFrame dataset to work with. 
dbutils.fs.rm("/tmp/dataframe_rowkey.csv", True)
dbutils.fs.put("/tmp/dataframe_rowkey.csv", row_key_raw, True)

formatPackage = "csv" if sc.version > '1.6' else "com.databricks.spark.csv"
row_key_df = spark.read.format(formatPackage).options(header='false', delimiter = ',').load("/tmp/dataframe_rowkey.csv")

row_key_df.printSchema()

Wrote 162 bytes.
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)



In [0]:
display(row_key_df)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12
Row-Key-001,K1,10,A2,20,K3,30,B4,42,K5,19.0,C20,20.0
Row-Key-002,X1,20,Y6,10,Z15,35,X16,42,,,,
Row-Key-003,L4,30,M10,5,N12,38,O14,41,P13,8.0,,


In [0]:
%scala

val row_key_raw = sc.parallelize(Array(
"Row-Key-001, K1, 10, A2, 20, K3, 30, B4, 42, K5, 19, C20, 20",
"Row-Key-002, X1, 20, Y6, 10, Z15, 35, X16, 42",
"Row-Key-003, L4, 30, M10, 5, N12, 38, O14, 41, P13, 8"
))
//val lines = sc.parallelize(Array(row_key_raw))
// val pairs = row_key_raw.map(x => (x.split(" ")(0), x))
// //pairs = lines.map(lambda x: (x.split(" ")[0], x))
// pairs.take(15)
val rowKey = row_key_raw.map(_.split(", "))
  .flatMap(x =>  x.tail.grouped(2).map(y => (x.head, y.head)))

// get the schema
val schema = new StructType()
  .add(StructField("key", StringType, true))
  .add(StructField("value", StringType, true))

// create the dataframe
val rowKey_df = rowKey.toDF()
display(rowKey_df)

_1,_2
Row-Key-001,K1
Row-Key-001,A2
Row-Key-001,K3
Row-Key-001,B4
Row-Key-001,K5
Row-Key-001,C20
Row-Key-002,X1
Row-Key-002,Y6
Row-Key-002,Z15
Row-Key-002,X16


In [0]:
%scala
// Add schema to the result
val df_row_key = rowKey.toDF("Key", "Value")
display(df_row_key)

Key,Value
Row-Key-001,K1
Row-Key-001,A2
Row-Key-001,K3
Row-Key-001,B4
Row-Key-001,K5
Row-Key-001,C20
Row-Key-002,X1
Row-Key-002,Y6
Row-Key-002,Z15
Row-Key-002,X16


# Section 5 => Connecting to JDBC Database

Write a Spark job that queries MySQL using its JDBC Driver.

#### Load your JDBC Driver onto Databricks
 * Databricks comes preloaded with JDBC libraries for **mysql**, but you can attach other JDBC libraries and reference them in your code
 * See our [Libraries Notebook](/#workspace/databricks_guide/02 Product Overview/04 Libraries) for instructions on how to install a Java JAR.

In [0]:
# assuming the secrets has been stored in databricks in a scope credential
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
jdbc_url = "jdbc_url"

In [0]:
query = "(select * from table_name where col_name < value) as sample_table"

table_jdbc = (spark.read
  .format("jdbc")
  .option("url", f"{jdbc_url}")
  .option("dbtable", query)
  .option("user", f"{username}")
  .option("password", f"{password}")
  .load()
)

# Section 6 => Create Tables Programmatically And Cache The Table

Create a table using Scala or Python

* Use `CREATE EXTERNAL TABLE` in SQL, or `DataFrame.saveAsTable()` in Scala or Python, to register tables.
* Please refer to the [Accessing Data](/#workspace/databricks_guide/03 Accessing Data/0 Accessing Data) guide for how to import specific data types.

#### Temporary Tables
* Within each Spark cluster, temporary tables registered in the `sqlContext` with `DataFrame.registerTempTable` will also be shared across the notebooks attached to that Databricks cluster.
  * Run `someDataFrame.registerTempTable(TEMP_TABLE_NAME)` to give register a table.
* These tables will not be visible in the left-hand menu, but can be accessed by name in SQL and DataFrames.

In [0]:
display(dbutils.fs.ls("/databricks-datasets/flights"))

path,name,size,modificationTime
dbfs:/databricks-datasets/flights/README.md,README.md,412,1457766852000
dbfs:/databricks-datasets/flights/airport-codes-na.txt,airport-codes-na.txt,11411,1457749605000
dbfs:/databricks-datasets/flights/departuredelays.csv,departuredelays.csv,33396236,1457749605000


In [0]:
# using the departuredelays.csv to demonstrate registering a temporary table
departure_delays_df = spark.read.csv("dbfs:/databricks-datasets/flights/departuredelays.csv", inferSchema=True, header=True)

In [0]:
display(departure_delays_df)

date,delay,distance,origin,destination
1011245,6,602,ABE,ATL
1020600,-8,369,ABE,DTW
1021245,-2,602,ABE,ATL
1020605,-4,602,ABE,ATL
1031245,-4,602,ABE,ATL
1030605,0,602,ABE,ATL
1041243,10,602,ABE,ATL
1040605,28,602,ABE,ATL
1051245,88,602,ABE,ATL
1050605,9,602,ABE,ATL


In [0]:
# register this df as external table
departure_delays_df.write\
                   .format("delta")\
                   .mode("overwrite")\
                   .option("path", "/FileStore/tables/departure_delays")\
                   .saveAsTable("departure_delays")

In [0]:
%sql
SELECT date, delay, distance, origin, destination
FROM departure_delays
WHERE destination = "DFW"
LIMIT 100;

date,delay,distance,origin,destination
1022100,165,195,IAH,DFW
1021112,29,195,IAH,DFW
1031716,17,195,IAH,DFW
1030910,0,195,IAH,DFW
1030822,0,195,IAH,DFW
1031436,40,195,IAH,DFW
1031258,2,195,IAH,DFW
1032100,6,195,IAH,DFW
1031112,53,195,IAH,DFW
1041714,43,195,IAH,DFW


# Section 7 => DataFrame UDFs and DataFrame SparkSQL Functions

Below we've created a small DataFrame. You should use DataFrame API functions and UDFs to accomplish two tasks.

1. You need to parse the State and city into two different columns.
2. You need to get the number of days in between the start and end dates. You need to do this two ways.
  - Firstly, you should use SparkSQL functions to get this date difference.
  - Secondly, you should write a udf that gets the number of days between the end date and the start date.

In [0]:
%python

from pyspark.sql import functions as F
from pyspark.sql.types import *

# Build an example DataFrame dataset to work with. 
dbutils.fs.rm("/tmp/dataframe_sample.csv", True)
dbutils.fs.put("/tmp/dataframe_sample.csv", """id|end_date|start_date|location
1|2015-10-14 00:00:00|2015-09-14 00:00:00|CA-SF
2|2015-10-15 01:00:20|2015-08-14 00:00:00|CA-SD
3|2015-10-16 02:30:00|2015-01-14 00:00:00|NY-NY
4|2015-10-17 03:00:20|2015-02-14 00:00:00|NY-NY
5|2015-10-18 04:30:00|2014-04-14 00:00:00|CA-SD
""", True)

formatPackage = "csv" if sc.version > '1.6' else "com.databricks.spark.csv"
df = spark.read.format(formatPackage).options(header='true', delimiter = '|').load("/tmp/dataframe_sample.csv")

df.printSchema()

Wrote 272 bytes.
root
 |-- id: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- location: string (nullable = true)



In [0]:
display(df)

id,end_date,start_date,location
1,2015-10-14 00:00:00,2015-09-14 00:00:00,CA-SF
2,2015-10-15 01:00:20,2015-08-14 00:00:00,CA-SD
3,2015-10-16 02:30:00,2015-01-14 00:00:00,NY-NY
4,2015-10-17 03:00:20,2015-02-14 00:00:00,NY-NY
5,2015-10-18 04:30:00,2014-04-14 00:00:00,CA-SD


part1 - Split the location column into State and City

In [0]:
# sparse the state and city into two different columns
from pyspark.sql.functions import split
df_split = df.withColumn('State', split(df['location'], '-').getItem(0)) \
             .withColumn('City', split(df['location'], '-').getItem(1))

# display the df
display(df_split)

id,end_date,start_date,location,State,City
1,2015-10-14 00:00:00,2015-09-14 00:00:00,CA-SF,CA,SF
2,2015-10-15 01:00:20,2015-08-14 00:00:00,CA-SD,CA,SD
3,2015-10-16 02:30:00,2015-01-14 00:00:00,NY-NY,NY,NY
4,2015-10-17 03:00:20,2015-02-14 00:00:00,NY-NY,NY,NY
5,2015-10-18 04:30:00,2014-04-14 00:00:00,CA-SD,CA,SD


part2: Find the difference in date

In [0]:
# find difference in date using pyspark
from pyspark.sql.functions import col, datediff
df_date_diff = df_split.withColumn("date_diff", datediff(col("end_date"), col("start_date")).alias("date_diff")).drop("location")
display(df_date_diff)

id,end_date,start_date,State,City,date_diff
1,2015-10-14 00:00:00,2015-09-14 00:00:00,CA,SF,30
2,2015-10-15 01:00:20,2015-08-14 00:00:00,CA,SD,62
3,2015-10-16 02:30:00,2015-01-14 00:00:00,NY,NY,275
4,2015-10-17 03:00:20,2015-02-14 00:00:00,NY,NY,245
5,2015-10-18 04:30:00,2014-04-14 00:00:00,CA,SD,552


In [0]:
# get difference between the end_date and start_date using SparkSQL functions
df_split.createOrReplaceTempView("df_split")

In [0]:
%sql
SELECT id, end_date, start_date, State, City, DATEDIFF(end_date, start_date) AS date_diff
FROM df_split

id,end_date,start_date,State,City,date_diff
1,2015-10-14 00:00:00,2015-09-14 00:00:00,CA,SF,30
2,2015-10-15 01:00:20,2015-08-14 00:00:00,CA,SD,62
3,2015-10-16 02:30:00,2015-01-14 00:00:00,NY,NY,275
4,2015-10-17 03:00:20,2015-02-14 00:00:00,NY,NY,245
5,2015-10-18 04:30:00,2014-04-14 00:00:00,CA,SD,552


part3 - find the date diff using a pandas udf

In [0]:
# # create the function
# def date_diff(end_date,start_date): 
#   """
#   function to find the difference in number of days between two dates
  
#   parameter:
#   ---------
#     end_date: datetime for the end date
#     start_date: datetime for the start date
  
#   returns:
#   -------
#     date_delta: number of days between two dates
#   """
#   from datetime import datetime
#   format = '%Y-%m-%d %H:%M:%S'
#   end = datetime.strptime(end_date, format)
#   start = datetime.strptime(start_date, format)
#   diff = (end - start).total_seconds()
#   date_delta = int(diff/(60*60*24))
#   return date_delta
  
# # register the udf
# spark.udf.register("date_diff", date_diff, IntegerType())

Out[6]: <function __main__.date_diff(end_date, start_date)>

In [0]:
#   from pyspark.sql.functions import to_date, col
#   format = '%Y-%m-%d %H:%M:%S'
#   end = to_date("2015-10-14 00:00:00", format)
#   display(end)
#   date_diff("2015-10-14 00:00:00", "2015-09-14 00:00:00")
#   from datetime import datetime
# ex = datetime.strptime("2015-09-14 00:00:00", '%Y-%m-%d %H:%M:%S')
# ex

Column<'to_date(2015-10-14 00:00:00, %Y-%m-%d %H:%M:%S)'>

In [0]:
# using pandas udf
import pandas as pd
import numpy as np
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import IntegerType

#pd.to_datetime(date, format = '%Y-%m-%d %H:%M:%S'))
# Declare the function and create the UDF
def date_delta(y: pd.Series, x: pd.Series) -> pd.Series:
  """
   function to find the difference in number of days between two dates
  
   parameter:
   ---------
     y: a pandas series of type string for the end date
     x: a pandas series of type string for the start date
  
   returns:
   -------
     date_diff: difference in number of days between two dates
  """
  y = pd.to_datetime(y)
  x = pd.to_datetime(x)
  diff = ((y - x)/np.timedelta64(1, 'D'))
  diff_days = diff.astype(int)
  return diff_days

date_delta = pandas_udf(date_delta, returnType=IntegerType())

In [0]:
# use the registered udf to find the date diff using pyspark
udf_date_diff = df_split.withColumn("date_diff", date_delta(col("end_date"), col("start_date")).alias("date_diff"))
display(udf_date_diff)

id,end_date,start_date,location,State,City,date_diff
1,2015-10-14 00:00:00,2015-09-14 00:00:00,CA-SF,CA,SF,30
2,2015-10-15 01:00:20,2015-08-14 00:00:00,CA-SD,CA,SD,62
3,2015-10-16 02:30:00,2015-01-14 00:00:00,NY-NY,NY,NY,275
4,2015-10-17 03:00:20,2015-02-14 00:00:00,NY-NY,NY,NY,245
5,2015-10-18 04:30:00,2014-04-14 00:00:00,CA-SD,CA,SD,552


# Section 8 => Machine Learning

#### **Question 1:** Demonstrate The Use of a MLlib Algorithm Using the DataFrame Interface(`org.apache.spark.ml`).

Demonstrate use of an MLlib algorithm and show an example of tuning the algorithm to improve prediction accuracy.

Use Decision Tree Example using Databricks MLib.

In [0]:
# import required libraries and datasets
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_iris
import pandas as pd

In [0]:
# load the iris data
iris_raw = load_iris()
df_iris = pd.DataFrame(iris_raw.data, columns=iris_raw.feature_names)
df_iris['label'] = pd.Series(iris_raw.target)
df_iris.head()

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),label
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0


In [0]:
iris_raw.feature_names

Out[111]: ['sepal length (cm)',
 'sepal width (cm)',
 'petal length (cm)',
 'petal width (cm)']

In [0]:
# convert the iris data to sparkDF
iris_data = spark.createDataFrame(df_iris)

# get the features name for the iris dataset
features = iris_raw.feature_names

# prepare the data for spark ml
va = VectorAssembler(inputCols = features, outputCol='features')

df_va = va.transform(iris_data)
df_va = df_va.select(['features', 'label'])

df_va.show(10)

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|    0|
|[4.9,3.0,1.4,0.2]|    0|
|[4.7,3.2,1.3,0.2]|    0|
|[4.6,3.1,1.5,0.2]|    0|
|[5.0,3.6,1.4,0.2]|    0|
|[5.4,3.9,1.7,0.4]|    0|
|[4.6,3.4,1.4,0.3]|    0|
|[5.0,3.4,1.5,0.2]|    0|
|[4.4,2.9,1.4,0.2]|    0|
|[4.9,3.1,1.5,0.1]|    0|
+-----------------+-----+
only showing top 10 rows



In [0]:
# train test split
(train, test) = df_va.randomSplit([0.8, 0.2])

# create an instance of the decision tree classifier
model_decision_tree = DecisionTreeClassifier(featuresCol="features", labelCol="label")

# train the model
model_decision_tree = model_decision_tree.fit(train)

# make predictions
predictions = model_decision_tree.transform(test)
predictions.show(10)

+-----------------+-----+--------------+-------------+----------+
|         features|label| rawPrediction|  probability|prediction|
+-----------------+-----+--------------+-------------+----------+
|[4.3,3.0,1.1,0.1]|    0|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[4.4,2.9,1.4,0.2]|    0|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[4.8,3.4,1.6,0.2]|    0|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.0,3.6,1.4,0.2]|    0|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.4,3.9,1.3,0.4]|    0|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.8,4.0,1.2,0.2]|    0|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[4.7,3.2,1.6,0.2]|    0|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.1,3.8,1.5,0.3]|    0|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.7,3.8,1.7,0.3]|    0|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[4.4,3.2,1.3,0.2]|    0|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
+-----------------+-----+--------------+-------------+----------+
only showing top 10 rows



In [0]:
# Model evaluation
evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Prediction Accuracy: ", round(accuracy, 3))

y_pred=predictions.select("prediction").collect()
y_orig=predictions.select("label").collect()

confusionMatrix = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(confusionMatrix)

Prediction Accuracy:  1.0
Confusion Matrix:
[[13  0  0]
 [ 0 11  0]
 [ 0  0  9]]


In [0]:
# combine all the steps above in one cell or script

# import required libraries and datasets
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_iris
import pandas as pd

# load the iris data
iris_raw = load_iris()
df_iris = pd.DataFrame(iris_raw.data, columns=iris_raw.feature_names)
df_iris['label'] = pd.Series(iris_raw.target)
display(df_iris)
print()
# print the feature names
print("Feature Names:")
print(iris_raw.feature_names)
print()
# convert the iris data to sparkDF
iris_data = spark.createDataFrame(df_iris)
# get the features name for the iris dataset
features = iris_raw.feature_names

# prepare the data for spark ml
va = VectorAssembler(inputCols = features, outputCol='features')
df_va = va.transform(iris_data)
df_va = df_va.select(['features', 'label'])
print("vectorized features:")
print(df_va.show(10))
print()

# train test split
(train, test) = df_va.randomSplit([0.8, 0.2])

# create an instance of the decision tree classifier
model_decision_tree = DecisionTreeClassifier(featuresCol="features", labelCol="label")

# train the model
model_decision_tree = model_decision_tree.fit(train)

# make predictions
predictions = model_decision_tree.transform(test)

# Model evaluation
evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Prediction Accuracy: ", round(accuracy, 3))
print()

y_pred=predictions.select("prediction").collect()
y_orig=predictions.select("label").collect()

confusionMatrix = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(confusionMatrix)

sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),label
5.1,3.5,1.4,0.2,0
4.9,3.0,1.4,0.2,0
4.7,3.2,1.3,0.2,0
4.6,3.1,1.5,0.2,0
5.0,3.6,1.4,0.2,0
5.4,3.9,1.7,0.4,0
4.6,3.4,1.4,0.3,0
5.0,3.4,1.5,0.2,0
4.4,2.9,1.4,0.2,0
4.9,3.1,1.5,0.1,0



Feature Names:
['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)']

vectorized features:
+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|    0|
|[4.9,3.0,1.4,0.2]|    0|
|[4.7,3.2,1.3,0.2]|    0|
|[4.6,3.1,1.5,0.2]|    0|
|[5.0,3.6,1.4,0.2]|    0|
|[5.4,3.9,1.7,0.4]|    0|
|[4.6,3.4,1.4,0.3]|    0|
|[5.0,3.4,1.5,0.2]|    0|
|[4.4,2.9,1.4,0.2]|    0|
|[4.9,3.1,1.5,0.1]|    0|
+-----------------+-----+
only showing top 10 rows

None

Prediction Accuracy:  0.912

Confusion Matrix:
[[10  0  0]
 [ 0 12  2]
 [ 0  1  9]]


# Section 9 => XML, Sql understanding, Streaming, Debugging Spark

#### 
**Question 1** : Create sample large xml data in your own with namespace and generate dataframe and do some DataSet transformations and actions. Also provide some meaningful insights from it.

instead of creating an xml, I will use the one available in this [link](https://raw.githubusercontent.com/databricks/spark-xml/master/src/test/resources/books.xml)

In [0]:
# the xml file has been uploaded to file store
display(dbutils.fs.ls("FileStore/tables/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/Databricks_Certified_Apache_Spark_Developer-1.dbc,Databricks_Certified_Apache_Spark_Developer-1.dbc,595249,1648212202000
dbfs:/FileStore/tables/Databricks_Certified_Apache_Spark_Developer.dbc,Databricks_Certified_Apache_Spark_Developer.dbc,595249,1648210397000
dbfs:/FileStore/tables/Iris.csv,Iris.csv,5107,1665863184000
dbfs:/FileStore/tables/books-1.xml,books-1.xml,5676,1665860183000
dbfs:/FileStore/tables/books.xml,books.xml,5676,1665859845000
dbfs:/FileStore/tables/departure_delays/,departure_delays/,0,0


In [0]:
%scala
// Infer schema
// val url = "https://raw.githubusercontent.com/databricks/spark-xml/master/src/test/resources/books.xml"
// //Answer here for Section 9/Question 1 or create required cells here 
// val schema = new StructType()
//       .add("_id",StringType)
//       .add("firstname",StringType)
//       .add("middlename",StringType)
//       .add("lastname",StringType)
//       .add("dob_year",StringType)
//       .add("dob_month",StringType)
//       .add("gender",StringType)
//       .add("salary",StringType)
// val df = spark.read
//   .option("rowTag", "book")
//   .schema(schema)
//   .format("com.databricks.spark.xml")
//   .load("dbfs:/FileStore/tables/books.xml")
// df.show()

In [0]:
df_xml = spark.read \
              .format("com.databricks.spark.xml") \
              .option("rootTag", "catalog") \
              .option("rowTag", "book") \
              .load("dbfs:/FileStore/tables/books.xml")

In [0]:
display(df_xml)

_id,author,description,genre,price,publish_date,title
bk101,"Gambardella, Matthew","An in-depth look at creating applications  with XML.This manual describes Oracle XML DB, and how you can use it to store, generate, manipulate, manage,  and query XML data in the database.  After introducing you to the heart of Oracle XML DB, namely the XMLType framework and Oracle XML DB repository,  the manual provides a brief introduction to design criteria to consider when planning your Oracle XML DB  application. It provides examples of how and where you can use Oracle XML DB.  The manual then describes ways you can store and retrieve XML data using Oracle XML DB, APIs for manipulating  XMLType data, and ways you can view, generate, transform, and search on existing XML data. The remainder of  the manual discusses how to use Oracle XML DB repository, including versioning and security,  how to access and manipulate repository resources using protocols, SQL, PL/SQL, or Java, and how to manage  your Oracle XML DB application using Oracle Enterprise Manager. It also introduces you to XML messaging and  Oracle Streams Advanced Queuing XMLType support.",Computer,44.95,2000-10-01,XML Developer's Guide
bk102,"Ralls, Kim","A former architect battles corporate zombies, an evil sorceress, and her own childhood to become queen of the world.",Fantasy,5.95,2000-12-16,Midnight Rain
bk103,"Corets, Eva","After the collapse of a nanotechnology society in England, the young survivors lay the foundation for a new society.",Fantasy,5.95,2000-11-17,Maeve Ascendant
bk104,"Corets, Eva","In post-apocalypse England, the mysterious agent known only as Oberon helps to create a new life for the inhabitants of London. Sequel to Maeve Ascendant.",Fantasy,5.95,2001-03-10,Oberon's Legacy
bk105,"Corets, Eva","The two daughters of Maeve, half-sisters, battle one another for control of England. Sequel to Oberon's Legacy.",Fantasy,5.95,2001-09-10,The Sundered Grail
bk106,"Randall, Cynthia","When Carla meets Paul at an ornithology conference, tempers fly as feathers get ruffled.",Romance,4.95,2000-09-02,Lover Birds
bk107,"Thurman, Paula",A deep sea diver finds true love twenty thousand leagues beneath the sea.,Romance,4.95,2000-11-02,Splish Splash
bk108,"Knorr, Stefan","An anthology of horror stories about roaches,  centipedes, scorpions and other insects.",Horror,4.95,2000-12-06,Creepy Crawlies
bk109,"Kress, Peter","After an inadvertant trip through a Heisenberg  Uncertainty Device, James Salway discovers the problems of being quantum.",Science Fiction,6.95,2000-11-02,Paradox Lost
bk110,"O'Brien, Tim",Microsoft's .NET initiative is explored in detail in this deep programmer's reference.,Computer,36.95,2000-12-09,Microsoft .NET: The Programming Bible


In [0]:
# Find how many distinct authors in each genre
from pyspark.sql.functions import countDistinct, col
authors_genres = df_xml\
                       .groupBy("genre")\
                       .agg(countDistinct("author").alias("total_authors_per_genre"))\
                       .orderBy(col("total_authors_per_genre").desc(), col("genre"))
display(authors_genres)

genre,total_authors_per_genre
Computer,3
Fantasy,2
Romance,2
Horror,1
Science Fiction,1


In [0]:
display(authors_genres)

genre,total_authors_per_genre
Computer,3
Fantasy,2
Romance,2
Horror,1
Science Fiction,1


#### 
**Question 2** : You have the SQL query below to find employees who earn the top three salaries in each of the
department. Please write a pure Java/Scala code equivalent to this SQL code. Do not use direct SQL inside the java/scala code. But we need you to use your programming skills to answer the question

In [0]:
%sql 
WITH order_salary AS
( 
SELECT dept.Name AS DeptName, emp.Name AS EmpName,emp.Salary AS Salary, RANK() OVER (PARTITION BY dept.Id ORDER BY emp.Salary DESC) OrderedRank
FROM Employee AS emp
INNER JOIN Department AS dept on dept.Id = emp.DepartmentId 
)
SELECT DeptName, EmpName, Salary FROM order_salary WHERE OrderedRank <= 3;

In [0]:
# It would have been nice if I had access to this data. However, assuming the Dapartment_df and Employee_df tables are dataframes

In [0]:
# using pyspark
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, rank

# # select required columns from the given dataframe and rename them
# Employee = Employee_df.select(col("Name").alias("EmpName"), col("Salary"), col("Id"))
# Department = Department_df.select(col("Name").alias("DeptName"), col("DepartmentId"))


# create the window specifications
window_spec = Window.partitionBy(Department["DepartmentId"]).orderBy(desc(Employee["Salary"]))

# perform the join, select only required columns, and add the rank column
emp_dept_joined_rank = Employee.join(Department, Employee["Id"] == Department["DepartmentId"], how="inner")\
                               .select(Department["Name"].alias("DeptName"), Employee["Name"].alias("EmpName"), Employee["Salary"])\
                               .withColumn("OrderedRank", rank().over(window_spec))

# perform the filter
emp_dept_ranked = emp_dept_joined_rank.where(col("OrderedRank") <= 3)

# display the result
display(emp_dept_ranked)

In [0]:
%scala
// using scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

//row_number
val windowSpec  = Window.partitionBy(Department["DepartmentId"]).orderBy(desc(Employee["Salary"]))

// perform the join, select only required columns, and add the rank column
val emp_dept_joined_rank_scala = Employee.join(Department, Employee["Id"] == Department["DepartmentId"], how="inner")\
                                         .select(Department["Name"].alias("DeptName"), Employee["Name"].alias("EmpName"), Employee["Salary"])\
                                         .withColumn("OrderedRank", rank().over(window_spec))


// perform the filter
val emp_dept_ranked_scala = emp_dept_joined_rank.where(col("OrderedRank") <= 3)

// display the result
display(emp_dept_ranked)

#### 
**Question 3** : There is a customer who reports that one of his spark job stalls at the last executor (399/400) for more than 1hr and executor logs are showing ‘ExecutorLost’ stacktrace and couple of retry attempts. What do you recommend as a next step(s)?

1. This can be due to several causes: For example, using spot instances, high disk utilization resulting in unhealthy nodes, aggresive auto scaling policies, etc.
2. Depending on the problem, I will recommend to the customer to stop using spot instances and use at least on-demand instances, use less aggresive auto scaling policies, or if in the case of high disk utilization, it would be good to delete unsed data from the storage or increase the amount of volume attached to the task nodes.

In [0]:
# 1. I will ask the customer to allocate more memory to the executor and maybe change the memoryOverhead for executors
# 2. Also I will recommend the customer go to the spark UI and try identify that particular task and also find out the size of the partition it is working on as well as if there are any spills to disks.
# 3. Depending on that outcome, the customer might want to set a better shuffle partitions or if that last executor is working on a partition that is a hot key, the customer might want to hint spark that there is a skew. Also, enabling adaptive query execution might be helpful as well. I will encourage the customer to do the above and rerun the job.

#### 
**Question 4** : Consider the following streaming job  What does the below code do?  How long will it retain the state? And why?

```val streamingDf  = readStream()
streamingDf.withWatermark("eventTime","10 seconds").dropDuplicates("guid")
streamingDf.writeStream()```

The code above does the following:
1. Reading the streaming data
2. Define the watermark of the query on the value of the "eventTime", and also define 10 seconds as the threshold of how late the data is allowed to be 
3. Dropping duplicates on "guid" column
4. Writing the stream