In [0]:
import pandas as pd
import random
from datetime import datetime, timedelta
import os

# Set random seed for reproducibility
random.seed(42)

# Mock data lists
first_names = ['John', 'Jane', 'Michael', 'Sarah', 'David', 'Emily', 'Robert', 'Lisa', 'James', 'Maria', 
               'William', 'Jennifer', 'Richard', 'Patricia', 'Charles', 'Linda', 'Joseph', 'Elizabeth', 
               'Thomas', 'Barbara', 'Christopher', 'Susan', 'Daniel', 'Jessica', 'Matthew', 'Karen',
               'Mark', 'Nancy', 'Paul', 'Betty', 'Donald', 'Helen', 'George', 'Sandra', 'Kenneth', 'Donna',
               'Steven', 'Carol', 'Edward', 'Ruth', 'Brian', 'Sharon', 'Ronald', 'Michelle', 'Anthony', 'Laura',
               'Kevin', 'Sarah', 'Jason', 'Kimberly', 'Jeffrey', 'Deborah', 'Ryan', 'Dorothy', 'Jacob', 'Amy',
               'Gary', 'Angela', 'Nicholas', 'Brenda', 'Eric', 'Emma', 'Jonathan', 'Olivia', 'Stephen', 'Cynthia',
               'Larry', 'Marie', 'Justin', 'Janet', 'Scott', 'Catherine', 'Brandon', 'Frances', 'Benjamin', 'Christine']

last_names = ['Smith', 'Johnson', 'Williams', 'Brown', 'Jones', 'Garcia', 'Miller', 'Davis', 'Rodriguez', 
              'Martinez', 'Hernandez', 'Lopez', 'Gonzalez', 'Wilson', 'Anderson', 'Thomas', 'Taylor', 
              'Moore', 'Jackson', 'Martin', 'Lee', 'Perez', 'Thompson', 'White', 'Harris', 'Sanchez',
              'Clark', 'Ramirez', 'Lewis', 'Robinson', 'Walker', 'Young', 'Allen', 'King', 'Wright',
              'Scott', 'Torres', 'Nguyen', 'Hill', 'Flores', 'Green', 'Adams', 'Nelson', 'Baker',
              'Hall', 'Rivera', 'Campbell', 'Mitchell', 'Carter', 'Roberts', 'Gomez', 'Phillips',
              'Evans', 'Turner', 'Diaz', 'Parker', 'Cruz', 'Edwards', 'Collins', 'Reyes', 'Stewart',
              'Morris', 'Morales', 'Murphy', 'Cook', 'Rogers', 'Gutierrez', 'Ortiz', 'Morgan', 'Cooper']

occupations = ['Software Engineer', 'Teacher', 'Doctor', 'Nurse', 'Accountant', 'Marketing Manager', 
               'Sales Representative', 'Data Analyst', 'Project Manager', 'Graphic Designer', 
               'Lawyer', 'Consultant', 'Engineer', 'Administrator', 'Researcher', 'Writer', 
               'Chef', 'Mechanic', 'Electrician', 'Pharmacist', 'Architect', 'Therapist',
               'Financial Advisor', 'HR Manager', 'Operations Manager', 'Business Analyst',
               'Product Manager', 'UX Designer', 'DevOps Engineer', 'Quality Assurance',
               'Customer Service Rep', 'Real Estate Agent', 'Insurance Agent', 'Banker']

def generate_email(first_name, last_name):
    """Generate a mock email address"""
    domains = ['gmail.com', 'yahoo.com', 'hotmail.com', 'outlook.com', 'company.com', 'email.com']
    return f"{first_name.lower()}.{last_name.lower()}@{random.choice(domains)}"

def generate_dob():
    """Generate a random date of birth between 1950 and 2000"""
    start_date = datetime(1950, 1, 1)
    end_date = datetime(2000, 12, 31)
    time_between = end_date - start_date
    days_between = time_between.days
    random_days = random.randrange(days_between)
    return (start_date + timedelta(days=random_days)).strftime('%Y-%m-%d')

def generate_phone():
    """Generate a mock phone number"""
    # Sometimes return None to simulate missing data
    if random.random() < 0.1:  # 10% chance of missing phone
        return None
    return f"+1-{random.randint(200,999)}-{random.randint(200,999)}-{random.randint(1000,9999)}"

def generate_person():
    """Generate a single person's data"""
    first_name = random.choice(first_names)
    last_name = random.choice(last_names)
    
    # Sometimes make name None to simulate missing data
    name = f"{first_name} {last_name}" if random.random() > 0.05 else None  # 5% chance of missing name
    
    return {
        'Name': name,
        'email': generate_email(first_name, last_name),  # Email is never empty as required
        'DOB': generate_dob() if random.random() > 0.08 else None,  # 8% chance of missing DOB
        'occupation': random.choice(occupations) if random.random() > 0.12 else None,  # 12% chance of missing occupation
        'phone_number': generate_phone()
    }

# Generate a pool of 200 people that can be reused across CSVs
people_pool = [generate_person() for _ in range(200)]

print(f"Generated {len(people_pool)} unique people for the pool")
print("\nSample person:")
print(people_pool[0])

In [0]:
# Configuration for catalog storage
catalog = "agustin_training_catalog"
schema = "dev_agustin_panizza_sfmc"
table_name = "mock_files"
full_table_name = f"{catalog}.{schema}.{table_name}"

# Create directory for temporary CSV files
csv_dir = f"/Volumes/{catalog}/{schema}/mock_files"
os.makedirs(csv_dir, exist_ok=True)

# Create 10 different CSV files with some overlapping people
csv_files_data = []
all_file_data = []  # To store all data for the catalog table

for i in range(1, 11):  # Create 10 files
    # Each CSV will have exactly 30 records
    num_records = 30
    
    # Mix of people from the pool (to create overlaps) and new people
    csv_data = []
    
    # Add some people from the pool (for overlap)
    pool_people_count = random.randint(15, 25)  # 15-25 people from pool
    selected_pool_people = random.sample(people_pool, pool_people_count)
    csv_data.extend(selected_pool_people)
    
    # Add some new people specific to this CSV
    remaining_count = num_records - len(csv_data)
    for _ in range(remaining_count):
        csv_data.append(generate_person())
    
    # Shuffle the data
    random.shuffle(csv_data)
    
    # Create DataFrame and save to temporary CSV
    df = pd.DataFrame(csv_data)
    csv_filename = f"{csv_dir}/people_data_{i:02d}.csv"
    df.to_csv(csv_filename, index=False)
    
    # Add file metadata for catalog storage
    df['file_name'] = f"people_data_{i:02d}.csv"
    df['file_id'] = i
    df['created_timestamp'] = datetime.now()
    
    # Add to all_file_data for catalog storage
    all_file_data.append(df)
    
    csv_files_data.append({
        'filename': csv_filename,
        'file_id': i,
        'records': len(df) - 3,  # Subtract metadata columns
        'non_null_emails': df['email'].notna().sum(),
        'null_names': df['Name'].isna().sum(),
        'null_dobs': df['DOB'].isna().sum(),
        'null_occupations': df['occupation'].isna().sum(),
        'null_phones': df['phone_number'].isna().sum()
    })
    
    print(f"Created {csv_filename} with {num_records} records")

# Combine all DataFrames for catalog storage
combined_df = pd.concat(all_file_data, ignore_index=True)
spark_df = spark.createDataFrame(combined_df)

# Store in catalog table
print(f"\nüìä Storing all data in catalog table: {full_table_name}")
try:
    # Drop table if exists and create new one
    spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")
    spark_df.write.mode("overwrite").saveAsTable(full_table_name)
    print(f"‚úÖ Successfully stored {len(combined_df)} records in {full_table_name}")
except Exception as e:
    print(f"‚ùå Error storing in catalog: {e}")
    print("Will continue with local CSV files only")

print(f"\n‚úÖ Created {len(csv_files_data)} CSV files in {csv_dir}")
print(f"üìä Total records across all files: {sum(info['records'] for info in csv_files_data)}")
print("\nüìã Summary of created files:")
for file_info in csv_files_data:
    print(f"\nFile {file_info['file_id']:2d}: {file_info['filename']}")
    print(f"  - Total records: {file_info['records']}")
    print(f"  - Non-null emails: {file_info['non_null_emails']} (should equal total records)")
    print(f"  - Null names: {file_info['null_names']}")
    print(f"  - Null DOBs: {file_info['null_dobs']}")
    print(f"  - Null occupations: {file_info['null_occupations']}")
    print(f"  - Null phone numbers: {file_info['null_phones']}")

In [0]:
# Display sample data from the catalog table and individual files
print("üìã Sample data from catalog table and individual CSV files:\n")

# Show catalog table summary
try:
    print(f"=== Catalog Table: {full_table_name} ===")
    catalog_df = spark.table(full_table_name)
    print(f"Total records in catalog: {catalog_df.count()}")
    print(f"Unique files: {catalog_df.select('file_id').distinct().count()}")
    
    print("\nSample records from catalog table:")
    catalog_df.select("Name", "email", "DOB", "occupation", "phone_number", "file_name", "file_id").limit(10).display()
    
    print("\nRecords per file in catalog:")
    catalog_df.groupBy("file_id", "file_name").count().orderBy("file_id").display()
    
except Exception as e:
    print(f"‚ùå Error reading from catalog table: {e}")
    print("Showing local CSV files instead...")

print("\n" + "="*80)
print("\nüìÅ Individual CSV Files Sample Data:\n")

# Display sample from first 3 CSV files
for i in range(1, min(4, len(csv_files_data) + 1)):
    file_info = csv_files_data[i-1]
    print(f"=== CSV File {file_info['file_id']}: {file_info['filename']} ===")
    
    # Read and display first 5 rows
    df = pd.read_csv(file_info['filename'])
    print(f"Shape: {df.shape}")
    print("\nFirst 5 rows:")
    display(df.head())
    
    # Show data quality summary
    print("\nData Quality Summary:")
    print(df.isnull().sum())
    print("\n" + "="*60 + "\n")

if len(csv_files_data) > 3:
    print(f"... and {len(csv_files_data) - 3} more files with similar structure")

In [0]:
# Analyze overlaps between CSV files by checking email addresses
all_emails = set()
file_emails = {}
overlapping_emails = set()

print("üîç Analyzing email overlaps between 10 CSV files:\n")

for i, file_info in enumerate(csv_files_data, 1):
    df = pd.read_csv(file_info['filename'])
    file_emails[f"CSV_{i:02d}"] = set(df['email'].dropna())
    
    # Check for overlaps with previously processed files
    current_overlaps = all_emails.intersection(file_emails[f"CSV_{i:02d}"])
    if current_overlaps:
        overlapping_emails.update(current_overlaps)
        print(f"CSV {i:2d} has {len(current_overlaps):2d} overlapping emails with previous files")
        if len(current_overlaps) <= 5:
            print(f"       Overlapping emails: {list(current_overlaps)}")
        else:
            print(f"       Sample overlapping emails: {list(current_overlaps)[:5]}...")
    else:
        print(f"CSV {i:2d} has  0 overlapping emails with previous files")
    
    all_emails.update(file_emails[f"CSV_{i:02d}"])
    print(f"       CSV {i:2d} contains {len(file_emails[f'CSV_{i:02d}']):2d} unique emails")
    print()

print(f"\nüìä Overall Statistics:")
print(f"Total unique emails across all files: {len(all_emails)}")
print(f"Total overlapping emails: {len(overlapping_emails)}")
print(f"Overlap percentage: {len(overlapping_emails)/len(all_emails)*100:.1f}%")
print(f"Average emails per file: {sum(len(emails) for emails in file_emails.values()) / len(file_emails):.1f}")

# Show pairwise overlaps (only show significant ones to avoid clutter)
print("\nüîó Significant pairwise overlaps between files (>= 3 shared emails):")
overlap_count = 0
for i in range(1, len(csv_files_data) + 1):
    for j in range(i + 1, len(csv_files_data) + 1):
        overlap = file_emails[f"CSV_{i:02d}"].intersection(file_emails[f"CSV_{j:02d}"])
        if len(overlap) >= 3:  # Only show significant overlaps
            print(f"CSV {i:2d} ‚Üî CSV {j:2d}: {len(overlap):2d} shared emails")
            overlap_count += 1

if overlap_count == 0:
    print("No significant pairwise overlaps found (all overlaps < 3 emails)")

# Show distribution of overlap sizes
print("\nüìà Overlap Distribution:")
overlap_sizes = []
for i in range(1, len(csv_files_data) + 1):
    for j in range(i + 1, len(csv_files_data) + 1):
        overlap_size = len(file_emails[f"CSV_{i:02d}"].intersection(file_emails[f"CSV_{j:02d}"]))
        overlap_sizes.append(overlap_size)

if overlap_sizes:
    print(f"Min overlap: {min(overlap_sizes)} emails")
    print(f"Max overlap: {max(overlap_sizes)} emails")
    print(f"Avg overlap: {sum(overlap_sizes)/len(overlap_sizes):.1f} emails")
    print(f"Total pairwise comparisons: {len(overlap_sizes)}")

# Verify catalog table data if available
try:
    print("\nüìä Catalog Table Verification:")
    catalog_df = spark.table(full_table_name)
    unique_emails_catalog = catalog_df.select("email").distinct().count()
    total_records_catalog = catalog_df.count()
    print(f"Unique emails in catalog: {unique_emails_catalog}")
    print(f"Total records in catalog: {total_records_catalog}")
    print(f"Duplicate email rate: {(total_records_catalog - unique_emails_catalog) / total_records_catalog * 100:.1f}%")
except:
    print("\n‚ö†Ô∏è  Catalog table not available for verification")

In [0]:
# List all created CSV files and catalog table information
print("üìÅ Created CSV files and catalog table ready for Auto Loader processing:\n")

# Show catalog table information first
print(f"üìä Catalog Table: {full_table_name}")
try:
    catalog_df = spark.table(full_table_name)
    total_size_estimate = catalog_df.count() * 100  # Rough estimate
    print(f"   ‚úÖ Successfully stored in catalog")
    print(f"   Records: {catalog_df.count():,}")
    print(f"   Files represented: {catalog_df.select('file_id').distinct().count()}")
    print(f"   Columns: {', '.join(catalog_df.columns)}")
    print(f"   Estimated size: ~{total_size_estimate:,} bytes")
    print(f"   Table location: {full_table_name}")
except Exception as e:
    print(f"   ‚ùå Catalog table not accessible: {e}")

print(f"\nüìÅ Individual CSV Files:")
import glob
csv_files = sorted(glob.glob(f"{csv_dir}/*.csv"))

total_size = 0
total_records = 0

for i, filepath in enumerate(csv_files, 1):
    file_size = os.path.getsize(filepath)
    df = pd.read_csv(filepath)
    total_size += file_size
    total_records += len(df)
    
    print(f"{i:2d}. {filepath}")
    print(f"    Size: {file_size:,} bytes")
    print(f"    Records: {len(df):,}")
    print(f"    Columns: {', '.join(df.columns)}")
    print()


print(f"‚úÖ Summary:")
print(f"   Total CSV files: {len(csv_files)}")
print(f"   Total records: {total_records:,}")
print(f"   Total size: {total_size:,} bytes ({total_size/1024:.1f} KB)")
print(f"   Average records per file: {total_records/len(csv_files):.0f}")
print(f"   Average file size: {total_size/len(csv_files):.0f} bytes")

print(f"\nüí° Next steps for Auto Loader:")
print(f"1. Option A - Use Catalog Table:")
print(f"   - Query data directly from: {full_table_name}")
print(f"   - Filter by file_id or file_name for specific files")
print(f"   - Use for batch processing or analysis")
print(f"")
print(f"2. Option B - Use CSV Files in Volume:")
print(f"   - Copy CSV files to your Volume paths:")
print(f"     * For ADD operations: /Volumes/{{catalog}}/{{schema}}/{{volume}}/folder*/add/")
print(f"     * For REMOVE operations: /Volumes/{{catalog}}/{{schema}}/{{volume}}/folder*/remove/")
print(f"   - Run Auto Loader to process streaming files")
print(f"")
print(f"3. Data Distribution Recommendation:")
print(f"   - Files 01-05: Use for ADD operations (150 records total)")
print(f"   - Files 06-10: Use for REMOVE operations (150 records total)")
print(f"   - This ensures balanced testing of both ADD and REMOVE streams")

# Show the expected directory structure
print(f"\nüìÇ Recommended Volume directory structure:")
print(f"/Volumes/{{catalog}}/{{schema}}/{{volume}}/")
print(f"‚îú‚îÄ‚îÄ batch1/")
print(f"‚îÇ   ‚îî‚îÄ‚îÄ add/")
for i in range(1, 6):
    print(f"‚îÇ       ‚îú‚îÄ‚îÄ people_data_{i:02d}.csv")
print(f"‚îú‚îÄ‚îÄ batch2/")
print(f"‚îÇ   ‚îî‚îÄ‚îÄ remove/")
for i in range(6, 11):
    print(f"‚îÇ       {'‚îî' if i == 10 else '‚îú'}‚îÄ‚îÄ people_data_{i:02d}.csv")
print(f"‚îî‚îÄ‚îÄ checkpoints/")
print(f"    ‚îú‚îÄ‚îÄ add_stream/")
print(f"    ‚îî‚îÄ‚îÄ remove_stream/")

print(f"\nüîç Quick verification query for catalog table:")
print(f"SELECT file_name, COUNT(*) as record_count")
print(f"FROM {full_table_name}")
print(f"GROUP BY file_name, file_id")
print(f"ORDER BY file_id;")

# Execute the verification query if possible
try:
    print(f"\nüìã Executing verification query:")
    verification_df = spark.sql(f"""
        SELECT file_name, file_id, COUNT(*) as record_count
        FROM {full_table_name}
        GROUP BY file_name, file_id
        ORDER BY file_id
    """)
    verification_df.display()
except:
    print(f"\n‚ö†Ô∏è  Could not execute verification query on catalog table")

In [0]:
import string
import random

result = [f"#{letter}{str(digit1)}{str(digit2)}" for letter in string.ascii_uppercase for digit1 in range(10) for digit2 in range(10)]

def generate_list_to_segment_table(list_of_list_keys):
  buckets = {'blue': [], 'red': [], 'green': []}
  for coso in list_of_list_keys:
    bucket = random.choice(['blue', 'red', 'green'])
    buckets[bucket].append(coso)
  for color, items in buckets.items():
    print(f"{color}: {items}")
  # Create DataFrame from buckets
  data = [(color, item) for color, items in buckets.items() for item in items]
  df = spark.createDataFrame(data, schema=["bucket", "key"])
  return df

keys = random.sample(result, 20)
df = generate_list_to_segment_table(keys)

display(df)
df.write.mode('overwrite').saveAsTable("agustin_training_catalog.dev_agustin_panizza_sfmc.segments_keys")

In [0]:
# --- Generate 300 unique new people and save to CSV ---
unique_new_people = []
unique_emails = set()
while len(unique_new_people) < 300:
    person = generate_person()
    # Ensure email is unique and not in people_pool
    if person['email'] not in unique_emails and all(p['email'] != person['email'] for p in people_pool):
        unique_new_people.append(person)
        unique_emails.add(person['email'])

new_people_df = pd.DataFrame(unique_new_people)

# Add a 15-character random id to the DataFrame using Python
import string
import random

def random_string(length=15):
    chars = string.ascii_letters + string.digits
    return ''.join(random.choices(chars, k=length))

new_people_df['random_id'] = [random_string(15) for _ in range(len(new_people_df))]

new_people_df['list_keys'] = [''.join(random.sample(keys, k=random.randint(0, 3))) for _ in range(len(new_people_df))]

display(new_people_df)


catalog = "agustin_training_catalog"
schema = "dev_agustin_panizza_sfmc"
new_csv_filename = f"/Volumes/{catalog}/{schema}/mock_files/people_data_historic.csv"

# Create directory for temporary CSV files

new_people_df.to_csv(new_csv_filename, index=False)

print(f"\n‚úÖ Created new file with 300 unique people: {new_csv_filename}")
print(f"Shape: {new_people_df.shape}")
print("\nSample records:")
print(new_people_df.head())
print("\nData Quality Summary:")
print(new_people_df.isnull().sum())