## Step 1: Create a Google Drive Service

In [1]:
import os
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

In [2]:
def create_service(token_file_name, api_service_name, api_version, scopes):

    working_dir = os.getcwd()
    token_dir   = 'token_files'
    creds = Credentials.from_authorized_user_file(os.path.join(working_dir, token_dir, token_file_name), scopes)
    
    try:
        service = build(api_service_name, api_version, credentials=creds, static_discovery=False)
        print(api_service_name, api_version, 'service created successfully')
        return service
    except Exception as e:
        print(e)
        print(f'Failed to create service instance for {api_service_name}')
        os.remove(os.path.join(working_dir, token_dir, token_file_name))
        return None

To create a Google Drive service, you need to have the following files:
- token file for Google Drive

If you don't have it, please create one using the `create_token_files.ipynb` routine.

In [3]:
gdrive_service = create_service(
    token_file_name  = 'token_drive_v3_t1.json', 
    api_service_name = 'drive', 
    api_version      = 'v3', 
    scopes           = ['https://www.googleapis.com/auth/drive']
)

drive v3 service created successfully


In [111]:
buckets = {
    'recordings'  : '1z2nxlaCc6QPY9yIwUMIz_58-2DagVSJc',
    'transcripts' : '14c_oSCVHEkSBb59-68qJlJmSu9-WYqfO',
    'feedback'    : '1FKJ5dez8TJAoSIiCtVR6P1rYTrwSE0TT'
}

In [None]:
def check_date_dirs(selected_date, parent_id):
    query= f"parents = '{parent_id}'"
    current_dates = gdrive_service.files().list(supportsAllDrives=True, includeItemsFromAllDrives=True, q=query).execute()
    if selected_date.replace(' ', '-') not in [x['name'] for x in current_dates['files']]:
        print('No directory found for the selected date')
        return True
    else:
        raise Exception('Directory is already present!!!')

In [98]:
def create_date_subfolder(selected_date, parent_id):

    if check_date_dirs(selected_date, parent_id):
        folder_name = selected_date.replace(' ', '-')
        file_metadata = {
            'name': folder_name,
            'mimeType': 'application/vnd.google-apps.folder',
            'parents': [f'{parent_id}']
        }
        gdrive_service.files().create(body=file_metadata, supportsAllDrives=True, fields='id').execute()

In [124]:
def get_dir_id(selected_date, parent_id):

    query= f"parents = '{parent_id}'"
    date_fmt = selected_date.replace(' ', '-')

    folder_elements = (
        gdrive_service
        .files()
        .list(supportsAllDrives=True, includeItemsFromAllDrives=True, q=query)
        .execute()
    )
    dates_dict = {x['name'] : x['id'] for x in folder_elements['files']}

    if date_fmt in [k for k,_ in dates_dict.items()]:
        return dates_dict[date_fmt]
    else:
        raise Exception('Selected date not found in bucket. Please create the directory first')

In [147]:
def check_if_empty(parent_id):

    query= f"parents = '{parent_id}'"
    folder_elements = (
        gdrive_service
        .files()
        .list(supportsAllDrives=True, includeItemsFromAllDrives=True, q=query)
        .execute()
    )
    if folder_elements['files']:
        raise Exception('Directory is not empty. Make sure you are targeting the correct folder ID')
    else:
        return True

## Step 2: Access and download the SurveyCTO data

To access the data, you need the following information:

1. SurveyCTO username
2. SurveyCTO server name
3. SurveyCTO password

Create an environmental file with this information

In [4]:
import pandas as pd
from io import StringIO
from pysurveycto import SurveyCTOObject
from dotenv import load_dotenv

load_dotenv()
scto = SurveyCTOObject(
    server_name = os.getenv("SCTO_server"), 
    username    = os.getenv("SCTO_user"), 
    password    = os.getenv("SCTO_password")
)
form_data = scto.get_form_data(
    form_id     = 'llamadas', 
    format      = 'csv'
)
SCTO_data = pd.read_csv(StringIO(form_data))
SCTO_data['date_short'] = pd.to_datetime(SCTO_data['starttime']).dt.strftime('%B %d')

In [5]:
SCTO_data

Unnamed: 0,SubmissionDate,starttime,endtime,deviceid,devicephonenum,username,device_info,duration,caseid,id_estudiante,nombre,idioma,llamada,instanceID,formdef_version,review_quality,review_status,KEY,date_short
0,"Feb 14, 2025 3:06:31 AM","Feb 14, 2025 3:04:57 AM","Feb 14, 2025 3:06:19 AM",9761fdb37964719f,,laurita.buttner@gmail.com,Redmi|23117RA68G|14|SurveyCTO Collect 2.81.4 (...,82,,,Prueba de tutoría,,https://tutoriastel.surveycto.com/view/submiss...,uuid:775a057e-9673-4c0a-bb4a-9c1336c5554d,2502051553,,APPROVED,uuid:775a057e-9673-4c0a-bb4a-9c1336c5554d,February 14


In [7]:
available_dates = SCTO_data['date_short'].unique()
print(available_dates)

['February 14']


## Step 3: Delimit your cases by date

!!!Select one available date from the list above!!!

In [8]:
selected_date = 'February 14'
filtered_data = SCTO_data[SCTO_data['date_short'] == selected_date]
filtered_data

Unnamed: 0,SubmissionDate,starttime,endtime,deviceid,devicephonenum,username,device_info,duration,caseid,id_estudiante,nombre,idioma,llamada,instanceID,formdef_version,review_quality,review_status,KEY,date_short
0,"Feb 14, 2025 3:06:31 AM","Feb 14, 2025 3:04:57 AM","Feb 14, 2025 3:06:19 AM",9761fdb37964719f,,laurita.buttner@gmail.com,Redmi|23117RA68G|14|SurveyCTO Collect 2.81.4 (...,82,,,Prueba de tutoría,,https://tutoriastel.surveycto.com/view/submiss...,uuid:775a057e-9673-4c0a-bb4a-9c1336c5554d,2502051553,,APPROVED,uuid:775a057e-9673-4c0a-bb4a-9c1336c5554d,February 14


Play an audio sample -> The first recording from your filtered data

In [12]:
import ffmpeg
import tempfile

sample_url = filtered_data['llamada'].iloc[0]
sample_audio_bytes = scto.get_attachment(sample_url)

# Save bytes to a temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio:
    temp_audio.write(sample_audio_bytes)
    temp_sample_audio_path = temp_audio.name

# Play audio using ffplay (part of ffmpeg)
os.system(f'ffplay -nodisp -autoexit {temp_sample_audio_path}')


ffplay version 7.1 Copyright (c) 2003-2024 the FFmpeg developers
  built with Apple clang version 16.0.0 (clang-1600.0.26.4)
  configuration: --prefix=/opt/homebrew/Cellar/ffmpeg/7.1_4 --enable-shared --enable-pthreads --enable-version3 --cc=clang --host-cflags= --host-ldflags='-Wl,-ld_classic' --enable-ffplay --enable-gnutls --enable-gpl --enable-libaom --enable-libaribb24 --enable-libbluray --enable-libdav1d --enable-libharfbuzz --enable-libjxl --enable-libmp3lame --enable-libopus --enable-librav1e --enable-librist --enable-librubberband --enable-libsnappy --enable-libsrt --enable-libssh --enable-libsvtav1 --enable-libtesseract --enable-libtheora --enable-libvidstab --enable-libvmaf --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxml2 --enable-libxvid --enable-lzma --enable-libfontconfig --enable-libfreetype --enable-frei0r --enable-libass --enable-libopencore-amrnb --enable-libopencore-amrwb --enable-libopenjpeg --enable-libspeex --e

31488

## Step 4: Process the recordings

Processing the recordings will:

1. Upload the recording to Google Drive
2. Send the recording to the Assembly API

In [None]:
# Use the Google Drive service to create a subfolder
create_date_subfolder(selected_date, buckets['recordings'])

In [None]:
from googleapiclient.http import MediaFileUpload
import assemblyai as aai

load_dotenv()
aai.settings.api_key = os.getenv("aai_key")

aai_config = aai.TranscriptionConfig(
    speech_model      = aai.SpeechModel.best,
    speaker_labels    = True,
    # speakers_expected = n_speakers,
    language_code     = "es"
)

transcriber = aai.Transcriber()

In [None]:
def process_recordings(selected_date):

    dir_id = get_dir_id(selected_date, buckets['recordings'])

    if check_if_empty(dir_id):
    
        for i, row in filtered_data.iterrows():
            print(f'Processing recording {i+1} of {len(filtered_data)}')
            audio_url   = row['llamada']
            audio_bytes = scto.get_attachment(audio_url)
            audio_name  = f'{row['deviceid']}_{row['username']}.mp3'

            with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio:
                temp_audio.write(audio_bytes)
                temp_audio_path = temp_audio.name

            # Part 1: uploading recording to Google Drive
            audio_metadata = {
                'name': audio_name,
                'parents': [f'{dir_id}']
            }
            media = MediaFileUpload(temp_audio_path, mimetype='audio/mpeg')
            gdrive_service.files().create(body=audio_metadata, media_body=media, fields='id', supportsAllDrives=True).execute()
            print(f'File {audio_name} uploaded successfully')

            # Part 1: sending recording to AssemblyAI for transcription
            transcript = transcriber.transcribe(
                temp_audio_path,
                config = aai_config
            )
            if transcript.status == aai.TranscriptStatus.completed:
                print(f"Transcription was completed successfully!")
            if transcript.status == aai.TranscriptStatus.error:
                print(f"Transcription failed: {transcript.error}")
            transcript_uts = [f"Speaker {utterance.speaker}: {utterance.text}" for utterance in transcript.utterances]
            full_transcript = "\n".join(transcript_uts)

            os.remove(temp_audio_path)

    else:
        raise Exception('Directory is not empty. Make sure you are targeting the correct folder ID')

In [None]:
# Uploading recordings... make sure to have the correct date selected
save_recordings_to_gdrive(selected_date)

## Step 5: Process the recordings with AssemblyAI API

In [158]:
os.getenv("aai_key")

'b2789cfb44904616b9637bb48974564e'