In [3]:
# ! pip install tqdm
# ! pip install natsort

In [4]:
import os
import glob
import json
from tqdm import tqdm
from natsort import natsorted
import pandas as pd
import sqlite3
 
from tqdm.notebook import tqdm
tqdm.pandas()

from datetime import datetime, timedelta

## helper functions

In [23]:
def get_all_files(root_dir, contains=[''], extions=['']):
    found_files = []
    for rt_dir, dirs, files in os.walk(root_dir):
        for ext in extions:
            ext = ext.lower()
            ext_len = len(ext)
            for file in files:
                file_ext = file[-(ext_len):]
                # print(file)
                file_ext = file_ext.lower()
                if file_ext == ext:
                    file_name = os.path.join(rt_dir, file)
                    found_files.append(file_name)
                    # continue                    
                
        for con in contains:
            con = con.lower()
            con_len = len(con)
            for file in files:
                if con in os.path.basename(file):
                    file_name = os.path.join(rt_dir, file)
                    found_files.append(file_name)
    return found_files
 
    
def split_to_county(df, saved_path, column_name='visitor_home_cbgs', file_suffix='', mode='a'):
    if len(file_suffix) > 0:
            file_suffix = "_" +  file_suffix 
            
    df['county_code'] = df[column_name].str.zfill(12).str[:5]
    county_list = df['county_code'].unique()
    
    county_list = [c for c in county_list if c.isnumeric()]
    county_list = sorted(county_list) 
    print("   len of county_list:", len(county_list))

    df_row_cnt = len(df)
    removed_cnt = 0
    for idx, county in tqdm(enumerate(county_list)):  # cannot use tqdm in multiprocessing!     
        idxs = df['county_code'] == county
        county_df = df[idxs]
        
        basename = f'{county}_{column_name}{file_suffix}.csv'
        state_code = basename[:2]
        
        new_name = os.path.join(saved_path, state_code, county, basename)

        dirname = os.path.dirname(new_name)  
        os.makedirs(dirname, exist_ok=True)        
        
        county_df = county_df[[column_name, "placekey", "visits"]].sort_values(column_name)
        county_df.to_csv(new_name, index=False, mode=mode)
        removed_cnt += len(county_df)
        
        df = df[~idxs]

def unfold_df_columns(df, saved_path, file_suffix, columns=['visitor_home_cbgs', 'visitor_daytime_cbgs']):    
    for column in columns:
        pair_list = []
        print(f"   Creating edges for column {column}, {len(df)} POIs...")
        df = df[~df[column].isna()]
        df.progress_apply(unfold_row_dict, args=(pair_list, column), axis=1)
        pair_list_df = pd.DataFrame(pair_list)
        pair_list_df.columns = ["placekey", column, "visits"]

        print(f"   Created {len(pair_list_df)} edges.")

        print(f"   Splitting edges into county level for column {column}...")
        os.makedirs(saved_path, exist_ok=True)
        split_to_county(df=pair_list_df, column_name=column, saved_path = saved_path, file_suffix=file_suffix)
        print("   Finish splitting edges.")

def unfold_row_dict(row, result_list, column_name='visitor_home_cbgs'):

    # if 'safegraph_place_id' in row.index:
    #     placekey = row["safegraph_place_id"]
    # print("column_name:", column_name)
    if 'placekey' in row.index:
        placekey = row["placekey"]
    try:     
        a_dict = json.loads(row[column_name])
        # print(a_dict)
        result_list += list(zip([placekey] * len(a_dict.keys()), a_dict.keys(), a_dict.values()))
    except Exception as e: 
        print("Error in  unfold_row_dict(): e, row[column_name]:", e, row[column_name])
        


def patterns_to_Sqlite(df, sqlite_name):
     
    conn = sqlite3.connect(sqlite_name)
    curs = conn.cursor()
    
    columns = ','.join(df.columns)
    columns.replace('placekey,parent_placekey', 'placekey PRIMARY KEY,parent_placekey')
    
    curs.execute('create table if not exists POI ' +
                f"({','.join(df.columns)})")
    df.to_sql('POI', conn, if_exists='replace', index=False)
    
    sql = f'CREATE INDEX placekey_idx ON POI(placekey);'    
    curs.execute(sql)
    
    conn.close

In [6]:
# '06'.isnumeric()

# Target folder

In [7]:
data_root_dir = r'H:\SafeGraph_monthly_patterns_2018-2022'

all_files = get_all_files(data_root_dir)
print(f"Found files: {len(all_files)}")
print("The top 5 and bottom 5 files:")
all_files[:5] + ['...'] + all_files[-5:]

Found files: 77096
The top 5 and bottom 5 files:


['H:\\SafeGraph_monthly_patterns_2018-2022\\Dewey_safegraphy_download.ipynb',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\.ipynb_checkpoints\\Dewey_safegraphy_download-checkpoint.ipynb',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2018\\01\\01\\CUSTOMWEATHER\\DAILY\\daily.zip',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2018\\01\\01\\CUSTOMWEATHER\\HOURLY\\hourly.zip',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2018\\01\\01\\SAFEGRAPH\\MP\\brand_info.csv',
 '...',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\08\\21\\ADVAN\\WP\\patterns_weekly_000000000496.csv.gz',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\08\\21\\ADVAN\\WP\\patterns_weekly_000000000497.csv.gz',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\08\\21\\ADVAN\\WP\\patterns_weekly_000000000498.csv.gz',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\08\\21\\ADVAN\\WP\\patterns_weekly_000000000499.csv.gz',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\08\\21\\ADVAN\\WP\\visit_panel_summary.csv']

## Filter files

In [8]:
target_years = '2023'
target_months = ['08', '06', '07']
target_dataset = ['WP']
target_names = ['patterns_weekly_']
target_files = []

for file in all_files[:]:
    directories = file.replace(data_root_dir, '').split(os.sep)[1:]
    # print(directories)
    if len(directories) < 5:
        continue
    year = directories[0]
    month = directories[1]
    dataset = directories[-2]
    basename = directories[-1]
    if year in target_years:
        if month in target_months:
            if dataset in target_dataset:
                for target_name in target_names:
                    if target_name in basename:
                        target_files.append(file)

print(f"Found target files: {len(target_files)}")
print("The top 5 and bottom 5 files:")
target_files[:5] + ['...'] + target_files[-5:]

Found target files: 6000
The top 5 and bottom 5 files:


['H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\06\\05\\ADVAN\\WP\\patterns_weekly_000000000000.csv.gz',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\06\\05\\ADVAN\\WP\\patterns_weekly_000000000001.csv.gz',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\06\\05\\ADVAN\\WP\\patterns_weekly_000000000002.csv.gz',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\06\\05\\ADVAN\\WP\\patterns_weekly_000000000003.csv.gz',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\06\\05\\ADVAN\\WP\\patterns_weekly_000000000004.csv.gz',
 '...',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\08\\21\\ADVAN\\WP\\patterns_weekly_000000000495.csv.gz',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\08\\21\\ADVAN\\WP\\patterns_weekly_000000000496.csv.gz',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\08\\21\\ADVAN\\WP\\patterns_weekly_000000000497.csv.gz',
 'H:\\SafeGraph_monthly_patterns_2018-2022\\2023\\08\\21\\ADVAN\\WP\\patterns_weekly_000000000498.csv.gz',
 'H:\\SafeGraph_monthly_patte

## Find dates

In [9]:
target_file_df = pd.DataFrame(target_files, columns=['file'])

In [10]:
year_start_pos = 40
target_file_df['year'] = target_file_df['file'].str[year_start_pos:year_start_pos+4]
target_file_df['month'] = target_file_df['file'].str[year_start_pos+5:year_start_pos+7]
target_file_df['day'] = target_file_df['file'].str[year_start_pos+8:year_start_pos+10]
target_file_df['date'] = target_file_df['year'] + '-' + target_file_df['month'] + '-' + target_file_df['day']
target_file_df 

Unnamed: 0,file,year,month,day,date
0,H:\SafeGraph_monthly_patterns_2018-2022\2023\0...,2023,06,05,2023-06-05
1,H:\SafeGraph_monthly_patterns_2018-2022\2023\0...,2023,06,05,2023-06-05
2,H:\SafeGraph_monthly_patterns_2018-2022\2023\0...,2023,06,05,2023-06-05
3,H:\SafeGraph_monthly_patterns_2018-2022\2023\0...,2023,06,05,2023-06-05
4,H:\SafeGraph_monthly_patterns_2018-2022\2023\0...,2023,06,05,2023-06-05
...,...,...,...,...,...
5995,H:\SafeGraph_monthly_patterns_2018-2022\2023\0...,2023,08,21,2023-08-21
5996,H:\SafeGraph_monthly_patterns_2018-2022\2023\0...,2023,08,21,2023-08-21
5997,H:\SafeGraph_monthly_patterns_2018-2022\2023\0...,2023,08,21,2023-08-21
5998,H:\SafeGraph_monthly_patterns_2018-2022\2023\0...,2023,08,21,2023-08-21


In [20]:
date_strings = target_file_df['date'].unique()
date_strings

array(['2023-06-05', '2023-06-12', '2023-06-19', '2023-06-26',
       '2023-07-03', '2023-07-10', '2023-07-17', '2023-07-24',
       '2023-07-31', '2023-08-07', '2023-08-14', '2023-08-21'],
      dtype=object)

In [None]:
saved_path = r'K:\SafeGraph\Weekly_county_files'
os.makedirs(saved_path, exist_ok=True)

for date_idx, date in enumerate(date_strings[:]):
    print(f'Processing date str: {date}, {date_idx + 1} / {len(date_strings)}')
    date_df = target_file_df.query(f"date == '{date}' ")
    date_csv_files = date_df['file'].to_list()

    unfold_columns=['visitor_home_cbgs', 'visitor_daytime_cbgs']
      
    print(f"    Loading CSV {len(date_csv_files)} files...")
   
    # read all patterns files
    # weekly_df = pd.concat([pd.read_csv(f) for f in tqdm(date_csv_files[:])]) 
    for idx, f in tqdm(enumerate(date_csv_files[:])):
        weekly_df = pd.read_csv(f)
        # clean data
        weekly_df = weekly_df[~weekly_df['date_range_start'].isna()]
        weekly_df = weekly_df[~weekly_df['date_range_end'].isna()] 
        start_date = weekly_df['date_range_start'].min()[:10] # E.g.: 2018-01-15T00:00:00-09:00
        end_date = weekly_df['date_range_end'].max()[:10]
     
        # print(f"   Read {len(date_df)} files. Date range: {start_date} - {end_date}")
        print(f"   Processing {idx + 1} / {len(date_df)} files. Date range: {start_date} - {end_date}, {f}")
        file_suffix = f"{start_date}_To_{end_date}"
    
        # Save POI CSV without the split columns.
        POI_new_name = os.path.join(saved_path, "POI_only", f"POI_{file_suffix}.db")
        os.makedirs(os.path.dirname(POI_new_name), exist_ok=True)
        print(f"   Saving POI files to: {POI_new_name}")
        # POI_drop_columns = ['visitor_home_cbgs', 'visitor_daytime_cbgs']
        POI_drop_columns = unfold_columns
        # weekly_df.drop(columns=POI_drop_columns).to_csv(POI_new_name, index=False)
        
        # print("    Writing Sqlite database...")
        # patterns_to_Sqlite(weekly_df.drop(columns=POI_drop_columns), POI_new_name)
    
        weekly_df = weekly_df[unfold_columns + ['placekey']]
        
        # Unfold_columns    
        
        unfold_df_columns(df=weekly_df, saved_path=saved_path, file_suffix=file_suffix, columns=unfold_columns)                  
        
        
        print()

weekly_df

Processing date str: 2023-06-05, 1 / 12
    Loading CSV 500 files...


0it [00:00, ?it/s]

   Processing 1 / 500 files. Date range: 2023-06-05 - 2023-06-12, H:\SafeGraph_monthly_patterns_2018-2022\2023\06\05\ADVAN\WP\patterns_weekly_000000000000.csv.gz
   Saving POI files to: K:\SafeGraph\Weekly_county_files\POI_only\POI_2023-06-05_To_2023-06-12.db
   Creating edges for column visitor_home_cbgs, 29150 POIs...


  0%|          | 0/14089 [00:00<?, ?it/s]

   Created 245503 edges.
   Splitting edges into county level...
   len of county_list: 3016


0it [00:00, ?it/s]

   Finish splitting edges.
   Creating edges for column visitor_daytime_cbgs, 14089 POIs...


  0%|          | 0/13967 [00:00<?, ?it/s]

   Created 233527 edges.
   Splitting edges into county level...
   len of county_list: 2978


0it [00:00, ?it/s]

   Finish splitting edges.

   Processing 2 / 500 files. Date range: 2023-06-05 - 2023-06-12, H:\SafeGraph_monthly_patterns_2018-2022\2023\06\05\ADVAN\WP\patterns_weekly_000000000001.csv.gz
   Saving POI files to: K:\SafeGraph\Weekly_county_files\POI_only\POI_2023-06-05_To_2023-06-12.db
   Creating edges for column visitor_home_cbgs, 28866 POIs...


  0%|          | 0/14181 [00:00<?, ?it/s]

   Created 246684 edges.
   Splitting edges into county level...
   len of county_list: 3022


0it [00:00, ?it/s]

In [18]:
weekly_df.columns

Index(['visitor_home_cbgs', 'visitor_daytime_cbgs', 'placekey'], dtype='object')

In [None]:

weekly_csv_dir = r'J:\weekly_patterns_20211211\to_cluster2'
weekly_csv_files = get_all_files(root_dir=weekly_csv_dir, contains=['row'], extions=[''])
weekly_csv_files = natsorted(weekly_csv_files, reverse=True)
print(f"Found {len(weekly_csv_files)} files. ")        
weekly_csv_files[0]

In [None]:

# get week strings
week_strings = [os.path.basename(f)[:10] for f in weekly_csv_files]
week_strings = list(set(week_strings))
week_strings = natsorted(week_strings, reverse=True)
print(f"Found {len(week_strings)} weeks.", week_strings[0])

In [None]:
# week_strings

In [None]:
# Start to process each week
print("Start to process each week:\n")
saved_path = r'J:\Safegraph\weekly_county_files\weekly_patterns_2018_2021'



for week_idx, week_str in enumerate(week_strings[:]):
    print(f'Processing week_str: {week_str}, {week_idx + 1} / {len(week_strings)}')
    df_list = []
    
    print(f"   Reading CSV files...")
    for f in weekly_csv_files:        
        if os.path.basename(f).startswith(week_str):
            # print(f)
            df = pd.read_csv(f)
            df_list.append(df)
        # break
            
    weekly_df = pd.concat(df_list).iloc[:]
    start_date = weekly_df['date_range_start'].min()[:10] # E.g.: 2018-01-15T00:00:00-09:00
    end_date = weekly_df['date_range_end'].max()[:10]
    print(f"   Read {len(df_list)} files. Date range: {start_date} - {end_date}")
    file_suffix = f"{start_date}_To_{end_date}"

    # Unfold_columns    
    unfold_columns=['visitor_home_cbgs', 'visitor_daytime_cbgs']
    unfold_df_columns(df=weekly_df, saved_path=saved_path, file_suffix=file_suffix, columns=unfold_columns)                  
    
    # Save POI CSV without the split columns.
    POI_new_name = os.path.join(saved_path, "POI", f"POI_{file_suffix}.csv")
    os.makedirs(os.path.dirname(POI_new_name), exist_ok=True)
    print(f"   Saving POI files to: {POI_new_name}")
    # POI_drop_columns = ['visitor_home_cbgs', 'visitor_daytime_cbgs']
    POI_drop_columns = unfold_columns
    weekly_df.drop(columns=POI_drop_columns).to_csv(POI_new_name, index=False)
    print()

print("All weeks were done.")

In [None]:
json.loads(weekly_df.iloc[8]['visitor_home_cbgs'])

In [None]:
dict_ = json.loads(weekly_df.iloc[8]['visitor_home_cbgs'])
pd.DataFrame.from_dict(dict_, orient='index', columns=[ 'visits']).reset_index()

In [None]:
pair_list = []

weekly_df.apply(unfold_row_dict, args=(pair_list,), axis=1)

pair_list_df = pd.DataFrame(pair_list)
pair_list_df.columns = ["placekey", "visitor_home_cbgs", "visits"]
pair_list_df        

In [None]:
pair_list_df.columns = ["placekey", "visitor_home_cbgs", "visits"]


In [None]:
def split_to_county(df, saved_path):
    df['county_code'] = df['visitor_home_cbgs'].str.zfill(12).str[:5]
    county_list = df['county_code'].unique()

    print("len of county_list:", len(county_list))

    df_row_cnt = len(df)

    removed_cnt = 0

    for idx, county in enumerate(county_list):  # cannot use tqdm in multiprocessing!
        print(idx, county)
        idxs = df['county_code'] == county
        county_df = df[idxs]
        
        #

        # new_name = f'County_{county}_{basename}'
        basename = f'{county}.csv'
        state_code = basename[:2]
        
        print("basename:", basename)
        
        new_name = os.path.join(saved_path, state_code, basename)
        # print("new_name:", new_name)
        # start_date = county_df[''].min()
        
        dirname = os.path.dirname(new_name)  
        os.makedirs(dirname, exist_ok=True)
        
        
        county_df.to_csv(new_name, index=False)
        # print("len of county_df:", len(county_df))

        removed_cnt += len(county_df)

        df = df[~idxs]

split_to_county(df=pair_list_df, saved_path = r'J:\Safegraph\weekly_county_files\weekly_patterns_20211211')

In [None]:
pair_list_df['county_code'] = pair_list_df['visitor_home_cbgs'].str.zfill(12).str[:5]

In [None]:

groups = pair_list_df.groupby('county_code')

In [None]:
# groups = df.groupby('county_code', as_index=False)


In [None]:
county_df

In [None]:
def split_to_county(df, saved_path, column_name='visitor_home_cbgs', file_suffix=''):
    if len(file_suffix) > 0:
            file_suffix = "_" +  file_suffix 
            
    df['county_code'] = df[column_name].str.zfill(12).str[:5]
    county_list = df['county_code'].unique()
    
    county_list = [c for c in county_list if c.isnumeric()]
    county_list = sorted(county_list) 
    

    df_row_cnt = len(df)
    removed_cnt = 0
    
    groups = df.groupby('county_code', as_index=False)
    # print("   len of county_list:", len(county_list))
    processed_county_cnt = 0
    for county, county_df in groups:
        
        if not county.isnumeric():
            continue
        
        basename = f'{county}_{column_name}{file_suffix}.csv'
        state_code = basename[:2]
        
        new_name = os.path.join(saved_path, state_code, county, basename)

        dirname = os.path.dirname(new_name)  
        os.makedirs(dirname, exist_ok=True)        
        
        county_df = county_df[[column_name, "placekey", "visits"]].sort_values([column_name, 'visits'], ascending=[True, False])
        county_df.to_csv(new_name, index=False)
        removed_cnt += len(county_df)

        processed_county_cnt += 1
        if processed_county_cnt % 100 == 0:
            print(f"   PID {os.getpid()} finished {processed_county_cnt} / {len(groups)} counties for in period: {file_suffix}\n")
            

            
split_to_county(df, 
                saved_path=, 
                column_name='visitor_home_cbgs', 
                file_suffix='test'):


In [None]:
# test groupby

weekly_df.groupby('county_code')