In [None]:
%pip install azure-cognitiveservices-speech

In [None]:
key_vault_name = 'ckmv2-keyvault' #'kv_to-be-replaced'

In [None]:
from trident_token_library_wrapper import PyTridentTokenLibrary as tl

def get_secrets_from_kv(kv_name, secret_name):

    access_token = mssparkutils.credentials.getToken("keyvault")
    kv_endpoint = f'https://{kv_name}.vault.azure.net/'
    return(tl.get_secret_with_token(kv_endpoint,secret_name,access_token))

openai_api_type = "azure"
openai_api_version  = get_secrets_from_kv(key_vault_name,"AZURE-OPENAI-VERSION")
openai_api_base = get_secrets_from_kv(key_vault_name,"AZURE-OPENAI-ENDPOINT")
openai_api_key = get_secrets_from_kv(key_vault_name,"AZURE-OPENAI-KEY")

In [2]:
#Set AI services variables
ai_services_endpoint = get_secrets_from_kv(key_vault_name,"COG-SERVICES-ENDPOINT") 
ai_services_key = get_secrets_from_kv(key_vault_name,"COG-SERVICES-KEY") 
ai_services_region = 'eastus' #get_secrets_from_kv(key_vault_name,"AZURE-LOCATION")
# wav_file_path = '/lakehouse/default/Files/data/audio_input/Travel_20240124183556.wav'
language1 = 'en-US'

StatementMeta(, d029f378-0e5a-41c9-aac6-81abeb63355e, 9, Finished, Available)

In [3]:
#Drop the metadata table if it already exists
spark.sql('drop table if exists ckm_lakehouse.ckm_conv_metadata')

StatementMeta(, d029f378-0e5a-41c9-aac6-81abeb63355e, 10, Finished, Available)

DataFrame[]

In [None]:
from pyspark.sql import functions as F

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

# Read all the CSV files in the directory
df = spark.read.format("csv").option("header","true").load("Files/data/audio_input/*.csv")

# Convert StartTime and EndTime to timestamp format
df = df.withColumn("StartTime", F.to_timestamp("StartTime", "MM/dd/yyyy h:mm:ss a"))
df = df.withColumn("EndTime", F.to_timestamp("EndTime", "MM/dd/yyyy h:mm:ss a"))

# Calculate the duration in milliseconds and add it as a new column
df = df.withColumn("Duration", (F.col("EndTime").cast("long") - F.col("StartTime").cast("long")) / 60)


# Write the DataFrame to a Delta table
df.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable('ckm_conv_metadata')

# # Display the first 2 rows
# display(df.head(2))


In [None]:
# df = spark.sql("SELECT * FROM ckm_lakehouse.ckm_conv_metadata LIMIT 1000")
# display(df)

In [6]:
#https://learn.microsoft.com/en-us/azure/ai-services/speech-service/get-started-stt-diarization?tabs=windows&pivots=programming-language-python
import os
import time
import azure.cognitiveservices.speech as speechsdk
import json

def transcribe_from_file(ai_services_key,ai_services_region,wav_file_path, conversation_id):
    all_results = list()

    speech_config = speechsdk.SpeechConfig(subscription=ai_services_key, region=ai_services_region)
    speech_config.speech_recognition_language="en-US"

    audio_config = speechsdk.audio.AudioConfig(filename=wav_file_path)
    conversation_transcriber = speechsdk.transcription.ConversationTranscriber(speech_config=speech_config, audio_config=audio_config)

    transcribing_stop = False

    def conversation_transcriber_session_started_cb(evt: speechsdk.SessionEventArgs):
        pass 
        # print('SessionStarted event')

    def stop_cb(evt: speechsdk.SessionEventArgs): #callback that signals to stop continuous recognition upon receiving an event `evt`
        nonlocal transcribing_stop
        transcribing_stop = True

    def conversation_transcriber_recognition_canceled_cb(evt: speechsdk.SessionEventArgs):
        pass
        # print('Canceled event')

    def conversation_transcriber_session_stopped_cb(evt: speechsdk.SessionEventArgs):
        pass
        # print('SessionStopped event')

    def handle_final_result(evt):
        nonlocal all_results
        if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech:
            #https://learn.microsoft.com/en-us/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.recognitionresult?view=azure-python
            r = json.loads(evt.result.json)
            all_results.append([conversation_id,
                                r["Id"],
                                r["DisplayText"],
                                r["Offset"],
                                r["Duration"],
                                r["Channel"],
                                r["Type"],
                                r["SpeakerId"]
                                ])
 
    conversation_transcriber.transcribed.connect(handle_final_result) # Connect callbacks to the events fired by the conversation transcriber
    conversation_transcriber.session_started.connect(conversation_transcriber_session_started_cb)
    conversation_transcriber.session_stopped.connect(conversation_transcriber_session_stopped_cb)
    conversation_transcriber.canceled.connect(conversation_transcriber_recognition_canceled_cb)
    conversation_transcriber.session_stopped.connect(stop_cb)
    conversation_transcriber.canceled.connect(stop_cb)

    conversation_transcriber.start_transcribing_async()

    # Waits for completion.
    while not transcribing_stop:
        time.sleep(.5)

    conversation_transcriber.stop_transcribing_async()
    return(all_results)

# try:
#     wav_file_path = "/lakehouse/default/Files/data/audio_input/Travel_20240124183556.wav"
#     r = transcribe_from_file(ai_services_key,ai_services_region,wav_file_path,'f25c4254-1c97-4301-bfaf-d9d20129e67c')
#     print(r)
# except Exception as err:
#     print("Encountered exception. {}".format(err))

StatementMeta(, d029f378-0e5a-41c9-aac6-81abeb63355e, 13, Finished, Available)

In [None]:
# # spark.sql('drop table if exists ckm_lakehouse.ckm_conv_messages')

# from pyspark.sql import SparkSession

# # Create a Spark session
# spark = SparkSession.builder.getOrCreate()

# # Get the schema of the existing table
# schema = spark.table("ckm_lakehouse.ckm_conv_messages").schema

# # Create an empty DataFrame with the same schema
# empty_df = spark.createDataFrame([], schema)

# # Overwrite the existing table with the empty DataFrame
# empty_df.write.mode('overwrite').saveAsTable("ckm_lakehouse.ckm_conv_messages")


In [None]:
from pyspark.sql import functions as f

for row in df.rdd.collect():
    # Strip leading and trailing whitespace from the file name
    file_name = row.FileName.strip()
    print(file_name)
    wav_file_path = '/lakehouse/default/Files/data/audio_input/' + file_name # full path is required for speechSDK
    # print(wav_file_path)
    try:
        r = transcribe_from_file(ai_services_key,ai_services_region,wav_file_path,row.ConversationId)
        if len(r) != 0:
            df_columns = ["conversation_id","Id","DisplayText","Offset","Duration","Channel","Type","SpeakerId"]
            df_conv = spark.createDataFrame(data=r, schema = df_columns)
            df_conv = df_conv.coalesce(1).withColumn("row_id", f.monotonically_increasing_id())

            df_conv.write.format('delta').mode('append').option("overwriteSchema", "true").saveAsTable('ckm_conv_messages')
                # Move the processed file to the 'audio_processed' folder
            print('Files/data/audio_input/' + file_name, 'Files/data/audio_processed/' + file_name)
            mssparkutils.fs.mv(('Files/data/audio_input/' + file_name), ('Files/data/audio_processed/' + file_name), False,True)
            # break
    except Exception as e:
        print("could not load:", wav_file_path)
        print("An error occurred:", e)  # Print the exception
        # Move the processed file to the 'audio_failed' folder
        # mssparkutils.fs.mv(('Files/data/audio_input/' + file_name), ('Files/data/audio_failed/' + file_name), False,True)


In [None]:
# df = spark.sql("SELECT * FROM ckm_lakehouse.`ckm_conv_metadata` LIMIT 1000")
# display(df)

StatementMeta(, , , Waiting, )

In [13]:
import os
import shutil

# Directory paths
input_dir = '/lakehouse/default/Files/data/audio_input/'
processed_dir = '/lakehouse/default/Files/data/audio_processed/'

# Get a list of all .csv files in the input directory
csv_files = [f for f in os.listdir(input_dir) if f.endswith('.csv')]

# Move each .csv file to the processed directory
for file_name in csv_files:
    shutil.move(os.path.join(input_dir, file_name), os.path.join(processed_dir, file_name))


StatementMeta(, d029f378-0e5a-41c9-aac6-81abeb63355e, 20, Finished, Available)

In [38]:
# df = spark.sql("SELECT * FROM ckm_lakehouse.ckm_conv_messages LIMIT 10")
# display(df)

StatementMeta(, , , Waiting, )