In [0]:
from pyspark.sql.functions import input_file_name, current_timestamp, col, udf
import pyspark.sql.functions as F 
import openai
import os
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, LongType, TimestampType, DoubleType, FloatType, DecimalType, ArrayType

# Define source paths
src_data_storage_path = "abfss://landing@landing123emhol.dfs.core.windows.net/face_api_json/"
checkpoint_json = "dbfs:/tmp/auto_loader/checkpoint/nottest2"
trg_delta_table = "db_hack.bronze.stg_face_api_json_3"

# Define source delta paths (target path for json)
checkpoint_delta = "dbfs:/tmp/auto_loader/checkpoint/checkpoint_delta_test5"
checkpoint_delta_schema = "dbfs:/tmp/auto_loader/checkpoint/checkpoint_delta_schema_test_5"
source_delta_table = "db_hack.bronze.stg_face_api_json_3"
trg_delta_table_silver = "db_hack.silver.shrek_story_5"

Read adhoc file and infer schema

In [0]:
df = spark.read.option("inferSchema", True).option("cloudFiles.schemaLocation", checkpoint_json).json(src_data_storage_path)
schema = df.schema

Create stream for FaceAPI and write to delta location

In [0]:
spark.readStream.option("inferSchema", True).json(src_data_storage_path, schema=schema).selectExpr("faceId", "faceAttributes.age", "faceAttributes.emotion.anger", "faceAttributes.emotion.contempt", "faceAttributes.emotion.disgust", "faceAttributes.emotion.fear", "faceAttributes.emotion.happiness", "faceAttributes.emotion.neutral", "faceAttributes.emotion.sadness", "faceAttributes.emotion.surprise", "faceAttributes.gender", "capturetime", "current_timestamp() AS load_time").writeStream \
  .format("delta") \
  .option("checkpointLocation", checkpoint_json) \
  .toTable(trg_delta_table)

Take a look at all the data!

In [0]:
%sql
select *, from_unixtime(capturetime / 1000) AS datetime_locally from db_hack.bronze.stg_face_api_json_3

faceId,age,anger,contempt,disgust,fear,happiness,neutral,sadness,surprise,gender,capturetime,load_time,datetime_locally
5e070e51-25fe-419d-a0c3-29ca76e43949,33.0,0.001,0.001,0.001,0.0,0.213,0.778,0.001,0.005,male,1686664001889,2023-06-15T07:58:48.640+0000,2023-06-13 13:46:41
ae19447c-c1f5-4648-b33b-ad7d385685f6,22.0,0.0,0.0,0.0,0.0,0.004,0.59,0.0,0.405,female,1686664001889,2023-06-15T07:58:48.640+0000,2023-06-13 13:46:41
83263eda-55c0-4084-81fb-475a8e80b600,33.0,0.001,0.005,0.001,0.0,0.379,0.613,0.001,0.001,male,1686664014173,2023-06-15T07:58:48.640+0000,2023-06-13 13:46:54
62c5f802-a317-40ce-b7ee-cf4c794a8938,28.0,0.0,0.002,0.0,0.0,0.38,0.585,0.0,0.033,female,1686664014173,2023-06-15T07:58:48.640+0000,2023-06-13 13:46:54
5432ce83-a7a7-48a2-bcfc-712104ae5bb8,26.0,0.0,0.001,0.0,0.0,0.86,0.139,0.0,0.0,male,1686815316856,2023-06-15T07:58:48.640+0000,2023-06-15 07:48:36
7d97b281-0c38-482e-a27b-da3523f84996,32.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,male,1686815316856,2023-06-15T07:58:48.640+0000,2023-06-15 07:48:36
f3e5d1c8-f654-4e3a-a186-8b3922f1788e,33.0,0.0,0.001,0.0,0.0,0.0,0.994,0.005,0.0,male,1686815350405,2023-06-15T07:58:48.640+0000,2023-06-15 07:49:10
c8fe8086-5bc1-4443-8558-3c474be92645,24.0,0.002,0.043,0.001,0.0,0.0,0.368,0.586,0.0,male,1686815350405,2023-06-15T07:58:48.640+0000,2023-06-15 07:49:10
25a07aae-c83f-4942-a793-11c8c7de2762,36.0,0.0,0.003,0.0,0.0,0.001,0.937,0.058,0.0,male,1686651247847,2023-06-15T07:58:48.640+0000,2023-06-13 10:14:07
00574007-e7df-4466-9f97-09800ee9c476,34.0,0.0,0.001,0.0,0.0,0.0,0.631,0.368,0.0,male,1686651259983,2023-06-15T07:58:48.640+0000,2023-06-13 10:14:19


Create stream for OpenAI LLM model after streaming images from FACE API

In [0]:
def openai_api_call(content):
    # Set API variables
    #Note: The openai-python library support for Azure OpenAI is in preview.
    openai.api_type = "azure"
    openai.api_base = "https://hackathon-2023.openai.azure.com/"
    openai.api_version = "2023-03-15-preview"
    openai.api_key = "af8e3b8b773642b4b440ad3fce72416a" #os.getenv("OPENAI_API_KEY")
    #"https://hackathon-2023.openai.azure.com/openai/deployments/hackathon-2023/chat/completions?api-version=2023-03-15-preview"
    
    first_run = True
    message = ""
    
    if first_run:
        message = [{"role":"system","content":"You are an AI storyteller that adjust a story depending on the mood of the reader."},
                    {"role": "user", "content": "Tell me the first 5 sentences of the story about shrek"}]
        first_run = False
    else:
        message = [{"role":"system","content":"You are an AI storyteller that adjust a story depending on the mood of the reader."},
                    {"role": "user", "content": "Tell me the first 5 sentences of the story about shrek but keep in mind that the reader is " + str(content) + ", make the reader change his mood by modiffying the story dramatically"}]

    try:
        response = openai.ChatCompletion.create(
        engine="hackathon-2023",
        messages = message,
        temperature=0.7,
        max_tokens=800,
        top_p=0.95,
        frequency_penalty=0,
        presence_penalty=0,
        stop=None)

        resp_message = response.choices[0].message.content.strip()
        
        # resp_message: role assistant, message: role user
        return resp_message
    except openai.error.InvalidRequestError as e:
        return None

# Register function as UDF
openai_api_call_udf = udf(openai_api_call)

def process_batch(df, epoch_id):
    # Call transribe UDF -> get transciption and add as column
    df = df.withColumn("transcription", openai_api_call_udf(df.leading_emotion))
    # Write the data to Delta table
    df.write.format("delta").mode("append").saveAsTable(trg_delta_table_silver)

df = (spark.readStream \
    .format("delta") \
    .option("inferColumnTypes", "true") \
    .table(source_delta_table) \
    .withColumn("leading_emotion", F.when(F.col("happiness") == F.greatest(F.col("happiness"), F.col("sadness")), F.lit("happy")) \
    .otherwise(F.lit("sad"))))


# Define a streaming query to process the data and wait for the query to terminate
df.writeStream \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", checkpoint_delta) \
    .start() \
    .awaitTermination()

In [0]:
%sql
select * from db_hack.silver.shrek_story_5 order by load_time desc

faceId,age,anger,contempt,disgust,fear,happiness,neutral,sadness,surprise,gender,capturetime,load_time,leading_emotion,transcription
bb251682-d794-4b16-9448-22adba121f26,33.0,0.025,0.115,0.004,0.0,0.001,0.464,0.391,0.0,male,1686837217229,2023-06-15T13:53:43.863+0000,sad,"Once upon a time, in a faraway land, there lived an ogre named Shrek. He was big, green, and scary, and he lived alone in a swamp. Despite his fearsome appearance, Shrek had a heart of gold and longed for companionship. One day, his peaceful existence was disrupted when a group of fairy tale creatures appeared on his doorstep, seeking refuge from the cruel and vain Lord Farquaad."
7042dfe2-9f18-445a-8bad-1f30c6dc3a5e,31.0,0.0,0.003,0.0,0.0,0.614,0.383,0.0,0.0,male,1686837208369,2023-06-15T13:53:35.203+0000,happy,"Once upon a time, in a faraway land, there lived a big, green, and ugly ogre named Shrek. He lived alone in a swamp, where he enjoyed his solitude and peaceful life. However, his life was soon to be disrupted by a group of unwanted visitors. A group of fairy tale creatures had been banished to his swamp by the evil Lord Farquaad, who ruled the neighboring kingdom."
ce584c81-dc76-4964-baa0-40c4e53414e0,33.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,male,1686836626180,2023-06-15T13:43:52.834+0000,happy,"Once upon a time, in a faraway land, there was a green ogre named Shrek. He lived in a swamp that he called home and enjoyed his solitude. But one day, his peaceful life was disrupted when a group of fairy tale creatures were banished to his swamp by the evil Lord Farquaad. Shrek, being the grumpy ogre that he is, decided to confront Lord Farquaad and demand his swamp back."
2d0806b9-fe29-45ca-aab9-79e1b859bce7,32.0,0.0,0.0,0.003,0.0,0.123,0.0,0.873,0.0,male,1686836616989,2023-06-15T13:43:43.680+0000,sad,"Once upon a time, in a land far, far away, there lived an ogre named Shrek. He was big, green, and had a scowl on his face that would make anyone tremble. Shrek lived in a swamp all by himself and was content with his solitude. But one day, his peaceful existence was disrupted by a group of fairy tale creatures who were forced to flee their homes by the evil Lord Farquaad."
3a0f5dff-d83b-44fb-a31b-a0dded98faf9,28.0,0.0,0.011,0.0,0.0,0.441,0.545,0.003,0.0,male,1686836015274,2023-06-15T13:33:41.839+0000,happy,"Sure! Here's the beginning of the story: Once upon a time, in a faraway land, there lived an ogre named Shrek. He was big and green and had a face only a mother could love. Shrek lived a quiet life in his swamp, far away from the towns and cities where humans lived. But one day, everything changed when a group of fairy tale creatures showed up on his doorstep, seeking refuge from a tyrannical ruler."
50370c14-0de9-4e76-b293-832dfff7c661,33.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,male,1686835126239,2023-06-15T13:18:53.040+0000,happy,"Once upon a time, in a far-off kingdom, there lived an ogre named Shrek. He was big, green, and very grumpy. Shrek lived in a swamp all by himself, and he liked it that way. But one day, his peaceful solitude was interrupted by a group of fairytale creatures who were banished to his swamp by the evil Lord Farquaad."
a422a6aa-53bb-408a-ad32-659c9df9549d,33.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,male,1686834520490,2023-06-15T13:08:47.275+0000,happy,"Once upon a time, in a faraway swamp, there lived an ogre named Shrek. He was big, green and had a very grumpy personality. He loved his solitude, but his peace was disturbed when a group of fairytale creatures moved in and made themselves at home in his swamp. Shrek was not happy about this and decided to go to the ruler of the nearby kingdom, Lord Farquaad, to demand his swamp back."
c35e59bd-e691-46d8-9153-16946d71a9a2,30.0,0.015,0.003,0.0,0.0,0.0,0.678,0.304,0.0,male,1686833909247,2023-06-15T12:58:35.599+0000,sad,"Sure! Here are the first 5 sentences of the story about Shrek: Once upon a time, in a faraway land, there lived an ogre named Shrek. He was big, green, and feared by all the people in the kingdom. But despite his intimidating appearance, Shrek was a kind-hearted creature who enjoyed his solitary life in a swamp. One day, however, his peaceful existence was disrupted when a group of fairytale creatures were banished to his swamp by the evil Lord Farquaad. Not wanting to share his home with anyone, Shrek set out to confront Lord Farquaad and demand that he remove the creatures from his land."
865d82f1-de57-4b65-90b5-ca79d6e73bfa,34.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,male,1686833886381,2023-06-15T12:58:13.732+0000,happy,"Once upon a time in a faraway land, there lived a green ogre named Shrek. He was feared by all the villagers and was known for his grumpy and reclusive nature. Shrek lived in a swamp that he called home, away from the prying eyes of humans and other creatures. He was content with his simple life, spending his days scaring off anyone who dared to venture into his territory. But everything changed when a group of fairy tale creatures showed up in his swamp, seeking refuge from the wrath of a tyrannical ruler."
c214ca24-118d-4fc4-af85-1f4e909ec093,33.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,male,1686833392629,2023-06-15T12:49:59.007+0000,happy,"Once upon a time, there was a green ogre named Shrek who lived in a faraway swamp. He was feared and misunderstood by the nearby villagers, who often judged him for his appearance and reputation. Shrek enjoyed his solitary life and didn't care for the opinions of others, but everything changed when a group of fairy tale creatures invaded his swamp. They were forced to flee their homes by Lord Farquaad, the ruthless ruler of Duloc, who wanted to create a perfect kingdom without any magical beings."
