In [1]:
import pandas as pd
import os
import sys
import hashlib
import pickle
from datetime import datetime, date, time

cwd = os.getcwd()
delimiter = "\\" if "\\" in cwd else "/"
repoPath = delimiter.join(cwd.split(delimiter)[:cwd.split(delimiter).index("videoProcessing")]) + delimiter

workingDataPath = repoPath + "workingData/"
recentCapturesPath = repoPath + "recentCaptures/"
videoDataPath = repoPath + "videoData/"

In [2]:
# pass in a whole path to save so its more flexible
# for the video data the path would be like
# homeVideo/deskCam/frameMetaData/
# homeVideo/deskCam/yolo11pose/
# homeVideo/deskCam/yolo11Object/

def getWorkingDf(fullWorkingDataPath):
    workingDataFiles = os.listdir(fullWorkingDataPath)
    if len(workingDataFiles) == 0:
        print('no files found')
        return []

    dfSoFar = pd.read_parquet(workingDataHRPath + workingDataFiles[0])
    for dataFileNameIndex in range(1, len(workingDataFiles)):
        dfSoFar = pd.concat([dfSoFar, pd.read_parquet(workingDataHRPath + workingDataFiles[dataFileNameIndex])]) 
    dfSoFar = dfSoFar[~dfSoFar.index.duplicated(keep="first")].sort_index()
    return pd.DataFrame(dfSoFar['value'])


In [31]:
def getWorkingDf(location, interval = (pd.Timestamp.min.tz_localize("UTC"), pd.Timestamp.max.tz_localize("UTC"))):
    fullWorkingDataPath = workingDataPath + location
    workingDataFiles = os.listdir(fullWorkingDataPath)
    if len(workingDataFiles) == 0:
        print('no files found')
        return []

    startTime, endTime = interval
    numFilesAdded = 0
    relevantFiles = []
    for wdf in workingDataFiles:
        fileStartTime = pd.to_datetime(wdf.split("_")[0])
        fileEndTime = pd.to_datetime(wdf.split("_")[1])
        if fileEndTime > startTime and fileStartTime < endTime:
            relevantFiles.append(wdf)
    
    if len(relevantFiles) == 0:
        print('no relevant files found')
        return []
    
    dfSoFar = pd.read_parquet(fullWorkingDataPath + relevantFiles[0])
    for dataFileNameIndex in range(1, len(relevantFiles)):
        dfSoFar = pd.concat([dfSoFar, pd.read_parquet(fullWorkingDataPath + relevantFiles[dataFileNameIndex])]) 
    dfSoFar = dfSoFar[~dfSoFar.index.duplicated(keep="first")].sort_index()


    return dfSoFar.loc[startTime:endTime].copy()


In [4]:
# Function to compute a short hash of a Python object
def short_hash(obj, length=8):
    # Serialize the object using pickle
    obj_bytes = pickle.dumps(obj)
    
    # Compute MD5 hash of the serialized object
    hash_obj = hashlib.md5(obj_bytes)
    
    # Return the hash truncated to the specified length
    return hash_obj.hexdigest()[:length]

In [None]:
# this writes a file for a subset of a DF
def writeDfFile(Df, fullWorkingDataPath):
    sh = short_hash(Df)
    parquetName = Df.iloc[0].name.strftime('%Y-%m-%dT%H%M%S%z') +\
                "_" +\
                Df.iloc[-1].name.strftime('%Y-%m-%dT%H%M%S%z') +\
                "_" + sh + "_" + ".parquet.gzip"
    print(f"saved to a file named {parquetName}")
    print(f"in {fullWorkingDataPath}")

    Df.to_parquet(fullWorkingDataPath + parquetName,
            compression='gzip') 

In [6]:
#takes in a dataframe you want to save and saves it in multiple files
def saveRows(df, fullWorkingDataPath, rows_per_file):
    if len(df) == 0: return
    startRow = 0
    endRow = len(df)
    rows_remaining = endRow - startRow
    while rows_remaining > 2 * rows_per_file:
        print(f'{rows_remaining} is too many rows writing {startRow} to {(endRow - rows_remaining) + rows_per_file}')
        writeDfFile(df.iloc[startRow: (endRow - rows_remaining) + rows_per_file + 1], fullWorkingDataPath)
        rows_remaining -= rows_per_file
        startRow += rows_per_file
    writeDfFile(df.iloc[startRow:endRow+1], fullWorkingDataPath)

In [7]:
# for a given dataframe approximates the number of rows for a parquet of target file size
def rowsPerFile(Df, targetFileSize, fullWorkingDataPath, fileName = 'test.parquet.gzip'):
    if fileName == 'test.parquet.gzip':
        fileRows = 1_000_000
        if len(Df) < fileRows: fileRows = len(Df)-1
        Df.iloc[:fileRows].to_parquet(fullWorkingDataPath + fileName,
                        compression='gzip')
        file_size = os.path.getsize(fullWorkingDataPath + fileName)
        os.remove(fullWorkingDataPath + fileName)
    else:
        fileRows = len(pd.read_parquet(fullWorkingDataPath + fileName))
        file_size = os.path.getsize(fullWorkingDataPath + fileName)
    
    rows_per_file = int(fileRows//(file_size/targetFileSize))
    return rows_per_file

In [8]:
# uses the filenames to split the rows in the df and saves
# will check if no new rows were added to a file by comparing hashes
# and skip save  

def writeToExistingFiles(Df, fileNames, fullWorkingDataPath, rows_per_file):
    tzi = Df.index[0].tzinfo
    for fileNum, fileName in enumerate(fileNames):
        if fileNum == 0 and Df.index[0] < pd.to_datetime(fileName.split('_')[0]).tz_convert(tzi):
            startTime = Df.index[0]
        else:
            startTime = pd.to_datetime(fileName.split('_')[0]).tz_convert(tzi)
        
        if len(fileNames) == 1 or fileNum == len(fileNames) - 1:
            endTime = Df.index[-1]
        else:
            endTime = pd.to_datetime(fileNames[fileNum + 1].split('_')[0]).tz_convert(tzi)
        
        # if the hash doesn't match write a new file
        if short_hash(Df.loc[startTime:endTime]) != fileName.split('_')[2]:
            print("the hashes don't match")
            os.remove(fullWorkingDataPath + fileName)
            saveRows(Df.loc[startTime:endTime], fullWorkingDataPath, rows_per_file)
        else:
            print(f'hashes match for {fileName}')

In [12]:
# saves a df using existing filenames if available
def writeWorkingDf(location, Df, targetFileSize = 2 * 1024 * 1024):
    fullWorkingDataPath = workingDataPath + location
    if not os.path.exists(fullWorkingDataPath):
        os.makedirs(fullWorkingDataPath)
    fileNames = sorted(os.listdir(fullWorkingDataPath))

    if len(fileNames) == 0:
        rows_per_file = rowsPerFile(Df, targetFileSize, fullWorkingDataPath)
        saveRows(Df, fullWorkingDataPath, rows_per_file)

    else:
        rows_per_file = rowsPerFile(Df, targetFileSize, fullWorkingDataPath, fileNames[0])
        writeToExistingFiles(Df, fileNames, fullWorkingDataPath, rows_per_file)


In [14]:
testParquetName = "testvidMetaDataDf_2024-11-14T181915-001407-0700_2024-11-14T212815-000251-0700.parquet"
testLocation =  "homeVideo/" + testParquetName.split("_")[0] + "/frameMetaData/"
testParquet = pd.read_parquet(testParquetName)

testParquet

Unnamed: 0,sampleDT
0,2024-11-14 18:19:15.001407-07:00
1,2024-11-14 18:19:15.100091-07:00
2,2024-11-14 18:19:15.200086-07:00
3,2024-11-14 18:19:15.300096-07:00
4,2024-11-14 18:19:15.400095-07:00
...,...
1795,2024-11-14 21:27:15.000141-07:00
1796,2024-11-14 21:27:30.000141-07:00
1797,2024-11-14 21:27:45.000133-07:00
1798,2024-11-14 21:28:00.000133-07:00


In [17]:
testParquet = testParquet.set_index("sampleDT")

In [18]:
testParquet

2024-11-14 18:19:15.001407-07:00
2024-11-14 18:19:15.100091-07:00
2024-11-14 18:19:15.200086-07:00
2024-11-14 18:19:15.300096-07:00
2024-11-14 18:19:15.400095-07:00
...
2024-11-14 21:27:15.000141-07:00
2024-11-14 21:27:30.000141-07:00
2024-11-14 21:27:45.000133-07:00
2024-11-14 21:28:00.000133-07:00
2024-11-14 21:28:15.000251-07:00


In [19]:
writeWorkingDf(testLocation, testParquet)

saved to a file named 2024-11-14T181915-0700_2024-11-14T212815-0700_fc46e221_.parquet.gzip


In [32]:
tp = getWorkingDf(testLocation)
tp

2024-11-14 18:19:15.001407-07:00
2024-11-14 18:19:15.100091-07:00
2024-11-14 18:19:15.200086-07:00
2024-11-14 18:19:15.300096-07:00
2024-11-14 18:19:15.400095-07:00
...
2024-11-14 21:27:15.000141-07:00
2024-11-14 21:27:30.000141-07:00
2024-11-14 21:27:45.000133-07:00
2024-11-14 21:28:00.000133-07:00
2024-11-14 21:28:15.000251-07:00
