# Load Data Streaming

Notebook de desenvolvimento responável por realizar processo stream de ingestão de dados, o spark streaming busca os dados de um eventhub e escreve em formato delta table no 
Azure Data Lake Storage


<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://raw.githubusercontent.com/Foiac/MobileFraudDetectSolution/main/Editaveis/eventhubstreamingingestion.png" alt="SparkStreaming Ingest" style="width: 800px">
</div>

#### Import dependecies

In [None]:
from pyspark.sql.functions import col, cast, explode, from_json, sha2
from pyspark.sql.types import *
import os

#### Path Definitions

In [None]:
storage_account_name  = os.getenv("ENV_STORAGE")

database_name = "bronze_mobile"
table_name = "access"

container_name = "cont-fraud"
delta_table_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/bronze/{database_name}/{table_name}"

connection_string = os.getenv("ENV_EH_CONNECTION_STRING") #;EntityPath=fraud-detect
storage_account_key = os.getenv("ENV_ADLS_KEY")

In [None]:
delta_table_path

'abfss://contfraud@stacfraud.dfs.core.windows.net/Bronze/mobile/access'

#### Connection String Configuration

In [None]:
# Configurar a conexão com o Event Hub
eh_conf = {
    'eventhubs.connectionString': sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connection_string)
}

#### ADLS Configuration

In [None]:
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", storage_account_key)

#### Database Create

In [None]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

DataFrame[]

#### Read Data Stream

In [None]:
# Leia ou escreva dados no Event Hub usando a connection string configurada
df = (spark.readStream
    .format("eventhubs")
    .options(**eh_conf)
    .load())

#### Edit dictionary

In [None]:
# Definir o esquema para o JSON
schema = StructType([
    StructField("imei", StringType(), True),
    StructField("mac", StringType(), True),
    StructField("rede", StringType(), True),
    StructField("client_ip", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("logintude", StringType(), True),
    StructField("cpf", StringType(), True),
    StructField("senha", StringType(), True),
    StructField("transaction", StringType(), True),
    StructField("api", StringType(), True),
    StructField("endpoint", StringType(), True),
    StructField("os", StringType(), True),
    StructField("os_version", StringType(), True),
    StructField("app_version", StringType(), True),
    StructField("erro", StringType(), True),
    StructField("timestamp", StringType(), True)
])

# # Processar os dados como necessário
df = df.withColumn("body", df["body"].cast("string"))

df = df.withColumn("json_list", from_json(col("body"), ArrayType(schema)))

# Explodir a lista de objetos JSON
df = df.select(explode(col("json_list")).alias("json_data"))

# Selecionar e exibir campos individuais
df_body = (df.select(
    col("json_data.imei"),
    col("json_data.mac"),
    col("json_data.rede"),
    col("json_data.client_ip"),
    col("json_data.latitude"),
    col("json_data.logintude"),
    col("json_data.cpf"),
    col("json_data.senha"),
    col("json_data.transaction"),
    col("json_data.api"),
    col("json_data.endpoint"),
    col("json_data.os"),
    col("json_data.os_version"),
    col("json_data.app_version"),
    col("json_data.erro"),
    col("json_data.timestamp"))
      .withColumn("imei", sha2(col("imei"), 256))
      .withColumn("mac", sha2(col("mac"), 256))
      .withColumn("cpf", sha2(col("cpf"), 256))
      .withColumn("senha", sha2(col("senha"), 256))
      )


#### Write data on delta table

In [None]:
# # Escrever os dados em uma tabela Delta
query = (df_body.writeStream
    .format("delta")
    .outputMode("append")
    .trigger(processingTime="30 second")
    .option("checkpointLocation", f"{delta_table_path}/_checkpoints/")
    .start(delta_table_path)
    )

# # Esperar até que o processo de streaming seja interrompido
query.awaitTermination()

#### Stop Streaming

In [None]:
# import time
# # Run for 10 more seconds
# time.sleep(1) 

# query.stop()