In [52]:
import pandas as pd
import os


In [53]:
# Define file paths and constants
master_file = "individual_user_in_user_ids_file.txt"
checkpoint_file = "completed_individual_user_parquet_files.txt"


In [54]:
# Specify the folder containing the .parquet files
folder_path = "individual_user_extracted"

# List all .parquet files in the folder
parquet_files = [f for f in os.listdir(folder_path) if f.endswith('.parquet')]

# Write file names to the text file
with open(master_file, 'w') as file:
    for parquet_file in parquet_files:
        file.write(parquet_file + '\n')

print(f"File names have been written to {master_file}")        


File names have been written to individual_user_in_user_ids_file.txt


In [55]:
# Read the list of .txt files
with open(master_file, 'r') as f:
    txt_files = [line.strip() for line in f.readlines()]

# Load the list of already processed files
if os.path.exists(checkpoint_file):
    with open(checkpoint_file, 'r') as f:
        completed_files = set(line.strip() for line in f.readlines())
else:
    completed_files = set()


In [56]:
# Filter files that still need to be processed
print(f"Found parquet files: {parquet_files}")
files_to_process = [file for file in txt_files if file not in completed_files]
print(f"Files to process: {files_to_process}")

Found parquet files: ['individual_user_in_user_ids_part_001.parquet', 'individual_user_in_user_ids_part_002.parquet', 'individual_user_in_user_ids_part_003.parquet']
Files to process: ['individual_user_in_user_ids_part_001.parquet', 'individual_user_in_user_ids_part_002.parquet', 'individual_user_in_user_ids_part_003.parquet']


In [57]:
result_folder='us_individual_user_extracted'
os.makedirs(result_folder, exist_ok=True)  # Create the folder if it doesn't exist
# File to store uncategorized users' ids
uncategorized_file = "uncategorized_individual_user.txt"



In [58]:
# Iterate through each file to process
for txt_file in files_to_process:
    file_path = os.path.join(folder_path, txt_file)

    try:
        print(f"Starting to process: {txt_file}")
        # Load parquet file into DataFrame
        df = pd.read_parquet(file_path)

        # Add is_in_us column
        df['is_in_us'] = df['user_country'].apply(
            lambda x: 1 if x == "United States" else (0 if isinstance(x, str) else None)
        )

        # Handle uncategorized rows
        uncategorized_users = df[df['is_in_us'].isnull()]


        if not uncategorized_users.empty:
            with open(uncategorized_file, 'a') as uncategorized:
                for user_id in uncategorized_users['user_id']:
                    uncategorized.write(f"{int(user_id)}\n")
            # Drop uncategorized rows
            df = df.dropna(subset=['is_in_us'])

        # Save the result as a .parquet file
        output_file = os.path.join(result_folder, txt_file)
        df.to_parquet(output_file, index=False)
        print(f"Results saved to {output_file}")
        
        # Update the checkpoint file after successful processing
        with open(checkpoint_file, 'a') as f:
            f.write(txt_file + '\n')
        print(f"Checkpoint updated for {txt_file}. Moving to the next file...\n")
        
    except Exception as e:
        print(f"Failed to process {txt_file}: {e}")
        break  # Stop the process if an error occurs
print("All files processed successfully!")

Starting to process: individual_user_in_user_ids_part_001.parquet
Results saved to us_individual_user_extracted\individual_user_in_user_ids_part_001.parquet
Checkpoint updated for individual_user_in_user_ids_part_001.parquet. Moving to the next file...

Starting to process: individual_user_in_user_ids_part_002.parquet
Results saved to us_individual_user_extracted\individual_user_in_user_ids_part_002.parquet
Checkpoint updated for individual_user_in_user_ids_part_002.parquet. Moving to the next file...

Starting to process: individual_user_in_user_ids_part_003.parquet
Results saved to us_individual_user_extracted\individual_user_in_user_ids_part_003.parquet
Checkpoint updated for individual_user_in_user_ids_part_003.parquet. Moving to the next file...

All files processed successfully!
