In [1]:
#pip install Flask SQLAlchemy fastavro

In [2]:
#pip install --upgrade SQLAlchemy

In [3]:
#from flask import Flask, request, jsonify
from flask import Flask, render_template, request, jsonify, send_file
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy import text, inspect
import csv
import os
import datetime
import traceback
import fastavro

In [4]:
# Initialize SQLAlchemy engine
engine = create_engine('sqlite:///database.db', echo=False)
meta = MetaData()
Session = sessionmaker(bind=engine)
session = Session()

# Define database tables
employees = Table(
    'employees', meta,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    Column('datetime', DateTime),
    Column('department_id', Integer),
    Column('job_id', Integer)
)

departments = Table(
    'departments', meta,
    Column('id', Integer, primary_key=True),
    Column('department', String)
)

jobs = Table(
    'jobs', meta,
    Column('id', Integer, primary_key=True),
    Column('job', String)
)

# Drop existing tables and create new ones
meta.drop_all(engine, checkfirst=True)
meta.create_all(engine)

In [5]:
## Migrate initial Data
# Paths to CSV files
csv_files = {
    'hired_employees': ['id', 'name', 'datetime', 'department_id', 'job_id'],
    'departments': ['id', 'department'],
    'jobs': ['id', 'job']
}

# Function to parse datetime string into datetime object
def parse_datetime(datetime_str):
    return datetime.datetime.strptime(datetime_str, '%Y-%m-%dT%H:%M:%SZ')

# Function to read CSV files and insert data into the database
def migrate_data():
    for file, schema in csv_files.items():
        with open(file + '.csv', 'r') as csv_file:
            reader = csv.reader(csv_file)
            table_name = file
            table = None
            if table_name == 'hired_employees':
                table = employees
            elif table_name == 'departments':
                table = departments
            elif table_name == 'jobs':
                table = jobs
            if table is not None:  # Check if table exists

                # Parse datetime string if 'datetime' column exists in schema
                if 'datetime' in schema:
                    datetime_index = schema.index('datetime')
                    
                for row in reader:

                    # Validate row against schema format
                    if len(row) != len(schema):
                        print(f"Ignoring row ({table_name}): {row}. Number of columns doesn't match schema.")
                        continue
                        
                    # Convert hire_datetime string to datetime object
                    if 'datetime' in schema:
                        try:
                            row[datetime_index] = parse_datetime(row[datetime_index])
                        except ValueError:
                            print(f"Ignoring row ({table_name}): {row}. Invalid datetime format.")
                            continue

                    # Validate numeric fields
                    for index, value in enumerate(row):
                        if schema[index] in ['id', 'department_id', 'job_id']:  # Skip numeric fields that should be integers
                            try:
                                row[index] = float(value)  # Attempt to convert to float and update value in row
                            except ValueError:
                                print(f"Ignoring row ({table_name}): {row}. Non-numeric value found in field '{schema[index]}'.")
                                break  # Skip inserting this row
                    else:
                        # All validations passed, insert row into database
                        data = {column: value for column, value in zip(schema, row)}
                        session.execute(table.insert().values(**data))

                session.commit()

# Migrate data on application startup
migrate_data()

Ignoring row (hired_employees): [2.0, 'Ty Hofer', datetime.datetime(2021, 5, 30, 5, 43, 46), 8.0, '']. Non-numeric value found in field 'job_id'.
Ignoring row (hired_employees): [67.0, 'Thia Morican', datetime.datetime(2021, 3, 10, 19, 27, 10), '', '104']. Non-numeric value found in field 'department_id'.
Ignoring row (hired_employees): [84.0, 'Ludvig Norwood', datetime.datetime(2021, 2, 26, 18, 47, 53), 3.0, '']. Non-numeric value found in field 'job_id'.
Ignoring row (hired_employees): [87.0, 'Cirstoforo Martinetto', datetime.datetime(2021, 10, 15, 9, 19, 20), '', '84']. Non-numeric value found in field 'department_id'.
Ignoring row (hired_employees): [97.0, 'Beltran Natte', datetime.datetime(2021, 11, 1, 5, 12, 1), '', '67']. Non-numeric value found in field 'department_id'.
Ignoring row (hired_employees): [133.0, 'Jennine Wapol', datetime.datetime(2022, 1, 24, 14, 45, 57), '', '49']. Non-numeric value found in field 'department_id'.
Ignoring row (hired_employees): ['157', 'Kellia B

In [6]:
# Test loaded data with SQL
sql_query = text("SELECT * FROM employees")
result = session.execute(sql_query)
resultados = result.fetchall()

# Print the rows
print("\nHired Employees:")
for r in resultados:
    print(f"ID: {r.id}, Name: {r.name}")


Hired Employees:
ID: 1, Name: Harold Vogt
ID: 3, Name: Lyman Hadye
ID: 4, Name: Lotti Crowthe
ID: 5, Name: Gretna Lording
ID: 6, Name: Marlow Antecki
ID: 7, Name: Joan Rillett
ID: 8, Name: Ulrick Nucciotti
ID: 9, Name: Lucretia Northcote
ID: 10, Name: Arty Giacobo
ID: 11, Name: Libbi Dowtry
ID: 12, Name: Jacky Oldred
ID: 13, Name: Raine Mowett
ID: 14, Name: Melonie Slocomb
ID: 15, Name: Robers Swinden
ID: 16, Name: Bone Serridge
ID: 17, Name: Andee Tillot
ID: 18, Name: Gay Philbin
ID: 19, Name: Loralie Dundin
ID: 20, Name: Tobi Lawton
ID: 21, Name: Mandel Nayer
ID: 22, Name: Toddy Gare
ID: 23, Name: Jeremias Goudy
ID: 24, Name: Lemar Aronsohn
ID: 25, Name: Tanner Jopling
ID: 26, Name: Bartolomeo Fetherstone
ID: 27, Name: Elvina Bycraft
ID: 28, Name: Letitia Bastin
ID: 29, Name: Warner Astbury
ID: 30, Name: Geri Pennings
ID: 31, Name: Jerald Gilder
ID: 32, Name: Wilt Di Francesco
ID: 33, Name: Aldwin Hellmore
ID: 34, Name: Corena Heritege
ID: 35, Name: Malchy Pontain
ID: 36, Name: Gasp

In [7]:
# Test loaded data with SQL
sql_query = text("SELECT * FROM jobs")
result = session.execute(sql_query)
resultados = result.fetchall()

# Print the rows
print("\nJobs:")
for r in resultados:
    print(f"ID: {r.id}, Job: {r.job}")


Jobs:
ID: 1, Job: Marketing Assistant
ID: 2, Job: VP Sales
ID: 3, Job: Biostatistician IV
ID: 4, Job: Account Representative II
ID: 5, Job: VP Marketing
ID: 6, Job: Environmental Specialist
ID: 7, Job: Software Consultant
ID: 8, Job: Office Assistant III
ID: 9, Job: Information Systems Manager
ID: 10, Job: Desktop Support Technician
ID: 11, Job: Financial Advisor
ID: 12, Job: Computer Systems Analyst I
ID: 13, Job: Automation Specialist IV
ID: 14, Job: Help Desk Technician
ID: 15, Job: Office Assistant II
ID: 16, Job: VP Quality Control
ID: 17, Job: Office Assistant IV
ID: 18, Job: Financial Analyst
ID: 19, Job: Electrical Engineer
ID: 20, Job: Chemical Engineer
ID: 21, Job: Social Worker
ID: 22, Job: VP Product Management
ID: 23, Job: Administrative Officer
ID: 24, Job: Paralegal
ID: 25, Job: Actuary
ID: 26, Job: Database Administrator I
ID: 27, Job: Nuclear Power Engineer
ID: 28, Job: Database Administrator II
ID: 29, Job: GIS Technical Architect
ID: 30, Job: Human Resources Assista

In [15]:
# Initialize Flask application
app = Flask(__name__)

template_folder = 'C:/Users/onico/OneDrive/Escritorio/Desafio/templates'
schemas = {
    'employees': ['id', 'name', 'datetime', 'department_id', 'job_id'],
    'departments': ['id', 'department'],
    'jobs': ['id', 'job']
}
tables = list(schemas.keys())
@app.route('/')
def index():
    return 'Welcome to my Flask application!'

# Menu for Backup and Restore features
#@app.route('/')
#def index():
#    return render_template('menu.html', tables=tables)

# Get table by name
def get_table_by_name(table_name):
    table = None
    if table_name == 'employees':
        table = employees
    elif table_name == 'departments':
        table = departments
    elif table_name == 'jobs':
        table = jobs
    return table

# Function to validate data dictionary rules
def validate_data(row, table_name):
    schema = schemas[table_name]
    skipped_reasons = []

    # Verify same number of fields
    if len(row) != len(schema):
        skipped_reasons.append(f"Number of columns doesn't match schema.")
        return None, skipped_reasons
                    
    # Verify all schema fields are in the row
    for campo in schema:
        if not row.get(campo):
            skipped_reasons.append(f"Missing field {campo}.")
            return None, skipped_reasons
    
    # Convert hire_datetime string to datetime object
    if 'datetime' in schema:
        try:
            row['datetime'] = parse_datetime(row['datetime'])
        except ValueError:
            skipped_reasons.append("Invalid datetime format.")
            return None, skipped_reasons

    # Validate numeric fields
    for campo in ['id', 'department_id', 'job_id']:
        if campo in schema:
            try:
                row[campo] = float(row[campo])
            except ValueError:
                skipped_reasons.append(f"Non-numeric value found in field {campo}.")
                return None, skipped_reasons

    return row, None


# REST API endpoint to receive new data
@app.route('/insert_data/<table_name>', methods=['POST'])
def insert_data(table_name):
    if table_name not in tables:
        return jsonify({'error': 'Table not found'}), 404

    data = request.json
    if not isinstance(data, list):
        return jsonify({'error': 'Data must be a list of dictionaries'}), 400

    valid_data = []
    skipped_data = {}

    for row in data:
        validated_row, skipped_reasons = validate_data(row, table_name)
        if validated_row:
            valid_data.append(validated_row)
        else:
            skipped_data[str(row)] = skipped_reasons

    if valid_data:
        table = get_table_by_name(table_name)

        try:
            with engine.begin() as conn:
                conn.execute(table.insert(), valid_data)
                conn.commit()  # Commit changes to the database
            return jsonify({'message': 'Data inserted successfully', 'skipped_rows': skipped_data}), 201
        except IntegrityError:
            return jsonify({'error': 'Integrity constraint violation'}), 400
    else:
        return jsonify({'error': 'All rows skipped', 'skipped_rows': skipped_data}), 400


# Backup feature
@app.route('/backup/<table_name>', methods=['POST'])
def backup_table(table_name):
    
    if table_name not in tables:
        return jsonify({'error': 'Table not found'}), 404

    data = request.json
    if not isinstance(data, dict):
        return jsonify({'error': 'Data must be a dictionary like {"name": "BackupName"}'}), 400

    backup_name = data.get('name')
    if not backup_name:
        return jsonify({'error': 'Missing or empty "name" field in the request body'}), 400
    
    if not isinstance(backup_name, str):
        return jsonify({'error': 'The value associated with the "name" field must be a string'}), 400
    
    table = get_table_by_name(table_name)

    if table is not None:
        # Get table metadata using SQLAlchemy's inspect function
        inspector = inspect(engine)
        columns = inspector.get_columns(table_name)

        # Generate AVRO schema based on table structure
        schema = {
            "type": "record",
            "name": table_name,
            "fields": [
                {"name": col['name'], "type": "string" if col['type'].python_type == str or col['type'].python_type == datetime.datetime else col['type'].python_type.__name__} for col in columns
            ]
        }

        # Retrieve data from the table
        results = session.query(table).all()

        # Convert SQLAlchemy results to list of dictionaries
        data = []
        for row in results:
            row_dict = {}
            for column in table.columns:
                # Convert datetime object to string
                if isinstance(getattr(row, column.name), datetime.datetime):
                    row_dict[column.name] = getattr(row, column.name).strftime('%Y-%m-%dT%H:%M:%SZ')
                else:
                    row_dict[column.name] = getattr(row, column.name)
            data.append(row_dict)

        # Specify the file path for the backup
        file_path = f'backup/{table_name}_{backup_name}_backup.avro'

        # Write data to AVRO file
        with open(file_path, 'wb') as f:
            fastavro.writer(f, schema, data)

        return jsonify({'message': f'Backup created successfully for table {table_name}', 'file_path': file_path}), 201
    else:
        return jsonify({'error': 'Table not found'}), 404


# Restore feature
@app.route('/restore/<table_name>', methods=['POST'])
def restore_table(table_name):
    if table_name not in tables:
        return jsonify({'error': 'Table not found'}), 404

    data = request.json
    if not isinstance(data, dict):
        return jsonify({'error': 'Data must be a dictionary like {"name": "BackupName"}'}), 400

    backup_name = data.get('name')
    if not backup_name:
        return jsonify({'error': 'Missing or empty "name" field in the request body'}), 400
    
    if not isinstance(backup_name, str):
        return jsonify({'error': 'The value associated with the "name" field must be a string'}), 400

    # Specify the file path for the backup
    file_path = f'backup/{table_name}_{backup_name}_backup.avro'

    try:
        with open(file_path, 'rb') as f:
            avro_reader = fastavro.reader(f)
            writer_schema = avro_reader.writer_schema

            # Read data from AVRO file into a list
            avro_data = list(avro_reader)

            # Validate schema against table schema
            table = get_table_by_name(table_name)
            
            expected_columns = [col['name'] for col in writer_schema['fields']]
            actual_columns = [col.name for col in table.columns]
            if expected_columns != actual_columns:
                return jsonify({'error': 'Failed to restore backup. Differences found in the schema of the backup and the table data.'}), 400

            # Convert datetime strings back to datetime objects
            if 'datetime' in expected_columns:
                for row in avro_data:
                    if 'datetime' in row:
                        row['datetime'] = parse_datetime(row['datetime'])
                            
            # Clear existing data in the table
            try:
                with engine.begin() as conn:
                    conn.execute(table.delete())
            except Exception as e:
                return jsonify({'error': f'Failed to restore backup. Could not delete the existing table. Error: {str(e)}'}), 400

            # Insert restored data into the table
            try:
                f.seek(0)
                with engine.begin() as conn:
                    conn.execute(table.insert(), avro_data)
                    conn.commit()  # Commit changes to the database
            except Exception as e:
                return jsonify({'error': f'Failed to restore backup. Could not load avro backup data to the existing table. Error: {str(e)}'}), 400

            return jsonify({'message': f'Data restored successfully for table {table_name} using the backup: {backup_name}'}), 200
    except FileNotFoundError:
        return jsonify({'error': f'Backup avro file not found in path {file_path}.'}), 404
    except Exception as e:
        return jsonify({'error': f'An error occurred while restoring backup: {str(e)}'}), 500


# Run App
if __name__ == '__main__':
    app.run(debug=False)

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
Press CTRL+C to quit
127.0.0.1 - - [27/Mar/2024 13:56:21] "POST /restore/employees HTTP/1.1" 200 -


### Revisemos si se insertó la data

In [16]:
# Test loaded data with SQL
sql_query = text("SELECT * FROM employees WHERE id > 1995")
result = session.execute(sql_query)
resultados = result.fetchall()

# Print the rows
print("\nHired Employees:")
for r in resultados:
    row_data = " | ".join(f"{column}: {value}" for column, value in zip(result.keys(), r))
    print(row_data)


Hired Employees:
id: 1996 | name: Cirstoforo Youings | datetime: 2021-04-01 17:48:42.000000 | department_id: 3 | job_id: 23
id: 1997 | name: Wilek Yurkevich | datetime: 2021-05-04 12:19:50.000000 | department_id: 2 | job_id: 132
id: 1998 | name: Jerry Yven | datetime: 2021-10-03 14:12:50.000000 | department_id: 7 | job_id: 100
id: 1999 | name: Jerri Zebedee | datetime: 2022-01-18 10:47:37.000000 | department_id: 8 | job_id: 80
id: 2011 | name: John Doe | datetime: 2024-03-26 12:00:00.000000 | department_id: 1 | job_id: 202
id: 2012 | name: Jane Smith | datetime: 2024-03-26 13:00:00.000000 | department_id: 2 | job_id: 203
