In [None]:
import os
import snowflake.connector
import re
import requests
import json
# Specify additional packages including Snowflake dependencies
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,net.snowflake:snowflake-jdbc:3.16.0,net.snowflake:spark-snowflake_2.12:2.15.0-spark_3.4 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import from_json, col, window 
from pyspark.sql.types import *
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

with open("rsa_key.p8", "rb") as key_file:
    p_key = serialization.load_pem_private_key(
    key_file.read(),
    password="bigdata".encode(),
    backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
)
 
pkb = pkb.decode("UTF-8")

pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")


conn = snowflake.connector.connect(
    user="NADIALEIDEN",
    password="Tubesbigdata1",
    account="kibjnvw-yw34965",
    warehouse="TUBESBIGDATA",
    database="UKFLOODDATA",
    schema="AREAANDSEVERITY"
    )

# Initialize SparkSess
spark = SparkSession.builder \
    .appName("KafkaStructuredStreaming") \
    .getOrCreate()

# Define schema for nested field floodArea

flood_area_schema = StructType([
    StructField("@id", StringType()),
    StructField("county", StringType()),
    StructField("notation", StringType()),
    StructField("polygon", StringType()),
    StructField("riverOrSea", StringType(), True)  # Optional field, set nullable=True
])

# Define schema for JSON data including nested field floodArea
schema = StructType([
    StructField("@id", StringType()),
    StructField("description", StringType()),
    StructField("eaAreaName", StringType()),
    StructField("eaRegionName", StringType()),
    StructField("floodArea", flood_area_schema),
    StructField("floodAreaID", StringType()),
    StructField("isTidal", BooleanType()),
    StructField("message", StringType()),
    StructField("severity", StringType()),
    StructField("severityLevel", IntegerType()),
    StructField("timeMessageChanged", TimestampType()),
    StructField("timeRaised", TimestampType()),
    StructField("timeSeverityChanged", TimestampType())
])

# Read data from Kafka
kafkaStreamDF = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "uk-flood") \
    .load()

# Parse JSON data
parsedDF = kafkaStreamDF \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), ArrayType(schema)).alias("data")) \
    .selectExpr("explode(data) AS items") \
    .select("items.*")

polygon_schema = StructType([
    StructField("type", StringType(), True),
    StructField("features", ArrayType(
        StructType([
            StructField("type", StringType(), True),
            StructField("geometry", StructType([
                StructField("type", StringType(), True),
                StructField("coordinates", ArrayType(ArrayType(ArrayType(FloatType()))), True)
            ]), True)
        ])
    ), True)
])

def fetch_data_from_link(link):
    try:
        response = requests.get(link)
        if response.status_code == 200:
            return response.text  # Return as JSON string
        else:
            print(f"Failed to fetch data from {link}. Status code: {response.status_code}")
            return None
    except Exception as e:
        print(f"An error occurred while fetching data from {link}: {str(e)}")
        return None

# Register the function as a UDF

fetch_data_from_link_udf = F.udf(fetch_data_from_link, StringType())

# Apply the UDF to the floodArea.polygon column to fetch the data
fetchedDF = parsedDF.withColumn("fetched_data", fetch_data_from_link_udf("floodArea.polygon"))

# Apply the schema to parse the fetched data
parsedDataDF = fetchedDF.withColumn("parsed_data", from_json(col("fetched_data"), polygon_schema))
                
# Select the array 'features' to explode
arrayToExplodeDF = parsedDataDF.select("eaAreaName", "floodArea.polygon", "parsed_data.features")

# Explode the selected array
explodedDF = arrayToExplodeDF.select("eaAreaName", "polygon", F.explode("features").alias("polygon_content"))

query = explodedDF \
    .writeStream \
    .format("net.snowflake.spark.snowflake") \
    .option("sfUser", "NADIALEIDEN") \
    .option("sfPassword", "Tubesbigdata1") \
    .option("sfURL", "https://kibjnvw-yw34965.snowflakecomputing.com") \
    .option("sfDatabase", "UKFLOODDATA") \
    .option("sfSchema", "AREAANDSEVERITY") \
    .option("sfWarehouse", "TUBESBIGDATA") \
    .option("dbtable", "POLYGON_DATA") \
    .option("sfRole", "ACCOUNTADMIN") \
    .option("pem_private_key", pkb) \
    .option("streaming_stage", "POLYGONSTAGE") \
    .option("checkpointLocation", "/tmp/polygoncheckpoints") \
    .outputMode("append") \
    .start()
#https://kibjnvw-yw34965.snowflakecomputing.com

# Start the streaming query
query.awaitTermination()