# Data synchronization methods

This example demonstrates an incremental synchronization method that updates only the records that have changed since the last synchronization, using a timestamp column to track changeS

In [None]:
#Install the libraries
!pip install pysqlite3

In [2]:
#Import the libraries
import sqlite3
from datetime import datetime, timedelta

# Define the incremental synchronization function to synchronize data from a source database to a target database
def incremental_sync(source_conn, target_conn, table_name, timestamp_column):
    source_cur = source_conn.cursor()
    target_cur = target_conn.cursor()

    # Get the last synced timestamp
    target_cur.execute(f"SELECT MAX({timestamp_column}) FROM {table_name}")
    last_sync = target_cur.fetchone()[0] or datetime.min

    # Fetch new or updated records from source
    source_cur.execute(f'''
    SELECT * FROM {table_name}
    WHERE {timestamp_column} > ?
    ORDER BY {timestamp_column}
    ''', (last_sync,))

    rows = source_cur.fetchall()

    # Insert or update records in target
    for row in rows:
        target_cur.execute(f'''
        INSERT INTO {table_name} (id, data, last_updated) VALUES (?, ?, ?)
        ON CONFLICT(id) DO UPDATE SET
        data=excluded.data, last_updated=excluded.last_updated
        ''', row)

    target_conn.commit()
    print(f"Synchronization completed. {target_cur.rowcount} rows affected.")

# Function to set up the environment and perform synchronization
def setup_and_sync():
    # Create new SQLite databases in memory
    source_conn = sqlite3.connect(':memory:')
    target_conn = sqlite3.connect(':memory:')

    # Create tables in both source and target databases
    source_conn.execute('''
    CREATE TABLE customers (
        id INTEGER PRIMARY KEY,
        data TEXT,
        last_updated TIMESTAMP
    )''')
    target_conn.execute('''
    CREATE TABLE customers (
        id INTEGER PRIMARY KEY,
        data TEXT,
        last_updated TIMESTAMP
    )''')

    # Insert initial data into source database
    source_conn.execute("INSERT INTO customers (id, data, last_updated) VALUES (1, 'Initial data', '2020-01-01 10:00:00')")
    source_conn.execute("INSERT INTO customers (id, data, last_updated) VALUES (2, 'More data', '2020-01-02 12:00:00')")
    source_conn.commit()

    # Perform synchronization
    incremental_sync(source_conn, target_conn, 'customers', 'last_updated')

    # Fetch and print the current state of the target database
    target_rows = target_conn.execute("SELECT * FROM customers")
    print("Contents of the target database:")
    for row in target_rows:
        print(row)

    # Close connections
    source_conn.close()
    target_conn.close()

# Call the setup and sync function
setup_and_sync()

Synchronization completed. 1 rows affected.
Contents of the target database:
(1, 'Initial data', '2020-01-01 10:00:00')
(2, 'More data', '2020-01-02 12:00:00')


# Incremental data loading

This code creates synthetic sales data, saves it to a CSV file, and incrementally loads new records into a SQLite database table based on the latest transaction date.

In [None]:
# Install required packages
!pip install pandas sqlalchemy


In [4]:
# Import required libraries
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
import random

def create_synthetic_data(file_name, num_records):
    start_date = datetime.now() - timedelta(days=30)
    data = {
        'transaction_date': [start_date + timedelta(days=random.randint(0, 30)) for _ in range(num_records)],
        'amount': [random.uniform(10, 1000) for _ in range(num_records)],
        'customer_id': [random.randint(1, 100) for _ in range(num_records)]
    }
    df = pd.DataFrame(data)
    df.to_csv(file_name, index=False)

def incremental_load(source_file, target_table, engine, timestamp_col):
    # Read the entire source file
    df = pd.read_csv(source_file, parse_dates=[timestamp_col])

    # Get the last loaded timestamp from the target table
    last_loaded = pd.read_sql(f"SELECT MAX({timestamp_col}) as last_timestamp FROM {target_table}", engine).iloc[0]['last_timestamp']

    if pd.isna(last_loaded):
        last_loaded = datetime.min

    # Filter for new or updated records
    new_data = df[df[timestamp_col] > last_loaded]

    if not new_data.empty:
        # Load new data into the target table
        new_data.to_sql(target_table, engine, if_exists='append', index=False)
        print(f"Loaded {len(new_data)} new records.")
    else:
        print("No new data to load.")

# Create synthetic data
create_synthetic_data('sales_data.csv', 100)

# Example usage with SQLite
engine = create_engine('sqlite:///datawarehouse.db')

# Create table if it doesn't exist
with engine.connect() as conn:
    conn.execute(text('''
    CREATE TABLE IF NOT EXISTS sales (
        transaction_date TEXT,
        amount REAL,
        customer_id INTEGER
    )
    '''))

incremental_load('sales_data.csv', 'sales', engine, 'transaction_date')


Loaded 100 new records.


# Error handling and recovery in data integration

This code creates synthetic sales data, saves it to a CSV file, and attempts to load the data into a SQLite database table with retry logic and error handling. If the load fails, it retries up to three times with exponential backoff. Logging and custom exceptions are used to handle and log errors

In [None]:
# Install required packages
!pip install pandas sqlalchemy tenacity

In [6]:
# Import required libraries
import pandas as pd
from sqlalchemy import create_engine
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
from datetime import datetime, timedelta
import random

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class IntegrationError(Exception):
    pass

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def load_data(df, table_name, engine):
    try:
        with engine.begin() as connection:
            df.to_sql(table_name, connection, if_exists='append', index=False)
            logger.info(f"Successfully loaded {len(df)} rows into {table_name}")
            print(f"Successfully loaded {len(df)} rows into {table_name}")
    except Exception as e:
        logger.error(f"Error loading data into {table_name}: {str(e)}")
        print(f"Error loading data into {table_name}: {str(e)}")
        raise IntegrationError(f"Failed to load data into {table_name}")

def integrate_data(source_file, target_table, db_url):
    engine = create_engine(db_url)
    try:
        # Read source data
        df = pd.read_csv(source_file)
        print("Source data read successfully.")

        # Perform data validation
        if df.empty:
            raise IntegrationError("Source file is empty")

        # Attempt to load data with retry mechanism
        load_data(df, target_table, engine)
    except IntegrationError as e:
        logger.error(f"Integration error: {str(e)}")
        print(f"Integration error: {str(e)}")
        # Implement recovery logic here (e.g., revert to last known good state)
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        print(f"Unexpected error: {str(e)}")
    finally:
        engine.dispose()
        print("Integration process completed.")

# Create synthetic data for testing
def create_synthetic_data(file_name, num_records):
    start_date = datetime.now() - timedelta(days=30)
    data = {
        'transaction_date': [start_date + timedelta(days=random.randint(0, 30)) for _ in range(num_records)],
        'amount': [random.uniform(10, 1000) for _ in range(num_records)],
        'customer_id': [random.randint(1, 100) for _ in range(num_records)]
    }
    df = pd.DataFrame(data)
    df.to_csv(file_name, index=False)
    print(f"Synthetic data created in {file_name}")

# Create synthetic data
create_synthetic_data('sales_data.csv', 100)

# Example usage with SQLite
integrate_data('sales_data.csv', 'sales', 'sqlite:///datawarehouse.db')

Synthetic data created in sales_data.csv
Source data read successfully.
Successfully loaded 100 rows into sales
Integration process completed.
