In [12]:
import pandas as pd
import mysql.connector
from mysql.connector import Error
from pydantic import BaseModel, Field, ValidationError, EmailStr
from datetime import datetime
from typing import Optional

db_config = {
    'host': 'localhost',  # Your MySQL host (e.g., 'localhost', '127.0.0.1')
    'database': 'data_pipeline',  # The name of your database
    'user': 'pipeline_user',  # Your MySQL username
    'password': 'secure_password'  # Your MySQL password
}

# CSV File Details
csv_file_path = 'user_data.csv'  # Path to your CSV file
TABLE_NAME = 'users'  # The desired name for your MySQL table

In [6]:
# --- 1. Pydantic Model for Data Validation ---
class UserModel(BaseModel):
    """
    Pydantic model for validating user data from CSV before insertion into MySQL.
    Matches the 'users' table schema.
    """
    user_id: int
    username: str 
    email: str
    registration_date: datetime
    last_login: Optional[datetime]
    is_active: bool
    country: Optional[str]
    age: Optional[int]

In [7]:
# --- 2. Database Connection Function ---
def create_db_connection(host_name, user_name, user_password, db_name):
    """
    Establishes a connection to the MySQL database.

    Args:
        host_name (str): The hostname of the MySQL server.
        user_name (str): The username for database access.
        user_password (str): The password for the database user.
        db_name (str): The name of the database to connect to.

    Returns:
        mysql.connector.connection.MySQLConnection or None: The database connection object if successful, else None.
    """
    connection = None
    try:
        connection = mysql.connector.connect(
            host=host_name,
            user=user_name,
            passwd=user_password,
            database=db_name
        )
        if connection.is_connected():
            print(f"Successfully connected to MySQL database '{db_name}'")
        return connection
    except Error as e:
        print(f"Error connecting to MySQL database: {e}")
        return None


In [8]:
# --- 3. Function to Create Table (if not exists) ---
def create_users_table(connection):
    """
    Creates the 'users' table in the database if it doesn't already exist.

    Args:
        connection (mysql.connector.connection.MySQLConnection): The database connection object.
    """
    cursor = connection.cursor()
    try:
        create_table_query = """
        CREATE TABLE IF NOT EXISTS `users` (
            `user_id` INT PRIMARY KEY,
            `username` VARCHAR(255) NOT NULL,
            `email` VARCHAR(255) UNIQUE,
            `registration_date` DATETIME,
            `last_login` DATETIME,
            `is_active` BOOLEAN,
            `country` VARCHAR(100),
            `age` INT
        );
        """
        cursor.execute(create_table_query)
        connection.commit()
        print("Table 'users' checked/created successfully.")
    except Error as e:
        print(f"Error creating/checking table: {e}")
    finally:
        cursor.close()

In [11]:
connection = create_db_connection(
        db_config['host'], db_config['user'], db_config['password'], db_config['database']
    )

Successfully connected to MySQL database 'data_pipeline'


In [13]:
df = pd.read_csv(
    csv_file_path,
    parse_dates=['registration_date', 'last_login'],
    keep_default_na=True
)
df

Unnamed: 0,user_id,username,email,registration_date,last_login,is_active,country,age
0,1001,john_doe,john.doe@example.com,2022-01-15 10:30:00,2023-07-10 09:00:00,True,USA,30
1,1002,jane_smith,jane.smith@example.com,2022-03-20 14:00:00,2023-07-12 11:45:00,True,Canada,24
2,1003,peter_jones,peter.jones@example.com,2022-05-01 08:15:00,2023-07-05 16:20:00,True,UK,45
3,1004,alice_wonder,alice.wonder@example.com,2022-07-10 11:00:00,2023-06-28 10:00:00,False,Australia,28
4,1005,bob_builder,bob.builder@example.com,2022-09-01 09:45:00,2023-07-11 13:10:00,True,Germany,52
5,1006,charlie_chaplin,charlie.c@example.com,2022-11-12 16:00:00,2023-07-09 08:30:00,True,France,39
6,1007,diana_prince,diana.p@example.com,2023-01-01 07:00:00,2023-07-12 10:15:00,True,USA,33
7,1008,eve_adams,eve.a@example.com,2023-02-14 12:00:00,2023-07-08 14:00:00,False,Japan,22
8,1009,frank_sinatra,frank.s@example.com,2023-03-25 10:00:00,2023-07-11 17:00:00,True,Italy,60
9,1010,grace_kelly,grace.k@example.com,2023-04-30 09:00:00,2023-07-10 11:00:00,True,Monaco,29


In [16]:
user_data

UserModel(user_id=1001, username='john_doe', email='john.doe@example.com', registration_date=Timestamp('2022-01-15 10:30:00'), last_login=Timestamp('2023-07-10 09:00:00'), is_active=True, country='USA', age=30)

In [17]:
cursor = connection.cursor()
inserted_count = 0
skipped_count = 0
validation_error_count = 0

for index, row in df.iterrows():
    # Convert pandas NaT (Not a Time) or NaN to None for Pydantic
    row_data = row.where(pd.notna(row), None).to_dict()

    # Pydantic validation
    user_data = UserModel(**row_data)

    # Check if user_id already exists
    check_query = "SELECT user_id FROM users WHERE user_id = %s"
    cursor.execute(check_query, (user_data.user_id,))
    existing_user = cursor.fetchone()

    if existing_user:
        print(f"Skipping user_id {user_data.user_id}: Already exists in database.")
        skipped_count += 1
    else:
        # Construct INSERT query
        insert_query = """
        INSERT INTO users (user_id, username, email, registration_date, last_login, is_active, country, age)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """
        # Prepare data for insertion, converting datetime objects and bool
        data_to_insert = (
            user_data.user_id,
            user_data.username,
            user_data.email,
            user_data.registration_date.strftime('%Y-%m-%d %H:%M:%S'),
            user_data.last_login.strftime('%Y-%m-%d %H:%M:%S'),
            int(user_data.is_active), # MySQL BOOLEAN often expects 0 or 1
            user_data.country,
            user_data.age
        )
        cursor.execute(insert_query, data_to_insert)
        connection.commit()
        inserted_count += 1
        print(f"Inserted user_id: {user_data.user_id}")


Inserted user_id: 1001
Inserted user_id: 1002
Inserted user_id: 1003
Inserted user_id: 1004
Inserted user_id: 1005
Inserted user_id: 1006
Inserted user_id: 1007
Inserted user_id: 1008
Inserted user_id: 1009
Inserted user_id: 1010


In [19]:
with open('user_data.csv', 'r') as f:
    lines = f.readlines()
    for l in lines:
        u = UserModel(*l.split('\n'))
        print(u)

TypeError: BaseModel.__init__() takes 1 positional argument but 3 were given