![x](https://zdnet4.cbsistatic.com/hub/i/r/2017/12/17/e9b8f576-8c65-4308-93fa-55ee47cdd7ef/resize/370xauto/30f614c5879a8589a22e57b3108195f3/databricks-logo.png)

&copy; 2019 Databricks, Inc. All rights reserved.<br/>

Now that our existing dataset is clean, what can we do to enrich it with other sources of information, so as to make sure that our Data Science team has everything they need in order improve their work?

One option is to leverage other data sources to enrich our existing dataset. While other datasets can be stored in a storage solution such as ADLS or SQL, in some cases, you will have data being streamed in real time through services such as EventHub or Kafka.

In our case, more information about our devices is being streamed into EventHub by a different Data Engineering team. How can we leverage that information and join it with our dataset?

### What is EventHub?

Azure Event Hubs is a big data streaming platform and event ingestion service. It can receive and process millions of events per second. Data sent to an event hub can be transformed and stored by using any real-time analytics provider or batching/storage adapters.

https://azure.microsoft.com/en-gb/services/event-hubs/

-sandbox
##Streaming Concepts

<b>Stream processing</b> is where you continuously incorporate new data into a data lake and compute results.

The data is coming in faster than it can be consumed.

<div><img src="https://files.training.databricks.com/images/eLearning/Delta/firehose.jpeg" style="height: 200px"/></div><br/>

Treat a <b>stream</b> of data as a table to which data is continously appended. 

In this course we are assuming Databricks Structured Streaming, which uses the DataFrame API. 

There are other kinds of streaming systems.

<div><img src="https://files.training.databricks.com/images/eLearning/Delta/stream2rows.png" style="height: 300px"/></div><br/>

Examples are bank card transactions, Internet of Things (IoT) device data, and video game play events. 

Data coming from a stream is typically not ordered in any way.

A streaming system consists of 
* <b>Input source</b> such as Kafka, Azure Event Hub, files on a distributed system or TCP-IP sockets
* <b>Sinks</b> such as Kafka, Azure Event Hub, various file formats, `forEach` sinks, console sinks or memory sinks


In streaming, the problems of traditional data pipelines are exacerbated. 

Specifically, with frequent meta data refreshes, table repairs and accumulation of small files on a secondly- or minutely-basis!

Many small files result because data (may be) streamed in at low volumes with short triggers.

### What is Spark Structured Streaming?

<div style="width: 100%">
  <div style="margin: auto; width: 800px">
    <img src="http://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png"/>
  </div>
</div>

Data is appended to the Input Table every _trigger interval_. For instance, if the trigger interval is 1 second, then new data is appended to the Input Table every seconds. (The trigger interval is analogous to the _batch interval_ in the legacy RDD-based Streaming API.)

A great blog posts that describes Structured Streaming in Spark: https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html

# Reading Data from EventHub

Firstly, we need to setup an EventHub, then define connection parameters for EventHub. This is done below.

You can find further information and examples at the following sources:

https://docs.azuredatabricks.net/spark/latest/structured-streaming/streaming-event-hubs.html#  
https://docs.databricks.com/spark/latest/structured-streaming/streaming-event-hubs.html#production-structured-streaming-with-azure-event-hubs  
https://docs.microsoft.com/en-us/azure/azure-databricks/databricks-stream-from-eventhubs  
https://lenadroid.github.io/posts/connecting-spark-and-eventhubs.html

### Setting up EventHub

Please follow the instructions here to setup EventHub:

https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create

Once this is done, please fill in the information below with your EH configuration. 

Furthermore, install the following libraries:

* azure-eventhub via PyPi  
* com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.11 via Maven

In [11]:
import os
import sys
import time
from azure.eventhub import EventHubClient, Receiver, Offset

#Define EventHub requirements
ADDRESS_READ_v1_eh = 'amqps://workshop-adb-eh-kafka.servicebus.windows.net/ehlifecycle'
# SAS policy and key are not required if they are encoded in the URL
USER_READ_v1_eh = 'RootManageSharedAccessKey'
KEY_READ_v1_eh = str(dbutils.secrets.get("workshop_secrets", "ehKey"))

CONSUMER_GROUP_READ_v1_eh = "$default"
OFFSET_READ_v1_eh = Offset("-1")
PARTITION_READ_v1_eh = "0"

#Alternatively, you can use a connection string
CONNSTRING_READ_v1_eh = "Endpoint=sb://workshop-adb-eh-kafka.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={};EntityPath=ehlifecycle".format(KEY_READ_v1_eh)

NAMESPACE_READ_v1_eh = "workshop-adb-eh-kafka"
NAME_READ_v1_eh = "ehlifecycle"
DIR_READ_v1_eh = "/eh_read_tmp"

#### Generating Events

We also need to make sure that our EventHub is being populated. In order to do so, please briefly switch to the following [notebook on sending some data to EH]($./Includes/EH-Send)

Now let's start the background ETL process

In [14]:
%run ./Includes/EH-Send

In Databricks, you can run notebooks from within notebooks, creating your own data pipelines: https://docs.databricks.com/user-guide/notebooks/notebook-use.html#run-a-notebook-from-another-notebook

In [16]:
#Make sure we don't have anything in the temp folder
dbutils.fs.rm("/eh_read_tmp", True)

In [17]:
#Read from EH as a stream

inputStream = (spark.readStream 
.format("eventhubs")
.option("eventhubs.policyname", USER_READ_v1_eh)
.option("eventhubs.policykey", KEY_READ_v1_eh)
.option("eventhubs.namespace", NAMESPACE_READ_v1_eh)
.option("eventhubs.name", NAME_READ_v1_eh)
.option("eventhubs.partition.count", "9")
.option("eventhubs.maxRate", "1000")
.option("eventhubs.progressTrackingDir", DIR_READ_v1_eh)
.option("eventhubs.connectionstring", CONNSTRING_READ_v1_eh)
.option("startingOffsets", "earliest")           # Rewind stream to beginning when we restart notebook
.load())

We should also define a schema for our incoming stream

In [19]:
from pyspark.sql.types import *


streamSchema = StructType([
  StructField("MachineIdentifier", StringType(), True),
  StructField("Census_MDC2FormFactor", StringType(), True),
  StructField("Census_DeviceFamily", StringType(), True),
  StructField("Census_OEMNameIdentifier", IntegerType(), True),
  StructField("Census_OEMModelIdentifier", IntegerType(), True),
  StructField("Census_ProcessorCoreCount", IntegerType(), True),
  StructField("Census_ProcessorManufacturerIdentifier", IntegerType(), True),
  StructField("Census_ProcessorModelIdentifier", IntegerType(), True),
  StructField("Census_ProcessorClass", StringType(), True),
  StructField("Census_PrimaryDiskTotalCapacity", StringType(), True),
  StructField("Census_PrimaryDiskTypeName", StringType(), True),
  StructField("Census_SystemVolumeTotalCapacity", IntegerType(), True),
  StructField("Census_HasOpticalDiskDrive", IntegerType(), True),
  StructField("Census_TotalPhysicalRAM", IntegerType(), True),
  StructField("Census_ChassisTypeName", StringType(), True),
  StructField("Census_InternalPrimaryDiagonalDisplaySizeInInches", DoubleType(), True),
  StructField("Census_InternalPrimaryDisplayResolutionHorizontal", IntegerType(), True),
  StructField("Census_InternalPrimaryDisplayResolutionVertical", IntegerType(), True),
  StructField("Census_PowerPlatformRoleName", StringType(), True),
  StructField("Census_InternalBatteryType", StringType(), True),
  StructField("Census_InternalBatteryNumberOfCharges", LongType(), True),
  StructField("Census_OSVersion", StringType(), True),
  StructField("Census_OSArchitecture", StringType(), True),
  StructField("Census_OSBranch", StringType(), True),
  StructField("Census_OSBuildNumber", IntegerType(), True),
  StructField("Census_OSBuildRevision", IntegerType(), True),
  StructField("Census_OSEdition", StringType(), True),
  StructField("Census_OSSkuName", StringType(), True),
  StructField("Census_OSInstallTypeName", StringType(), True),
  StructField("Census_OSInstallLanguageIdentifier", IntegerType(), True),
  StructField("Census_OSUILocaleIdentifier", IntegerType(), True),
  StructField("Census_OSWUAutoUpdateOptionsName", StringType(), True),
  StructField("Census_IsPortableOperatingSystem", IntegerType(), True),
  StructField("Census_GenuineStateName", StringType(), True),
  StructField("Census_ActivationChannel", StringType(), True),
  StructField("Census_IsFlightingInternal", IntegerType(), True),
  StructField("Census_IsFlightsDisabled", IntegerType(), True),
  StructField("Census_FlightRing", StringType(), True),
  StructField("Census_ThresholdOptIn", IntegerType(), True),
  StructField("Census_FirmwareManufacturerIdentifier", IntegerType(), True),
  StructField("Census_FirmwareVersionIdentifier", IntegerType(), True),
  StructField("Census_IsSecureBootEnabled", IntegerType(), True),
  StructField("Census_IsWIMBootEnabled", IntegerType(), True),
  StructField("Census_IsVirtualDevice", IntegerType(), True),
  StructField("Census_IsTouchEnabled", IntegerType(), True),
  StructField("Census_IsPenCapable", IntegerType(), True),
  StructField("Census_IsAlwaysOnAlwaysConnectedCapable", IntegerType(), True)])


Then we should cast the stream body to a string, and then make sure we select the relavant fields

In [21]:
from pyspark.sql.functions import *

sDf = (inputStream
 .select(from_json(inputStream.body.cast("string"), streamSchema).alias('fields'))
       .select("fields.MachineIdentifier", 
              "fields.Census_MDC2FormFactor", 
              "fields.Census_DeviceFamily", 
              "fields.Census_OEMNameIdentifier", 
              "fields.Census_OEMModelIdentifier", 
              "fields.Census_ProcessorCoreCount", 
              "fields.Census_ProcessorManufacturerIdentifier", 
              "fields.Census_ProcessorModelIdentifier", 
              "fields.Census_ProcessorClass", 
              "fields.Census_PrimaryDiskTotalCapacity", 
              "fields.Census_PrimaryDiskTypeName", 
              "fields.Census_SystemVolumeTotalCapacity", 
              "fields.Census_HasOpticalDiskDrive", 
              "fields.Census_TotalPhysicalRAM", 
              "fields.Census_ChassisTypeName", 
              "fields.Census_InternalPrimaryDiagonalDisplaySizeInInches", 
              "fields.Census_InternalPrimaryDisplayResolutionHorizontal", 
              "fields.Census_InternalPrimaryDisplayResolutionVertical", 
              "fields.Census_PowerPlatformRoleName", 
              "fields.Census_InternalBatteryType", 
              "fields.Census_InternalBatteryNumberOfCharges", 
              "fields.Census_OSVersion", 
              "fields.Census_OSArchitecture", 
              "fields.Census_OSBranch", 
              "fields.Census_OSBuildNumber", 
              "fields.Census_OSBuildRevision", 
              "fields.Census_OSEdition", 
              "fields.Census_OSSkuName", 
              "fields.Census_OSInstallTypeName", 
              "fields.Census_OSInstallLanguageIdentifier", 
              "fields.Census_OSUILocaleIdentifier", 
              "fields.Census_OSWUAutoUpdateOptionsName", 
              "fields.Census_IsPortableOperatingSystem", 
              "fields.Census_GenuineStateName", 
              "fields.Census_ActivationChannel", 
              "fields.Census_IsFlightingInternal", 
              "fields.Census_IsFlightsDisabled", 
              "fields.Census_FlightRing", 
              "fields.Census_ThresholdOptIn", 
              "fields.Census_FirmwareManufacturerIdentifier", 
              "fields.Census_FirmwareVersionIdentifier", 
              "fields.Census_IsSecureBootEnabled", 
              "fields.Census_IsWIMBootEnabled", 
              "fields.Census_IsVirtualDevice", 
              "fields.Census_IsTouchEnabled", 
              "fields.Census_IsPenCapable", 
              "fields.Census_IsAlwaysOnAlwaysConnectedCapable"
              ))

### It's just a DataFrame

We can use normal DataFrame transformations on our streaming DataFrame.

<img src="https://spark.apache.org/docs/latest/img/structured-streaming-example-model.png"/>

Below you can see how your stream should look like.

Please do not run the display function yet, as this will consume the events from EventHub, and we have no storage mechanism in place for the time being.

In [24]:
#display(sDf)

Some further info on the columns in this dataset:

**Census_MDC2FormFactor** - A grouping based on a combination of Device Census level hardware characteristics. The logic used to define Form Factor is rooted in business and industry standards and aligns with how people think about their device. (Examples:   Smartphone, Small Tablet, All in One, Convertible...)  
**Census_DeviceFamily** - AKA DeviceClass. Indicates the type of device that an edition of the OS is intended for. Example values: Windows.Desktop, Windows.Mobile, and iOS.Phone  
**Census_OEMNameIdentifier** - NA  
**Census_OEMModelIdentifier** - NA  
**Census_ProcessorCoreCount** - Number of logical cores in the processor  
**Census_ProcessorManufacturerIdentifier** - NA  
**Census_ProcessorModelIdentifier** - NA  
**Census_ProcessorClass** - A classification of processors into high/medium/low. Initially used for Pricing Level SKU. No longer maintained and updated  
**Census_PrimaryDiskTotalCapacity** - Amount of disk space on primary disk of the machine in MB  
**Census_PrimaryDiskTypeName** - Friendly name of Primary Disk Type - HDD or SSD  
**Census_SystemVolumeTotalCapacity** - The size of the partition that the System volume is installed on in MB  
**Census_HasOpticalDiskDrive** - True indicates that the machine has an optical disk drive (CD/DVD)  
**Census_TotalPhysicalRAM** - Retrieves the physical RAM in MB  
**Census_ChassisTypeName** - Retrieves a numeric representation of what type of chassis the machine has. A value of 0 means xx  
**Census_InternalPrimaryDiagonalDisplaySizeInInches** - Retrieves the physical diagonal length in inches of the primary display  
**Census_InternalPrimaryDisplayResolutionHorizontal** - Retrieves the number of pixels in the horizontal direction of the internal display.  
**Census_InternalPrimaryDisplayResolutionVertical** - Retrieves the number of pixels in the vertical direction of the internal display  
**Census_PowerPlatformRoleName** - Indicates the OEM preferred power management profile. This value helps identify the basic form factor of the device  
**Census_InternalBatteryType** - NA  
**Census_InternalBatteryNumberOfCharges** - NA  
**Census_OSVersion** - Numeric OS version Example - 10.0.10130.0  
**Census_OSArchitecture** - Architecture on which the OS is based. Derived from OSVersionFull. Example - amd64  
**Census_OSBranch** - Branch of the OS extracted from the OsVersionFull. Example - OsBranch = fbl_partner_eeap where OsVersion = 6.4.9813.0.amd64fre.fbl_partner_eeap.140810-0005  
**Census_OSBuildNumber** - OS Build number extracted from the OsVersionFull. Example - OsBuildNumber = 10512 or 10240  
**Census_OSBuildRevision** - OS Build revision extracted from the OsVersionFull. Example - OsBuildRevision = 1000 or 16458  
**Census_OSEdition** - Edition of the current OS. Sourced from HKLM\Software\Microsoft\Windows NT\CurrentVersion@EditionID in registry. Example: Enterprise  
**Census_OSSkuName** - OS edition friendly name (currently Windows only)  
**Census_OSInstallTypeName** - Friendly description of what install was used on the machine i.e. clean  
**Census_OSInstallLanguageIdentifier** - NA  
**Census_OSUILocaleIdentifier** - NA  
**Census_OSWUAutoUpdateOptionsName** - Friendly name of the WindowsUpdate auto-update settings on the machine.  
**Census_IsPortableOperatingSystem** - Indicates whether OS is booted up and running via Windows-To-Go on a USB stick.  
**Census_GenuineStateName** - Friendly name of OSGenuineStateID. 0 = Genuine  
**Census_ActivationChannel** - Retail license key or Volume license key for a machine.  
**Census_IsFlightingInternal** - NA  
**Census_IsFlightsDisabled** - Indicates if the machine is participating in flighting.  
**Census_FlightRing** - The ring that the device user would like to receive flights for. This might be different from the ring of the OS which is currently installed if the user changes the ring after getting a flight from a different ring.  
**Census_ThresholdOptIn** - NA  
**Census_FirmwareManufacturerIdentifier** - NA  
**Census_FirmwareVersionIdentifier** - NA  
**Census_IsSecureBootEnabled** - Indicates if Secure Boot mode is enabled.  
**Census_IsWIMBootEnabled** - NA  
**Census_IsVirtualDevice** - Identifies a Virtual Machine (machine learning model)  
**Census_IsTouchEnabled** - Is this a touch device ?  
**Census_IsPenCapable** - Is the device capable of pen input ?  
**Census_IsAlwaysOnAlwaysConnectedCapable** - Retreives information about whether the battery enables the device to be AlwaysOnAlwaysConnected .

It appears that there has been an error in the process that generates the data above.

The column "Census_PrimaryDiskTotalCapacity" appears to have 4 extra "0"s at the end, as well as some "." between the numbers. Let's fix this like we would with a normal dataframe.

In [27]:
#First, let's stop all active streams in this notebook (if we need to)
#for s in spark.streams.active:
  #s.stop()

In [28]:
resolveDF = ...

In [29]:
### Do not unhide
#############################################################################################
#############################################################################################

solvedDF = sDf.withColumn("Census_PrimaryDiskTotalCapacity", regexp_replace(col("Census_PrimaryDiskTotalCapacity"), "\.", "")).withColumn("Census_PrimaryDiskTotalCapacity", expr("substring(Census_PrimaryDiskTotalCapacity, 1, length(Census_PrimaryDiskTotalCapacity)-4)"))

In [30]:
display(solvedDF)

### Backup for EventHub

In case we have any issues setting up EventHub, you have a snippet to read the code in a streaming fashion from files below. This also includes the solutions for cleaning the stream.

In [32]:
#Ensure schema is correct for the parquet file

from pyspark.sql.types import *


streamSchema = StructType([
  StructField("MachineIdentifier", StringType(), True),
  StructField("Census_MDC2FormFactor", StringType(), True),
  StructField("Census_DeviceFamily", StringType(), True),
  StructField("Census_OEMNameIdentifier", IntegerType(), True),
  StructField("Census_OEMModelIdentifier", IntegerType(), True),
  StructField("Census_ProcessorCoreCount", IntegerType(), True),
  StructField("Census_ProcessorManufacturerIdentifier", IntegerType(), True),
  StructField("Census_ProcessorModelIdentifier", IntegerType(), True),
  StructField("Census_ProcessorClass", StringType(), True),
  StructField("Census_PrimaryDiskTotalCapacity", StringType(), True),
  StructField("Census_PrimaryDiskTypeName", StringType(), True),
  StructField("Census_SystemVolumeTotalCapacity", IntegerType(), True),
  StructField("Census_HasOpticalDiskDrive", IntegerType(), True),
  StructField("Census_TotalPhysicalRAM", IntegerType(), True),
  StructField("Census_ChassisTypeName", StringType(), True),
  StructField("Census_InternalPrimaryDiagonalDisplaySizeInInches", DoubleType(), True),
  StructField("Census_InternalPrimaryDisplayResolutionHorizontal", IntegerType(), True),
  StructField("Census_InternalPrimaryDisplayResolutionVertical", IntegerType(), True),
  StructField("Census_PowerPlatformRoleName", StringType(), True),
  StructField("Census_InternalBatteryType", StringType(), True),
  StructField("Census_InternalBatteryNumberOfCharges", DecimalType(), True),
  StructField("Census_OSVersion", StringType(), True),
  StructField("Census_OSArchitecture", StringType(), True),
  StructField("Census_OSBranch", StringType(), True),
  StructField("Census_OSBuildNumber", IntegerType(), True),
  StructField("Census_OSBuildRevision", IntegerType(), True),
  StructField("Census_OSEdition", StringType(), True),
  StructField("Census_OSSkuName", StringType(), True),
  StructField("Census_OSInstallTypeName", StringType(), True),
  StructField("Census_OSInstallLanguageIdentifier", IntegerType(), True),
  StructField("Census_OSUILocaleIdentifier", IntegerType(), True),
  StructField("Census_OSWUAutoUpdateOptionsName", StringType(), True),
  StructField("Census_IsPortableOperatingSystem", IntegerType(), True),
  StructField("Census_GenuineStateName", StringType(), True),
  StructField("Census_ActivationChannel", StringType(), True),
  StructField("Census_IsFlightingInternal", IntegerType(), True),
  StructField("Census_IsFlightsDisabled", IntegerType(), True),
  StructField("Census_FlightRing", StringType(), True),
  StructField("Census_ThresholdOptIn", IntegerType(), True),
  StructField("Census_FirmwareManufacturerIdentifier", IntegerType(), True),
  StructField("Census_FirmwareVersionIdentifier", IntegerType(), True),
  StructField("Census_IsSecureBootEnabled", IntegerType(), True),
  StructField("Census_IsWIMBootEnabled", IntegerType(), True),
  StructField("Census_IsVirtualDevice", IntegerType(), True),
  StructField("Census_IsTouchEnabled", IntegerType(), True),
  StructField("Census_IsPenCapable", IntegerType(), True),
  StructField("Census_IsAlwaysOnAlwaysConnectedCapable", IntegerType(), True)])


In [33]:
from pyspark.sql.functions import *

csvStreamInputDF = (spark
  .readStream                                 # Returns an instance of DataStreamReader
  .schema(streamSchema)                         
  .option("maxFilesPerTrigger", 1)            # Treat a sequence of files as a stream, one file at a time
  .parquet("/mnt/databricks-workshop-datasets/End-to-End-ML-Lifecycle/workshop_train_enrich_sample_parquet")                            # Specifies the format, path and returns a DataFrame
)

csvSolvedDF = csvStreamInputDF.withColumn("Census_PrimaryDiskTotalCapacity", regexp_replace(col("Census_PrimaryDiskTotalCapacity"), "\.", "")).withColumn("Census_PrimaryDiskTotalCapacity", expr("substring(Census_PrimaryDiskTotalCapacity, 1, length(Census_PrimaryDiskTotalCapacity)-4)"))

In [34]:
#We're stopping the streams to not incurr any aditional costs for our EH, since we can just replicate based on the CSV files. The code will be the same.

for s in spark.streams.active:
  s.stop()

In [35]:
#display(csvSolvedDF)

### Streaming Joins

Now that we have cleaned our stream data, how do we join it back to the original dataset?

Grouping by unkown product IDs is not that that exciting. Let's join the stream with the product lookup data set
* Use the join key productId
* Hint: Since both DataFrames have the same column name `productId`
* Use the duplicated columns trick documented here: https://docs.azuredatabricks.net/spark/latest/faq/join-two-dataframes-duplicated-column.html

Remember we wrote a parquet file? Let's read it in.

In [38]:
batchDF = spark.read.parquet("/databricks_workshop/user_workshop_train/")

In [39]:
#joinedDF = batchDF.join(resolveDF, on="MachineIdentifier", how="left")

#Use the below if using the CSV stream
joinedDF = batchDF.join(csvSolvedDF, on="MachineIdentifier", how="left")

In [40]:
display(joinedDF)

Failed? Ok, let's try the other way.

In [42]:
#joinedDF = solvedDF.join(batchDF, on="MachineIdentifier", how="left")

#Use the below if EH was not setup
joinedDF = csvSolvedDF.join(batchDF, on="MachineIdentifier", how="left")

In [43]:
display(joinedDF)

MachineIdentifier,Census_MDC2FormFactor,Census_DeviceFamily,Census_OEMNameIdentifier,Census_OEMModelIdentifier,Census_ProcessorCoreCount,Census_ProcessorManufacturerIdentifier,Census_ProcessorModelIdentifier,Census_ProcessorClass,Census_PrimaryDiskTotalCapacity,Census_PrimaryDiskTypeName,Census_SystemVolumeTotalCapacity,Census_HasOpticalDiskDrive,Census_TotalPhysicalRAM,Census_ChassisTypeName,Census_InternalPrimaryDiagonalDisplaySizeInInches,Census_InternalPrimaryDisplayResolutionHorizontal,Census_InternalPrimaryDisplayResolutionVertical,Census_PowerPlatformRoleName,Census_InternalBatteryType,Census_InternalBatteryNumberOfCharges,Census_OSVersion,Census_OSArchitecture,Census_OSBranch,Census_OSBuildNumber,Census_OSBuildRevision,Census_OSEdition,Census_OSSkuName,Census_OSInstallTypeName,Census_OSInstallLanguageIdentifier,Census_OSUILocaleIdentifier,Census_OSWUAutoUpdateOptionsName,Census_IsPortableOperatingSystem,Census_GenuineStateName,Census_ActivationChannel,Census_IsFlightingInternal,Census_IsFlightsDisabled,Census_FlightRing,Census_ThresholdOptIn,Census_FirmwareManufacturerIdentifier,Census_FirmwareVersionIdentifier,Census_IsSecureBootEnabled,Census_IsWIMBootEnabled,Census_IsVirtualDevice,Census_IsTouchEnabled,Census_IsPenCapable,Census_IsAlwaysOnAlwaysConnectedCapable,ProductName,EngineVersion,AppVersion,AvSigVersion,IsBeta,RtpStateBitfield,IsSxsPassiveMode,DefaultBrowsersIdentifier,AVProductStatesIdentifier,AVProductsInstalled,AVProductsEnabled,HasTpm,CountryIdentifier,CityIdentifier,OrganizationIdentifier,GeoNameIdentifier,LocaleEnglishNameIdentifier,Platform,Processor,OsVer,OsBuild,OsSuite,OsPlatformSubRelease,OsBuildLab,SkuEdition,IsProtected,AutoSampleOptIn,PuaMode,SMode,IeVerIdentifier,SmartScreen,Firewall,UacLuaenable,Wdft_IsGamer,Wdft_RegionIdentifier,HasDetections,serviceDate,recentIncident
268e7dc651edf03a6897123c3122641c,Notebook,Windows.Desktop,525.0,331423.0,4.0,5.0,2998.0,,953.0,HDD,953303.0,0,8192.0,Notebook,17.2,1600.0,900.0,Mobile,,-71.0,10.0.15063.0,amd64,rs2_release,15063,0,Core,CORE,IBSClean,14.0,49,UNKNOWN,0,OFFLINE,OEM:DM,,0.0,Retail,,142.0,70712.0,0,,0.0,0,0,0.0,win8defender,1.1.15100.1,4.18.1807.18075,1.273.1159.0,0,7.0,0,,53447.0,1.0,1.0,1,66,6373.0,27.0,89,88,windows10,x64,10.0.0.0,15063,768,rs2,15063.0.amd64fre.rs2_release.170317-1834,Home,1.0,0,,0.0,105.0,,1.0,1.0,0.0,5.0,1,2018-12-01,2018-01-03 21:42:47
26dbc52fe0c4a560f412c45f5c7f45c2,Notebook,Windows.Desktop,2102.0,245824.0,4.0,5.0,2697.0,,953.0,HDD,307193.0,1,8192.0,Notebook,15.5,1366.0,768.0,Mobile,lion,0.0,10.0.16299.371,amd64,rs3_release,16299,371,CoreSingleLanguage,CORE_SINGLELANGUAGE,Update,8.0,31,Notify,0,IS_GENUINE,OEM:DM,,0.0,Retail,0.0,554.0,33142.0,1,0.0,0.0,0,0,0.0,win8defender,1.1.15200.1,4.18.1807.18075,1.275.1442.0,0,0.0,1,,3371.0,2.0,1.0,1,93,99535.0,27.0,119,64,windows10,x64,10.0.0.0,16299,768,rs3,16299.15.amd64fre.rs3_release.170928-1534,Home,1.0,0,,0.0,117.0,RequireAdmin,1.0,1.0,0.0,8.0,0,2017-09-01,2017-07-21 18:40:27
28a599dc23971c9a3020f171c9e249b6,Notebook,Windows.Desktop,585.0,190276.0,2.0,5.0,1998.0,,29.0,SSD,29206.0,0,2048.0,Notebook,11.6,1366.0,768.0,Mobile,lion,0.0,10.0.10586.1176,amd64,th2_release_sec,10586,1176,CoreSingleLanguage,CORE_SINGLELANGUAGE,Other,9.0,34,Notify,0,IS_GENUINE,OEM:DM,,0.0,Retail,0.0,556.0,63140.0,1,0.0,0.0,0,0,0.0,win8defender,1.1.15100.1,4.9.10586.1106,1.273.461.0,0,7.0,0,,53447.0,1.0,1.0,1,141,147039.0,18.0,167,227,windows10,x64,10.0.0.0,10586,768,th2,10586.1176.amd64fre.th2_release_sec.170913-1848,Home,1.0,0,,0.0,74.0,ExistsNotSet,1.0,1.0,0.0,10.0,1,2018-08-01,2017-08-25 12:23:52
25d86096f455f5b6135dd13aac69f201,Notebook,Windows.Desktop,4142.0,296216.0,2.0,5.0,4335.0,,238.0,HDD,237122.0,0,2048.0,Notebook,15.5,1366.0,768.0,Mobile,,0.0,10.0.17134.228,x86,rs4_release,17134,228,Professional,PROFESSIONAL,Refresh,29.0,125,FullAuto,0,IS_GENUINE,Retail,,0.0,Retail,,807.0,3968.0,0,,0.0,0,0,0.0,win8defender,1.1.15200.1,4.18.1807.18075,1.275.323.0,0,7.0,0,,53447.0,1.0,1.0,1,171,70765.0,,211,182,windows10,x86,10.0.0.0,17134,256,rs4,17134.1.x86fre.rs4_release.180410-1804,Pro,1.0,0,,0.0,137.0,Warn,1.0,0.0,0.0,3.0,0,2017-06-01,2018-08-08 18:19:00
29bc033a56234603bb7f880cf61488ba,Notebook,Windows.Desktop,2102.0,241921.0,4.0,5.0,2405.0,,953.0,HDD,937075.0,0,8192.0,Notebook,15.5,1366.0,768.0,Mobile,,0.0,10.0.16299.547,amd64,rs3_release_svc_escrow,16299,547,CoreSingleLanguage,CORE_SINGLELANGUAGE,Other,10.0,35,UNKNOWN,0,IS_GENUINE,OEM:DM,,0.0,Retail,,554.0,32998.0,1,,0.0,0,0,0.0,win8defender,1.1.15200.1,4.12.16299.15,1.273.1420.0,0,7.0,0,1823.0,53447.0,1.0,1.0,1,141,5299.0,27.0,167,227,windows10,x64,10.0.0.0,16299,768,rs3,16299.431.amd64fre.rs3_release_svc_escrow.180502-1908,Home,1.0,0,,0.0,117.0,,1.0,1.0,0.0,10.0,1,2018-06-01,2018-09-17 18:18:22
261d44c466eec108e65887bd356d13b5,Notebook,Windows.Desktop,2206.0,247520.0,8.0,5.0,2840.0,,953.0,HDD,937299.0,0,4096.0,Notebook,17.3,1600.0,900.0,Mobile,,-1.0,10.0.17134.112,amd64,rs4_release,17134,112,Professional,PROFESSIONAL,UUPUpgrade,9.0,34,UNKNOWN,0,IS_GENUINE,Retail,,0.0,Retail,,500.0,33032.0,0,,0.0,0,0,0.0,win8defender,1.1.15100.1,4.18.1807.18075,1.273.1338.0,0,7.0,0,,53447.0,1.0,1.0,1,60,64627.0,18.0,240,233,windows10,x64,10.0.0.0,17134,256,rs4,17134.1.amd64fre.rs4_release.180410-1804,Pro,1.0,0,,0.0,137.0,RequireAdmin,1.0,1.0,0.0,15.0,1,2017-10-01,2017-01-22 05:28:55
2a29c11f73a574d1928be7573dbdcbde,Notebook,Windows.Desktop,2668.0,116369.0,2.0,5.0,3270.0,,305.0,UNKNOWN,200000.0,0,4096.0,Notebook,15.4,1280.0,800.0,Mobile,,0.0,10.0.17134.48,amd64,rs4_release,17134,48,Professional,PROFESSIONAL,UUPUpgrade,8.0,31,UNKNOWN,0,IS_GENUINE,Retail,,0.0,Retail,,628.0,17510.0,0,,0.0,0,0,0.0,win8defender,1.1.14800.3,4.14.17639.18041,1.267.1804.0,0,7.0,0,,53447.0,1.0,1.0,1,220,50191.0,27.0,237,72,windows10,x64,10.0.0.0,17134,256,rs4,17134.1.amd64fre.rs4_release.180410-1804,Pro,1.0,0,,0.0,137.0,,1.0,1.0,0.0,11.0,0,2017-09-01,2018-07-25 09:20:48
294594dc57fd6888e91139a81402848e,Notebook,Windows.Desktop,585.0,189547.0,2.0,5.0,1992.0,,476.0,HDD,476323.0,0,4096.0,Notebook,15.5,1366.0,768.0,Mobile,lion,0.0,10.0.16299.371,amd64,rs3_release,16299,371,Core,CORE,Upgrade,5.0,26,UNKNOWN,0,IS_GENUINE,OEM:DM,,0.0,Retail,0.0,556.0,63317.0,1,0.0,0.0,0,0,0.0,win8defender,1.1.15100.1,4.18.1807.18075,1.273.1005.0,0,7.0,0,,12202.0,2.0,1.0,1,39,166671.0,27.0,252,106,windows10,x64,10.0.0.0,16299,768,rs3,16299.15.amd64fre.rs3_release.170928-1534,Home,1.0,0,,0.0,117.0,RequireAdmin,1.0,1.0,0.0,15.0,0,2018-06-01,2018-02-22 02:03:44
2a0a5b55d15349c01585967c1c0bab08,Notebook,Windows.Desktop,1443.0,256641.0,4.0,5.0,2908.0,,953.0,HDD,943736.0,0,8192.0,Portable,13.9,1366.0,768.0,Mobile,li-i,0.0,10.0.17134.286,amd64,rs4_release,17134,286,CoreSingleLanguage,CORE_SINGLELANGUAGE,Upgrade,8.0,31,FullAuto,0,IS_GENUINE,Retail,,0.0,Retail,0.0,355.0,19970.0,1,0.0,0.0,0,0,0.0,win8defender,1.1.15200.1,4.18.1807.18075,1.275.1545.0,0,7.0,0,,46413.0,2.0,1.0,1,89,66953.0,,120,75,windows10,x64,10.0.0.0,17134,768,rs4,17134.1.amd64fre.rs4_release.180410-1804,Home,1.0,0,,0.0,137.0,RequireAdmin,1.0,1.0,0.0,1.0,1,2016-02-01,2018-08-01 19:30:01
26e88b44215cb0cabc6d8fe84478c4da,Desktop,Windows.Desktop,1443.0,293178.0,8.0,5.0,3035.0,,1907.0,HDD,1906374.0,0,8192.0,Desktop,23.5,1920.0,1080.0,Desktop,,-1.0,10.0.17134.112,amd64,rs4_release,17134,112,Professional,PROFESSIONAL,UUPUpgrade,8.0,31,Notify,0,IS_GENUINE,OEM:DM,,0.0,Retail,,355.0,9326.0,0,,0.0,0,0,0.0,win8defender,1.1.15200.1,4.16.17656.18052,1.275.319.0,0,7.0,0,,51613.0,2.0,1.0,1,91,145233.0,,125,113,windows10,x64,10.0.0.0,17134,256,rs4,17134.1.amd64fre.rs4_release.180410-1804,Pro,1.0,0,,0.0,137.0,RequireAdmin,0.0,1.0,0.0,11.0,0,2016-05-01,2018-02-07 08:27:15


So we can only join to a streaming dataframe, rather than joining a streaming dataframe to a batch table. Which kind of makes sense. Let's make sure we write this somewhere.

### Sidenote - Partitioning

If you’re familiar with big data systems (be it Apache Spark, Hive, Impala, Vertica, etc.), you might already be thinking: (horizontal) partitioning.

Quick reminder: In Spark, just like Hive, partitioning 1 works by having one subdirectory for every distinct value of the partition column(s). Queries with filters on the partition column(s) can then benefit from partition pruning, i.e., avoid scanning any partition that doesn’t satisfy those filters.

The main question is: What columns do you partition by?
And the typical answer is: The ones you’re most likely to filter by in time-sensitive queries.
But… What if there are multiple (say 4+), equally relevant columns?

The problem, in that case, is that you end up with a huge number of unique combinations of values, which means a huge number of partitions and therefore files. Having data split across many small files brings up the following main issues:

Metadata becomes as large as the data itself, causing performance issues for various driver-side operations.
In particular, file listing is affected, becoming very slow.
Compression effectiveness is compromised, leading to wasted space and slower IO.
So while data partitioning in Spark generally works great for dates or categorical columns, it is not well suited for high-cardinality columns and, in practice, it is usually limited to one or two columns at most.

In [47]:
%fs ls /databricks_workshop/user_workshop_train/

path,name,size
dbfs:/databricks_workshop/user_workshop_train/_SUCCESS,_SUCCESS,0
dbfs:/databricks_workshop/user_workshop_train/_committed_2235540940205978602,_committed_2235540940205978602,529
dbfs:/databricks_workshop/user_workshop_train/_started_2235540940205978602,_started_2235540940205978602,0
dbfs:/databricks_workshop/user_workshop_train/part-00000-tid-2235540940205978602-bbf26b50-9c47-4efb-a835-ec1c3e40c13c-6734-1-c000.snappy.parquet,part-00000-tid-2235540940205978602-bbf26b50-9c47-4efb-a835-ec1c3e40c13c-6734-1-c000.snappy.parquet,1083875
dbfs:/databricks_workshop/user_workshop_train/part-00001-tid-2235540940205978602-bbf26b50-9c47-4efb-a835-ec1c3e40c13c-6735-1-c000.snappy.parquet,part-00001-tid-2235540940205978602-bbf26b50-9c47-4efb-a835-ec1c3e40c13c-6735-1-c000.snappy.parquet,1083682
dbfs:/databricks_workshop/user_workshop_train/part-00002-tid-2235540940205978602-bbf26b50-9c47-4efb-a835-ec1c3e40c13c-6736-1-c000.snappy.parquet,part-00002-tid-2235540940205978602-bbf26b50-9c47-4efb-a835-ec1c3e40c13c-6736-1-c000.snappy.parquet,1086667
dbfs:/databricks_workshop/user_workshop_train/part-00003-tid-2235540940205978602-bbf26b50-9c47-4efb-a835-ec1c3e40c13c-6737-1-c000.snappy.parquet,part-00003-tid-2235540940205978602-bbf26b50-9c47-4efb-a835-ec1c3e40c13c-6737-1-c000.snappy.parquet,1084288
dbfs:/databricks_workshop/user_workshop_train/part-00004-tid-2235540940205978602-bbf26b50-9c47-4efb-a835-ec1c3e40c13c-6738-1-c000.snappy.parquet,part-00004-tid-2235540940205978602-bbf26b50-9c47-4efb-a835-ec1c3e40c13c-6738-1-c000.snappy.parquet,1084801


In [48]:
%fs ls /mnt/databricks-workshop-datasets/End-to-End-ML-Lifecycle/workshop_train_enrich_sample.csv

path,name,size
dbfs:/mnt/databricks-workshop-datasets/End-to-End-ML-Lifecycle/workshop_train_enrich_sample.csv/_SUCCESS,_SUCCESS,0
dbfs:/mnt/databricks-workshop-datasets/End-to-End-ML-Lifecycle/workshop_train_enrich_sample.csv/_committed_596214098976685127,_committed_596214098976685127,1804
dbfs:/mnt/databricks-workshop-datasets/End-to-End-ML-Lifecycle/workshop_train_enrich_sample.csv/_started_596214098976685127,_started_596214098976685127,0
dbfs:/mnt/databricks-workshop-datasets/End-to-End-ML-Lifecycle/workshop_train_enrich_sample.csv/part-00000-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8701-1-c000.csv,part-00000-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8701-1-c000.csv,1311731
dbfs:/mnt/databricks-workshop-datasets/End-to-End-ML-Lifecycle/workshop_train_enrich_sample.csv/part-00001-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8692-1-c000.csv,part-00001-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8692-1-c000.csv,1314810
dbfs:/mnt/databricks-workshop-datasets/End-to-End-ML-Lifecycle/workshop_train_enrich_sample.csv/part-00002-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8689-1-c000.csv,part-00002-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8689-1-c000.csv,1312926
dbfs:/mnt/databricks-workshop-datasets/End-to-End-ML-Lifecycle/workshop_train_enrich_sample.csv/part-00003-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8690-1-c000.csv,part-00003-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8690-1-c000.csv,1312349
dbfs:/mnt/databricks-workshop-datasets/End-to-End-ML-Lifecycle/workshop_train_enrich_sample.csv/part-00004-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8696-1-c000.csv,part-00004-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8696-1-c000.csv,1311551
dbfs:/mnt/databricks-workshop-datasets/End-to-End-ML-Lifecycle/workshop_train_enrich_sample.csv/part-00005-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8693-1-c000.csv,part-00005-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8693-1-c000.csv,1311329
dbfs:/mnt/databricks-workshop-datasets/End-to-End-ML-Lifecycle/workshop_train_enrich_sample.csv/part-00006-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8706-1-c000.csv,part-00006-tid-596214098976685127-708690ea-d5b5-4bba-83c9-ab29e13696f9-8706-1-c000.csv,1311302


In [49]:
#Make sure we don't have anything in the temp folder
dbutils.fs.rm("/tmp_ckp_stream", True)
dbutils.fs.rm("/databricks_workshop/user_stream_merged", True)

In [50]:
joinedDF.coalesce(1).writeStream.format("parquet").partitionBy("serviceDate").option("path", "/databricks_workshop/user_stream_merged").option("checkpointLocation", "/tmp_ckp_stream").outputMode("append").start()

In [51]:
%fs ls /databricks_workshop/user_stream_merged

path,name,size
dbfs:/databricks_workshop/user_stream_merged/_spark_metadata/,_spark_metadata/,0
dbfs:/databricks_workshop/user_stream_merged/serviceDate=2016-01-01/,serviceDate=2016-01-01/,0
dbfs:/databricks_workshop/user_stream_merged/serviceDate=2016-02-01/,serviceDate=2016-02-01/,0
dbfs:/databricks_workshop/user_stream_merged/serviceDate=2016-03-01/,serviceDate=2016-03-01/,0
dbfs:/databricks_workshop/user_stream_merged/serviceDate=2016-04-01/,serviceDate=2016-04-01/,0
dbfs:/databricks_workshop/user_stream_merged/serviceDate=2016-05-01/,serviceDate=2016-05-01/,0
dbfs:/databricks_workshop/user_stream_merged/serviceDate=2016-06-01/,serviceDate=2016-06-01/,0
dbfs:/databricks_workshop/user_stream_merged/serviceDate=2016-07-01/,serviceDate=2016-07-01/,0
dbfs:/databricks_workshop/user_stream_merged/serviceDate=2016-08-01/,serviceDate=2016-08-01/,0
dbfs:/databricks_workshop/user_stream_merged/serviceDate=2016-09-01/,serviceDate=2016-09-01/,0


In [52]:
%fs ls /databricks_workshop/user_stream_merged/serviceDate=2016-12-01/

path,name,size
dbfs:/databricks_workshop/user_stream_merged/serviceDate=2016-12-01/part-00000-055b07eb-83dd-4ed8-9ff4-6e0913486317.c000.snappy.parquet,part-00000-055b07eb-83dd-4ed8-9ff4-6e0913486317.c000.snappy.parquet,38674
dbfs:/databricks_workshop/user_stream_merged/serviceDate=2016-12-01/part-00000-c2791bf3-6f47-4650-8662-34f91a0ba9c9.c000.snappy.parquet,part-00000-c2791bf3-6f47-4650-8662-34f91a0ba9c9.c000.snappy.parquet,39625


Ok, so we can see files being written. Let's create a metastore table on that location.

In [54]:
%sql

DROP TABLE IF EXISTS malware_stream_parquet;
CREATE TABLE malware_stream_parquet
USING PARQUET
LOCATION "/databricks_workshop/user_stream_merged/"

In [55]:
%sql
SELECT COUNT(*) 
FROM malware_stream_parquet

count(1)
8830


What if we refresh the table?

In [57]:
%sql
REFRESH TABLE malware_stream_parquet

--Alternatively, we could try repair
--REPAIR MSCK TABLE malware_stream_parquet

In [58]:
%sql
SELECT COUNT(*) 
FROM malware_stream_parquet

count(1)
22083


Ok, that seems to work as a snapshot of whenever we refresh. Not ideal...

In [60]:
#Stop any active streams if any remain before moving on:

for s in spark.streams.active:
  s.stop()

## Next Step

[Managed Delta Lake]($./1-05 Managed Delta Lake)

-sandbox
&copy; 2019 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>