set up

In [1]:
import os
import sys
import json
import datetime
import psycopg2
import configparser

# Load config file
config = configparser.ConfigParser()
config.read('../config/config.conf')
configs = config['source']
path = config['path']

sys.path.append(config['path']['root'])

from utils import (validation as validate,
                   parsing as parse,
                   logging)


# Load configuration file (json format) for source database 
with open(configs['config_file']) as json_file:
    source = json.load(json_file)

Test connection

In [2]:
# Establish connection to the PostgreSQL database
try:
    cnx = psycopg2.connect(
        database=source['database'],
        user=source['user'],
        password=source['password'],
        host=source.get('db.host', 'localhost'),  # Use 'localhost' as default if 'db.host' is not set
        port=source.get('db.port', '5432')  # Use '5432' as default if 'db.port' is not set
    )
    cursor = cnx.cursor()

    # Test query to ensure connection is established
    cursor.execute("SELECT version();")
    record = cursor.fetchone()
    print("You are connected to - ", record, "\n")

except Exception as error:
    print("Error while connecting to PostgreSQL", error)
finally:
    # Close the cursor and connection
    if (cnx):
        cursor.close()
        cnx.close()
        print("PostgreSQL connection is closed")

You are connected to -  ('PostgreSQL 16.0 (Debian 16.0-1.pgdg120+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit',) 

PostgreSQL connection is closed


transform

In [26]:
import traceback
from utils import validation as validate, parsing as parse, logging


def process_file(filename, cnx):
    with open(filename, 'r') as f:
        for line in f:
            process_line(line, cnx)



def process_line(line, cnx):
    row = line.strip().split(',')
    try:
        if validate.layout(row) and validate.email(row[0]) and all(validate.date(row[i]) for i in [5, 6, 9]):
            insert_data(row, cnx)
        else:
            raise ValueError('Invalid email or date')
    except Exception as e:
        # Directly pass the list 'row' and the exception details to the logging function
        logging.cnx_error(row, str(e), type(e).__name__, e.__class__.__name__, traceback.format_exc(), cnx)


def insert_data(row, cnx):
    visitor = parse.visitor(row)
    statistics = parse.statistics(row)
    with cnx.cursor() as cursor:
        insert_visitor(visitor, cursor)
        insert_statistics(statistics, cursor)
        cnx.commit()

def insert_visitor(visitor, cursor):
    visitor_insert = """
    INSERT INTO visitor (email, fechaPrimeraVisita, fechaUltimaVisita, visitasTotales, visitasAnioActual, visitasMesActual)
    VALUES (%s, %s, %s, %s, %s, %s)
    """
    cursor.execute(visitor_insert, (
        visitor['email'],
        visitor['fechaPrimeraVisita'],
        visitor['fechaUltimaVisita'],
        visitor['visitasTotales'],
        visitor['visitasAnioActual'],
        visitor['visitasMesActual']
    ))

def insert_statistics(statistics, cursor):
    statistics_insert = """
    INSERT INTO statistics (email, jyv, Badmail, Baja, Fecha_envio, Fecha_open, Opens, Opens_virales, Fecha_click, Clicks, Clicks_virales, Links, IPs, Navegadores, Plataformas)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    cursor.execute(statistics_insert, (
        statistics['email'],
        statistics['jyv'],
        statistics['Badmail'],
        statistics['Baja'],
        statistics['Fecha envio'],
        statistics['Fecha open'],
        statistics['Opens'],
        statistics['Opens virales'],
        statistics['Fecha click'],
        statistics['Clicks'],
        statistics['Clicks virales'],
        statistics['Links'],
        statistics['IPs'],
        statistics['Navegadores'],
        statistics['Plataformas']
    ))

In [20]:
temp_dir = f"..{path['temp']}"

In [27]:
# Connect to Postgres database
cnx = psycopg2.connect(
    database=source['database'],
    user=source['user'],
    password=source['password'],
    host=source.get('host', 'localhost'),  # Use 'localhost' as default if not set
    port=source.get('port', '5432')  # Use '5432' as default if not set
)

# Usage
temp_dir = f"..{path['temp']}"
for filename in os.listdir(temp_dir):
    if filename.endswith('.txt'):
        full_path = os.path.join(temp_dir, filename)
        process_file(full_path, cnx)
        # Delete file once processed
        os.remove(full_path)

# Close Postgres connection
cnx.close()

AttributeError: 'list' object has no attribute 'get'