# Extract Raw Data from ObserveRTC

The output is a CSV file which will be appened to after each run.

A local file will keep track of when the script was run last and only query data for that given time period.

If there already exists a local data file, then it will load it and remove the latest duplicates. 

And append the newest entries. After this other data processing can happen by reading the CSV file.

In [1]:
from dotenv import load_dotenv
from pymongo import MongoClient
from datetime import datetime
import pandas as pd
import os
import json

In [7]:
load_dotenv()

filename = "output_folder/rawData.csv"


address = 'mongodb://{user}:{password}@{host}:{port}'.format(
    user= os.getenv('MONGO_USER'),
    password= os.getenv('MONGO_PASSWORD'),
    host= "localhost",
    port= os.getenv('MONGO_PORT')
)
client = MongoClient(address)
database=client["observertc-reports"]

# the collection we want to query
reportsDatabase = database.reports

#read number from text file
timeOfLastQuery = 0
try:
    with open("timeOfLastQuery.txt", "r") as f:
        timeOfLastQuery = int(f.read()) 
        print("Last run on:", datetime.fromtimestamp(timeOfLastQuery/1000), "going to search for new data 1 hour before this." )
        timeOfLastQuery -= 1000 * 60 * 60 # 1 hour before last query
except:
    pass

print("Searching for records since:", datetime.fromtimestamp(timeOfLastQuery/1000))

timeOfCurrentQuery = datetime.now()
cursor = reportsDatabase.find({"type": "CLIENT_EXTENSION_DATA", 
                                "payload.extensionType" : { "$in" : [
                                    "OUT_BOUND_RTC", 
                                    "IN_BOUND_RTC", 
                                    "REMOTE_OUT_BOUND_RTC", 
                                    "REMOTE_IN_BOUND_RTC"]}, 
                                "payload.timestamp" : {"$gt": timeOfLastQuery}})

print("Got data from database, converting to data frame")
dataSet = []
for record in cursor:
    data = {}
    #append timestamp to data
    data["timestamp"] = record["payload"]["timestamp"]
    data["callId"] = record["payload"]["callId"]
    data["roomId"] = record["payload"]["roomId"]
    data["clientId"] = record["payload"]["clientId"]
    data["userId"] = record["payload"]["userId"]
    data["sampleSeq"] = record["payload"]["sampleSeq"]

    a = json.loads(record["payload"]["payload"])

    # https://stackoverflow.com/questions/38987/how-do-i-merge-two-dictionaries-in-a-single-expression
    data = {**data, **a["stats"]}

    dataSet.append(data)


newData = pd.DataFrame(dataSet)
newData["callId"]=newData["callId"].astype(str)
newData["roomId"]=newData["roomId"].astype(str)
newData["clientId"]=newData["clientId"].astype(str)
newData["userId"]=newData["userId"].astype(str)

try:
    print("Trying to read and load from", filename)
    oldData = pd.read_csv(filename)

    oldData["callId"]=oldData["callId"].astype(str)
    oldData["roomId"]=oldData["roomId"].astype(str)
    oldData["clientId"]=oldData["clientId"].astype(str)
    oldData["userId"]=oldData["userId"].astype(str)

    print("size of old data: ", oldData.shape)
    print("size of new data: ", newData.shape)

    on = ['timestamp', 'callId', 'roomId', 'clientId', 'userId', 'sampleSeq',
       'id', 'type', 'codecId', 'kind', 'mediaType', 'ssrc', 'bytesSent',
       'packetsSent', 'headerBytesSent', 'nackCount', 'retransmittedBytesSent',
       'retransmittedPacketsSent', 'remoteId', 'jitter', 'packetsLost',
       'packetsReceived', 'fractionLost', 'localId', 'roundTripTime',
       'roundTripTimeMeasurements', 'totalRoundTripTime', 'firCount',
       'frameHeight', 'frameWidth', 'framesEncoded', 'framesSent',
       'hugeFramesSent', 'pliCount', 'qpSum', 'totalEncodeTime',
       'totalEncodedBytesTarget', 'packetsDiscarded', 'audioLevel',
       'bytesReceived', 'concealedSamples', 'concealmentEvents',
       'fecPacketsDiscarded', 'fecPacketsReceived', 'headerBytesReceived',
       'insertedSamplesForDeceleration', 'jitterBufferDelay',
       'jitterBufferEmittedCount', 'lastPacketReceivedTimestamp',
       'removedSamplesForAcceleration', 'silentConcealedSamples',
       'totalAudioEnergy', 'totalSamplesDuration', 'totalSamplesReceived',
       'remoteTimestamp', 'discardedPackets', 'framesDecoded',
       'framesPerSecond', 'framesReceived', 'totalDecodeTime',
       'totalInterFrameDelay', 'totalProcessingDelay',
       'totalSquaredInterFrameDelay']
    diff =pd.merge(oldData,newData[on], indicator=True, how='right', on=on).query('_merge=="right_only"').drop('_merge', axis=1)

    print("size of diff: ", diff.shape)

    # append diff to data.csv
    diff.to_csv(filename, mode='a', header=False, index=False)
except:
    print("size of new data: ", newData.shape)
    newData.to_csv(filename, mode='w', header=True, index=False)
    
# save timeOfQuery to a file as epoch
with open("timeOfLastQuery.txt", "w") as f:
    print("Saving timeOfCurrentQuery:", timeOfCurrentQuery, "as epoch:", int(timeOfCurrentQuery.timestamp() * 1000))
    f.write(str(int(timeOfCurrentQuery.timestamp() * 1000)))

    

Searching for records since: 1970-01-01 01:00:00
Got data from database, converting to data frame
Trying to read and load from rawData.csv
size of new data:  (101606, 63)
Saving timeOfCurrentQuery: 2022-12-22 13:57:17.606301 as epoch: 1671713837606
