In [2]:

from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from gridfs import GridFS
from time import time
from pydub import AudioSegment
import os
import soundfile as sf

In [3]:
def get_id():
    return str(int(time())) # id is timestamp. 

In [15]:
class my_db:
    def __init__(self, user = "admin", password = "FxTrZgPAQCRnMXer"):
        connection_uri = f"mongodb+srv://{user}:{password}@capstone.szoclas.mongodb.net/?retryWrites=true&w=majority"
        print(connection_uri)
        self.db = MongoClient(connection_uri)['Capstone']
        self.files = self.db['files'] # files storage
        self.users = self.db['users'] # user credentials
        self.user_data = self.db['user_data'] # user data - contains id's of base songs and other details
        self.processing_data = self.db['processing_data'] # processing stage of songs will be mentioned here
        os.makedirs('temp_files', exist_ok = True)
    
    def create_user(self, user, password):
        # check if user already exists
        users = self.users.find({'user': user})
        if len(list(users)) > 0:
            return {"status": "failed", "message": f"User '{user}' Already Exists"}
        else:
            user_id = get_id()
            user_record = {
                "user": user,
                "password": password,
                "_id": user_id
            }
            user_data = {
                '_id': user_id,
                'base_audios': [],
                'recorded_audios': [],
            }
            self.users.insert_one(user_record) # insert user's credentials
            self.user_data.insert_one(user_data) # insert user's data - will be updated based on user's usage
            return {"status": "done", "message": f"New User '{user}' Created."}
        
    def validate_user(self, user, password):
        matched_users = list(self.users.find({'user': user}))
        if matched_users:
            if matched_users[0]['password'] == password:
                return {'status': 'done', 'id': matched_users[0]['_id'], 'message': 'User Logged In.'}
            else:
                return {'status': 'failed', 'message': 'Invalid Password.'}
        else:
            return {'status': 'failed', 'message': 'Invalid User.'}
        
    def write_audio(self, file_path):
        # read audio using PyDub
        audio_file = AudioSegment.from_file(file_path)
        id = get_id()
        temp_file = f'temp_files/{id}.mp3'
        audio_file.export(temp_file, format = "mp3")
        file = open(temp_file, 'rb')
        file_contents = file.read()
        file.close()
        record = {
            "_id": id,
            "content": file_contents
        }
        self.files.insert_one(record)
        return id

    def read_audio(self, id, file_path):
        records = list(self.files.find({"_id": id}))
        temp_id = get_id()
        if records:
            with open(f'temp_files/{temp_id}.mp3', 'wb') as file:
                file.write(records[0]['content'])
            audio = AudioSegment.from_file(f'temp_files/{temp_id}.mp3')
            audio.export(f'temp_files/{temp_id}.wav', format = "wav")
            wav_data, sample_rate = sf.read(f'temp_files/{temp_id}.wav')
            sf.write(file_path, wav_data, sample_rate)
            return file_path
        else:
            return False        

    def write_video(self, file_path):
        # generate id
        id = get_id()
        file = open(file_path, 'rb')
        file_contents = file.read()
        file.close()
        record = {
            "_id": id,
            "content": file_contents,
        }
        self.files.insert_one(record)
        return id
    
    def read_video(self, id, file_path = None):
        records = list(self.files.find({"_id": id}))
        if records:
            if file_path:
                with open(file_path, 'wb') as file:
                    file.write(records[0]['content'])
            return records[0]['content']
        else:
            return False
    
    def add_song_data(self, user_id, category, data):
        self.user_data.update_one(
            {'_id': user_id},
            {'$push': {category: data}}
            )
        

    def store_uploaded_song(self, file_path, user_id):
        # store the file in DB and get id
        id = self.write_audio(file_path)
        fname = os.path.split(file_path)[-1]
        name = os.path.splitext(fname)[0]
        duration = AudioSegment.from_file(file_path).duration_seconds


database = my_db()

mongodb+srv://admin:FxTrZgPAQCRnMXer@capstone.szoclas.mongodb.net/?retryWrites=true&w=majority


In [20]:
database.add_song_data('1699180310', 'base_audios', '43052345453')

In [6]:
# user uploads audio - workflow
# 1. user uploads audio
# 2. user

In [21]:
list(database.user_data.find())

[{'_id': '1699180310',
  'base_audios': ['43052345453', '43052345453'],
  'recorded_audios': []}]

In [7]:
# users
# audios
# videos
# all uploaded audios will be converted into vocals and no_vocals and will be stored in DB


In [3]:
# run this to clear all data in database
client = MongoClient("mongodb+srv://admin:FxTrZgPAQCRnMXer@capstone.szoclas.mongodb.net/?retryWrites=true&w=majority")
db = client['Capstone']
collections = db.list_collection_names()
for collection in collections:
    db[collection].drop()
client.close()