In [0]:
//Here I started streaming the tweets from the EventHub but while streaming it from EventHub it comes in binary format.
%scala
import org.apache.spark.eventhubs._
import com.microsoft.azure.eventhubs._

    // Build connection string with the above information
    val namespaceName = "*********"
    val eventHubName = "**********"
    val sasKeyName = "*************"
    val sasKey = "*********************"
    val connStr = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val customEventhubParameters =
      EventHubsConf(connStr.toString())
      .setMaxEventsPerTrigger(5)

    val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

    incomingStream.printSchema

    // Sending the incoming stream into the console.
    // Data comes in batches!
    incomingStream.writeStream.outputMode("append").format("console").option("truncate", false).start()

In [0]:
//Converting the binary stream into the readable format and writing it to mounted ADLS gen2.
%scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

    // Event Hub message format is JSON and contains "body" field
    // Body is binary, so we cast it to string to see the actual content of the message
    val messages =
      incomingStream
      .withColumn("Offset", $"offset".cast(LongType))
      .withColumn("TimeReadable", $"enqueuedTime".cast(TimestampType))
      .withColumn("Timestamp", $"enqueuedTime".cast(LongType))
      .withColumn("Body", $"body".cast(StringType))
      .select("Offset", "TimeReadable", "Timestamp", "Body")

    //messages.printSchema

    messages.writeStream.outputMode("append").format("delta").option("checkpointLocation", "/mnt/mymount1/Data").table("USElectionResults")

In [0]:
%sql
select * from uselectionresults;

Offset,TimeReadable,Timestamp,Body
8590992208,2020-11-22T07:21:35.618+0000,1606029695,#ม็อบ17พฤศจิกา #whatishappeninginthailand #ประชุมสภา #แก้รัฐธรรมนูญ #หยุดคุกคามประชาชน #กูสั่งให้มึงอยู่ใต้รัฐธรรมนูญ #ThailandProtest2020 #ม็อบ17พฤศจิกา unknown Giannis Ningning Jrue giselle #SignThePapers #USElectionResults #VarunChakravarthy #whatshappeninginthailand あ https://t.co/DX6BpDM0NF
8591008440,2020-11-22T07:21:40.622+0000,1606029700,#ม็อบ17พฤศจิกา #whatishappeninginthailand #ประชุมสภา #แก้รัฐธรรมนูญ #หยุดคุกคามประชาชน #กูสั่งให้มึงอยู่ใต้รัฐธรรมนูญ #ThailandProtest2020 #ม็อบ17พฤศจิกา unknown Giannis Ningning Jrue giselle #SignThePapers #USElectionResults #VarunChakravarthy #whatshappeninginthailand https://t.co/DX6BpDM0NF
8590993384,2020-11-22T07:22:15.608+0000,1606029735,#ม็อบ17พฤศจิกา #whatishappeninginthailand #ประชุมสภา #แก้รัฐธรรมนูญ #หยุดคุกคามประชาชน #กูสั่งให้มึงอยู่ใต้รัฐธรรมนูญ #ThailandProtest2020 #ม็อบ17พฤศจิกา unknown Giannis Ningning Jrue giselle #SignThePapers #USElectionResults #VarunChakravarthy #whatshappeninginthailand https://t.co/DX6BpDM0NF
8591028688,2020-11-22T07:34:44.645+0000,1606030484,"@samirsinh189 @WalteRiley @ManuelPMicaller @JoshAdams_1 @JessicaBrown53 @AndyMason001 @WetherbeeJon @gakatt @mavawna @RichardRossow @dds_sugano Regional Comprehensive Economic Partnership (#RCEP) is made up of 10 Southeast Asian countries, as well as South Korea, China, Japan, Australia and New Zealand. The pact is seen as an extension of China's influence in the region, #USElectionResults link? And #US pressure?"
8591051344,2020-11-22T07:49:38.850+0000,1606031378,"@GenFlynn @DanScavino @lofly727 @BarbaraRedgate @JosephJFlynn1 @GoJackFlynn @flynn_neill @realDonaldTrump In all honesty and with your hand on your heart, would you say @realDonaldTrump used his Presidency to drain the swamp? To wake people up? To reveal to them the scam that is the global financial system? To reveal the secret societies and their agendas? #USElectionResults"
8591086472,2020-11-22T08:23:33.454+0000,1606033413,@AviFlyGirl @harrisonjaime @Georgia_AG @LindseyGrahamSC Also show up for this if in SC. #LindseyGrahamResign #georgiacheating #GrahamResign #2020Elections #2020Election #USElection2020 #KrakenReleased #USElectionResults #uspoli #USAelection2020 #gop #SouthCarolinaRecountNow #SouthCarolinaRecount #SouthCarolina #LadyG https://t.co/pq8GwtJ22O
8590986808,2020-11-22T07:17:54.125+0000,1606029474,@AviFlyGirl @harrisonjaime @Georgia_AG @LindseyGrahamSC Also show up for this if in SC. #LindseyGrahamResign #georgiacheating #GrahamResign #2020Elections #2020Election #USElection2020 #KrakenReleased #USElectionResults #uspoli #USAelection2020 #gop #SouthCarolinaRecountNow #SouthCarolinaRecount #SouthCarolina #LadyG https://t.co/pq8GwtJ22O
8591010776,2020-11-22T07:33:08.261+0000,1606030388,@MirrorNow @NitishKumar @RJDforIndia @manojkjhadu @scribe_prashant Trump fantasising in #USElectionResults his Country Cousin playing out in #BiharElectionResults #बिहार_मे_लोकतंत्र_कि_हत्या #बिहार_मांगें_रिकॉउंटिंग https://t.co/NvBj51mlcr
8591058016,2020-11-22T08:04:52.609+0000,1606032292,"@ellis_march @mmpadellan Thinking about the #USElectionResults and the refusal to concede defeat. This verse refers to self-harm and harm to those around you (Trump does this). It's relevant to, as Bono refers to him 'the candidate' (He refuses to call him president). #U2songs #Bono #BidenHarris2020 https://t.co/cW9HnSGmhp"
8591084432,2020-11-22T08:22:19.448+0000,1606033339,How to steal an American Election...🤔 ✔️ Rig the polls making your candidate look like a winner ✔️ Use large scale mail-in ballots to hide your fraud ✔️ Alter the ballot counting algorithms for insurance ✔️ Use the media to amplify your claims of legitimacy #USElectionResults https://t.co/WHjPbOfjZn


In [0]:
%sql
SELECT COUNT (Offset)
FROM uselectionresults

count(Offset)
24614
