## 1. Extract Monthly Patterns


In [1]:
import sys
from pathlib import Path
import pyarrow.parquet as pq
import pyarrow.dataset as ds

sys.path.append(str(Path.cwd().parent))
from config import PATH_KIOSK_USER_PATTERNS_FOLDER, PATH_KIOSK_USER_PATTERNS_REPO, PATH_SSD_ADVAN_FOLDER

In [2]:
import pandas as pd

execfile(Path(PATH_KIOSK_USER_PATTERNS_REPO, "functions/safe_parse_json.py"))

cols = [
    "PLACEKEY", "LOCATION_NAME", "NAICS_CODE", "LATITUDE", "LONGITUDE",
    "STREET_ADDRESS", "CITY", "REGION", "DATE_RANGE_START", "DATE_RANGE_END",
    "RAW_VISITOR_COUNTS", "RAW_VISIT_COUNTS", "VISITOR_HOME_CBGS"
]
monthly_patterns_files = list((PATH_SSD_ADVAN_FOLDER / "Monthly Patterns" / "Foot Traffic").rglob('**/*.gz'))
len(monthly_patterns_files)

3656

In [3]:
import re

# Extract year from file path using regex (assuming year is a 4-digit number in the path)
monthly_patterns_files_df = pd.DataFrame({
    'file_path': monthly_patterns_files,
    'FILE_NAME': [f.name for f in monthly_patterns_files],
    'YEAR': [int(re.search(r'(\d{4})', str(f)).group(1)) if re.search(r'(\d{4})', str(f)) else None for f in monthly_patterns_files]
})

pd.options.display.max_colwidth = None  # Show full file paths in the DataFrame
monthly_patterns_files_df.head()

Unnamed: 0,file_path,FILE_NAME,YEAR
0,F:\Advan Research\Monthly Patterns\Foot Traffic\2023\data_01bd7440-0105-dcc9-0042-fa0702ed2712_13_3_27.csv.gz,data_01bd7440-0105-dcc9-0042-fa0702ed2712_13_3_27.csv.gz,2023
1,F:\Advan Research\Monthly Patterns\Foot Traffic\2023\data_01bd7440-0105-dcc9-0042-fa0702ed2712_43_1_30.csv.gz,data_01bd7440-0105-dcc9-0042-fa0702ed2712_43_1_30.csv.gz,2023
2,F:\Advan Research\Monthly Patterns\Foot Traffic\2023\data_01bd7440-0105-dcc9-0042-fa0702ed2712_43_1_39.csv.gz,data_01bd7440-0105-dcc9-0042-fa0702ed2712_43_1_39.csv.gz,2023
3,F:\Advan Research\Monthly Patterns\Foot Traffic\2023\data_01bd7440-0105-dcc9-0042-fa0702ed2712_43_7_25.csv.gz,data_01bd7440-0105-dcc9-0042-fa0702ed2712_43_7_25.csv.gz,2023
4,F:\Advan Research\Monthly Patterns\Foot Traffic\2023\data_01bd7440-0105-dcc9-0042-fa0702ed2712_53_2_11.csv.gz,data_01bd7440-0105-dcc9-0042-fa0702ed2712_53_2_11.csv.gz,2023


In [4]:
output_base = PATH_KIOSK_USER_PATTERNS_FOLDER / "working/processed/kupdat03_advan research monthly patterns"



In [5]:
dataset = ds.dataset(output_base, format="parquet", partitioning=["YEAR", "MONTH"])

# Check if the dataset exists and get unique file names
try:
    # Use PyArrow to read the metadata
    table = dataset.to_table(columns=["FILE_NAME","YEAR"])
    # Convert to pandas to get unique values
    unique_files = table.to_pandas().drop_duplicates().reset_index(drop=True)
    # Extract the year as integer from the "YEAR" column (e.g., "YEAR=2023" -> 2023)
    unique_files['YEAR'] = unique_files['YEAR'].str.extract(r'(\d{4})').astype(int)
    print(f"Found {len(unique_files['FILE_NAME'])} unique files in the dataset")

except Exception as e:
    print(f"Error accessing the dataset: {e}")
    # Check if the directory exists
    if not output_base.exists():
        print(f"Directory does not exist: {output_base}")


unique_files.head()

Found 3649 unique files in the dataset


Unnamed: 0,FILE_NAME,YEAR
0,data_01bd7440-0105-dcc9-0042-fa0702ed2712_03_1_24.csv.gz,2019
1,data_01bd7440-0105-dcc9-0042-fa0702ed2712_03_1_20.csv.gz,2019
2,data_01bd7440-0105-dcc9-0042-fa0702ed2712_03_1_19.csv.gz,2019
3,data_01bd7440-0105-dcc9-0042-fa0702ed2712_03_1_5.csv.gz,2019
4,data_01bd7440-0105-dcc9-0042-fa0702ed2712_03_1_27.csv.gz,2019


In [6]:
merged = monthly_patterns_files_df.merge(
        unique_files, 
        left_on=['FILE_NAME', 'YEAR'], 
        right_on=['FILE_NAME', 'YEAR'], 
        how='left',
        indicator=True
    )
    
    # Keep only rows that don't have a match
unprocessed = merged[merged['_merge'] == 'left_only']

unprocessed.shape

(7, 4)

In [7]:
unprocessed.iloc[0]['file_path']

WindowsPath('F:/Advan Research/Monthly Patterns/Foot Traffic/2024/data_01bd7440-0105-dcc9-0042-fa0702ed2712_53_0_54.csv.gz')

In [8]:
error_list = []

import json

for f in unprocessed['file_path']:
    # Read the gzip-compressed CSV file
    print(f"Processing file: {f}")
    
    try:
        df_table = pd.read_csv(f, 
                              compression='gzip', 
                              usecols=cols, 
                              engine='python',
                              on_bad_lines='skip',
                              
                              encoding= 'latin1')

        # Filter rows where LOCATION_NAME contains 'walmart'
        walmart_mask = df_table['LOCATION_NAME'].str.contains('walmart', case=False, na=False)
        df_walmart = (df_table[walmart_mask]
                      .assign(
                          DATE_RANGE_START=lambda x: pd.to_datetime(x['DATE_RANGE_START']))
                      .assign(YEAR =lambda x: x['DATE_RANGE_START'].dt.year,
                              MONTH=lambda x: x['DATE_RANGE_START'].dt.month,
                              FILE_NAME=f.name))



        
        # Expand VISITOR_HOME_CBGS into long format
        df_cbgs = (df_walmart.dropna(subset=['VISITOR_HOME_CBGS']).copy())
        
        df_cbgs = (df_cbgs
                   .assign(VISITOR_HOME_CBGS=df_cbgs['VISITOR_HOME_CBGS'].apply(lambda x: json.loads(x) if isinstance(x, str) else {})
        ).assign(VISITOR_HOME_CBGS=df_cbgs['VISITOR_HOME_CBGS'].apply(safe_parse_json)
        ).assign(VISITOR_HOME_CBGS=df_cbgs['VISITOR_HOME_CBGS'].apply(lambda x: list(x.items()) if isinstance(x, dict) else None))
        )

        # Then explode
        df_long = df_cbgs.explode('VISITOR_HOME_CBGS')
        # df_long = df_long.reset_index(drop=True)

        df_long = (df_long
            .reset_index(drop=True)
            .assign(
                HOME_CBG=lambda x: x['VISITOR_HOME_CBGS'].apply(lambda y: y[0] if isinstance(y, tuple) else None),
                VISITOR_COUNT=lambda x: x['VISITOR_HOME_CBGS'].apply(lambda y: y[1] if isinstance(y, tuple) else None)
            ).drop(columns=['VISITOR_HOME_CBGS'])
        )
        # Save to Parquet with partitioning by YEAR and MONTH
        df_long.to_parquet(
            output_base,
            index=False,
            partition_cols=["YEAR", "MONTH"]
        )
    except Exception as e:
        print(f"Error processing file {f}: {e}")
        error_list.append(f)
        continue








Processing file: F:\Advan Research\Monthly Patterns\Foot Traffic\2024\data_01bd7440-0105-dcc9-0042-fa0702ed2712_53_0_54.csv.gz
Processing file: F:\Advan Research\Monthly Patterns\Foot Traffic\2021\data_01bd7440-0105-dcc9-0042-fa0702ed2712_13_5_49.csv.gz
Processing file: F:\Advan Research\Monthly Patterns\Foot Traffic\2022\data_01bd7440-0105-dcc9-0042-fa0702ed2712_63_2_48.csv.gz
Error processing file F:\Advan Research\Monthly Patterns\Foot Traffic\2022\data_01bd7440-0105-dcc9-0042-fa0702ed2712_63_2_48.csv.gz: Error -3 while decompressing data: invalid stored block lengths
Processing file: F:\Advan Research\Monthly Patterns\Foot Traffic\2022\data_01bd7440-0105-dcc9-0042-fa0702ed2712_13_2_12.csv.gz
Error processing file F:\Advan Research\Monthly Patterns\Foot Traffic\2022\data_01bd7440-0105-dcc9-0042-fa0702ed2712_13_2_12.csv.gz: Compressed file ended before the end-of-stream marker was reached
Processing file: F:\Advan Research\Monthly Patterns\Foot Traffic\2022\data_01bd7440-0105-dcc9-00

In [9]:
pd.Series(error_list).to_csv(PATH_KIOSK_USER_PATTERNS_REPO / "data/kupdat03_advan research monthly patterns errors.csv", index=False)

In [None]:
df_walmart.iloc[0]['FILE_NAME']  # Display the file name of the first row

In [None]:
f

In [None]:
# Convert dictionaries to lists of key-value pairs
df_cbgs['VISITOR_HOME_CBGS'] = df_cbgs['VISITOR_HOME_CBGS'].apply(
    lambda x: list(x.items()) if isinstance(x, dict) else None
)


In [None]:
dataset = ds.dataset(output_base, format="parquet")

df = next(dataset.to_batches(batch_size=10)).to_pandas()
df.head(n = 100)

## 2. Extract Home Panel Summary

In [None]:
home_panel_files = list((PATH_SSD_ADVAN_FOLDER / "Monthly Patterns" / "Home Panel Summary").rglob('**/*.gz'))
len(home_panel_files)

In [None]:
# Extract year from file path using regex (assuming year is a 4-digit number in the path)
home_panel_files_df = pd.DataFrame({
    'file_path': home_panel_files,
    'FILE_NAME': [f.name for f in home_panel_files]
})

pd.options.display.max_colwidth = None  # Show full file paths in the DataFrame
home_panel_files_df.head()

In [None]:
output_base_home_panel = PATH_KIOSK_USER_PATTERNS_FOLDER / "working/processed/kupdat03_advan research home panel summary"


In [None]:
error_list = []


for f in home_panel_files_df['file_path']:
    # Read the gzip-compressed CSV file
    print(f"Processing file: {f}")
    
    try:
        df_table = pd.read_csv(f, 
                              compression='gzip', 
                              engine='python',
                              on_bad_lines='skip')

        # Create a dictionary mapping state abbreviations to full names
        state_abbrev_to_name = {
            'AL': 'Alabama', 'AK': 'Alaska', 'AZ': 'Arizona', 'AR': 'Arkansas', 'CA': 'California',
            'CO': 'Colorado', 'CT': 'Connecticut', 'DE': 'Delaware', 'FL': 'Florida', 'GA': 'Georgia',
            'HI': 'Hawaii', 'ID': 'Idaho', 'IL': 'Illinois', 'IN': 'Indiana', 'IA': 'Iowa',
            'KS': 'Kansas', 'KY': 'Kentucky', 'LA': 'Louisiana', 'ME': 'Maine', 'MD': 'Maryland',
            'MA': 'Massachusetts', 'MI': 'Michigan', 'MN': 'Minnesota', 'MS': 'Mississippi', 'MO': 'Missouri',
            'MT': 'Montana', 'NE': 'Nebraska', 'NV': 'Nevada', 'NH': 'New Hampshire', 'NJ': 'New Jersey',
            'NM': 'New Mexico', 'NY': 'New York', 'NC': 'North Carolina', 'ND': 'North Dakota', 'OH': 'Ohio',
            'OK': 'Oklahoma', 'OR': 'Oregon', 'PA': 'Pennsylvania', 'RI': 'Rhode Island', 'SC': 'South Carolina',
            'SD': 'South Dakota', 'TN': 'Tennessee', 'TX': 'Texas', 'UT': 'Utah', 'VT': 'Vermont',
            'VA': 'Virginia', 'WA': 'Washington', 'WV': 'West Virginia', 'WI': 'Wisconsin', 'WY': 'Wyoming',
            'DC': 'District of Columbia', 'PR': 'Puerto Rico', 'VI': 'Virgin Islands', 'GU': 'Guam',
            'AS': 'American Samoa', 'MP': 'Northern Mariana Islands'
        }
        
        df_usa = (df_table[df_table['ISO_COUNTRY_CODE'] == 'US']
              .rename(columns={'MON': 'MONTH'})
              .assign(FILE_NAME = lambda x: f.name)
              .assign(state_name = lambda x: x['REGION'].map(state_abbrev_to_name)))
        

        
        # Print data summary
        print(f"Found {len(df_usa)} USA records")
        # Save to Parquet with partitioning by YEAR and MONTH
        df_usa.to_parquet(
            output_base_home_panel,
            index=False,
            partition_cols=["YEAR", "MONTH"]
        )
    except Exception as e:
        print(f"Error processing file {f}: {e}")
        error_list.append(f)
        continue