d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px; height: 163px">
</div>

# Gain Actionable Insights from Twitter Data

In this capstone project, you will use Structured Streaming to gain insight from streaming Twitter data.

The executive team would like to have access to some key business metrics such as
* most tweeted hashtag in last 5 minute window
* a map of where tweets are coming from

## Instructions
* Insert solutions wherever it says `FILL_IN`
* Feel free to copy/paste code from the previous notebooks, where applicable
* Run test cells to verify that your solution is correct

## Audience
* Primary Audience: Data Engineers
* Additional Audiences: Data Analysts and Data Scientists

## Prerequisites
* Web browser: current versions of Google Chrome, Firefox, Safari, Microsoft Edge and 
Internet Explorer 11 on Windows 7, 8, or 10 (see <a href="https://docs.databricks.com/user-guide/supported-browsers.html#supported-browsers#" target="_blank">Supported Web Browsers</a>)
* Databricks Runtime 4.2 or greater
* Completed courses Spark-SQL, DataFrames or ETL-Part 1 from <a href="https://academy.databricks.com/" target="_blank">Databricks Academy</a>, or have similar knowledge
* Have done the rest of this course

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Getting Started</h2>

Run the following cell to configure our "classroom."

In [0]:
%run "./Includes/Classroom-Setup"

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Step 1</h2>
<h3>Read Streaming Data from Input Source</h3>

The input source is a a Kafka feed of Twitter data

For this step you will need to:
0. Use the `format()` operation to specify "kafka" as the type of the stream
0. Specify the location of the Kafka server by setting the option "kafka.bootstrap.servers" with one of the following values (depending on where you are located): 
 * **server1.databricks.training:9092** (US-Oregon)
 * **server2.databricks.training:9092** (Singapore)
0. Indicate which topics to listen to by setting the option "subscribe" to "tweets"
0. Throttle Kafka's processing of the streams
0. Rewind stream to beginning when we restart notebook
0. Load the input data stream in as a DataFrame
0. Select the column `value` - cast it to a `STRING`

In [0]:
# TODO
from pyspark.sql.functions import col

spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

kafkaServer = "server1.databricks.training:9092"   # US (Oregon)
# kafkaServer = "server2.databricks.training:9092" # Singapore

rawDF = (spark.readStream
 .format('kafka')                     # Specify "kafka" as the type of the stream
 .option('kafka.bootstrap.servers', kafkaServer)                     # Set the location of the kafka server
 .option('subscribe', 'tweets')                     # Indicate which topics to listen to
 .option('maxOffsetsPerTrigger', 1000)                     # Throttle Kafka's processing of the streams
 .option('startingOffsets', 'earliest')                     # Rewind stream to beginning when we restart notebook
 .load()                     # Load the input data stream in as a DataFrame
 .select(col('value').cast('STRING'))                     # Select the "value" column and cast to a string
)

In [0]:
# TEST - Run this cell to test your solution.
schemaStr = str(rawDF.schema)

dbTest("SS-06-schema-value",     True, "(value,StringType,true)" in schemaStr)
dbTest("SS-06-is-streaming",     True, rawDF.isStreaming)

print("Tests passed!")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Step 2</h2>
<h3>A Schema for parsing JSON</h3>

Becase the schema is so complex, it is being provided for you.

Simply run the following cell and proceed to the next step.

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, ArrayType

twitSchema = StructType([
  StructField("hashTags", ArrayType(StringType(), False), True),
  StructField("text", StringType(), True),   
  StructField("userScreenName", StringType(), True),
  StructField("id", LongType(), True),
  StructField("createdAt", LongType(), True),
  StructField("retweetCount", IntegerType(), True),
  StructField("lang", StringType(), True),
  StructField("favoriteCount", IntegerType(), True),
  StructField("user", StringType(), True),
  StructField("place", StructType([
    StructField("coordinates", StringType(), True), 
    StructField("name", StringType(), True),
    StructField("placeType", StringType(), True),
    StructField("fullName", StringType(), True),
    StructField("countryCode", StringType(), True)]), 
  True)
])

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Step 3</h2>
<h3>Create a JSON DataFrame</h3>

From the `rawDF` parse out the json subfields using `from_json`. Create a DataFrame that has fields
* `time`
* `json`, a nested field that has all the rest of the data
* promote all `json` subfields to fields.

In [0]:
# TODO
from pyspark.sql.functions import from_json, expr

cleanDF = (rawDF
 .withColumn('json', from_json('value', twitSchema))                          # Add the column "json" by parsing the column "value" with "from_json"
 .select(
   expr("cast(cast(json.createdAt as double)/1000 as timestamp) as time"),   # Cast "createdAt" column properly, call it "time"
   col("json.hashTags").alias("hashTags"),                                     # Promote subfields of "json" column e.g. "json.field" to "field"
   col("json.text").alias("text"),                                                                   # Repeat for each subfields of "json"
   col("json.userScreenName").alias("userScreenName"),
   col("json.id").alias("id"),
   col("json.retweetCount").alias("retweetCount"),
   col("json.lang").alias("lang"),
   col("json.favoriteCount").alias("favoriteCount"),
   col("json.user").alias("user"),
   col("json.place.coordinates").alias("coordinates"),
   col("json.place.name").alias("name"),
   col("json.place.placeType").alias("placeType"),
   col("json.place.fullName").alias("fullName"),
   col("json.place.countryCode").alias("countryCode"),
 )
)

In [0]:
# TEST - Run this cell to test your solution.
schemaStr = str(cleanDF.schema)

dbTest("SS-06-schema-hashTag",  True, "hashTags,ArrayType(StringType,true)" in schemaStr)
dbTest("SS-06-schema-text",  True, "(text,StringType,true)" in schemaStr)
dbTest("SS-06-schema-userScreenName",  True, "(userScreenName,StringType,true)" in schemaStr)
dbTest("SS-06-schema-id",  True, "(id,LongType,true)" in schemaStr)
dbTest("SS-06-schema-time",  True, "(time,TimestampType,true)" in schemaStr)
dbTest("SS-06-schema-retweetCount",  True, "(retweetCount,IntegerType,true)" in schemaStr)
dbTest("SS-06-schema-lang",  True, "(lang,StringType,true)" in schemaStr)
dbTest("SS-06-schema-favoriteCount",  True, "(favoriteCount,IntegerType,true)" in schemaStr)
dbTest("SS-06-schema-user",  True, "(user,StringType,true)" in schemaStr)
dbTest("SS-06-schema-coordinates",  True, "(coordinates,StringType,true)" in schemaStr)
dbTest("SS-06-schema-name",  True, "(name,StringType,true)" in schemaStr)
dbTest("SS-06-schema-placeType",  True, "(placeType,StringType,true)" in schemaStr)
dbTest("SS-06-schema-fullName",  True, "(fullName,StringType,true)" in schemaStr)
dbTest("SS-06-schema-countryCode",  True, "(countryCode,StringType,true)" in schemaStr)

print("Tests passed!")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Step 4</h2>
<h3>Display Twitter Data as a Table</h3>

Click the left-most button in the bottom left corner.

In [0]:
# TODO
display(cleanDF)  # display "cleanDF"

time,hashTags,text,userScreenName,id,retweetCount,lang,favoriteCount,user,coordinates,name,placeType,fullName,countryCode
2020-03-01T06:47:19.000+0000,List(),WAIT rlly wanna see sum...it kinda messed me up,papistyIes,1234007291331899392,0,en,0,mia,[],,,,
2020-03-01T06:47:19.000+0000,List(),"RT @DrJoeAbah: In the last 2 weeks, I have used the Abuja International Airport twice, London Heathrow Airport four times and Washington Du…",Uc383,1234007291353075714,0,en,0,Uc38,[],,,,
2020-03-01T06:47:19.000+0000,"List(ButtaBomma, samajavaragamana, Mimdblock, ButtaBomma)",RT @ARYANSurya_: 24 hours video song Stats #ButtaBomma Views - 5.4M #samajavaragamana Views -5.1M #Mimdblock views-4.5M #ButtaBomma lik…,ManifanofBunny,1234007291369644032,0,en,0,Mâñî_FáñØf_Bûññy🀄⛎Š♓🀄♈,[],,,,
2020-03-01T06:47:19.000+0000,List(),RT @Terrisrenee: Talk to your girl.. she’s there for you. You can trust her 🤞🏾,trillysssa,1234007291353014272,0,en,0,lys,[],,,,
2020-03-01T06:47:19.000+0000,List(),@LiterateLiberal Plus thoughts.,howellsuzy,1234007291348705280,0,en,0,Suzy Howell,[],,,,
2020-03-01T06:47:19.000+0000,List(),Bae too cold.,BoobieBack,1234007291348779009,0,en,0,Og BoobieP,[],,,,
2020-03-01T06:47:19.000+0000,List(),"@BerniceKing @InactionNever Why are you making this a choice of either/or? I believe, as adults, we can choose what… https://t.co/UHui98W1EX",Polly_Tics,1234007291361280002,0,en,0,Polly_Tics,[],,,,
2020-03-01T06:47:19.000+0000,List(),RT @iTerryTommy: Everybody say thank you to all the Black gay niggas shooting like this for the past like 6 years,Musiclover1789,1234007291344506880,0,en,0,Lyric,[],,,,
2020-03-01T06:47:19.000+0000,List(),Don't talk on my behalf. I'm not proud of TAJ. Instead I'm proud of ancient indian monuments/temples(that the invad… https://t.co/tPRFuIrRcP,accidentalijner,1234007291369861120,0,en,0,🇮🇳 🇮🇳,[],,,,
2020-03-01T06:47:19.000+0000,List(),After a slight break Travis August is back with his new track Birdie https://t.co/TJpSbwl2Hk https://t.co/kIVevUYm1i,GospelHydration,1234007291336114176,0,en,0,gospel hydration,[],,,,


In [0]:
# TEST - Run this cell to test your solution.
dbTest("SS-06-numActiveStreams", True, len(spark.streams.active) > 0)
       
print("Tests passed!")

When you are done, stop the stream:

In [0]:
# TODO
for s in spark.streams.active:
  s.stop()

In [0]:
# TEST - Run this cell to test your solution.
dbTest("SS-06-numActiveStreams1", 0, len(spark.streams.active))

print("Tests passed!")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Step 5</h2>
<h3>Hashtag Processing</h3>

In this exercise, we do ETL processing on the `hashTags` column.

The goal is to first convert hash tags all to lower case then group tweets and count by hash tags.

You will notice that `hashTags` is an array of hash tags, which you will have to break up (use `explode` function).

The `explode` method allows you to split an array column into multiple rows, copying all the other columns into each new row.

In [0]:
# TODO
from pyspark.sql.functions import explode, lower

twitCountsDF = (cleanDF      # Start with "cleanDF"
 .select(explode('hashTags').alias('hashTag1'))                    # Explode the array "hashTags" into "hashTag"
 .withColumn('hashTag', lower(col('hashTag1'))).drop('hashTag1')                    # Convert "hashTag" to lower case
 .groupBy('hashTag')                    # Aggregate by "hashTag"...
 .count()                    # For the aggregate, produce a count  
 .orderBy('count', ascending=False)                    # Sort by "count"
 .limit(25)                    # Limit the result to 25 records
)

In [0]:
# TEST - Run this cell to test your solution.
schemaStr = str(twitCountsDF.schema)

dbTest("SS-06-schema-hashTag", True, "(hashTag,StringType,true)" in schemaStr)
dbTest("SS-06-schema-count",   True, "(count,LongType,false)" in schemaStr)

print("Tests passed!")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Step 6</h2>
<h3>Plot Counts of Top 25 Most Popular Hashtags</h3>

Under <b>Plot Options</b>, use the following:
* <b>Keys:</b> `hashTag`
* <b>Values:</b> `count`

In <b>Display type</b>, use <b>Pie Chart</b> and click <b>Apply</b>.

Once you apply the plot options, be prepared to increase the size of the plot graphic using the resize widget in the lower right corner of the graphic area.

In [0]:
# TODO
display(twitCountsDF) # display twitCountsDF

hashTag,count
bts,67
secrets_of_vedas,64
iheartawards,58
coronavirus,47
uwmalastingpromise,44
rss_killingmuslims,40
ourheartsidnaaz,35
master,30
letsexposeptm,29
jimin,27


In [0]:
# TEST - Run this cell to test your solution.
dbTest("SS-06-numActiveStreams", True, len(spark.streams.active) > 0)
       
print("Tests passed!")

When you are done, stop the stream:

In [0]:
for streamingQuery in spark.streams.active:
  streamingQuery.stop()

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Step 7</h2>
<h3>Read in File with Two Letter to Three Letter Country Codes</h3>

For this next part we are going to take a look at the number of requests per country.

To get started, we first need a lookup table that will give us the 3-character country code.

0. Read in the file at `/mnt/training/countries/ISOCountryCodes/ISOCountryLookup.parquet`
0. We will be interested in the `alpha2Code` and `alpha3Code` fields later

In [0]:
# TODO
countryCodeDF = spark.read.parquet('/mnt/training/countries/ISOCountryCodes/ISOCountryLookup.parquet')

In [0]:
# TEST - Run this cell to test your solution.
schemaStr = str(countryCodeDF.schema)

dbTest("SS-06-schema-1", True, "(EnglishShortName,StringType,true)" in schemaStr)
dbTest("SS-06-schema-2", True, "(alpha2Code,StringType,true)" in schemaStr)
dbTest("SS-06-schema-3", True, "(alpha3Code,StringType,true)" in schemaStr)
dbTest("SS-06-schema-4", True, "(numericCode,StringType,true)" in schemaStr)
dbTest("SS-06-schema-5", True, "(ISO31662SubdivisionCode,StringType,true)" in schemaStr)
dbTest("SS-06-schema-6", True, "(independentTerritory,StringType,true)" in schemaStr)

dbTest("SS-06-streaming-7", False, countryCodeDF.isStreaming)

print("Tests passed!")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Step 8</h2>
<h3>Join Tables &amp; Aggregate By Country</h3>

In `cleanDF`, there is a `countryCode` field. However, it is in the form of a two-letter country code.

The `display` map expects a three-letter country code.

In order to retrieve tweets with three-letter country codes, we will have to join `cleanDF` with `countryCodesDF`.

In [0]:
# TODO
mappedDF = (cleanDF
  .filter(col('countryCode').isNotNull())                            # Filter out any nulls for "countryCode"
  .join(countryCodeDF, cleanDF.countryCode == countryCodeDF.alpha2Code)  # Join the two tables on "countryCode" and "alpha2Code"
  .groupBy('alpha3Code')                            # Aggregate by country, "alpha3Code"
  .count()                            # Produce a count of each aggregate
)

In [0]:
# TEST - Run this cell to test your solution.
schemaStr = str(mappedDF.schema)
print(schemaStr)

dbTest("SS-06-schema-1",  True, "alpha3Code,StringType,true" in schemaStr)
dbTest("SS-06-schema-2",  True, "count,LongType,false" in schemaStr)

print("Tests passed!")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Step 9</h2>
<h3>Plot Tweet Counts on a World Map</h3>

Under <b>Plot Options</b>, use the following:
* <b>Keys:</b> `alpha3Code`
* <b>Values:</b> `count`

In <b>Display type</b>, use <b>World map</b> and click <b>Apply</b>.

<img src="https://files.training.databricks.com/images/eLearning/Structured-Streaming/plot-options-map-06.png"/>

In [0]:
# TODO
display(mappedDF)  # display mappedDF

alpha3Code,count
IND,17
CHL,1
LAO,1
NZL,3
KEN,3
TUR,1
CHE,1
OMN,1
DEU,4
AUS,11


In [0]:
# TEST - Run this cell to test your solution.
dbTest("SS-06-numActiveStreams", True, len(spark.streams.active) > 0)
       
print("Tests passed!")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Step 10: Write Stream</h2>

Write the stream to an in-memory table
0. Use appropriate `format`
0. For this exercise, we want to append new records to the results table
0. Gives the query a name
0. Start the query
0. Assign the query to `mappedTable`

In [0]:
# TODO
mappedQuery = (mappedDF 
  .writeStream                               # From the DataFrame get the DataStreamWriter
  .format('memory')                               # Specify the sink format as "memory"
  .outputMode('complete')                               # Configure the output mode as "complete"
  .queryName('mappedTablePython')                               # Name the query "mappedTable-python"
  .start('mappedTable')                               # Start the query
)

In [0]:
# TEST  - Run this cell to test your solution.
dbTest("SS-06-isActive", True, mappedQuery.isActive)
dbTest("SS-06-name", "mappedTablePython", mappedQuery.name)

print("Tests passed!")

Wait until stream is done initializing...

In [0]:
untilStreamIsReady("mappedTablePython")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Step 11: Use SQL Syntax to Display a Few Rows</h2>

Do a basic SQL query to display all columns and, say, 10 rows.

### Why are we doing this?

In [0]:
%sql
--TODO 
SELECT *
FROM mappedTablePython
LIMIT 10

alpha3Code,count
IND,14
NZL,3
KEN,3
TUR,1
CHL,1
CHE,1
OMN,1
DEU,4
AUS,10
NGA,12


In [0]:
# TEST - Run this cell to test your solution.
try: tableExists = (spark.table("mappedTablePython") is not None)
except: tableExists = False
dbTest("SS-06-1", True, tableExists)  

firstRowCol = spark.sql("SELECT * FROM mappedTablePython limit 1").first()[0]
dbTest("SS-06-rowsExist", True, len(firstRowCol) > 0) 

print("Tests passed!")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Step 12: Stop Streaming Jobs</h2>

Before we can conclude, we need to shut down all active streams.

In [0]:
# TODO
for s in spark.streams.active:# Iterate over all the active streams
    s.stop()# Stop the stream

In [0]:
# TEST - Run this cell to test your solution.
dbTest("SS-06-numActiveStreams", 0, len(spark.streams.active))

print("Tests passed!")

Congratulations: ALL DONE!!

Don't forget to complete this short [feedback survey](https://www.surveymonkey.com/r/YCH8FYB).  Your input is extremely important and will shape future development.

-sandbox
&copy; 2019 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>