In [None]:
from pydantic import BaseModel
from typing import Dict
from datetime import datetime

from sqlalchemy.orm import Session

# Import database schema (Base, engine, and models)
from DB_scheme import Base, engine, Contact, Event

# Import custom database operation functions from db_operations module
from DB_operations import (
    is_event_exists,        # Check if an event already exists in the database
    is_contact_exists,      # Check if a contact already exists in the database
    insert_event,           # Insert a new event into the database
    insert_contact,         # Insert a new contact into the database
    update_contact          # Update an existing contact in the database
)

import json
import pandas as pd

In [None]:
# Define a Pydantic model to validate and parse JSON event data
class EventJson(BaseModel):
    eventId: str
    eventType: str
    occurredAt: datetime
    objectId: int
    properties: Dict[str, str]

# Open the JSON file containing events
with open("eventupdate.json", encoding="utf-8") as f:
    raw = json.load(f)

# Validate and convert each JSON entry into dict
events = [EventJson(**e).dict() for e in raw]

# Flatten JSON into a Pandas DataFrame
df = pd.json_normalize(events, sep='_')

In [None]:
# Remove "properties_" prefix from column names
df.columns = [col.replace("properties_", "") for col in df.columns]
print(df.columns)

# Convert 'occurredAt' to datetime
df['occurredAt'] = pd.to_datetime(df['occurredAt'], errors='coerce')

# String columns to clean
string_cols = ['email', 'firstname', 'lastname', 'phone', 'lifecyclestage',
               'utm_source', 'utm_medium', 'utm_campaign']

# Clean strings: strip, lowercase, replace NaN
for col in string_cols:
    if col in df.columns:
        df[col] = (
            df[col]
            .astype(str)
            .str.strip()         
            .str.lower() 
            .replace("nan", None)  
        )

# Keep only rows with valid emails
df = df[df['email'].str.contains(r'@', na=False)]

print(df)

In [None]:
# Iterate over each row in the DataFrame
for _, row in df.iterrows():
    # Check if event does not already exist
    if not is_event_exists(row['eventId'], engine):
        print('Inserto evento')
        insert_event(row, engine)
    
        # Check if contact exists
        if not is_contact_exists(row['objectId'], engine):
            print('Contacto insertado')
            insert_contact(row, engine)
        else:
            # If contact exists and event is "updated", update contact info
            if row['eventType'] == 'contact.updated':
                update_contact(row, engine)
                print(f"Contacto actualizado: {row['objectId']}")
            else:
                # Ignore duplicate contact
                print(f"Contacto duplicado ignorado: {row['objectId']}")
    else:
        # Ignore duplicate event
        print(f"Evento duplicado ignorado: {row['eventId']}")