In [0]:
from pyspark.sql.functions import col, cast
from pyspark.sql.types import *

# Event Hub details
EH_CONN_STR = dbutils.secrets.get(scope="eventhub-scope", key="eventhub-conn-str")
EH_NAMESPACE = "eventhub-namespace-twitter-analysis"        # my-event-hubs-namespace
EH_KAFKA_TOPIC = "eventhub-twitter-analysis"                # my-event-hub
EH_BOOTSTRAP_SERVERS = f"{EH_NAMESPACE}.servicebus.windows.net:9093"
EH_SASL_WRITE = f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{EH_CONN_STR}";'

# Azure SQL Database Details
SQL_DB_URL = "jdbc:sqlserver://sqlserver-twitter-analysis.database.windows.net:1433;database=db-twitter-analysis"
SQL_DB_USERNAME = dbutils.secrets.get(scope="sqldb-scope", key="sqldb-username")
SQL_DB_PASSWORD = dbutils.secrets.get(scope="sqldb-scope", key="sqldb-password")


# standard configuration options
topic_name = EH_KAFKA_TOPIC
eh_namespace_name = EH_NAMESPACE
eh_sasl = EH_SASL_WRITE
bootstrap_servers = EH_BOOTSTRAP_SERVERS
kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.request.timeout.ms": "60000",
    "kafka.session.timeout.ms": "30000",
    "startingOffsets": "earliest",
    "kafka.sasl.jaas.config": eh_sasl,
    "subscribe": topic_name,
}

In [0]:
%sql

CREATE TABLE IF NOT EXISTS tweets
(
    Key BINARY,
    Value BINARY,
    Topic STRING,
    Partition INT,
    Offset LONG,
    Timestamp TIMESTAMP,
    TimestampType INT,
    Value_text STRING
) using DELTA

In [0]:
kafka_df = (
    spark.readStream.format("kafka")
    .options(**kafka_options)
    .load()
    .withColumn("value_text", col("value").cast("string"))
)

In [0]:
kafka_df.writeStream.outputMode("append").option(
    "checkpointLocation", "/tmp/delta/events/_checkpoints/"
).toTable("tweets")

<pyspark.sql.streaming.query.StreamingQuery at 0x7f6e88729ad0>

In [0]:
%sql
SELECT
  value_text
FROM
  hive_metastore.default.tweets;

value_text
"""{\""id_str\"":601665000948649984,\""created_at\"":\""2015-05-22 08:24:49\"",\""text\"":\""b'Another good riddle: drill a hole thru a sphere such that the height of what remains of the sphere is 10cm. What is its remaining volume?'\""}"""
"""{\""id_str\"":601578147113529344,\""created_at\"":\""2015-05-22 02:39:41\"",\""text\"":\""b'Astronomer Royal Martin Rees: How soon will robots take over the world? - via @Telegraph http:\\/\\/t.co\\/DStxgM0S8F'\""}"""
"""{\""id_str\"":601501265332477952,\""created_at\"":\""2015-05-21 21:34:11\"",\""text\"":\""b'RT @SpaceX: Cargo is offloaded and spacecraft is powered down. #Dragon back in its nest after about 5 weeks at the @Space_Station http:\\/\\/t.\\\\xe2\\\\x80\\\\xa6'\""}"""
"""{\""id_str\"":600784025335136256,\""created_at\"":\""2015-05-19 22:04:08\"",\""text\"":\""b'This is not the full Gigafactory, it is just the pilot plant (1\\/4 size) https:\\/\\/t.co\\/gz2EmJkYtm'\""}"""
"""{\""id_str\"":600549508796190720,\""created_at\"":\""2015-05-19 06:32:15\"",\""text\"":\""b'Fossil fuels subsidised by $10m a minute, says IMF http:\\/\\/t.co\\/c4nsZjXc32'\""}"""
"""{\""id_str\"":599964926992486401,\""created_at\"":\""2015-05-17 15:49:20\"",\""text\"":\""b'RT @johngreen: My brilliant brother has a message for you, future dead person. https:\\/\\/t.co\\/kHy5nRuXxY'\""}"""
"""{\""id_str\"":599391754786181122,\""created_at\"":\""2015-05-16 01:51:45\"",\""text\"":\""b'RT @SpaceX: Tired of the same old travel destinations? Discover the wonder of Mars \\\\nhttp:\\/\\/t.co\\/KpluQw4cow http:\\/\\/t.co\\/vD9oLSuY5r'\""}"""
"""{\""id_str\"":599391737094635520,\""created_at\"":\""2015-05-16 01:51:40\"",\""text\"":\""b'RT @SpaceX: Adventure awaits! Explore Mars\\\\xe2\\\\x80\\\\x99 ultimate vacation destinations \\\\nhttp:\\/\\/t.co\\/KpluQw4cow http:\\/\\/t.co\\/kptj27x1LD'\""}"""
"""{\""id_str\"":598232495537016832,\""created_at\"":\""2015-05-12 21:05:16\"",\""text\"":\""b'@jankenbrandt Hi Julie!'\""}"""
"""{\""id_str\"":598174400404262912,\""created_at\"":\""2015-05-12 17:14:25\"",\""text\"":\""b'RT @TeslaMotors: Agreed. @FTC affirms States to allow consumers to choose how they buy their cars. #Tesla #Michigan http:\\/\\/t.co\\/fT1JHjMpzg'\""}"""


In [0]:
from pyspark.sql.functions import (
    col,
    current_timestamp,
    from_json,
    cast,
    lit,
    from_unixtime,
    to_timestamp,
    sum,
    avg,
    when,
    count,
    round,
    max,
    explode,
    regexp_replace,
    left,
    right,
    expr
)
from pyspark.sql.types import (
    StructField,
    StructType,
    IntegerType,
    StringType,
    DoubleType,
    LongType,
    TimestampType,
)

tweets_schema = StructType(
    fields=[
        StructField("id_str", StringType(), True),
        StructField("created_at", StringType(), True),
        StructField("text", StringType(), True),
    ]
)

In [0]:
tweets_df = (
    spark.sql("SELECT * FROM hive_metastore.default.tweets")
    .drop("key", "value", "partition", "offset", "Timestamp", "Timestamptype")
    .withColumn("JSON", from_json(regexp_replace(expr("substring(value_text, 2, length(value_text) - 2)"), r'\\', ''), tweets_schema))
    .withColumn("id", col("JSON.id_str"))
    .withColumn("text", col("JSON.text"))
    .withColumn("created_at", col("JSON.created_at"))
    .drop("JSON", "value_text", "Topic")
    .dropDuplicates()
)

In [0]:
display(tweets_df)

id,text,created_at
5.982324955370168e+17,b'@jankenbrandt Hi Julie!',2015-05-12 21:05:16
8.351878591580201e+17,b'@donbarbieri @GreenCarReports Good chance it will',2017-02-24 18:01:17
1.5276308448898253e+17,"b'SpaceX has Boeing, Lockheed, Europe (Ariane) and Russia (Proton/Soyuz) near checkmate in rocket technology. End game is all about China.'",2011-12-30 14:48:52
3.383671234558853e+17,b'@kn0xvi11ain Very few. Scientific consensus at time of Columbus was world was round http://t.co/V3XIcob4s8',2013-05-25 18:53:03
4.299794899224617e+17,b'Tesla LA to NY Supercharger rally just completed in 76 hours across northern route in dead of winter thru heavy snow!',2014-02-02 14:07:53
3.079581104741048e+17,b'Just received #Dragon docking clearance from @NASA. Will begin orbital maneuvers to Space Station at 11pm Pacific time.',2013-03-02 20:58:29
4.061867924096328e+17,b'15 mins to liftoff http://t.co/3jDrHRSPQZ',2013-11-28 22:24:12
1.7352886769956864e+17,b'Dragon spaceship and Falcon 9 rocket just completed final assembly at Cape Canaveral http://t.co/4GrSr3VU',2012-02-25 22:04:41
4.621046791160504e+17,b'@rocketrepreneur ~40%',2014-05-02 05:41:55
4.727572976853565e+17,b'Recording of Dragon V2 unveil at http://t.co/aBIV4EWOS2',2014-05-31 15:11:37


In [0]:
try:
    (
        tweets_df.coalesce(1)
        .write.format("jdbc")
        .option("url", SQL_DB_URL)
        .option("dbtable", "dbo.tweets")
        .option("user", SQL_DB_USERNAME)
        .option("password", SQL_DB_PASSWORD)
        .mode("append")
        .save()
    )

    print("Successfully write data into target SQL database")
except Exception as error:
    print("An exception occurred:", error)

Successfully write data into target SQL database
