### Install Azure Event Hub

In [0]:
pip install azure-eventhub

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting azure-eventhub
  Downloading azure_eventhub-5.11.5-py3-none-any.whl (315 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 315.5/315.5 kB 5.3 MB/s eta 0:00:00
Collecting azure-core<2.0.0,>=1.14.0
  Downloading azure_core-1.29.6-py3-none-any.whl (192 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 192.5/192.5 kB 17.9 MB/s eta 0:00:00
Collecting typing-extensions>=4.0.1
  Downloading typing_extensions-4.9.0-py3-none-any.whl (32 kB)
Collecting anyio<5.0,>=3.0
  Downloading anyio-4.2.0-py3-none-any.whl (85 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 85.5/85.5 kB 10.3 MB/s eta 0:00:00
Collecting sniffio>=1.1
  Downloading sniffio-1.3.0-py3-none-any.whl (10 kB)
Collecting exceptiongroup>=1.0.2
  Downloading exceptiongroup-1.2.0-py3-none-any.whl (16 kB)
Installing collected packages: typing-extensions, sniffio, exceptiongroup, anyio, azure-core, azure-eventhub
  Att

## Create database

In [0]:
# Replace 'your_database_name' with the desired database name
database_name = 'formerSalamendars'

# Create the database
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

# Set the current database to the newly created database
spark.sql(f"USE {database_name}")

### Fetch data from Azure Event Hub

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp, date_format, regexp_extract, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
from azure.eventhub import EventHubConsumerClient
from pyspark.sql import functions as F

# Create a SparkSession
spark = SparkSession.builder \
    .appName("movies-ratings-app") \
    .config("spark.jars.packages", "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.21") \
    .getOrCreate()
    
eventHubName = "formersalamandersdata"

connection_string = f"Endpoint=sb://formersalamanders.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=609Z/WJbdIVvaXu82BnZtLSDtDOUpcNPI+AEhC0b1GY=;EntityPath={eventHubName}"

ehConf = {}

ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connection_string)
# ehConf["eventhubs.startingPosition"] = -1

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

selected = df.withColumn("body", F.col("body").cast("string"))

# Define columns
columns = [
    "timestamp", "log_level", "request_id", "session_id", "user_id", "action",
    "http_method", "url", "referrer_url", "ip_address", "user_agent", "response_time",
    "product_id", "cart_size", "checkout_status", "token", "auth_method", "auth_level",
    "correlation_id", "server_ip", "port_number", "protocol", "status", "detail"
]

# Split the body column based on the delimiter
split_df = df.select(F.split(F.col("body"), " \| ").alias("split_data"))

# Extract columns and alias them
for i in range(len(columns)):
    split_df = split_df.withColumn(columns[i], split_df["split_data"][i])

# Select the required columns
log_df = split_df.select(columns)

# 1. Extract date and time into separate columns
log_df = log_df.withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"))
log_df = log_df.withColumn("date", date_format("timestamp", "yyyy-MM-dd"))
log_df = log_df.withColumn("time", date_format("timestamp", "HH:mm:ss.SSS"))



# 2. Extract domain from referrer_url
from pyspark.sql.functions import regexp_extract
log_df = log_df.withColumn("referrer_domain", regexp_extract(log_df["referrer_url"], "(?<=//)(.*?)(?=/|$)", 1))


# 3. Remove the name part from each column
columns=['ip_address', 'user_agent', 'response_time', 'product_id', 'cart_size', 'checkout_status', 'token', 'auth_method', 'auth_level', 'correlation_id', 'server_ip', 'port_number', 'protocol', 'Detail']
for column in columns:
    log_df = log_df.withColumn(column, regexp_extract(col(column), r"(?<=:\s)(.*)", 1))
    

# 4. Drop the 'timestamp' and 'referrer_url' columns
columns_to_drop = ['timestamp', 'referrer_url']
log_df = log_df.drop(*columns_to_drop)    


# 5. Reorder columns in the DataFrame
desired_columns_order = [
    "date", "time","log_level", "request_id", "session_id", "user_id", "action",
    "http_method", "url", "referrer_domain", "ip_address", "user_agent", "response_time",
    "product_id", "cart_size", "checkout_status", "token", "auth_method",
    "auth_level", "correlation_id", "server_ip", "port_number", "protocol",
    "status", "detail"
]
log_df = log_df.select(desired_columns_order)

# Define the Hive table name and location
hive_table_name = 'formerSalamendars_logs'
table_location = '/mnt/former_salamenders/logs'

# Write the streaming DataFrame to a location
query = log_df.writeStream \
    .format('parquet') \
    .outputMode('append') \
    .option('path', table_location) \
    .option('checkpointLocation', '/mnt/checkpoint/dir') \
    .start()


In [0]:
%fs
pwd