In [54]:
import dask.dataframe as dd
from datetime import datetime, timedelta

In [76]:
startDate = '2019-12-18'
endDate = '2019-12-30'
outputDir = './data/splitted' + '/'
outputPrefix = 'out_'
inputFile = './data/Santa_Clara_County_Pin_Report_1e6.tsv'

flushThresholdPerDate = 1000 # We will cache rows for a date in memory before writing to the files.




In [77]:
def getDateStrings(startDate, endDate):
    start = datetime.strptime(startDate, "%Y-%m-%d")
    end = datetime.strptime(endDate, "%Y-%m-%d")
    days = (end - start).days
    dayDelta = timedelta(days = 1)
    dateStrings = []
    
    for _ in range(days+1):
        dateStrings.append(start.strftime("%Y-%m-%d"))
        start = start + dayDelta
        
    return dateStrings
        

In [78]:
def getInitializedCacheRows(dates):
    cachedRows = {}
    for date in dates:
        cachedRows[date] = []
    
    return cachedRows

# flushes data to file if the cachedData grows beyond a threshold
def flushDataToFiles(cachedRows, force=False):
    
    print(f'flushing files, force: {force}')
    
    if force is False:
        for date in cachedRows:
            if len(cachedRows[date]) >= flushThresholdPerDate:
                writeToFile(date, cachedRows[date])
    else:
        for date in cachedRows:
            writeToFile(date, cachedRows[date])
            
    # empty cache
    for date in cachedRows:
        cachedRows[date] = []
            
    pass


def getOutputFilename(dateStr):
    return outputDir + outputPrefix + dateStr + ".csv"

def toCsv(rows):
    csv = ""
    for row in rows:    
        csv = csv + f"{row['pid']},{row['hid']},{row['lat']},{row['lon']},{row['ts']},{row['td']},{row['dw']},{row['tz']} \n"
    return csv

    
def writeToFile(dateStr, rows):
    filename = getOutputFilename(dateStr)
    with open(filename, 'a+') as file:
        file.write(toCsv(rows))
    
        
    

In [79]:
columnNamesMap = {
    'Polygon ID': "pid",
    'Hashed Device ID': "hid",
    'Lat of Visit': 'lat',
    'Lon of Visit': 'lon',
    'Unix Timestamp of Visit': 'ts',
    'Date': 'date',
    'Time of Day': 'td',
    'Day of Week': 'dw',
    'Time Zone': 'tz'
    
}

dates = getDateStrings(startDate, endDate)

In [80]:
df = dd.read_csv('./data/Santa_Clara_County_Pin_Report_1e6.tsv', sep='\t')


In [81]:
df.head(5)

Unnamed: 0,Polygon ID,Hashed Device ID,Lat of Visit,Lon of Visit,Unix Timestamp of Visit,Date,Time of Day,Day of Week,Time Zone
0,Santa Clara County,8880011d4969dafb2bff89d4da1a37c994292941,36.900276,-121.581208,1576733932,2019-12-18,21:38:52,Wed,America/Los_Angeles
1,Santa Clara County,8880011d4969dafb2bff89d4da1a37c994292941,36.898245,-121.574142,1576733977,2019-12-18,21:39:37,Wed,America/Los_Angeles
2,Santa Clara County,8880011d4969dafb2bff89d4da1a37c994292941,36.897755,-121.565889,1576734022,2019-12-18,21:40:22,Wed,America/Los_Angeles
3,Santa Clara County,8880011d4969dafb2bff89d4da1a37c994292941,36.900589,-121.560763,1576734067,2019-12-18,21:41:07,Wed,America/Los_Angeles
4,Santa Clara County,8880011d4969dafb2bff89d4da1a37c994292941,36.906981,-121.559551,1576734113,2019-12-18,21:41:53,Wed,America/Los_Angeles


In [82]:
df = df.rename(columns=columnNamesMap)
df.head(5)

Unnamed: 0,pid,hid,lat,lon,ts,date,td,dw,tz
0,Santa Clara County,8880011d4969dafb2bff89d4da1a37c994292941,36.900276,-121.581208,1576733932,2019-12-18,21:38:52,Wed,America/Los_Angeles
1,Santa Clara County,8880011d4969dafb2bff89d4da1a37c994292941,36.898245,-121.574142,1576733977,2019-12-18,21:39:37,Wed,America/Los_Angeles
2,Santa Clara County,8880011d4969dafb2bff89d4da1a37c994292941,36.897755,-121.565889,1576734022,2019-12-18,21:40:22,Wed,America/Los_Angeles
3,Santa Clara County,8880011d4969dafb2bff89d4da1a37c994292941,36.900589,-121.560763,1576734067,2019-12-18,21:41:07,Wed,America/Los_Angeles
4,Santa Clara County,8880011d4969dafb2bff89d4da1a37c994292941,36.906981,-121.559551,1576734113,2019-12-18,21:41:53,Wed,America/Los_Angeles


In [83]:
# task 1: generate string dates from start and end

count = 0

# we will caches rows for each date until flushed to files. keys: dates, values: list of rows
cachedRows = getInitializedCacheRows(dates)
flushSize = 10000 # write after parsing flushSize number of rows.
progress = 0
    
for index, row in df.iterrows():
#     progress = progress + 1
# #     print(row)
#     if progress > 500:
#         break
    
        
    if row['date'] in dates:
        cachedRows[row['date']].append(row)
        
    count += 1
    if count >= flushSize:
        flushDataToFiles(cachedRows)

flushDataToFiles(cachedRows, force=True)
        
        
