In [0]:
secret_scope = "eventhub-hate-crimes"
secret_name = "connectionString"
topic_name = dbutils.secrets.get(secret_scope, "eventhubname")
eh_namespace_name = dbutils.secrets.get(secret_scope, "eventhubnamespace")
readConnectionString = dbutils.secrets.get(secret_scope, secret_name)
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
    + f' required username="$ConnectionString" password="{readConnectionString}";'
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
     "kafka.bootstrap.servers": bootstrap_servers,
     "kafka.sasl.mechanism": "PLAIN",
     "kafka.security.protocol": "SASL_SSL",
     "kafka.request.timeout.ms": "60000",
     "kafka.session.timeout.ms": "30000",
     "startingOffsets": "earliest",
     "kafka.sasl.jaas.config": eh_sasl,
     "subscribe": topic_name
  }

In [0]:

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType

schema = StructType() \
.add("Month", StringType()) \
.add("IncidentNumber", StringType()) \
.add("DateofIncident", StringType()) \
.add("DayofWeek", StringType()) \
.add("NumberoOfVictimsUnder18", StringType()) \
.add("NumberOfVictimsOver18", StringType()) \
.add("NumberOfOffendersUnder18", StringType()) \
.add("NumberOfOffendersOver18", StringType()) \
.add("RaceEthnicityofOffenders", StringType()) \
.add("Offenses", StringType()) \
.add("OffenseLocation", StringType()) \
.add("Bias", StringType()) \
.add("ZipCode", StringType()) \
.add("APDSector", StringType()) \
.add("CouncilDistrict", StringType())


streaming_df = (
    spark.readStream.format("kafka").options(**kafka_options).load()
    .selectExpr("cast(value as string) as json_payload")
    .select(from_json(col("json_payload"), schema).alias("data"))
    .select("data.*")
)

streaming_df.createOrReplaceTempView("hate_crimes_temp_view")


In [0]:
import dlt 

@dlt.table(
  name="hate_crimes_raw",
  comment="Streaming table for raw data from Event Hub",
  table_properties={
    "quality": "bronze"}
  )
def hate_crimes_raw():
    return spark.read.table("hate_crimes_temp_view")
    

### Data Cleansing in Silver Layer

In [0]:
from pyspark.sql.functions import *

@dlt.table(
  name="hate_crimes_transformed",
  comment="Streaming table for raw data from Event Hub",
  table_properties={
    "quality": "silver"}
  )   
def hate_crimes_clean():
    crimesDF = spark.read.table("hate_crimes_raw") \
        .withColumn("NumberoOfVictimsUnder18", when(col("NumberoOfVictimsUnder18") == "Unknown", "-1").otherwise(col("NumberoOfVictimsUnder18"))) \
        .withColumn("NumberOfVictimsOver18", when(col("NumberOfVictimsOver18") == "Unknown", "-1").otherwise(col("NumberoOfVictimsUnder18"))) \
        .withColumn("NumberOfOffendersUnder18", when(col("NumberOfOffendersUnder18") == "Unknown", "-1").otherwise(col("NumberoOfVictimsUnder18"))) \
         .withColumn("NumberOfOffendersOver18", when(col("NumberOfOffendersOver18") == "Unknown", "-1").otherwise(col("NumberoOfVictimsUnder18"))) \
         .withColumn("APDSector", when(col("APDSector") == "", "Unknown").otherwise(col("APDSector"))) 

    crimesDF = crimesDF.withColumn("DateofIncident",to_date(substring(col("DateofIncident"), 1, 10), 'MM/dd/yyyy')).alias("DateOfIncident") \
        .withColumn("NumberoOfVictimsUnder18", col("NumberoOfVictimsUnder18").cast("int")).alias("NumVictimsUnder18") \
        .withColumn("NumberOfVictimsOver18", col("NumberOfVictimsOver18").cast("int")).alias("NumVictimsOver18") \
        .withColumn("NumberOfOffendersUnder18", col("NumberOfOffendersUnder18").cast("int")).alias("NumOffendersUnder18") \
        .withColumn("NumberOfOffendersOver18", col("NumberOfOffendersOver18").cast("int")).alias("NumOffendersOver18")

    return crimesDF.select("*")

                             
    


In [0]:
@dlt.table(
  name="hate_crimes_silver",
  comment="Cleansed data",
  table_properties={
    "quality": "silver"}
  )

def hate_crimes_silver():
    return spark.read.table("hate_crimes_transformed") \
        .withColumnRenamed("DateofIncident", "DateOfIncident") \
        .withColumn("IncidentYear", year("DateOfIncident")) \
        .select("*")

    

In [0]:
@dlt.table(
  name="crimes_by_year_gold",
  comment="Total crimes by year",
  table_properties={
    "quality": "gold"}
  )
def crimes_by_year():
    return spark.read.table("hate_crimes_silver") \
        .groupBy("IncidentYear") \
        .agg(count("*").alias("TotalCrimes"))


In [0]:
@dlt.table(
    name="crimes_by_apd_sector_gold",
    comment="Total Crimes by APD Sector",
    table_properties={
    "quality": "gold"}
    )
def crimes_by_apd_sector():
    return spark.read.table("hate_crimes_silver") \
    .groupBy("APDSector") \
    .agg(count("*").alias("TotalCrimes"))

In [0]:
@dlt.table(
    name="crimes_on_minor_gold",
    comment="Incidents on minor",
    table_properties={
    "quality": "gold"}
    )
def crimes_on_minor():
    return spark.read.table("hate_crimes_silver") \
        .where(col("NumberoOfVictimsUnder18") > 0) \
        