In [16]:
# import psycopg2
# from dotenv import load_dotenv
# import os

# # Load environment variables from .env
# load_dotenv()

# # Fetch variables
# USER = os.getenv("USER")
# PASSWORD = os.getenv("PASSWORD")
# HOST = os.getenv("HOST")
# PORT = os.getenv("PORT")
# DBNAME = os.getenv("DBNAME")

# # Connect to the database
# try:
#     connection = psycopg2.connect(
#         user=USER, password=PASSWORD, host=HOST, port=PORT, dbname=DBNAME
#     )
#     print(
#         "Connection successful as user: ",
#         USER,
#         "on database: ",
#         DBNAME,
#         "password: ",
#         PASSWORD,
#     )
#     # Create a cursor to execute SQL queries
#     cursor = connection.cursor()
#     # Example query
#     cursor.execute("SELECT NOW();")
#     result = cursor.fetchone()
#     print("Current Time:", result)
#     # Close the cursor and connection
#     cursor.close()
#     connection.close()
#     print("Connection closed.")

# except Exception as e:
#     print(f"Failed to connect: {e}")

In [17]:
# import os
# from supabase import create_client, Client
# from dotenv import load_dotenv

# # Load environment variables from .env
# load_dotenv()

# try:
#     # Initialize Supabase client
#     supabase: Client = create_client(
#         supabase_url=os.getenv("SUPABASE_URL"), supabase_key=os.getenv("SUPABASE_KEY")
#     )

#     # Test the connection with a simple query
#     response = supabase.table("test").select("*").limit(1).execute()
#     print("Connection successful!")
#     print("Sample data:", response.data)

# except Exception as e:
#     print(f"Failed to connect: {e}")

Connection successful!
Sample data: [{'id': 56, 'created_at': '2025-02-14T07:36:15+00:00'}]


In [18]:
%%time
import ijson
import pandas as pd
from pathlib import Path

file_path = f"/Users/krish/Projects/CxC_Data/2024/amplitude_export_chunk_1_anonymized.json"

# non-empty columns
columns_keep = [
    "$insert_id",
    "amplitude_id",
    "app",
    "city",
    "client_event_time",
    "client_upload_time",
    "country",
    "data",
    "data_type",
    "device_family",
    "device_id",
    "device_type",
    "dma",
    "event_id",
    "event_properties",
    "event_time",
    "event_type",
    "language",
    "library",
    "os_name",
    "os_version",
    "platform",
    "processed_time",
    "region",
    "server_received_time",
    "server_upload_time",
    "session_id",
    "user_id",
    "user_properties",
    "uuid",
]

with open(file_path, "r") as f:
    # Get just the first item from the JSON file
    objects = ijson.items(f, "item")
    first_row = next(objects)
    
    # Convert to DataFrame and select desired columns
    df = pd.DataFrame([first_row])
    df = df[columns_keep]
    
    print("First row of data:")
    print(df)

First row of data:
                             $insert_id  amplitude_id     app       city  \
0  b5aa91c6-ac17-4bfe-8def-df50869540b0  857540442426  591532  Covington   

            client_event_time          client_upload_time        country  \
0  2024-06-14 23:06:34.898000  2024-06-14 23:06:35.998000  United States   

                                                data data_type device_family  \
0  {'path': '/2/httpapi', 'group_first_event': {}...     event       Windows   

   ... os_version platform              processed_time   region  \
0  ...        125      Web  2024-06-14 23:06:36.809000  Georgia   

         server_received_time          server_upload_time     session_id  \
0  2024-06-14 23:06:35.998000  2024-06-14 23:06:36.007000  1718399623706   

                                user_id  \
0  01708ccf-437b-44ed-b5a0-2fed8f7761d3   

                                     user_properties  \
0  {'initial_utm_medium': 'EMPTY', 'initial_refer...   

                          

In [24]:
%%time
import ijson
import pandas as pd
import json
from pathlib import Path

file_path = f"/Users/krish/Projects/CxC_Data/2024/amplitude_export_chunk_1_anonymized.json"

# Important columns with their preprocessing requirements
columns_keep = {
    # PDNP (Probably Doesn't Need Preprocessing)
    'uuid': {'type': 'PDNP'},  # Added as primary interaction ID
    'amplitude_id': {'type': 'PDNP'},
    'app': {'type': 'PDNP'},
    'data_type': {'type': 'PDNP'},
    'device_family': {'type': 'PDNP'},
    'device_id': {'type': 'PDNP'},
    'device_type': {'type': 'PDNP'},
    'event_type': {'type': 'PDNP'},
    'language': {'type': 'PDNP'},
    'os_name': {'type': 'PDNP'},
    'platform': {'type': 'PDNP'},
    
    # PNP (Probably Needs Preprocessing)
    'data': {'type': 'PNP'},
    'dma': {'type': 'PNP'},
    'event_id': {'type': 'PNP'},
    'event_properties': {'type': 'PNP'},
    'user_properties': {'type': 'PNP'},
    
    # Location fields
    'city': {'type': 'location'},
    'country': {'type': 'location'},
    'region': {'type': 'location'},
    
    # Time and Session
    'event_time': {'type': 'time'},
    'session_id': {'type': 'session'}
}

def preprocess_row(row):
    # Convert row to DataFrame for easier handling
    df = pd.DataFrame([row])
    
    # 1. Basic null check for PDNP fields
    pdnp_columns = [col for col, info in columns_keep.items() if info['type'] == 'PDNP']
    if df[pdnp_columns].isnull().any(axis=1).iloc[0]:
        print("Row dropped: Missing values in PDNP fields")
        return None
    
    # 2. Validate UUID format
    uuid = df['uuid'].iloc[0]
    if not isinstance(uuid, str) or len(uuid.split('-')) != 5:
        print("Row dropped: Invalid UUID format")
        return None
    
    # 3. Location validation
    if df['country'].iloc[0] == "United States":
        if pd.isnull(df['dma'].iloc[0]):
            print("Row dropped: Missing DMA for US location")
            return None
    
    # 4. Validate event_id is numeric
    try:
        event_id = int(df['event_id'].iloc[0])
        if event_id <= 0:
            print("Row dropped: Invalid event_id")
            return None
    except (ValueError, TypeError):
        print("Row dropped: Non-numeric event_id")
        return None
    
    # 5. Validate event_properties and user_properties are proper JSON
    for prop_field in ['event_properties', 'user_properties']:
        if not isinstance(df[prop_field].iloc[0], dict):
            print(f"Row dropped: Invalid {prop_field} format")
            return None
    
    # 6. Validate data field
    if not isinstance(df['data'].iloc[0], dict):
        print("Row dropped: Invalid data format")
        return None
    
    # 7. Validate session_id is numeric
    try:
        session_id = int(df['session_id'].iloc[0])
        if session_id <= 0:
            print("Row dropped: Invalid session_id")
            return None
    except (ValueError, TypeError):
        print("Row dropped: Non-numeric session_id")
        return None
    
    # 8. Validate event_time format
    try:
        pd.to_datetime(df['event_time'].iloc[0])
    except (ValueError, TypeError):
        print("Row dropped: Invalid event_time format")
        return None
    
    return df[columns_keep.keys()]

# Read and process the first row
with open(file_path, "r") as f:
    objects = ijson.items(f, "item")
    first_row = next(objects)
    
    # Process the row
    processed_df = preprocess_row(first_row)
    
    if processed_df is not None:
        print("\nProcessed row:")
        print(processed_df)
    else:
        print("\nRow was filtered out due to preprocessing criteria")


Processed row:
                                   uuid  amplitude_id     app data_type  \
0  f10e3860-18f7-4e43-93d9-29bda5edb636  857540442426  591532     event   

  device_family                             device_id device_type  \
0       Windows  449d7b98-0a1c-4f2d-bbe6-9b889d4d8e27     Windows   

                     event_type language os_name  ...  \
0  account-lines::widget:render  English  Chrome  ...   

                                                data          dma event_id  \
0  {'path': '/2/httpapi', 'group_first_event': {}...  Atlanta, GA     8247   

                                    event_properties  \
0  {'displayName': 'Policy Detail Card - EX', 'id...   

                                     user_properties       city  \
0  {'initial_utm_medium': 'EMPTY', 'initial_refer...  Covington   

         country   region                  event_time     session_id  
0  United States  Georgia  2024-06-14 23:06:34.898000  1718399623706  

[1 rows x 21 columns]
CPU times

In [31]:
%%time
import ijson
import pandas as pd
import json
from pathlib import Path
from supabase import create_client, Client
from dotenv import load_dotenv
import os

file_path = f"/Users/krish/Projects/CxC_Data/2024/amplitude_export_chunk_1_anonymized.json"

# Important columns with their Supabase types
columns_keep = {
    # UUID fields
    'id': {'type': 'uuid', 'source': 'uuid'},  # Primary key from uuid field
    'device_id': {'type': 'uuid'},
    
    # Integer fields
    'amplitude_id': {'type': 'int8'},
    'app': {'type': 'int8'},
    'event_id': {'type': 'int8'},
    'session_id': {'type': 'int8'},
    
    # Text fields
    'city': {'type': 'text'},
    'country': {'type': 'text'},
    'data_type': {'type': 'text'},
    'device_family': {'type': 'text'},
    'device_type': {'type': 'text'},
    'dma': {'type': 'text'},
    'event_type': {'type': 'text'},
    'language': {'type': 'text'},
    'os_name': {'type': 'text'},
    'platform': {'type': 'text'},
    'region': {'type': 'text'},
    
    # JSON fields
    'data': {'type': 'json'},
    'event_properties': {'type': 'json'},
    'user_properties': {'type': 'json'},
    
    # Timestamp field
    'event_time': {'type': 'timestamp'},
}

def clean_user_properties(props):
    """Convert 'EMPTY' string values to None in user_properties and remove trackingVersion"""
    if not isinstance(props, dict):
        return props
    
    cleaned = {}
    for key, value in props.items():
        # Skip trackingVersion field
        if key == 'trackingVersion':
            continue
            
        if isinstance(value, str) and value == "EMPTY":
            cleaned[key] = None
        elif isinstance(value, list):
            cleaned[key] = value  # Preserve lists (like roles) as is
        else:
            cleaned[key] = value
    return cleaned

def clean_empty_objects(data):
    """Convert empty nested objects to None in data column"""
    if not isinstance(data, dict):
        return data
    
    cleaned = {}
    for key, value in data.items():
        if isinstance(value, dict) and len(value) == 0:
            cleaned[key] = None
        else:
            cleaned[key] = value
    return cleaned

def is_empty_value(val):
    """Check if a value should be considered empty"""
    if pd.isna(val):  # Checks for None and NaN
        return True
    if isinstance(val, str) and val.strip() == "":  # Empty strings or whitespace
        return True
    if val == "None":  # String "None"
        return True
    if isinstance(val, (dict, list)) and len(val) == 0:  # Empty collections
        return True
    return False

def preprocess_row(row):
    # Convert row to DataFrame for easier handling
    df = pd.DataFrame([row])
    
    # Create id from uuid
    df['id'] = df['uuid']

    # Clean user_properties before validation
    if 'user_properties' in df.columns:
        df['user_properties'] = df['user_properties'].apply(clean_user_properties)
    
    # Clean data column before validation
    if 'data' in df.columns:
        df['data'] = df['data'].apply(clean_empty_objects)
    
    # Type validations
    for col, info in columns_keep.items():
        # Get the value for the current column
        val = df[col].iloc[0]
        
        # Check for empty values based on column type
        if is_empty_value(val):
            print(f"Row dropped: Empty value in {col}")
            return None
            
        # Handle special case for id field
        if col == 'id' and info['source'] == 'uuid':
            continue
            
        # UUID validation
        if info['type'] == 'uuid':
            if not isinstance(val, str) or len(val.split('-')) != 5:
                print(f"Row dropped: Invalid UUID format for {col}")
                return None
        
        # Integer validation
        elif info['type'] == 'int8':
            try:
                df[col] = pd.to_numeric(df[col], downcast='integer')
                if df[col].iloc[0] <= 0:
                    print(f"Row dropped: Invalid value for {col}")
                    return None
            except (ValueError, TypeError):
                print(f"Row dropped: Non-numeric {col}")
                return None
        
        # JSON validation
        elif info['type'] == 'json':
            if not isinstance(val, dict):
                print(f"Row dropped: Invalid JSON format for {col}")
                return None
            if len(val) == 0:
                print(f"Row dropped: Empty JSON object in {col}")
                return None
        
        # Text validation
        elif info['type'] == 'text':
            if not isinstance(val, str):
                df[col] = str(val)  # Convert to string if possible
            if len(df[col].iloc[0].strip()) == 0:
                print(f"Row dropped: Empty text in {col}")
                return None
        
        # Timestamp validation
        elif info['type'] == 'timestamp':
            try:
                df[col] = pd.to_datetime(df[col])
            except (ValueError, TypeError):
                print(f"Row dropped: Invalid timestamp for {col}")
                return None
    
    # Location validation for US entries
    if df['country'].iloc[0] == "United States":
        if pd.isnull(df['dma'].iloc[0]) or df['dma'].iloc[0].strip() == "":
            print("Row dropped: Missing DMA for US location")
            return None
    
    return df[columns_keep.keys()]

# Read and process the first row
with open(file_path, "r") as f:
    objects = ijson.items(f, "item")
    first_row = next(objects)
    
    # Process the row
    processed_df = preprocess_row(first_row)
    
    if processed_df is not None:
        print("\nProcessed row:")
        print(processed_df)
        print("\nCleaned user_properties:")
        print(json.dumps(processed_df['user_properties'].iloc[0], indent=2))
        print("\nCleaned data:")
        print(json.dumps(processed_df['data'].iloc[0], indent=2))
    else:
        print("\nRow was filtered out due to preprocessing criteria")


Processed row:
                                     id                             device_id  \
0  f10e3860-18f7-4e43-93d9-29bda5edb636  449d7b98-0a1c-4f2d-bbe6-9b889d4d8e27   

   amplitude_id     app  event_id     session_id       city        country  \
0  857540442426  591532      8247  1718399623706  Covington  United States   

  data_type device_family  ...          dma                    event_type  \
0     event       Windows  ...  Atlanta, GA  account-lines::widget:render   

  language os_name platform   region  \
0  English  Chrome      Web  Georgia   

                                                data  \
0  {'path': '/2/httpapi', 'group_first_event': No...   

                                    event_properties  \
0  {'displayName': 'Policy Detail Card - EX', 'id...   

                                     user_properties              event_time  
0  {'initial_utm_medium': None, 'initial_referrin... 2024-06-14 23:06:34.898  

[1 rows x 21 columns]

Cleaned user_propert

In [32]:
%%time
# Initialize Supabase client
load_dotenv()
supabase: Client = create_client(
    supabase_url=os.getenv("SUPABASE_URL"),
    supabase_key=os.getenv("SUPABASE_KEY")
)

# If we have processed data from previous cell
if processed_df is not None:
    try:
        # Convert DataFrame to dictionary format
        row_dict = processed_df.iloc[0].to_dict()
        
        # Convert timestamp to ISO format string
        row_dict['event_time'] = row_dict['event_time'].isoformat()
        
        # Insert data into Supabase
        response = supabase.table('federato_amplitude_data').insert(row_dict).execute()
        
        print("Data inserted successfully!")
        print("Response:", response)
        
    except Exception as e:
        print(f"Error inserting data: {e}")
else:
    print("No valid data to insert")

Data inserted successfully!
Response: data=[{'id': 'f10e3860-18f7-4e43-93d9-29bda5edb636', 'amplitude_id': 857540442426, 'app': 591532, 'city': 'Covington', 'country': 'United States', 'data': {'path': '/2/httpapi', 'group_first_event': None, 'group_ids': None}, 'data_type': 'event', 'device_family': 'Windows', 'device_id': '449d7b98-0a1c-4f2d-bbe6-9b889d4d8e27', 'device_type': 'Windows', 'dma': 'Atlanta, GA', 'event_id': 8247, 'event_properties': {'displayName': 'Policy Detail Card - EX', 'id': '36c45c49-d9db-4f8e-937c-63ebcd242c35', 'type': 'built-in', 'slug': 'policy-detail-card'}, 'event_time': '2024-06-14T23:06:34.898', 'event_type': 'account-lines::widget:render', 'language': 'English', 'os_name': 'Chrome', 'platform': 'Web', 'region': 'Georgia', 'session_id': 1718399623706, 'user_properties': {'initial_utm_medium': None, 'initial_referring_domain': None, 'initial_utm_content': None, 'roles': ['underwriter'], 'isInternalUser': 'False', 'initial_utm_campaign': None, 'initial_twcli

In [41]:
%%time
import ijson
import pandas as pd
import json
from pathlib import Path
from supabase import create_client, Client
from dotenv import load_dotenv
import os

file_path = f"/Users/krish/Projects/CxC_Data/2024/amplitude_export_chunk_1_anonymized.json"

# Important columns with their Supabase types
columns_keep = {
    # UUID fields
    'id': {'type': 'uuid', 'source': 'uuid', 'nullable': False},  
    'device_id': {'type': 'uuid', 'nullable': False},
    
    # Integer fields
    'amplitude_id': {'type': 'int8', 'nullable': False},
    'app': {'type': 'int8', 'nullable': False},
    'event_id': {'type': 'int8', 'nullable': False},
    'session_id': {'type': 'int8', 'nullable': False},
    
    # Text fields - some allowed to be null
    'city': {'type': 'text', 'nullable': True},
    'country': {'type': 'text', 'nullable': False},
    'data_type': {'type': 'text', 'nullable': False},
    'device_family': {'type': 'text', 'nullable': True},
    'device_type': {'type': 'text', 'nullable': True},
    'dma': {'type': 'text', 'nullable': True},
    'event_type': {'type': 'text', 'nullable': False},
    'language': {'type': 'text', 'nullable': False},
    'os_name': {'type': 'text', 'nullable': False},
    'platform': {'type': 'text', 'nullable': False},
    'region': {'type': 'text', 'nullable': False},
    
    # JSON fields
    'data': {'type': 'json', 'nullable': False},
    'user_properties': {'type': 'json', 'nullable': True},
    
    # Timestamp field
    'event_time': {'type': 'timestamp', 'nullable': False},
}

def clean_user_properties(props):
    """Convert 'EMPTY' string values to None in user_properties and remove trackingVersion"""
    if not isinstance(props, dict):
        return props
    
    cleaned = {}
    for key, value in props.items():
        # Skip trackingVersion field
        if key == 'trackingVersion':
            continue
            
        if isinstance(value, str) and value == "EMPTY":
            cleaned[key] = None
        elif isinstance(value, list):
            cleaned[key] = value  # Preserve lists (like roles) as is
        else:
            cleaned[key] = value
    return cleaned

def clean_empty_objects(data):
    """Convert empty nested objects to None in data column"""
    if not isinstance(data, dict):
        return data
    
    cleaned = {}
    for key, value in data.items():
        if isinstance(value, dict) and len(value) == 0:
            cleaned[key] = None
        else:
            cleaned[key] = value
    return cleaned

def is_empty_value(val):
    """Check if a value should be considered empty"""
    if pd.isna(val):  # Checks for None and NaN
        return True
    if isinstance(val, str) and val.strip() == "":  # Empty strings or whitespace
        return True
    if val == "None":  # String "None"
        return True
    if isinstance(val, (dict, list)) and len(val) == 0:  # Empty collections
        return True
    return False

def preprocess_row(row):
    # Convert row to DataFrame for easier handling
    df = pd.DataFrame([row])
    
    # Create id from uuid
    df['id'] = df['uuid']

    # Clean user_properties before validation
    if 'user_properties' in df.columns:
        df['user_properties'] = df['user_properties'].apply(clean_user_properties)
    
    # Clean data column before validation
    if 'data' in df.columns:
        df['data'] = df['data'].apply(clean_empty_objects)
    
    # Type validations
    for col, info in columns_keep.items():
        # Get the value for the current column
        val = df[col].iloc[0]
        
        # Skip empty value check for nullable fields
        if not info['nullable']:
            if is_empty_value(val):
                print(f"Row dropped: Empty value in non-nullable field {col}")
                return None
            
        # Handle special case for id field
        if col == 'id' and info['source'] == 'uuid':
            continue
            
        # UUID validation
        if info['type'] == 'uuid':
            if not isinstance(val, str) or len(val.split('-')) != 5:
                print(f"Row dropped: Invalid UUID format for {col}")
                return None
        
        # Integer validation
        elif info['type'] == 'int8':
            try:
                df[col] = pd.to_numeric(df[col], downcast='integer')
                if df[col].iloc[0] <= 0:
                    print(f"Row dropped: Invalid value for {col}")
                    return None
            except (ValueError, TypeError):
                print(f"Row dropped: Non-numeric {col}")
                return None
        
        # JSON validation
        elif info['type'] == 'json':
            if not info['nullable']:
                if not isinstance(val, dict):
                    print(f"Row dropped: Invalid JSON format for {col}")
                    return None
                if len(val) == 0:
                    print(f"Row dropped: Empty JSON object in {col}")
                    return None
            elif val is not None:  # For nullable JSON fields, validate only if not None
                if not isinstance(val, dict):
                    print(f"Row dropped: Invalid JSON format for {col}")
                    return None
        
        # Text validation
        elif info['type'] == 'text':
            if not info['nullable']:
                if not isinstance(val, str):
                    df[col] = str(val)  # Convert to string if possible
                if len(df[col].iloc[0].strip()) == 0:
                    print(f"Row dropped: Empty text in non-nullable field {col}")
                    return None
            elif val is not None:  # For nullable text fields, validate only if not None
                if not isinstance(val, str):
                    df[col] = str(val)
        
        # Timestamp validation
        elif info['type'] == 'timestamp':
            try:
                df[col] = pd.to_datetime(df[col])
            except (ValueError, TypeError):
                print(f"Row dropped: Invalid timestamp for {col}")
                return None
    
    # Location validation for US entries
    # if df['country'].iloc[0] == "United States":
    #     if pd.isnull(df['dma'].iloc[0]) or df['dma'].iloc[0].strip() == "":
    #         print("Row dropped: Missing DMA for US location")
    #         return None
    
    return df[columns_keep.keys()]

# Initialize Supabase client
load_dotenv()
supabase: Client = create_client(
    supabase_url=os.getenv("SUPABASE_URL"),
    supabase_key=os.getenv("SUPABASE_KEY")
)

# Process rows in batches
batch_size = 1000  # Adjust based on your needs
processed_rows = []
total_rows = 0
valid_rows = 0
error_rows = 0

with open(file_path, "r") as f:
    objects = ijson.items(f, "item")
    
    for row in objects:
        total_rows += 1
        
        # Process the row
        processed_df = preprocess_row(row)
        
        if processed_df is not None:
            # Convert timestamp to ISO format - fixed version
            processed_df['event_time'] = processed_df['event_time'].iloc[0].strftime('%Y-%m-%dT%H:%M:%S.%fZ')
            
            # Convert row to dictionary
            row_dict = processed_df.iloc[0].to_dict()
            processed_rows.append(row_dict)
            valid_rows += 1
            
            # When batch is full, insert into Supabase
            if len(processed_rows) >= batch_size:
                try:
                    response = supabase.table('federato_amplitude_data').insert(processed_rows).execute()
                    print(f"\nInserted batch of {len(processed_rows)} rows")
                    processed_rows = []  # Clear the batch
                except Exception as e:
                    print(f"\nError inserting batch: {e}")
                    error_rows += len(processed_rows)
                    processed_rows = []  # Clear the batch even if there's an error
        else:
            error_rows += 1
        
        # Print progress
        if total_rows % 1000 == 0:
            print(f"\nProgress: Processed {total_rows} rows")
            print(f"Valid rows: {valid_rows}")
            print(f"Error rows: {error_rows}")

# Insert any remaining rows
if processed_rows:
    try:
        response = supabase.table('federato_amplitude_data').insert(processed_rows).execute()
        print(f"\nInserted final batch of {len(processed_rows)} rows")
    except Exception as e:
        print(f"\nError inserting final batch: {e}")
        error_rows += len(processed_rows)

# Print final statistics
print("\nProcessing complete!")
print(f"Total rows processed: {total_rows}")
print(f"Valid rows inserted: {valid_rows}")
print(f"Error rows: {error_rows}")


Inserted batch of 1000 rows

Progress: Processed 1000 rows
Valid rows: 1000
Error rows: 0

Inserted batch of 1000 rows

Progress: Processed 2000 rows
Valid rows: 2000
Error rows: 0

Inserted batch of 1000 rows

Progress: Processed 3000 rows
Valid rows: 3000
Error rows: 0

Inserted batch of 1000 rows

Progress: Processed 4000 rows
Valid rows: 4000
Error rows: 0

Inserted batch of 1000 rows

Progress: Processed 5000 rows
Valid rows: 5000
Error rows: 0

Inserted batch of 1000 rows

Progress: Processed 6000 rows
Valid rows: 6000
Error rows: 0

Inserted batch of 1000 rows

Progress: Processed 7000 rows
Valid rows: 7000
Error rows: 0

Inserted batch of 1000 rows

Progress: Processed 8000 rows
Valid rows: 8000
Error rows: 0

Inserted batch of 1000 rows

Progress: Processed 9000 rows
Valid rows: 9000
Error rows: 0
Row dropped: Invalid value for event_id
Row dropped: Invalid value for event_id

Progress: Processed 10000 rows
Valid rows: 9998
Error rows: 2

Inserted batch of 1000 rows

Progress

In [42]:
%%time
import ijson
import pandas as pd
import json
from pathlib import Path
from supabase import create_client, Client
from dotenv import load_dotenv
import os
from glob import glob

# Get all chunk files (excluding chunk 1)
base_path = "/Users/krish/Projects/CxC_Data/2024/"
file_pattern = "amplitude_export_chunk_*_anonymized.json"
file_paths = sorted(glob(os.path.join(base_path, file_pattern)))
# Filter out chunk 1
file_paths = [f for f in file_paths if "chunk_1_" not in f]

print(f"Found {len(file_paths)} files to process:")
for file_path in file_paths:
    print(f"- {os.path.basename(file_path)}")

# Important columns with their Supabase types
columns_keep = {
    # UUID fields
    'id': {'type': 'uuid', 'source': 'uuid', 'nullable': False},  
    'device_id': {'type': 'uuid', 'nullable': False},
    
    # Integer fields
    'amplitude_id': {'type': 'int8', 'nullable': False},
    'app': {'type': 'int8', 'nullable': False},
    'event_id': {'type': 'int8', 'nullable': False},
    'session_id': {'type': 'int8', 'nullable': False},
    
    # Text fields - some allowed to be null
    'city': {'type': 'text', 'nullable': True},
    'country': {'type': 'text', 'nullable': False},
    'data_type': {'type': 'text', 'nullable': False},
    'device_family': {'type': 'text', 'nullable': True},
    'device_type': {'type': 'text', 'nullable': True},
    'dma': {'type': 'text', 'nullable': True},
    'event_type': {'type': 'text', 'nullable': False},
    'language': {'type': 'text', 'nullable': False},
    'os_name': {'type': 'text', 'nullable': False},
    'platform': {'type': 'text', 'nullable': False},
    'region': {'type': 'text', 'nullable': False},
    
    # JSON fields
    'data': {'type': 'json', 'nullable': False},
    'user_properties': {'type': 'json', 'nullable': True},
    
    # Timestamp field
    'event_time': {'type': 'timestamp', 'nullable': False},
}

def clean_user_properties(props):
    """Convert 'EMPTY' string values to None in user_properties and remove trackingVersion"""
    if not isinstance(props, dict):
        return props
    
    cleaned = {}
    for key, value in props.items():
        # Skip trackingVersion field
        if key == 'trackingVersion':
            continue
            
        if isinstance(value, str) and value == "EMPTY":
            cleaned[key] = None
        elif isinstance(value, list):
            cleaned[key] = value  # Preserve lists (like roles) as is
        else:
            cleaned[key] = value
    return cleaned

def clean_empty_objects(data):
    """Convert empty nested objects to None in data column"""
    if not isinstance(data, dict):
        return data
    
    cleaned = {}
    for key, value in data.items():
        if isinstance(value, dict) and len(value) == 0:
            cleaned[key] = None
        else:
            cleaned[key] = value
    return cleaned

def is_empty_value(val):
    """Check if a value should be considered empty"""
    if pd.isna(val):  # Checks for None and NaN
        return True
    if isinstance(val, str) and val.strip() == "":  # Empty strings or whitespace
        return True
    if val == "None":  # String "None"
        return True
    if isinstance(val, (dict, list)) and len(val) == 0:  # Empty collections
        return True
    return False

def preprocess_row(row):
    # Convert row to DataFrame for easier handling
    df = pd.DataFrame([row])
    
    # Create id from uuid
    df['id'] = df['uuid']

    # Clean user_properties before validation
    if 'user_properties' in df.columns:
        df['user_properties'] = df['user_properties'].apply(clean_user_properties)
    
    # Clean data column before validation
    if 'data' in df.columns:
        df['data'] = df['data'].apply(clean_empty_objects)
    
    # Type validations
    for col, info in columns_keep.items():
        # Get the value for the current column
        val = df[col].iloc[0]
        
        # Skip empty value check for nullable fields
        if not info['nullable']:
            if is_empty_value(val):
                print(f"Row dropped: Empty value in non-nullable field {col}")
                return None
            
        # Handle special case for id field
        if col == 'id' and info['source'] == 'uuid':
            continue
            
        # UUID validation
        if info['type'] == 'uuid':
            if not isinstance(val, str) or len(val.split('-')) != 5:
                print(f"Row dropped: Invalid UUID format for {col}")
                return None
        
        # Integer validation
        elif info['type'] == 'int8':
            try:
                df[col] = pd.to_numeric(df[col], downcast='integer')
                if df[col].iloc[0] <= 0:
                    print(f"Row dropped: Invalid value for {col}")
                    return None
            except (ValueError, TypeError):
                print(f"Row dropped: Non-numeric {col}")
                return None
        
        # JSON validation
        elif info['type'] == 'json':
            if not info['nullable']:
                if not isinstance(val, dict):
                    print(f"Row dropped: Invalid JSON format for {col}")
                    return None
                if len(val) == 0:
                    print(f"Row dropped: Empty JSON object in {col}")
                    return None
            elif val is not None:  # For nullable JSON fields, validate only if not None
                if not isinstance(val, dict):
                    print(f"Row dropped: Invalid JSON format for {col}")
                    return None
        
        # Text validation
        elif info['type'] == 'text':
            if not info['nullable']:
                if not isinstance(val, str):
                    df[col] = str(val)  # Convert to string if possible
                if len(df[col].iloc[0].strip()) == 0:
                    print(f"Row dropped: Empty text in non-nullable field {col}")
                    return None
            elif val is not None:  # For nullable text fields, validate only if not None
                if not isinstance(val, str):
                    df[col] = str(val)
        
        # Timestamp validation
        elif info['type'] == 'timestamp':
            try:
                df[col] = pd.to_datetime(df[col])
            except (ValueError, TypeError):
                print(f"Row dropped: Invalid timestamp for {col}")
                return None
    
    # Location validation for US entries
    # if df['country'].iloc[0] == "United States":
    #     if pd.isnull(df['dma'].iloc[0]) or df['dma'].iloc[0].strip() == "":
    #         print("Row dropped: Missing DMA for US location")
    #         return None
    
    return df[columns_keep.keys()]

# Initialize Supabase client
load_dotenv()
supabase: Client = create_client(
    supabase_url=os.getenv("SUPABASE_URL"),
    supabase_key=os.getenv("SUPABASE_KEY")
)

# Process files one by one
batch_size = 1000
grand_total_rows = 0
grand_valid_rows = 0
grand_error_rows = 0

for file_path in file_paths:
    print(f"\nProcessing file: {os.path.basename(file_path)}")
    processed_rows = []
    total_rows = 0
    valid_rows = 0
    error_rows = 0

    with open(file_path, "r") as f:
        objects = ijson.items(f, "item")
        
        for row in objects:
            total_rows += 1
            
            # Process the row
            processed_df = preprocess_row(row)
            
            if processed_df is not None:
                # Convert timestamp to ISO format
                processed_df['event_time'] = processed_df['event_time'].iloc[0].strftime('%Y-%m-%dT%H:%M:%S.%fZ')
                
                # Convert row to dictionary
                row_dict = processed_df.iloc[0].to_dict()
                processed_rows.append(row_dict)
                valid_rows += 1
                
                # When batch is full, insert into Supabase
                if len(processed_rows) >= batch_size:
                    try:
                        response = supabase.table('federato_amplitude_data').insert(processed_rows).execute()
                        print(f"Inserted batch of {len(processed_rows)} rows")
                        processed_rows = []  # Clear the batch
                    except Exception as e:
                        print(f"Error inserting batch: {e}")
                        error_rows += len(processed_rows)
                        processed_rows = []  # Clear the batch even if there's an error
            else:
                error_rows += 1
            
            # Print progress
            if total_rows % 1000 == 0:
                print(f"Progress: Processed {total_rows} rows")
                print(f"Valid rows: {valid_rows}")
                print(f"Error rows: {error_rows}")

    # Insert any remaining rows for this file
    if processed_rows:
        try:
            response = supabase.table('federato_amplitude_data').insert(processed_rows).execute()
            print(f"Inserted final batch of {len(processed_rows)} rows")
        except Exception as e:
            print(f"Error inserting final batch: {e}")
            error_rows += len(processed_rows)

    # Print statistics for this file
    print(f"\nFile {os.path.basename(file_path)} complete!")
    print(f"Total rows processed: {total_rows}")
    print(f"Valid rows inserted: {valid_rows}")
    print(f"Error rows: {error_rows}")
    
    # Update grand totals
    grand_total_rows += total_rows
    grand_valid_rows += valid_rows
    grand_error_rows += error_rows

# Print final statistics for all files
print("\nAll files processing complete!")
print(f"Grand total rows processed: {grand_total_rows}")
print(f"Grand total valid rows inserted: {grand_valid_rows}")
print(f"Grand total error rows: {grand_error_rows}")

Found 15 files to process:
- amplitude_export_chunk_10_anonymized.json
- amplitude_export_chunk_11_anonymized.json
- amplitude_export_chunk_12_anonymized.json
- amplitude_export_chunk_13_anonymized.json
- amplitude_export_chunk_14_anonymized.json
- amplitude_export_chunk_15_anonymized.json
- amplitude_export_chunk_16_anonymized.json
- amplitude_export_chunk_2_anonymized.json
- amplitude_export_chunk_3_anonymized.json
- amplitude_export_chunk_4_anonymized.json
- amplitude_export_chunk_5_anonymized.json
- amplitude_export_chunk_6_anonymized.json
- amplitude_export_chunk_7_anonymized.json
- amplitude_export_chunk_8_anonymized.json
- amplitude_export_chunk_9_anonymized.json

Processing file: amplitude_export_chunk_10_anonymized.json
Inserted batch of 1000 rows
Progress: Processed 1000 rows
Valid rows: 1000
Error rows: 0
Inserted batch of 1000 rows
Progress: Processed 2000 rows
Valid rows: 2000
Error rows: 0
Inserted batch of 1000 rows
Progress: Processed 3000 rows
Valid rows: 3000
Error ro

In [43]:
%%time
import ijson
import pandas as pd
import json
from pathlib import Path
from supabase import create_client, Client
from dotenv import load_dotenv
import os

file_path = f"/Users/krish/Projects/CxC_Data/new_amplitude_export_2025.json"

# Important columns with their Supabase types
columns_keep = {
    # UUID fields
    'id': {'type': 'uuid', 'source': 'uuid', 'nullable': False},  
    'device_id': {'type': 'uuid', 'nullable': False},
    
    # Integer fields
    'amplitude_id': {'type': 'int8', 'nullable': False},
    'app': {'type': 'int8', 'nullable': False},
    'event_id': {'type': 'int8', 'nullable': False},
    'session_id': {'type': 'int8', 'nullable': False},
    
    # Text fields - some allowed to be null
    'city': {'type': 'text', 'nullable': True},
    'country': {'type': 'text', 'nullable': False},
    'data_type': {'type': 'text', 'nullable': False},
    'device_family': {'type': 'text', 'nullable': True},
    'device_type': {'type': 'text', 'nullable': True},
    'dma': {'type': 'text', 'nullable': True},
    'event_type': {'type': 'text', 'nullable': False},
    'language': {'type': 'text', 'nullable': False},
    'os_name': {'type': 'text', 'nullable': False},
    'platform': {'type': 'text', 'nullable': False},
    'region': {'type': 'text', 'nullable': False},
    
    # JSON fields
    'data': {'type': 'json', 'nullable': False},
    'user_properties': {'type': 'json', 'nullable': True},
    
    # Timestamp field
    'event_time': {'type': 'timestamp', 'nullable': False},
}

def clean_user_properties(props):
    """Convert 'EMPTY' string values to None in user_properties and remove trackingVersion"""
    if not isinstance(props, dict):
        return props
    
    cleaned = {}
    for key, value in props.items():
        # Skip trackingVersion field
        if key == 'trackingVersion':
            continue
            
        if isinstance(value, str) and value == "EMPTY":
            cleaned[key] = None
        elif isinstance(value, list):
            cleaned[key] = value  # Preserve lists (like roles) as is
        else:
            cleaned[key] = value
    return cleaned

def clean_empty_objects(data):
    """Convert empty nested objects to None in data column"""
    if not isinstance(data, dict):
        return data
    
    cleaned = {}
    for key, value in data.items():
        if isinstance(value, dict) and len(value) == 0:
            cleaned[key] = None
        else:
            cleaned[key] = value
    return cleaned

def is_empty_value(val):
    """Check if a value should be considered empty"""
    if pd.isna(val):  # Checks for None and NaN
        return True
    if isinstance(val, str) and val.strip() == "":  # Empty strings or whitespace
        return True
    if val == "None":  # String "None"
        return True
    if isinstance(val, (dict, list)) and len(val) == 0:  # Empty collections
        return True
    return False

def preprocess_row(row):
    # Convert row to DataFrame for easier handling
    df = pd.DataFrame([row])
    
    # Create id from uuid
    df['id'] = df['uuid']

    # Clean user_properties before validation
    if 'user_properties' in df.columns:
        df['user_properties'] = df['user_properties'].apply(clean_user_properties)
    
    # Clean data column before validation
    if 'data' in df.columns:
        df['data'] = df['data'].apply(clean_empty_objects)
    
    # Type validations
    for col, info in columns_keep.items():
        # Get the value for the current column
        val = df[col].iloc[0]
        
        # Skip empty value check for nullable fields
        if not info['nullable']:
            if is_empty_value(val):
                print(f"Row dropped: Empty value in non-nullable field {col}")
                return None
            
        # Handle special case for id field
        if col == 'id' and info['source'] == 'uuid':
            continue
            
        # UUID validation
        if info['type'] == 'uuid':
            if not isinstance(val, str) or len(val.split('-')) != 5:
                print(f"Row dropped: Invalid UUID format for {col}")
                return None
        
        # Integer validation
        elif info['type'] == 'int8':
            try:
                df[col] = pd.to_numeric(df[col], downcast='integer')
                if df[col].iloc[0] <= 0:
                    print(f"Row dropped: Invalid value for {col}")
                    return None
            except (ValueError, TypeError):
                print(f"Row dropped: Non-numeric {col}")
                return None
        
        # JSON validation
        elif info['type'] == 'json':
            if not info['nullable']:
                if not isinstance(val, dict):
                    print(f"Row dropped: Invalid JSON format for {col}")
                    return None
                if len(val) == 0:
                    print(f"Row dropped: Empty JSON object in {col}")
                    return None
            elif val is not None:  # For nullable JSON fields, validate only if not None
                if not isinstance(val, dict):
                    print(f"Row dropped: Invalid JSON format for {col}")
                    return None
        
        # Text validation
        elif info['type'] == 'text':
            if not info['nullable']:
                if not isinstance(val, str):
                    df[col] = str(val)  # Convert to string if possible
                if len(df[col].iloc[0].strip()) == 0:
                    print(f"Row dropped: Empty text in non-nullable field {col}")
                    return None
            elif val is not None:  # For nullable text fields, validate only if not None
                if not isinstance(val, str):
                    df[col] = str(val)
        
        # Timestamp validation
        elif info['type'] == 'timestamp':
            try:
                df[col] = pd.to_datetime(df[col])
            except (ValueError, TypeError):
                print(f"Row dropped: Invalid timestamp for {col}")
                return None
    
    # Location validation for US entries
    # if df['country'].iloc[0] == "United States":
    #     if pd.isnull(df['dma'].iloc[0]) or df['dma'].iloc[0].strip() == "":
    #         print("Row dropped: Missing DMA for US location")
    #         return None
    
    return df[columns_keep.keys()]

# Initialize Supabase client
load_dotenv()
supabase: Client = create_client(
    supabase_url=os.getenv("SUPABASE_URL"),
    supabase_key=os.getenv("SUPABASE_KEY")
)

# Process rows in batches
batch_size = 1000  # Adjust based on your needs
processed_rows = []
total_rows = 0
valid_rows = 0
error_rows = 0

with open(file_path, "r") as f:
    objects = ijson.items(f, "item")
    
    for row in objects:
        total_rows += 1
        
        # Process the row
        processed_df = preprocess_row(row)
        
        if processed_df is not None:
            # Convert timestamp to ISO format - fixed version
            processed_df['event_time'] = processed_df['event_time'].iloc[0].strftime('%Y-%m-%dT%H:%M:%S.%fZ')
            
            # Convert row to dictionary
            row_dict = processed_df.iloc[0].to_dict()
            processed_rows.append(row_dict)
            valid_rows += 1
            
            # When batch is full, insert into Supabase
            if len(processed_rows) >= batch_size:
                try:
                    response = supabase.table('federato_amplitude_data').insert(processed_rows).execute()
                    print(f"\nInserted batch of {len(processed_rows)} rows")
                    processed_rows = []  # Clear the batch
                except Exception as e:
                    print(f"\nError inserting batch: {e}")
                    error_rows += len(processed_rows)
                    processed_rows = []  # Clear the batch even if there's an error
        else:
            error_rows += 1
        
        # Print progress
        if total_rows % 1000 == 0:
            print(f"\nProgress: Processed {total_rows} rows")
            print(f"Valid rows: {valid_rows}")
            print(f"Error rows: {error_rows}")

# Insert any remaining rows
if processed_rows:
    try:
        response = supabase.table('federato_amplitude_data').insert(processed_rows).execute()
        print(f"\nInserted final batch of {len(processed_rows)} rows")
    except Exception as e:
        print(f"\nError inserting final batch: {e}")
        error_rows += len(processed_rows)

# Print final statistics
print("\nProcessing complete!")
print(f"Total rows processed: {total_rows}")
print(f"Valid rows inserted: {valid_rows}")
print(f"Error rows: {error_rows}")

Row dropped: Invalid value for event_id
Row dropped: Invalid value for event_id
Row dropped: Invalid value for event_id
Row dropped: Invalid value for event_id
Row dropped: Invalid value for event_id

Progress: Processed 1000 rows
Valid rows: 995
Error rows: 5

Error inserting batch: {'message': 'JSON could not be generated', 'code': 507, 'hint': 'Refer to full message for details', 'details': "b'exceeded request buffer limit while retrying upstream'"}

Progress: Processed 2000 rows
Valid rows: 1995
Error rows: 1005

Error inserting batch: {'message': 'JSON could not be generated', 'code': 507, 'hint': 'Refer to full message for details', 'details': "b'exceeded request buffer limit while retrying upstream'"}

Progress: Processed 3000 rows
Valid rows: 2995
Error rows: 2005

Error inserting batch: {'message': 'JSON could not be generated', 'code': 507, 'hint': 'Refer to full message for details', 'details': "b'exceeded request buffer limit while retrying upstream'"}

Progress: Processed 