In [1]:
import os
import pandas as pd
import dask.dataframe as dd
import glob

pd.options.mode.chained_assignment = None  # default='warn'

In [2]:
dir_path = "/work/group/oit_research_data/mobility/data/data_DFW_2021/data/DFW_2021_01/"
file_name = "A10_DFW"
ext = ".tsv"
file_path = dir_path+file_name+ext

In [3]:
# Define the start and end hours for filtering
start_hour = 1
end_hour = 4

In [4]:
threshold=1

In [5]:
# Function to convert Unix timestamps to seconds (if not already in seconds)
def convert_timestamp_to_dt(ddf, tz='US/Central'):        
    ddf['DateTime'] = dd.to_datetime(ddf['Unix Timestamp'],unit='s')    
    #ddf['DateTime'] = ddf["DateTime"].round("H")
    ddf.DateTime = ddf.DateTime.dt.tz_localize('UTC').dt.tz_convert(tz)  
    #ddf['DateTime']=ddf['DateTime'].astype(str)
    #ddf.DateTime = ddf.DateTime.str.slice(stop=-6)
    ddf['Date'] = ddf.DateTime.dt.date
    return ddf

In [6]:
def keep_rows_between_hours(df, start_hour, end_hour):
    df = df[(df['DateTime'].dt.hour >= start_hour) & (df['DateTime'].dt.hour <= end_hour)]
    return df

In [8]:
# Read the large TSV file in chunks using Dask
columns=["Polygon_ID","Device_ID","Lat","Lon","Unix Timestamp"]
df = dd.read_csv(file_path, sep='\t', names=columns, header=None,
                 dtype={"Polygon_ID":'object',"Device_ID":'object',"Lat":'float64',"Lon":'float64',"Unix Timestamp":'object'}).compute()

In [9]:
ddf = dd.from_pandas(df, npartitions=256)

In [10]:
convert_timestamp_to_dt(df, tz='US/Central')

Unnamed: 0,Polygon_ID,Device_ID,Lat,Lon,Unix Timestamp,DateTime,Date
0,P2,9a220574a893533efb3773aa3ca3282ba5834775,32.166847,-97.080640,1609459264,2020-12-31 18:01:04-06:00,2020-12-31
1,P2,9a220574a893533efb3773aa3ca3282ba5834775,32.166851,-97.080673,1609460462,2020-12-31 18:21:02-06:00,2020-12-31
2,P2,9a220574a893533efb3773aa3ca3282ba5834775,32.166850,-97.080641,1609461666,2020-12-31 18:41:06-06:00,2020-12-31
3,P2,9a220574a893533efb3773aa3ca3282ba5834775,32.166862,-97.080650,1609462887,2020-12-31 19:01:27-06:00,2020-12-31
4,P2,9a220574a893533efb3773aa3ca3282ba5834775,32.166862,-97.080650,1609462888,2020-12-31 19:01:28-06:00,2020-12-31
...,...,...,...,...,...,...,...
215919,E14,8bbde1b26d7ce211705e6f1cc808d51dcacf5b95,32.227679,-97.009669,1610322995,2021-01-10 17:56:35-06:00,2021-01-10
215920,E14,8bbde1b26d7ce211705e6f1cc808d51dcacf5b95,32.229966,-97.012108,1610323040,2021-01-10 17:57:20-06:00,2021-01-10
215921,D14,8bbde1b26d7ce211705e6f1cc808d51dcacf5b95,32.236599,-97.010914,1610323085,2021-01-10 17:58:05-06:00,2021-01-10
215922,B13,8bbde1b26d7ce211705e6f1cc808d51dcacf5b95,32.243516,-97.014395,1610323130,2021-01-10 17:58:50-06:00,2021-01-10


In [11]:
meta_dict = {"Polygon_ID":'object',"Device_ID":'object',"Lat":'float64',"Lon":'float64',"Unix Timestamp":'object', 'DateTime':'object','Date':'object'}

In [12]:
# Convert Unix timestamps to seconds
ddf = ddf.map_partitions(convert_timestamp_to_dt, meta=meta_dict)

In [13]:
# Drop rows between specific hours
ddf = ddf.map_partitions(keep_rows_between_hours, start_hour=start_hour, end_hour=end_hour,
                        meta=meta_dict)

In [14]:
def chunk(s):
    # for the comments, assume only a single grouping column, the 
    # implementation can handle multiple group columns.
    #
    # s is a grouped series. value_counts creates a multi-series like 
    # (group, value): count
    return s.value_counts()


def agg(s):
#     print('agg',s.apply(lambda s: s.groupby(level=-1).sum()))
    # s is a grouped multi-index series. In .apply the full sub-df will passed
    # multi-index and all. Group on the value level and sum the counts. The
    # result of the lambda function is a series. Therefore, the result of the 
    # apply is a multi-index series like (group, value): count
    #return s.apply(lambda s: s.groupby(level=-1).sum())

    # faster version using pandas internals
    s = s._selected_obj
    return s.groupby(level=list(range(s.index.nlevels))).sum()


def finalize(s):
    # s is a multi-index series of the form (group, value): count. First
    # manually group on the group part of the index. The lambda will receive a
    # sub-series with multi index. Next, drop the group part from the index.
    # Finally, determine the index with the maximum value, i.e., the mode.
    level = list(range(s.index.nlevels - 1))
    return (
        s.groupby(level=level)
        .apply(lambda s: s.reset_index(level=level, drop=True).idxmax())
    )

max_occurence = dd.Aggregation('mode', chunk, agg, finalize)

In [15]:
# Group by timestamp and ID, then aggregate as needed
grouped_df = ddf.groupby(['Device_ID','Date']).agg({
    'Lat': max_occurence,
    'Lon': max_occurence
}).reset_index()

In [16]:
v = grouped_df['Device_ID'].value_counts().compute()
grouped_df = grouped_df[grouped_df['Device_ID'].isin(v.index[v.gt(threshold)])]

In [17]:
# Group by timestamp and ID, then aggregate as needed
grouped_df = grouped_df.groupby(['Device_ID']).agg({
    'Lat': max_occurence,
    'Lon': max_occurence
}).reset_index()

In [18]:
grouped_df.compute().to_csv(file_name+'_output.tsv', sep='\t', index=False)#, single_file=True)

print("Processing complete. Processed file saved as 'processed_output.tsv'.")

Processing complete. Processed file saved as 'processed_output.tsv'.


In [19]:
test = pd.read_csv(file_name+'_output.tsv', sep='\t')
test

Unnamed: 0,Device_ID,Lat,Lon
0,0169a31330d568fbd19b0e10dfc7fed970b18647,32.248650,-97.016680
1,01f7fc93e2a7f945f9796410926791086edd6cab,32.166109,-97.082704
2,021cb3cf2220e4439a0ad2144b7c9f4a84556e09,32.226383,-97.071365
3,081585eb026761641fddc97ff7e470a14648b218,32.229558,-96.997914
4,0dc05346cc569d1accce2ed6d42035f6601d7c05,32.210590,-97.082148
...,...,...,...
88,f48953055864d49633ef11e6345951f96645e2e4,32.158283,-97.038029
89,f561c29493ecffd0343f341c4e368fd6f68fa08e,32.189232,-97.077105
90,faaa4bd8d948d48d7646bc23b2aaf773f2b23a6b,32.182456,-96.984255
91,fecf6ce048e98f683d912941397689ae3079cfdc,32.229660,-96.997900


In [20]:
def custom_mode(series):
    return series.mode().iloc[0] if not series.mode().empty else series.iloc[0]

In [21]:
testgb = df.groupby(['Date', 'Device_ID']).agg({
    'Lat': custom_mode,
    'Lon': custom_mode
}).reset_index()
testgb

Unnamed: 0,Date,Device_ID,Lat,Lon
0,2020-12-31,0091dec73e15a15f183c43d463fda73694f21a54,32.229470,-96.997918
1,2020-12-31,00d33e9d6974e69ceee8149b9181674bac8dad14,32.190241,-97.011447
2,2020-12-31,0169a31330d568fbd19b0e10dfc7fed970b18647,32.248615,-97.016586
3,2020-12-31,03070cde168148a85ebddbfb6fd0bedb1e8cc9a5,32.192838,-97.019206
4,2020-12-31,05119d77c268ff91ce14810bed564956cfbbea47,32.153957,-97.083419
...,...,...,...,...
5171,2021-01-31,f7e3266a538277dfb881b8322205eccf7bfad4af,32.178496,-97.049001
5172,2021-01-31,f82f05cd861234cbf6f9314fe09323e6964b708a,32.153996,-97.081610
5173,2021-01-31,fa15341e695b7ceb0af5fa877184abea1700393b,32.206200,-97.021057
5174,2021-01-31,fecf6ce048e98f683d912941397689ae3079cfdc,32.229650,-96.997900


In [22]:
testgb['Device_ID'].value_counts()

341b40f3b21088cb276b73350e307f12ce548be7    32
2e5a7143bb24dc1094ffb3a102b232ad8314164e    32
9115f80810a344266d027c7f1ef1c79a90437d38    32
961b66385f96094ff34c150ddfd377ca0426ae86    32
974e1c6765054fe74a522ccc7be433fea8358829    32
                                            ..
4d5ac2cd7fbd46c633390b6fa36c9084ad912c59     1
67f21307eade5671e5839265aecbfbed3306cfec     1
689b710db4b7f6a65b8df98ad4f2ce292431037d     1
6b5f96395858a33a4bec9dc3c70bdc769c11695c     1
f7e3266a538277dfb881b8322205eccf7bfad4af     1
Name: Device_ID, Length: 1446, dtype: int64

In [23]:
v = testgb.Device_ID.value_counts()
testgbth = testgb[testgb.Device_ID.isin(v.index[v.gt(threshold)])]

In [24]:
testgbth.groupby(['Device_ID']).agg({
    'Lat': custom_mode,
    'Lon': custom_mode
}).reset_index()

Unnamed: 0,Device_ID,Lat,Lon
0,0091dec73e15a15f183c43d463fda73694f21a54,32.219490,-97.077060
1,00d33e9d6974e69ceee8149b9181674bac8dad14,32.186615,-97.036289
2,0108e1decd9e0843f6d869b7f97de6feb0561852,32.233630,-97.079580
3,0169a31330d568fbd19b0e10dfc7fed970b18647,32.247693,-97.018151
4,01924eca355e4f73f8951dfacaf465b6a7c3c278,32.197014,-97.045866
...,...,...,...
574,feca7e83d4db189a55035d8be649a37d9a098fc9,32.228107,-97.012096
575,fecf6ce048e98f683d912941397689ae3079cfdc,32.229660,-96.997900
576,fefbd65ed48b8fe38c8a588def1ba2d466bd93f7,32.182090,-96.988270
577,ff2079221c8118d2b7eba027ad59d32194211299,32.188329,-97.019285
