In [17]:
import numpy as np 
import pandas as pd 
from sqlalchemy import create_engine
from fastavro import writer, reader
from fastavro.schema import parse_schema
import datetime

In [25]:
import pandas as pd
from sqlalchemy import create_engine
from fastavro import writer, parse_schema

# Define connection parameters
user_name = 'postgres'
password = 'Ducanh712'
ip = '35.188.10.153'
database_name = 'northwind-db'
connection_string = (
    f"postgresql+psycopg2://{user_name}:{password}@{ip}:5432/{database_name}"
)
engine = create_engine(connection_string)

# Fetch all table names from the PostgreSQL database
def fetch_tables_name(engine):
    with engine.connect() as conn:
        query = "SELECT table_name FROM INFORMATION_SCHEMA.TABLES WHERE table_schema = 'public' AND table_type = 'BASE TABLE';"
        # Write table names to the pandas dataframe
        table_names = pd.read_sql(query, conn)
    return table_names['table_name'].tolist()  # Return as a list

# Function to check if a column is datetime-like
def is_datetime_column(df, col):
    """
    Check if a column is datetime-like. Avoid misidentifying sparse columns as datetime.
    """
    non_null_values = df[col].dropna()  # Ignore nulls during the check
    if non_null_values.empty:  # If all values are null, it can't be datetime
        return False

    # Attempt to convert the non-null values to datetime
    try:
        pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
        return True  # If conversion succeeds, it's a datetime column
    except (ValueError, TypeError):
        return False

# Function to generate Avro schema from a Pandas DataFrame
def get_avro_schema(df, data_type_to_avro, table_name, namespace="Northwind-OLTP", doc="Extracted from OLTP"):
    schema = {
        "doc": doc,                # Description of the schema
        "name": table_name,        # The name of the record
        "namespace": namespace,    # Namespace for the schema
        "type": "record",          # Type of Avro schema: 'record'
        "fields": []               # List of fields
    }

    # Add fields based on the DataFrame columns and their types
    for col in df.columns:
        dtype = df[col].dtype
        avro_type = data_type_to_avro.get(str(dtype), 'string')  # Default to 'string' for unknown types

        # Check if the column has null values
        if df[col].isnull().any():
            # Allow null in the field
            avro_type = ["null", avro_type]  # Avro allows unions of types

        # Check for datetime64 and handle accordingly
        if pd.api.types.is_datetime64_any_dtype(df[col]):
            # For datetime columns, map them to 'string' in Avro (ISO 8601 format)
            avro_type = ["null", "string"]  # Allow null and use string for datetime

        # Special case: handle boolean columns explicitly
        elif pd.api.types.is_bool_dtype(df[col]):
            avro_type = ["null", "boolean"]  # Allow null and map to Avro boolean

        # Append field info to the schema
        schema['fields'].append({
            "name": col,             # Column name
            "type": avro_type        # Mapped Avro type
        })

    # Parse and return the schema using fastavro's parse_schema function
    return parse_schema(schema)

# Function to process the DataFrame and convert datetime-like columns
def process_dataframe(df, original_types):
    for col in df.columns:
        if original_types[col] == 'object':
            # Step 1: Check if the column contains datetime-like data (in object dtype)
            if is_datetime_column(df, col):
                # Convert object column to datetime type if it contains datetime-like values
                df[col] = pd.to_datetime(df[col], errors='coerce')  # Invalid entries become NaT
                # Convert the datetime64 column to string in ISO 8601 format
                df[col] = df[col].dt.strftime('%Y-%m-%dT%H:%M:%S')
                # Replace NaT with None (Avro expects None for missing values)
                df[col] = df[col].where(df[col].notnull(), None)
            else:
                # If not datetime, ensure it's a string and handle missing values (nan, 'None', '')
                df[col] = df[col].astype(str).replace({'nan': None, 'None': None, '': None})
        
        elif original_types[col] == 'datetime64[ns]':
            # Step 2: Handle datetime64 columns, convert to ISO 8601 string format
            df[col] = df[col].dt.strftime('%Y-%m-%dT%H:%M:%S')  # Use ISO 8601 format for datetime values
            # Ensure the datetime is now a string, as Avro expects a string for datetime fields

        # Replace NaT values with None for consistency
        df[col] = df[col].where(pd.notnull(df[col]), None)

# Main function to extract data, process the DataFrame, and write to Avro
def extract_data_to_df(engine, table_names, dtype_to_avro):
    for table_name in table_names:
        print(f"Processing table {table_name}")
        with engine.connect() as conn:
            query = f"SELECT * FROM {table_name}"
            df = pd.read_sql(query, conn)
        
        # Track original types
        original_types = df.dtypes

        # Step 3: Process the dataframe to handle datetimes and convert to strings
        process_dataframe(df, original_types)

        # Generate the Avro schema
        parsed_schema = get_avro_schema(df, dtype_to_avro, table_name)

        # Writing the DataFrame to Avro
        with open(f'{table_name}.avro', 'wb') as out:
            # Convert DataFrame to dictionary and ensure NaT is replaced by None
            records = df.to_dict(orient='records')

            # Ensure NaT and NaN are replaced with None for the whole dataframe
            for record in records:
                for key, value in record.items():
                    if pd.isna(value):  # Catch NaT explicitly using pd.isna()
                        record[key] = None  # Replace NaT with None for Avro writing

            # Write to Avro file
            writer(out, parsed_schema, records)

    return "Writing successfully executed."

In [26]:
# Data type to Avro mapping
dtype_to_avro = {
    'object': 'string',         # 'object' -> string
    'int64': 'long',            # 'int64' -> long
    'float64': 'double',        # 'float64' -> double
    'datetime64[ns]': 'string', # 'datetime64[ns]' -> string (or 'long' if you want Unix timestamp)
    'bool': 'boolean',          # 'bool' -> boolean
    'category': 'string',       # 'category' -> string
}

# Fetch the table names and extract data to Avro
table_names = fetch_tables_name(engine)
result = extract_data_to_df(engine, table_names, dtype_to_avro)

print(result)

Processing table customer_demographics
Processing table customer_customer_demo
Processing table customers


  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates


Processing table employees


  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd

Processing table categories


  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates


Processing table products


  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates


Processing table suppliers


  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates


Processing table orders


  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates


Processing table shippers


  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates


Processing table region


  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates


Processing table territories


  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates


Processing table employee_territories


  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates


Processing table order_details
Processing table us_states
Writing successfully executed.


  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
  pd.to_datetime(non_null_values, errors='raise')  # Raise error for invalid dates
