# Admin

In [1]:
%python
dbutils.fs.ls("/tmp/" + username + "/ipCount.parquet")

%fs head /mnt/training/Chicago-Crimes-2018.csv

println(dbutils.fs.head("/mnt/training/wikipedia/pageviews/pageviews_by_second.tsv", 100))

[36mx[39m: [32mString[39m = [32m"x"[39m

# Reading

## Options

In [None]:

// csv
val path = "/mnt/training/EDGAR-Log-20170329/EDGAR-Log-20170329.csv"

val logDF = spark
  .read
  .option("header", true)
  .csv(path)
  .sample(withReplacement=false, fraction=0.3, seed=3) // using a sample to reduce data size

display(logDF)

// parquet
val ipCountDF2 = (spark
  .read
  .parquet(writePath)
  .orderBy(desc("count"))
)

//tab delim
display(spark.read
  .option("delimiter", "\t")
  .csv("/mnt/training/Chicago-Crimes-2018.csv")
)

display(spark.read
  .option("delimiter", "\t")
  .option("header", true)
  .csv("/mnt/training/Chicago-Crimes-2018.csv")
)

// infering schema
val crimeDF = spark.read
  .option("delimiter", "\t")
  .option("header", true)
  .option("timestampFormat", "mm/dd/yyyy hh:mm:ss a")
  .option("inferSchema", true)
  .csv("/mnt/training/Chicago-Crimes-2018.csv")

display(crimeDF)

## S3

In [None]:
val AccessKey = "AKIAJBRYNXGHORDHZB4A"
// Encode the Secret Key to remove any "/" characters
val SecretKey = "a0BzE1bSegfydr3%2FGE3LSPM6uIV5A4hOUfpH8aFF".replace("/", "%2F")
val AwsBucketName = "databricks-corp-training/common"
val MountName = "/mnt/training-%s".format(username)

// unmount the bucket
try {
  dbutils.fs.unmount(s"$MountName") // Use this to unmount as needed
} catch {
  case ioe: java.rmi.RemoteException => println(s"$MountName already unmounted")
}

// mount the bucket
try {
  val MountTarget = "s3a://%s:%s@%s".format(AccessKey, SecretKey, AwsBucketName)
  dbutils.fs.mount(MountTarget, MountName)
} catch {
  case ioe: java.rmi.RemoteException => println($"$MountName already mounted. Run previous cells to unmount first")
}

## JDBC

In [None]:
val jdbcHostname = "server1.databricks.training"
val jdbcPort = 5432
val jdbcDatabase = "training"

val jdbcUrl = s"jdbc:postgresql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"

import java.util.Properties

val connectionProperties = new Properties()
connectionProperties.put("user", "readonly")
connectionProperties.put("password", "readonly")

val accountDF = spark.read.jdbc(url=jdbcUrl, table="Account", properties=connectionProperties)
display(accountDF)

// calculate range of values
import org.apache.spark.sql.functions.{min, max}

val dfMin = accountDF.select(min("insertID")).first()(0).asInstanceOf[Long]
val dfMax = accountDF.select(max("insertID")).first()(0).asInstanceOf[Long]

println(s"DataFrame minimum: $dfMin \nDataFrame maximum: $dfMax")

In [None]:
val accountDFParallel = spark.read.jdbc(
  jdbcUrl,                      // the JDBC URL
  "Account",                    // the name of the table
  """"insertID"""",             // the name of a column of an integral type that will be used for partitioning.
  dfMin,                        // the minimum value of columnName used to decide partition stride.
  dfMax,                        // the maximum value of columnName used to decide partition stride
  12,                           // the number of partitions/connections
  connectionProperties          // the connection properties
)

display(accountDFParallel)

In [None]:
println(accountDF.rdd.getNumPartitions)
// 1
println(accountDFParallel.rdd.getNumPartitions)
// 12

In [None]:
def timeIt[T](op: => T): Float = {
  val start = System.currentTimeMillis
  val res = op
  val end = System.currentTimeMillis
  (end - start) / 1000f
}

val time1 = timeIt(accountDF.describe())
val time2 = timeIt(accountDFParallel.describe())

println(s"Serial read completed in $time1 seconds vs $time2 seconds for parallel read")

# Writing

In [None]:
serverErrorDF
  .write
  .mode("overwrite") // overwrites a file if it already exists
  .parquet("/tmp/" + username + "/log20170329/transformedLogs.parquet")

val writePath = "/tmp/" + username + "/ipCount.parquet"

ipCountDF
  .write
  .mode("overwrite")
  .parquet(writePath)

In [None]:
crimeRenamedColsDF.write.mode("overwrite").parquet("/tmp/" + username + "/crime.parquet")

# Schemas

In [None]:
// ANSWER

import org.apache.spark.sql.types.{StructType, StructField, StringType}

val schema = StructType(List(
  StructField("SMS", StringType, true)
))

In [None]:
val zipsDF = spark.read.json("/mnt/training/zips.json")
zipsDF.printSchema

val zipsSchema = zipsDF.schema

zipsSchema.foreach(println)

In [None]:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType}

// define a schema
val zipsSchema2 = StructType(List(
  StructField("city", StringType, true),
  StructField("pop", IntegerType, true)
))

// apply a schema on read
val zipsDF2 = spark.read
  .schema(zipsSchema2)
  .json("/mnt/training/zips.json")

display(zipsDF2)

// nested data types
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType, ArrayType, FloatType}

val zipsSchema3 = StructType(List(
  StructField("city", StringType, true), 
  StructField("loc", 
    ArrayType(FloatType, true), true),
  StructField("pop", IntegerType, true)
))

val zipsDF3 = (spark.read
  .schema(zipsSchema3)
  .json("/mnt/training/zips.json")
)
display(zipsDF3)

In [None]:
import org.apache.spark.sql.types.{StructType, StructField, StringType}

val schema2 = StructType(List(
  StructField("SMS", StructType(List(
    StructField("Address", StringType, true),
    StructField("date", StringType, true),
    StructField("metadata", StructType(List(
      StructField("name", StringType, true)
    )), true))
  ), true)
))

val SMSDF2 = spark.read
  .schema(schema2)
  .json("/mnt/training/UbiqLog4UCI/14_F/log*")
  .filter($"SMS".isNotNull)

val cols = SMSDF2.columns(0)
val schemaJson = SMSDF2.schema.json

In [None]:
import org.apache.spark.sql.types.{StructField, StructType, ArrayType, StringType, IntegerType, LongType}

val path = "/mnt/training/twitter/firehose/2018/01/08/18/twitterstream-1-2018-01-08-18-48-00-bcf3d615-9c04-44ec-aac9-25f966490aa4"
// val path = "/mnt/training/twitter/firehose/2018/*/*/*/*"

val fullTweetSchema = StructType(List(
  StructField("id", LongType, true),
  StructField("user", StructType(List(
    StructField("id", LongType, true),
    StructField("screen_name", StringType, true),
    StructField("location", StringType, true),
    StructField("friends_count", IntegerType, true),
    StructField("followers_count", IntegerType, true),
    StructField("description", StringType, true)
  )), true),
  StructField("entities", StructType(List(
    StructField("hashtags", ArrayType(
      StructType(List(
        StructField("text", StringType, true)
      ))
    ), true),
    StructField("urls", ArrayType(
      StructType(List(
        StructField("url", StringType, true),
        StructField("expanded_url", StringType, true),
        StructField("display_url", StringType, true)
      ))
    ), true) 
  )), true),
  StructField("lang", StringType, true),
  StructField("text", StringType, true),
  StructField("created_at", StringType, true)
))

val fullTweetDF = spark.read.schema(fullTweetSchema).json(path)
fullTweetDF.printSchema()
display(fullTweetDF)

import org.apache.spark.sql.functions.col

val schema = fullTweetSchema.fieldNames
scala.util.Sorting.quickSort(schema)
val tweetCount = fullTweetDF.filter(col("id").isNotNull).count



# Corrupt Data

In [None]:
// not recommended

val data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')

val corruptDF = spark.read
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .json(sc.parallelize(data))

display(corruptDF)

try {
  val data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')

  val corruptDF = spark.read
    .option("mode", "FAILFAST")
    .json(sc.parallelize(data))

  display(corruptDF)  
  
} catch {
  case e:Exception => print(e)
}

// recommended (bad records path)

val basePath = "%s/etl1s".format(userhome)
val myBadRecords = "%s/badRecordsPath".format(basePath)

println("""Your temp directory is "%s"""".format(myBadRecords))
println("-"*80)

val data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')

val corruptDF = spark.read
  .option("badRecordsPath", myBadRecords)
  .json(sc.parallelize(data))

display(corruptDF)
                                                             
// view records in path
                                                             
val path = "%s/*/*/*".format(myBadRecords)
display(spark.read.json(path))

# Cleaning

In [None]:
val crimeDF = spark.read
  .option("delimiter", "\t")
  .option("header", true)
  .option("timestampFormat", "mm/dd/yyyy hh:mm:ss a")
  .option("inferSchema", true)
  .csv("/mnt/training/Chicago-Crimes-2018.csv")

// remove spaces & invalid characters (camelCase)
val cols = crimeDF.columns
val camelCols = new scala.collection.mutable.ListBuffer[String]()
cols.foreach(camelCols += _.toLowerCase.split(" ").reduceLeft(_+_.capitalize))

val crimeRenamedColsDF = crimeDF.toDF(camelCols:_*)
display(crimeRenamedColsDF)

In [None]:
// make lowercase & substitude characters
import org.apache.spark.sql.functions.{col, lower, translate}

val dupedWithColsDF = (dupedDF
  .select(col("*"),
    lower(col("firstName")).alias("lcFirstName"),
    lower(col("lastName")).alias("lcLastName"),
    lower(col("middleName")).alias("lcMiddleName"),
    translate(col("ssn"), "-", "").alias("ssnNums")
))
display(dupedWithColsDF)

# Selecting

## Renaming

In [None]:
// ANSWER
val accountDF = fullTweetFilteredDF.select(col("user.id").alias("userID"), 
    col("user.screen_name").alias("screenName"),
    col("user.location"),
    col("user.friends_count").alias("friendsCount"),
    col("user.followers_count").alias("followersCount"),
    col("user.description")
)

display(accountDF)

## Timestamps

In [None]:
import org.apache.spark.sql.functions.{unix_timestamp, col}
import org.apache.spark.sql.types.TimestampType

val timestampFormat = "EEE MMM dd HH:mm:ss ZZZZZ yyyy"

val tweetDF = fullTweetFilteredDF.select(col("id").alias("tweetID"), 
  col("user.id").alias("userID"), 
  col("lang").alias("language"),
  col("text"),
  unix_timestamp(col("created_at"), timestampFormat).cast(TimestampType).alias("createdAt")
)

# Filtering

In [None]:
//pass columns names with $ sign

val serverErrorDF = logDF
  .filter(($"code" >= 500) && ($"code" < 600))
  .select("date", "time", "extention", "code")

display(serverErrorDF)

## Dropping Nulls

In [None]:
import org.apache.spark.sql.functions.col

val fullTweetFilteredDF = fullTweetDF
  .filter(col("id").isNotNull)

display(fullTweetFilteredDF)

In [None]:
// another dropping null example
val corruptDF = Seq(
  (Some(11), Some(66), Some(5)),
  (Some(12), Some(68), None),
  (Some(1), None, Some(6)),
  (Some(2), Some(72), Some(7))
).toDF("hour", "temperature", "wind")

display(corruptDF)

val corruptDroppedDF = corruptDF.na.drop("any")

display(corruptDroppedDF)

//fillna with HC'd values 
val map = Map("temperature" -> 68, "wind" -> 6)
val corruptImputedDF = corruptDF.na.fill(map)

display(corruptImputedDF)

## De-duping

In [None]:
val duplicateDF = Seq(
  (15342, "Conor", "red"),
  (15342, "conor", "red"),
  (12512, "Dorothy", "blue"),
  (5234, "Doug", "aqua")
  ).toDF("id", "name", "favorite_color")

display(duplicateDF)

val duplicateDedupedDF = duplicateDF.dropDuplicates("id", "favorite_color")

display(duplicateDedupedDF)

# Data Manipulation

In [None]:
// explode: split out nested column components
import org.apache.spark.sql.functions.{explode, col}

val hashtagDF = fullTweetFilteredDF.select(col("id").alias("tweetID"), 
    explode(col("entities.hashtags.text")).alias("hashtag")
)

val urlDF = (fullTweetFilteredDF.select(col("id").alias("tweetID"), 
      explode(col("entities.urls")).alias("urls"))
  .select(
      col("tweetID"),
      col("urls.url").alias("URL"),
      col("urls.display_url").alias("displayURL"),
      col("urls.expanded_url").alias("expandedURL"))
)

hashtagDF.show()
urlDF.show()

In [None]:
// normalizing numeric data

import org.apache.spark.sql.functions.{col, max, min}

val colMin = integerDF.select(min("id")).first()(0).asInstanceOf[Long]
val colMax = integerDF.select(max("id")).first()(0).asInstanceOf[Long]

val normalizedIntegerDF = integerDF
  .withColumn("normalizedValue", (col("id") - colMin) / (colMax - colMin) )

display(normalizedIntegerDF)

In [None]:
// other data manipulation functions
// explode()	Returns a new row for each element in the given array or map
// pivot()	Pivots a column of the current DataFrame and perform the specified aggregation
// cube()	Create a multi-dimensional cube for the current DataFrame using the specified columns, so we can run aggregation on them
// rollup()	Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them

In [None]:
import org.apache.spark.sql.functions.date_format

// timestamp (dow)

val pageviewsDF = spark.read
  .parquet("/mnt/training/wikipedia/pageviews/pageviews_by_second.parquet/")
  .withColumn("dow", date_format($"timestamp", "u").alias("dow"))

# If/Then Logic

# Sorting

In [None]:
import org.apache.spark.sql.functions.desc

val cols = smartphoneDF.columns.toSet
val sample = smartphoneDF.orderBy(desc("Application")).first().toSeq(0).toString

# Aggregations

In [None]:
import org.apache.spark.sql.functions.{from_utc_timestamp, hour}

val countsDF = serverErrorDF
  .select(hour(from_utc_timestamp($"time", "GMT")).alias("hour"))
  .groupBy($"hour")
  .count()
  .orderBy($"hour")

display(countsDF)


import org.apache.spark.sql.functions.desc

val ipCountDF = logDF
  .select($"ip")
  .groupBy($"ip")
  .count()
  .orderBy(desc("count"))

display(ipCountDF)

In [None]:
val aggregatedDowDF = pageviewsEnhancedDF
  .groupBy($"dow", $"longName", $"abbreviated", $"shortName")  
  .sum("requests")                                             
  .withColumnRenamed("sum(requests)", "Requests")
  .orderBy($"dow")

display(aggregatedDowDF)

# Re-partioning

In [None]:
// Function	Transformation Type	Use	Evenly distributes data across partitions?
// .coalesce(n)	narrow (does not shuffle data)	reduce the number of partitions	no
// .repartition(n)	wide (includes a shuffle operation)	increase the number of partitions	yes



In [None]:
crimeRenamedColsDF.repartition(1).write.mode("overwrite").parquet("/tmp/" + username + "/crimeRepartitioned.parquet")

In [None]:
val wikiDF = spark.read
  .parquet("/mnt/training/wikipedia/pageviews/pageviews_by_second.parquet")

display(wikiDF)

// show # partitions
val partitions = wikiDF.rdd.getNumPartitions
println(s"Partitions: $partitions")

// re-partition to increase # of partitions
val repartitionedWikiDF = wikiDF.repartition(16)
println($"Partitions: ${repartitionedWikiDF.rdd.getNumPartitions}")

// coalesce to reduce # of partitions
val coalescedWikiDF = repartitionedWikiDF.coalesce(2)
println($"Partitions: ${coalescedWikiDF.rdd.getNumPartitions}")




Spark uses a default value of 200 partitions, which comes from real-world experience by Spark engineers. This is an adjustable configuration setting. Run the following cell to see this value.

In [None]:
spark.conf.get("spark.sql.shuffle.partitions")

// adjust the number of partitions, changes after a shuffle operation

spark.conf.set("spark.sql.shuffle.partitions", "8")


In [None]:
println($"Partitions: ${coalescedWikiDF.orderBy("requests").rdd.getNumPartitions}")

// The .orderBy() triggered the repartition of the DataFrame into 8 partitions. 
// Now reset the default value.


In [8]:
spark.conf.set("spark.sql.shuffle.partitions", "200")

cmd8.sc:1: not found: value spark
val res8 = spark.conf.set("spark.sql.shuffle.partitions", "200")
           ^

: 

# Data Manipulation

# UDF (User-Defined Functions)

In [None]:
// define the function
def manual_split: String => Seq[String] = _.split("e")

manual_split("this is my example string")

// register the function
val manualSplitScalaUDF = spark.udf.register("manualSplitSQLUDF", manual_split)

// create random data
import org.apache.spark.sql.functions.{sha1, rand}

val randomDF = (spark.range(1, 10000 * 10 * 10 * 10)
  .withColumn("random_value", rand(seed=10).cast("string"))
  .withColumn("hash", sha1($"random_value"))
  .drop("random_value")
)

display(randomDF)

// apply the function on the dataframe
val randomAugmentedDF = randomDF.select($"*", manualSplitScalaUDF($"hash").alias("augmented_col"))

display(randomAugmentedDF)

// register the dataframe for sql queries
randomDF.createOrReplaceTempView("randomTable")

%sql
SELECT id,
  hash,
  manualSplitSQLUDF(hash) as augmented_col
FROM
  randomTable

In [None]:
// register a new udf
val plusOneUDF = spark.udf.register("plusOneUDF", (input: Float) => input + 1)

In [None]:
def timeIt[T](op: => T): Float = {
  val start = System.currentTimeMillis
  val res = op
  val end = System.currentTimeMillis
  (end - start) / 1000f
}

val time1 = timeIt(randomFloatsDF.withColumn("incremented_float", plusOneUDF($"random_float")).count)
val time2 = timeIt(randomFloatsDF.withColumn("incremented_float", $"random_float" + 1).count)

println(s"UDF completed in $time1 seconds vs $time2 seconds for built-in functionality")

In [None]:
// another example
def IPConvert(IPString: String): Long = {
  val Array(a, b, c, d) = IPString.split("\\.").map(_.toLong)
  println(a, b, c, d)
  (a * scala.math.pow(256, 3) + b * scala.math.pow(256, 2) + (c * 256) + d).toLong
}

IPConvert("1.2.3.4") // should equal 16909060

// ANSWER
val IPConvertUDF = spark.udf.register("IPConvertUDF", IPConvert _)
// TEST - Run this cell to test your solution
val testDF = Seq(
  "1.2.3.4",
  "10.10.10.10",
  "23.13.65.23").toDF("ip")

val result = testDF.select(IPConvertUDF($"ip")).collect()

// apply on a DF
val IPDF = Seq("123.123.123.123", "1.2.3.4", "127.0.0.0").toDF("ip")

val IPDFWithParsedIP = IPDF.withColumn("parsedIP", IPConvertUDF($"ip"))

display(IPDFWithParsedIP)

# Adv UDF

## Ex1

In [3]:
def manual_add(a: Int, b: Int): Int = a + b

manual_add(1, 2)

defined [32mfunction[39m [36mmanual_add[39m
[36mres2_1[39m: [32mInt[39m = [32m3[39m

In [None]:
val manualAddScalaUDF = spark.udf.register("manualAddSQLUDF", manual_add _)

In [None]:
val integerDF = Seq(
  (1, 2),
  (3, 4),
  (5, 6)
).toDF("col1", "col2")

display(integerDF)

val integerAddDF = integerDF.select($"*", manualAddScalaUDF($"col1", $"col2").alias("sum"))

display(integerAddDF)

## Ex 1

** Complex Output **

Complex outputs are helpful when you need to return multiple values from your UDF. The UDF design pattern involves returning a single column to drill down into, to pull out the desired data.

In [4]:
case class MathOperations(sum: Float, multiplication: Float, division: Float)

defined [32mclass[39m [36mMathOperations[39m

In [5]:
def manual_math: (Int, Int) => 
MathOperations = (a, b) => 
MathOperations(a + b, a * b, a / b.asInstanceOf[Float])

//sum,//mult,//divide

manual_math(1, 2)

defined [32mfunction[39m [36mmanual_math[39m
[36mres4_1[39m: [32mMathOperations[39m = [33mMathOperations[39m([32m3.0F[39m, [32m2.0F[39m, [32m0.5F[39m)

In [None]:
// register
val manualMathScalaUDF = spark.udf.register("manualMathSQLUDF", manual_math)
 
// apply
display(integerDF.select($"*", manualMathScalaUDF($"col1", $"col2").alias("sum")))

## Ex 2

In [7]:
// define output class
case class temperature(fahrenheit: Float, celsius: Float, kelvin: Float)

defined [32mclass[39m [36mtemperature[39m

In [8]:
// define function
def temperatureConverter(temp: Float, unit: String): temperature = {
  if (unit == "C") {
    temperature((temp * 1.8f) + 32, temp, temp + 273.15f)
  }
  else {
    val c = (temp - 32) * (5f / 9)
    temperature(temp, c, c + 273.15f)
  }
}

temperatureConverter(10, "C") // should be temperature(50.0, 10, 283.15)
temperatureConverter(10, "F") // should be temperature(10, -12.2, 260.9)

defined [32mfunction[39m [36mtemperatureConverter[39m
[36mres7_1[39m: [32mtemperature[39m = [33mtemperature[39m([32m50.0F[39m, [32m10.0F[39m, [32m283.15F[39m)
[36mres7_2[39m: [32mtemperature[39m = [33mtemperature[39m([32m10.0F[39m, [32m-12.222223F[39m, [32m260.92776F[39m)

In [None]:
// register udf
val temperatureConverterUDF = udf((temp: Float, unit: String) => temperatureConverter(temp, unit))

In [None]:
// TEST - Run this cell to test your solution
import org.apache.spark.sql.types.{FloatType, StringType}

dbTest("ET2-S-04-04-01", Some(List(FloatType, StringType)), temperatureConverterUDF.inputTypes)

println("Tests passed!")

// apply udf on df
val weatherEnhancedDF = weatherDF.withColumn("TAVGAdjusted", temperatureConverterUDF($"TAVG", $"UNIT"))

display(weatherEnhancedDF)

# Joins

In [None]:
import org.apache.spark.sql.functions.date_format

val pageviewsDF = spark.read
  .parquet("/mnt/training/wikipedia/pageviews/pageviews_by_second.parquet/")
  .withColumn("dow", date_format($"timestamp", "u").alias("dow"))

display(pageviewsDF)

val pageviewsEnhancedDF = pageviewsDF.join(labelsDF, "dow")

display(pageviewsEnhancedDF)

In [None]:
// show explain plan (how join was executed)

aggregatedDowDF.explain()

By default, Spark did a broadcast join rather than a shuffle join. In other words, it broadcast labelsDF to the larger pageviewsDF, replicating the smaller DataFrame on each node of our cluster. This avoided having to move the larger DataFrame across the cluster.

Take a look at the broadcast threshold by accessing the configuration settings.

In [None]:
val threshold = spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
println("Threshold: $threshold")

In [None]:
// disable broadcasting
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [None]:
// Now notice the lack of broadcast in the query physical plan.

pageviewsDF.join(labelsDF, "dow").explain()

There are two ways of telling Spark to explicitly broadcast tables. The first is to change the Spark configuration, which affects all operations. The second is to declare it using the broadcast() function in the functions package.

In [None]:
// explicitly broadcast join
import org.apache.spark.sql.functions.broadcast

pageviewsDF.join(broadcast(labelsDF), "dow").explain()

In [None]:
// another example
// ANSWER
import org.apache.spark.sql.functions.broadcast

val logWithIPEnhancedDF = (logWithIPDF
  .join(broadcast(countryLookupDF), logWithIPDF.col("IPLookupISO2") === countryLookupDF.col("alpha2Code"))
  .drop("alpha2Code", "alpha3Code", "numericCode", "ISO31662SubdivisionCode", "independentTerritory")
)

display(logWithIPEnhancedDF)

# DB Writes

**Parallel Database Writes**
Database writes are the inverse of what was covered in Lesson 4 of ETL Part 1. In that lesson you defined the number of partitions in the call to the database.

By contrast and when writing to a database, the number of active connections to the database is determined by the number of partitions of the DataFrame.

In [None]:
wikiDF.write.mode("OVERWRITE").parquet(userhome+"/wiki.parquet")

%python
for i in dbutils.fs.ls(userhome+"/wiki.parquet"):
  print(i)

**A Note on Upserts**

Upserts insert a record into a database if it doesn't already exist, and updates the existing record if it does. Upserts are not supported in core Spark due to the transactional nature of upserting and the immutable nature of Spark. You can only append or overwrite.

## Ex 1

**Step 1: **

Import Helper Functions and Data
A function is defined for you to print out the number of records in each DataFrame. Run the following cell to define that function.

In [None]:
// Utility method to count & print the number of records in each partition
def printRecordsPerPartition(df:org.apache.spark.sql.Dataset[Row]): Unit = {
  println("Per-Partition Counts:")
  
  val results = df.rdd                                   // Convert to an RDD
    .mapPartitions(it => Array(it.size).iterator, true)  // For each partition, count
    .collect()                                           // Return the counts to the driver

  results.foreach(x => println("* " + x))
}

In [None]:
// ANSWER
val wikiDF = spark.read
  .parquet("/mnt/training/wikipedia/pageviews/pageviews_by_second.parquet")

display(wikiDF)

In [None]:
printRecordsPerPartition(wikiDF)

**Step 2:**

Repartition the Data
Define three new DataFrames:

wikiDF1Partition: wikiDF with 1 partition
wikiDF16Partition: wikiDF with 16 partitions
wikiDF128Partition: wikiDF with 128 partitions

In [None]:
val wikiDF1Partition = wikiDF.repartition(1)
val wikiDF16Partition = wikiDF.repartition(16)
val wikiDF128Partition = wikiDF.repartition(128)

**Step 3:**

Examine the Distribution of Records
Use printRecordsPerPartition() to examine the distribution of records across the partitions.

In [None]:
printRecordsPerPartition(wikiDF1Partition)
printRecordsPerPartition(wikiDF16Partition)
printRecordsPerPartition(wikiDF128Partition)

**Step 4:**

Coalesce wikiDF16Partition and Examine the Results
Coalesce wikiDF16Partition to 10 partitions, saving the result to wikiDF16PartitionCoalesced.

In [None]:
val wikiDF16PartitionCoalesced = wikiDF16Partition.coalesce(10)

wikiDF16PartitionCoalesced.rdd.getNumPartitions)

printRecordsPerPartition(wikiDF16PartitionCoalesced)

# Table Management

**Optimization of Data Storage with Managed and Unmanaged Tables**

A **managed table** is a table that manages both the data itself as well as the metadata. In this case, a DROP TABLE command removes both the metadata for the table as well as the data itself.

**Unmanaged tables** manage the metadata from a table such as the schema and data location, but the data itself sits in a different location, often backed by a blob store like the Azure Blob or S3. Dropping an unmanaged table drops only the metadata associated with the table while the data itself remains in place.

## Writing to unmanaged table

In [None]:
// create df
val df = spark.range(1, 100)

display(df)

In [None]:
// register the table
df.write.mode("OVERWRITE").saveAsTable("myTableManaged")

In [None]:
// describe table 
%sql
DESCRIBE EXTENDED myTableManaged

In [None]:
// writing to unmanaged table
df.write.mode("OVERWRITE").option("path", userhome+"/myTableUnmanaged").saveAsTable("myTableUnmanaged")

In [None]:
%sql
DESCRIBE EXTENDED myTableUnmanaged

## dropping tables

In [None]:
// look at files backing up the managed table

display(dbutils.fs.ls("dbfs:/user/hive/warehouse/" + databaseName + ".db/mytablemanaged"))

In [None]:
// drop the managed table
%sql
DROP TABLE myTableManaged

In [None]:
// after you drop a managed table it will 
// delete the underlying data
// this will throw an error

try {
  display(dbutils.fs.ls("dbfs:/user/hive/warehouse/" + databaseName + ".db/mytablemanaged"))
  
} catch {
  case e:Exception => println(e)
}

display(dbutils.fs.ls("dbfs:/user/" + username + "/myTableUnmanaged"))



In [None]:
// drop the unmanaged table

%sql
DROP TABLE myTableUnmanaged

// the data is still there

%python
dbutils.fs.ls("dbfs:/user/" + username + "/myTableUnmanaged")

# Capstone

In [None]:
import org.apache.spark.sql.types.{StructField, StructType, ArrayType, StringType, IntegerType, LongType}

// define the schema
lazy val fullTweetSchema = StructType(List(
  StructField("id", LongType, true),
  StructField("user", StructType(List(
    StructField("id", LongType, true),
    StructField("screen_name", StringType, true),
    StructField("location", StringType, true),
    StructField("friends_count", IntegerType, true),
    StructField("followers_count", IntegerType, true),
    StructField("description", StringType, true)
  )), true),
  StructField("entities", StructType(List(
    StructField("hashtags", ArrayType(
      StructType(List(
        StructField("text", StringType, true)
      ))
    ), true),
    StructField("urls", ArrayType(
      StructType(List(
        StructField("url", StringType, true),
        StructField("expanded_url", StringType, true),
        StructField("display_url", StringType, true)
      ))
    ), true) 
  )), true),
  StructField("lang", StringType, true),
  StructField("text", StringType, true),
  StructField("created_at", StringType, true)
))


In [None]:
// read and apply
val path = "/mnt/training/twitter/firehose/2018/01/08/18/twitterstream-1-2018-01-08-18-48-00-bcf3d615-9c04-44ec-aac9-25f966490aa4"
// val path = "/mnt/training/twitter/firehose/2018/*/*/*/*" // This imports of the data
val tweetDF = spark.read
  .schema(fullTweetSchema)
  .json(path)
  .filter($"id".isNotNull)

display(tweetDF)


In [None]:
// write udf to parse urls
def getDomain(URL: String) : String = {
  val pattern = """https?://(www\.)?([^/#\?]+)""".r
  pattern.findFirstMatchIn(URL).map(_.group(2)).getOrElse("")
}

getDomain("https://www.databricks.com/")



In [None]:
// test the UDF
val urls = List(
  "https://www.databricks.com/",
  "https://databricks.com/",
  "https://databricks.com/training-overview/training-self-paced",
  "http://www.databricks.com/",
  "http://databricks.com/",
  "http://databricks.com/training-overview/training-self-paced",
  "http://www.apache.org/",
  "http://spark.apache.org/docs/latest/"
)

urls.map(getDomain).foreach(println)

In [None]:
// register the udf 
import org.apache.spark.sql.functions.udf

val getDomainUDF = sqlContext.udf.register("getDomainUDF", getDomain _)

In [None]:
// apply the udf

// create the udf
import org.apache.spark.sql.functions.explode

val urlDF = (tweetDF
  .withColumn("URL", explode($"entities.urls.expanded_url"))
  .select("URL", "created_at") 
  .withColumn("parsedURL", getDomainUDF($"URL"))
)

display(urlDF)

val cols = urlDF.columns
val sample = urlDF.first


In [None]:
// parse timestamp
import org.apache.spark.sql.functions.{unix_timestamp, hour}
import org.apache.spark.sql.types.TimestampType

val timestampFormat = "EEE MMM dd HH:mm:ss ZZZZZ yyyy"

val urlWithTimestampDF = urlDF
  .withColumn("timestamp", unix_timestamp($"created_at", timestampFormat).cast(TimestampType).alias("createdAt"))
  .drop("created_at")
  .withColumn("hour", hour($"timestamp"))

display(urlWithTimestampDF)

val cols = urlWithTimestampDF.columns
val sample = urlWithTimestampDF.first



In [None]:
// aggregate and sort
import org.apache.spark.sql.functions.desc

val urlTrendsDF = (urlWithTimestampDF
  .groupBy("hour", "parsedURL")
  .count()
  .orderBy($"hour", desc("count"))
  .limit(10)
)

display(urlTrendsDF)

val cols = urlTrendsDF.columns
val sample = urlTrendsDF.first


In [None]:
// join new data

// import new table
val badActorsDF = spark.read.parquet("/mnt/training/twitter/supplemental/badactors.parquet")

display(badActorsDF)

val cols = badActorsDF.columns
val sample = badActorsDF.first

// add a column for bad actors
val tweetWithMaliciousDF = tweetDF
  .join(badActorsDF, tweetDF.col("user.id") === badActorsDF.col("userID"), "left")
  .withColumn("maliciousAcct", $"userID".isNotNull)
  .drop("screen_name", "userID")

display(tweetWithMaliciousDF)

val cols = tweetWithMaliciousDF.columns
val sample = tweetWithMaliciousDF.first




In [None]:
// load records
// ANSWER
urlTrendsDF.repartition(4).write.mode("overwrite").parquet(userhome + "/tmp/urlTrends.parquet")
tweetWithMaliciousDF.repartition(4).write.mode("overwrite").parquet(userhome + "/tmp/tweetWithMaliciousDF.parquet")

val urlTrendsDFTemp = spark.read.parquet(userhome + "/tmp/urlTrends.parquet")
val tweetWithMaliciousDFTemp = spark.read.parquet(userhome + "/tmp/tweetWithMaliciousDF.parquet")


# SQL

## Queries

In [None]:
// select
%sql
SELECT * FROM People10M

// describe
%sql
DESCRIBE People10M

// filter
%sql
SELECT firstName, middleName, lastName, birthDate
FROM People10M
WHERE year(birthDate) > 1990 AND gender = 'F'

%sql
SELECT firstName, middleName, lastName, year(birthDate) as birthYear, salary 
FROM People10M
WHERE year(birthDate) > 1990 AND gender = 'F'

%sql
SELECT year(birthDate) as birthYear, count(*) AS total
FROM People10M
WHERE firstName = 'Mary' AND gender = 'F'
GROUP BY birthYear
ORDER BY birthYear

%sql
SELECT year(birthDate) as birthYear,  firstName, count(*) AS total
FROM People10M
WHERE (firstName = 'Dorothy' or firstName = 'Donna') AND gender = 'F' AND year(birthDate) > 1990
GROUP BY birthYear, firstName



In [None]:
// create temporary view
%sql
CREATE OR REPLACE TEMPORARY VIEW TheDonnas AS
  SELECT * 
  FROM People10M 
  WHERE firstName = 'Donna'

%sql
SELECT * FROM TheDonnas

%sql
CREATE OR REPLACE TEMPORARY VIEW WomenBornAfter1990 AS
  SELECT firstName, middleName, lastName, year(birthDate) AS birthYear, salary 
  FROM People10M
  WHERE year(birthDate) > 1990 AND gender = 'F'

%sql
SELECT birthYear, count(*) 
FROM WomenBornAfter1990 
WHERE firstName = 'Mary' 
GROUP BY birthYear 
ORDER BY birthYear




## Data Access

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS junk;

USE junk;

CREATE TABLE IF NOT EXISTS IPGeocode
  USING parquet
  OPTIONS (
    path "dbfs:/mnt/training/ip-geocode.parquet"
  )

%sql
CREATE TABLE IF NOT EXISTS BikeSharingDay
  USING csv
  OPTIONS (
    path "/mnt/training/bikeSharing/data-001/day.csv",
    inferSchema "true",
    header "true"
  )




## JSON

In [None]:
%sql
CREATE TABLE IF NOT EXISTS DatabricksBlog
  USING json
  OPTIONS (
    path "dbfs:/mnt/training/databricks-blog.json",
    inferSchema "true"
  )

In [None]:
// pulling nested data
%sql
SELECT dates.createdOn, dates.publishedOn 
FROM DatabricksBlog

%sql
SELECT title, 
       cast(dates.publishedOn AS timestamp) AS publishedOn 
FROM DatabricksBlog

%sql
CREATE OR REPLACE TEMPORARY VIEW DatabricksBlog2 AS
  SELECT *, 
         cast(dates.publishedOn AS timestamp) AS publishedOn 
  FROM DatabricksBlog

%sql
SELECT title, 
       date_format(publishedOn, "MMM dd, yyyy") AS date, 
       link 
FROM DatabricksBlog2
WHERE year(publishedOn) = 2013
ORDER BY publishedOn

//col:authors has the nested list of authors
//col:author has each specific auther in list
%sql
SELECT title, 
       authors, 
       explode(authors) AS author, 
       link 
FROM DatabricksBlog 
WHERE size(authors) > 1 
ORDER BY title





In [None]:
// lateral views

// use LATERAL VIEW to explode multiple columns at once, in this case, 
// the columns authors and categories
%sql
SELECT dates.publishedOn, title, author, category
FROM DatabricksBlog
LATERAL VIEW explode(authors) exploded_authors_view AS author
LATERAL VIEW explode(categories) exploded_categories AS category
WHERE title = "Apache Spark 1.1: The State of Spark Streaming"
ORDER BY author, category