<a href="https://colab.research.google.com/github/SampathK/MyExperimentalNotebooks/blob/main/Streamlit_Experiment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%sh
pip -Uq install streamlit boto3 streamlit_autorefresh


In [None]:
import streamlit as st
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
import pandas as pd
from datetime import datetime
from streamlit_autorefresh import st_autorefresh

# AWS Configuration
AWS_ACCESS_KEY_ID = 'your_access_key_id'
AWS_SECRET_ACCESS_KEY = 'your_secret_access_key'
AWS_BUCKET_NAME = 'your_bucket_name'
DYNAMODB_TABLE_NAME = 'your_dynamodb_table_name'
REGION_NAME = 'your_aws_region'

# Initialize Boto3 clients
s3_client = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

dynamodb = boto3.resource(
    'dynamodb',
    region_name=REGION_NAME,
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

table = dynamodb.Table(DYNAMODB_TABLE_NAME)

# Function to upload file to S3 and update metadata
def upload_file_to_s3(file, bucket_name, file_id, object_name=None):
    if object_name is None:
        object_name = file.name
    try:
        # Upload the file to S3
        s3_client.upload_fileobj(file, bucket_name, object_name)

        # Update the object metadata with file ID
        s3_client.copy_object(
            Bucket=bucket_name,
            CopySource={'Bucket': bucket_name, 'Key': object_name},
            Key=object_name,
            Metadata={'file_id': file_id},
            MetadataDirective='REPLACE'
        )
        return object_name
    except NoCredentialsError:
        st.error('Credentials not available')
        return None
    except PartialCredentialsError:
        st.error('Incomplete credentials provided')
        return None

# Function to insert status into DynamoDB
def insert_status(file_id, status, s3_key_summary=None, error_message=None):
    timestamp = datetime.utcnow().isoformat()
    item = {
        'file_id': file_id,
        'status_timestamp': timestamp,
        'status': status
    }
    if s3_key_summary:
        item['s3_key_summary'] = s3_key_summary
    if error_message:
        item['error_message'] = error_message

    table.put_item(Item=item)

# Function to get updated file statuses from DynamoDB
def get_updated_statuses(last_check_time):
    try:
        response = table.scan(
            FilterExpression="status_timestamp > :last_check_time",
            ExpressionAttributeValues={":last_check_time": last_check_time}
        )
        return response.get('Items', [])
    except Exception as e:
        st.error(f"Error fetching updated statuses: {e}")
        return []

# Function to generate presigned URL
def generate_presigned_url(bucket_name, object_key, expiration=3600):
    try:
        return s3_client.generate_presigned_url('get_object', Params={'Bucket': bucket_name, 'Key': object_key}, ExpiresIn=expiration)
    except Exception as e:
        st.error(f"Error generating presigned URL: {e}")
        return None

# Streamlit Application
st.title("Audio File Upload and Processing Pipeline")

# Initialize session state for uploaded files and last check time
if 'file_statuses' not in st.session_state:
    st.session_state.file_statuses = {}
if 'last_check_time' not in st.session_state:
    st.session_state.last_check_time = datetime.min.isoformat()

uploaded_file = st.file_uploader("Choose an audio file", type=["mp3", "wav", "flac"])

if uploaded_file is not None:
    file_id = uploaded_file.name.split('.')[0]
    file_details = {
        "Filename": uploaded_file.name,
        "FileType": uploaded_file.type,
        "FileSize": uploaded_file.size
    }
    st.write(file_details)

    # Button to trigger file upload
    if st.button("Upload and Process"):
        with st.spinner('Uploading...'):
            s3_key = upload_file_to_s3(uploaded_file, AWS_BUCKET_NAME, file_id)
            if s3_key:
                st.success(f'File {uploaded_file.name} uploaded successfully!')

                # Insert initial status into DynamoDB
                insert_status(file_id, 'File Received')

                st.info('File uploaded. Waiting for processing to complete...')

# Auto-refresh every 10 seconds to check the status of uploaded files
st_autorefresh(interval=10 * 1000, key="status_autorefresh")

# Check for updated statuses
updated_statuses = get_updated_statuses(st.session_state.last_check_time)
if updated_statuses:
    st.session_state.last_check_time = max(status['status_timestamp'] for status in updated_statuses)
    for status in updated_statuses:
        st.session_state.file_statuses[status['file_id']] = status

# Display the status of all uploaded files in a table
st.header("Uploaded Files Status")

if st.session_state.file_statuses:
    df = pd.DataFrame(st.session_state.file_statuses.values())
    st.table(df[['file_id', 'status_timestamp', 'status']])

    # Check if there is a summary file ready for download
    for status in st.session_state.file_statuses.values():
        if status['status'] == 'Summary Created' and 's3_key_summary' in status:
            summary_url = generate_presigned_url(AWS_BUCKET_NAME, status['s3_key_summary'])
            st.markdown(f"[Download Summary for {status['file_id']}]({summary_url})")
else:
    st.write("No files uploaded yet.")

In [None]:
import streamlit as st
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
import pandas as pd
from datetime import datetime
from streamlit_autorefresh import st_autorefresh
from opensearchpy import OpenSearch
from wordcloud import WordCloud
import matplotlib.pyplot as plt

# AWS Configuration
AWS_ACCESS_KEY_ID = 'your_access_key_id'
AWS_SECRET_ACCESS_KEY = 'your_secret_access_key'
AWS_BUCKET_NAME = 'your_bucket_name'
DYNAMODB_TABLE_NAME = 'your_dynamodb_table_name'
REGION_NAME = 'your_aws_region'

# OpenSearch Configuration
OPENSEARCH_HOST = 'your_opensearch_host'
OPENSEARCH_PORT = 9200
OPENSEARCH_INDEX = 'your_index_name'

# Initialize Boto3 clients
s3_client = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

dynamodb = boto3.resource(
    'dynamodb',
    region_name=REGION_NAME,
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

table = dynamodb.Table(DYNAMODB_TABLE_NAME)

# Initialize OpenSearch client
client = OpenSearch(
    hosts=[{'host': OPENSEARCH_HOST, 'port': OPENSEARCH_PORT}],
    http_auth=('your_username', 'your_password'),
    use_ssl=True,
    verify_certs=True,
    ssl_show_warn=False
)

# Function to upload file to S3 and update metadata
def upload_file_to_s3(file, bucket_name, file_id, object_name=None):
    if object_name is None:
        object_name = file.name
    try:
        # Upload the file to S3
        s3_client.upload_fileobj(file, bucket_name, object_name)

        # Update the object metadata with file ID
        s3_client.copy_object(
            Bucket=bucket_name,
            CopySource={'Bucket': bucket_name, 'Key': object_name},
            Key=object_name,
            Metadata={'file_id': file_id},
            MetadataDirective='REPLACE'
        )
        return object_name
    except NoCredentialsError:
        st.error('Credentials not available')
        return None
    except PartialCredentialsError:
        st.error('Incomplete credentials provided')
        return None

# Function to insert status into DynamoDB
def insert_status(file_id, status, s3_key_summary=None, error_message=None):
    timestamp = datetime.utcnow().isoformat()
    item = {
        'file_id': file_id,
        'status_timestamp': timestamp,
        'status': status
    }
    if s3_key_summary:
        item['s3_key_summary'] = s3_key_summary
    if error_message:
        item['error_message'] = error_message

    table.put_item(Item=item)

# Function to get updated file statuses from DynamoDB
def get_updated_statuses(last_check_time):
    try:
        response = table.scan(
            FilterExpression="status_timestamp > :last_check_time",
            ExpressionAttributeValues={":last_check_time": last_check_time}
        )
        return response.get('Items', [])
    except Exception as e:
        st.error(f"Error fetching updated statuses: {e}")
        return []

# Function to generate presigned URL
def generate_presigned_url(bucket_name, object_key, expiration=3600):
    try:
        return s3_client.generate_presigned_url('get_object', Params={'Bucket': bucket_name, 'Key': object_key}, ExpiresIn=expiration)
    except Exception as e:
        st.error(f"Error generating presigned URL: {e}")
        return None

# Function to fetch tags from OpenSearch
def fetch_tags_from_opensearch():
    query = {
        "size": 1000,  # Adjust size as needed
        "_source": ["tags"],
        "query": {
            "match_all": {}
        }
    }

    response = client.search(index=OPENSEARCH_INDEX, body=query)
    tags = []
    for hit in response['hits']['hits']:
        tags.extend(hit['_source']['tags'])

    return tags

# Function to generate word cloud from tags
def generate_word_cloud(tags):
    wordcloud = WordCloud(width=800, height=400, background_color='white').generate(' '.join(tags))
    plt.figure(figsize=(10, 5))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.show()

# Streamlit Application
st.title("Audio File Upload and Processing Pipeline")

# Initialize session state for uploaded files and last check time
if 'file_statuses' not in st.session_state:
    st.session_state.file_statuses = {}
if 'last_check_time' not in st.session_state:
    st.session_state.last_check_time = datetime.min.isoformat()

uploaded_file = st.file_uploader("Choose an audio file", type=["mp3", "wav", "flac"])

if uploaded_file is not None:
    file_id = uploaded_file.name.split('.')[0]
    file_details = {
        "Filename": uploaded_file.name,
        "FileType": uploaded_file.type,
        "FileSize": uploaded_file.size
    }
    st.write(file_details)

    # Button to trigger file upload
    if st.button("Upload and Process"):
        with st.spinner('Uploading...'):
            s3_key = upload_file_to_s3(uploaded_file, AWS_BUCKET_NAME, file_id)
            if s3_key:
                st.success(f'File {uploaded_file.name} uploaded successfully!')

                # Insert initial status into DynamoDB
                insert_status(file_id, 'File Received')

                st.info('File uploaded. Waiting for processing to complete...')

# Auto-refresh every 10 seconds to check the status of uploaded files
st_autorefresh(interval=10 * 1000, key="status_autorefresh")

# Check for updated statuses
updated_statuses = get_updated_statuses(st.session_state.last_check_time)
if updated_statuses:
    st.session_state.last_check_time = max(status['status_timestamp'] for status in updated_statuses)
    for status in updated_statuses:
        st.session_state.file_statuses[status['file_id']] = status

# Display the status of all uploaded files in a table
st.header("Uploaded Files Status")

if st.session_state.file_statuses:
    df = pd.DataFrame(st.session_state.file_statuses.values())
    st.table(df[['file_id', 'status_timestamp', 'status']])

    # Check if there is a summary file ready for download
    for status in st.session_state.file_statuses.values():
        if status['status'] == 'Summary Created' and 's3_key_summary' in status:
            summary_url = generate_presigned_url(AWS_BUCKET_NAME, status['s3_key_summary'])
            st.markdown(f"[Download Summary for {status['file_id']}]({summary_url})")
else:
    st.write("No files uploaded yet.")

# Section to generate word cloud from OpenSearch tags
st.header("Generate Word Cloud from OpenSearch Tags")

if st.button("Generate Word Cloud"):
    with st.spinner('Fetching tags from OpenSearch...'):
        tags = fetch_tags_from_opensearch()
        if tags:
            st.success('Tags fetched successfully!')
            st.write("Tags:", tags)

            with st.spinner('Generating word cloud...'):
                generate_word_cloud(tags)
                st.pyplot(plt)
        else:
            st.warning('No tags found in OpenSearch.')


In [None]:
pip install opensearch-py wordcloud streamlit

In [None]:
Here's the updated Streamlit application code that accepts `file_id` and `email` as input before uploading the file to S3 and updates the same information in the S3 metadata:

```python
import streamlit as st
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
import pandas as pd
from datetime import datetime
from streamlit_autorefresh import st_autorefresh
from opensearchpy import OpenSearch
from wordcloud import WordCloud
import matplotlib.pyplot as plt

# AWS Configuration
AWS_ACCESS_KEY_ID = 'your_access_key_id'
AWS_SECRET_ACCESS_KEY = 'your_secret_access_key'
AWS_BUCKET_NAME = 'your_bucket_name'
DYNAMODB_TABLE_NAME = 'your_dynamodb_table_name'
REGION_NAME = 'your_aws_region'

# OpenSearch Configuration
OPENSEARCH_HOST = 'your_opensearch_host'
OPENSEARCH_PORT = 9200
OPENSEARCH_INDEX = 'your_index_name'

# Initialize Boto3 clients
s3_client = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

dynamodb = boto3.resource(
    'dynamodb',
    region_name=REGION_NAME,
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

table = dynamodb.Table(DYNAMODB_TABLE_NAME)

# Initialize OpenSearch client
client = OpenSearch(
    hosts=[{'host': OPENSEARCH_HOST, 'port': OPENSEARCH_PORT}],
    http_auth=('your_username', 'your_password'),
    use_ssl=True,
    verify_certs=True,
    ssl_show_warn=False
)

# Function to upload file to S3 and update metadata
def upload_file_to_s3(file, bucket_name, file_id, email, object_name=None):
    if object_name is None:
        object_name = file.name
    try:
        # Upload the file to S3
        s3_client.upload_fileobj(file, bucket_name, object_name)

        # Update the object metadata with file ID and email
        s3_client.copy_object(
            Bucket=bucket_name,
            CopySource={'Bucket': bucket_name, 'Key': object_name},
            Key=object_name,
            Metadata={'file_id': file_id, 'email': email},
            MetadataDirective='REPLACE'
        )
        return object_name
    except NoCredentialsError:
        st.error('Credentials not available')
        return None
    except PartialCredentialsError:
        st.error('Incomplete credentials provided')
        return None

# Function to insert status into DynamoDB
def insert_status(file_id, status, email, s3_key_summary=None, error_message=None):
    timestamp = datetime.utcnow().isoformat()
    item = {
        'file_id': file_id,
        'email': email,
        'status_timestamp': timestamp,
        'status': status
    }
    if s3_key_summary:
        item['s3_key_summary'] = s3_key_summary
    if error_message:
        item['error_message'] = error_message

    table.put_item(Item=item)

# Function to get updated file statuses from DynamoDB
def get_updated_statuses(last_check_time):
    try:
        response = table.scan(
            FilterExpression="status_timestamp > :last_check_time",
            ExpressionAttributeValues={":last_check_time": last_check_time}
        )
        return response.get('Items', [])
    except Exception as e:
        st.error(f"Error fetching updated statuses: {e}")
        return []

# Function to generate presigned URL
def generate_presigned_url(bucket_name, object_key, expiration=3600):
    try:
        return s3_client.generate_presigned_url('get_object', Params={'Bucket': bucket_name, 'Key': object_key}, ExpiresIn=expiration)
    except Exception as e:
        st.error(f"Error generating presigned URL: {e}")
        return None

# Function to fetch tags from OpenSearch
def fetch_tags_from_opensearch():
    query = {
        "size": 1000,  # Adjust size as needed
        "_source": ["tags"],
        "query": {
            "match_all": {}
        }
    }

    response = client.search(index=OPENSEARCH_INDEX, body=query)
    tags = []
    for hit in response['hits']['hits']:
        tags.extend(hit['_source']['tags'])

    return tags

# Function to generate word cloud from tags
def generate_word_cloud(tags):
    wordcloud = WordCloud(width=800, height=400, background_color='white').generate(' '.join(tags))
    plt.figure(figsize=(10, 5))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.show()

# Streamlit Application
st.title("Audio File Upload and Processing Pipeline")

# Initialize session state for uploaded files and last check time
if 'file_statuses' not in st.session_state:
    st.session_state.file_statuses = {}
if 'last_check_time' not in st.session_state:
    st.session_state.last_check_time = datetime.min.isoformat()

# Input fields for file_id and email
file_id = st.text_input("Enter File ID")
email = st.text_input("Enter Email")

uploaded_file = st.file_uploader("Choose an audio file", type=["mp3", "wav", "flac"])

if uploaded_file is not None:
    file_details = {
        "Filename": uploaded_file.name,
        "FileType": uploaded_file.type,
        "FileSize": uploaded_file.size
    }
    st.write(file_details)

    # Button to trigger file upload
    if st.button("Upload and Process"):
        with st.spinner('Uploading...'):
            s3_key = upload_file_to_s3(uploaded_file, AWS_BUCKET_NAME, file_id, email)
            if s3_key:
                st.success(f'File {uploaded_file.name} uploaded successfully!')

                # Insert initial status into DynamoDB
                insert_status(file_id, 'File Received', email)

                st.info('File uploaded. Waiting for processing to complete...')

# Auto-refresh every 10 seconds to check the status of uploaded files
st_autorefresh(interval=10 * 1000, key="status_autorefresh")

# Check for updated statuses
updated_statuses = get_updated_statuses(st.session_state.last_check_time)
if updated_statuses:
    st.session_state.last_check_time = max(status['status_timestamp'] for status in updated_statuses)
    for status in updated_statuses:
        st.session_state.file_statuses[status['file_id']] = status

# Display the status of all uploaded files in a table
st.header("Uploaded Files Status")

if st.session_state.file_statuses:
    df = pd.DataFrame(st.session_state.file_statuses.values())
    st.table(df[['file_id', 'email', 'status_timestamp', 'status']])

    # Check if there is a summary file ready for download
    for status in st.session_state.file_statuses.values():
        if status['status'] == 'Summary Created' and 's3_key_summary' in status:
            summary_url = generate_presigned_url(AWS_BUCKET_NAME, status['s3_key_summary'])
            st.markdown(f"[Download Summary for {status['file_id']}]({summary_url})")
else:
    st.write("No files uploaded yet.")

# Section to generate word cloud from OpenSearch tags
st.header("Generate Word Cloud from OpenSearch Tags")

if st.button("Generate Word Cloud"):
    with st.spinner('Fetching tags from OpenSearch...'):
        tags = fetch_tags_from_opensearch()
        if tags:
            st.success('Tags fetched successfully!')
            st.write("Tags:", tags)

            with st.spinner('Generating word cloud...'):
                generate_word_cloud(tags)
                st.pyplot(plt)
        else:
            st.warning('No tags found in OpenSearch.')
```

### Explanation:

1. **Input Fields for File ID and Email:**
   - Added text input fields for `file_id` and `email` before the file uploader.

2. **Upload and Process Button:**
   - Updated the `upload_file_to_s3` function to accept `file_id` and `email` and store them in the S3 metadata.
   - Updated the `insert_status` function to include `email` in the DynamoDB entry.

3. **Display the Status:**
   - Updated the status table to include `email`.

This setup ensures that the file ID and email are input before uploading the file to S3 and are stored in the S3 metadata and DynamoDB.