# Prueba Téncnica - ArkonData
## Data Engineer Jr
### René García Bernal

1. Cree una base de datos local con los conjuntos de datos proporcionados,  
puede utilizar la herramienta de gestión y administración de base de datos de su preferencia,  
queda a su consideración la gestión de esta base.

In [None]:
#Create logger function decorator to monitor all the other proccesses
import logging
from functools import wraps

# Configure the logging system
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

# Define the logging decorator
def logger(func):
    @wraps(func)  # Preserve the original function's metadata (name, docstring)
    def wrapper(*args, **kwargs):
        logging.info(f"Started executing '{func.__name__}' with arguments {args} and {kwargs}")
        try:
            result = func(*args, **kwargs)
            logging.info(f"Successfully finished '{func.__name__}' with result: {result}")
            return result
        except Exception as e:
            logging.error(f"Error in '{func.__name__}': {e}", exc_info=True)
            raise  # Re-raise the exception after logging
        finally:
            logging.info(f"Finished execution of '{func.__name__}'")
    return wrapper

In [None]:
from sqlalchemy import create_engine, text, update, and_
from sqlalchemy.schema import CreateTable, CreateSchema, DropSchema, DropTable, Table, Column, MetaData
from sqlalchemy.types import *
import pandas as pd

class Connector:

    def __init__(self,credentials):
        self._host = credentials["host"]
        self._port = credentials["port"]
        self._user = credentials["user"]
        self._password = credentials["password"]
        self._database = credentials["database"]
        self.schema = credentials["schema"]
        self._dialect = credentials["dialect"]
        self._driver = credentials["driver"]
        self._engine = self._create_engine()

    def __str__(self):
        return f"host: {self._host}. port: {self._port}. user: {self._user}. database: {self._database}."

    def _create_engine(self):
        url = (
            f"{self._dialect}+{self._driver}://{self._user}:"
            f"{self._password}@{self._host}:{self._port}/"
            f"{self._database}"
        )
        return create_engine(url)
    
    @logger
    def test_conection(self):
        try:
            conn = self._engine.connect()
            conn.close()
            return f"Connectio to database {self._database} successful"
        except Exception as e:
            return f"Connection failed: {e}"
    
    @logger
    def create_schema(self,schema):
        try:
            with self._engine.connect() as conn:
                stmt = CreateSchema(schema,if_not_exists=True)
                with conn.begin():
                    conn.execute(stmt)
            return stmt
        except Exception as e:
            return f"Exception: {e}"
        return None
    
    @logger
    def create_table(self,table_name,table_deff):
        """
        Table will be created in to the schema self._schema
        table_deff should be of dict type with the form
        key = column name
        value = data type and nullable constraint
        example: columns = {
            "Column1":{"type": INTEGER, "nullable": false},
            "Column2":{"type": VARCHAR, "nullable": false},
            "Column3":{"type": VARCHAR, "nullable": false},
            "Column4":{"type": VARCHAR, "nullable": true}
        } 
        dataframe is a pandas dataframe
        """
        try:
            with self._engine.connect() as conn:
                if isinstance(table_deff,pd.DataFrame):
                    table_deff.to_sql(table_name,conn,schema=self.schema,if_exists="replace",index=False)
                    
                elif isinstance(table_deff,dict):
                    columns_list = self._create_columns(table_deff)
                    metadata = MetaData(self.schema)
                    table = Table(table_name,metadata,*columns_list,schema=self.schema)
                    stmt = CreateTable(table,if_not_exists=True)                    
                    with conn.begin():
                        conn.execute(stmt)
            return "Finished without errors"
        except Exception as e:
            return f"Exception: {e}"

    @logger
    def drop_schema(self,schema):
        try:
            with self._engine.connect() as conn:
                stmt = DropSchema(schema,if_exists=True)
                with conn.begin():
                    conn.execute(stmt)
                return stmt
        except Exception as e:
            return f"Exception: {e}"
        return None
    
    @logger
    def drop_table(self,table_name):
        try:
            with self._engine.connect() as conn:
                table1meta = MetaData()
                table = Table(table_name, table1meta, autoload_with=self._engine)
                stmt = DropTable(table,if_exists=True)
                with conn.begin():
                    conn.execute(stmt)
                return stmt
        except Exception as e:
            return f"Exception: {e}"

    @logger    
    def execute_sql(self,sqlquery):
        try:
            result = []
            with self._engine.connect() as conn:
                stmt = text(sqlquery)
                conn.begin()
                cursor = conn.execute(stmt)
                conn.commit()
                result.append(tuple(cursor.keys()))
                for row in cursor:
                    result.append(row)
            return result
        except Exception as e:
            print(f"Exception: {e}")

        return None

    @logger
    def update(self,schema,table_nm,where,values):
        try:
            with self._engine.connect() as conn:
                table1meta = MetaData(schema)
                table = Table(table_nm, table1meta, autoload_with=conn)
                where_clause = and_(*[table.c[key] == value for key, value in where.items()])
                stmt = update(table).where(where_clause).values(values)
                conn.execute(stmt)
                conn.commit()
                return stmt
        except Exception as e:
            return f"Error in update statmenet: {e}"

    def get_connection(self):
        return self._engine.connect()
    
    def _create_columns(self,column_deff):
        columns = [
            Column(key,value['type'],nullable=value['nullable'])
            for key, value in column_deff.items()
        ]

        return columns

In [None]:
import configparser as cnf

config = cnf.ConfigParser()
config.read('database_credentials.ini')

connection = Connector(config["postgresql"])
connection.test_conection()


In [None]:
#Creates schema to store raw data
stg_schema = "staging"
connection.create_schema(stg_schema)

In [None]:
import pandas as pd

#Creates pandas dataframe from csv file 
data1 = pd.read_csv(filepath_or_buffer='datasets/Data1.csv',header=0)

#Creates a pandas dataframe from parquet file
data2 = pd.read_parquet(path='datasets/data2.parquet',engine='pyarrow')

#Explore data, column names, data types, row count.
print(f"Headers from csv file: {data1.columns}",f"row count: {data1.shape[0]}")

print(f"Headers from parquet file: {data2.columns}",f"row count: {data2.shape[0]}")

In [None]:
#Create tables from files into the database 
table_csv = "stg_starwars_csv"
table_parquet = "stg_starwars_prqt"
table_union = "starwars_union"
connection.schema = stg_schema
connection.create_table(table_csv,data1)
connection.create_table(table_parquet,data2)

In [None]:
#Delete this block of code
data1.to_sql("stg_starwars_csv",connection.get_connection(),if_exists="replace",index=False)
data2.to_sql("stg_starwars_parquet",connection.get_connection(),if_exists="replace",index=False)

2. Cree una tercera tabla en dónde unifique ambos conjuntos de datos,  
asegúrese de no tener duplicados

In [None]:
#Create a new schema with clean data
rfnd_schema = "refined"
connection.create_schema(rfnd_schema)
connection.schema = rfnd_schema

data_union = pd.concat([data1,data2],ignore_index=True,sort=False)
data_union.drop_duplicates(keep='first',inplace=True)
data_union.duplicated()

connection.create_table(table_union,data_union)

3. Generar una consulta con los nombres duplicados y la cantidad de veces que se repiten.

In [None]:
#Solution with pandas
sql_query = (
    "SELECT name, count(*)"    
    f"FROM ( SELECT name FROM {stg_schema}.{table_csv} UNION ALL " 
    f"SELECT name FROM {stg_schema}.{table_parquet}) t "
    "GROUP BY name HAVING COUNT(name) > 1;"
)

repeated_names = pd.read_sql_query(sql_query,connection.get_connection())

print(repeated_names)



In [None]:
#Solution with Connector object
sql_query = (
    "SELECT name, count(*)"    
    f"FROM ( SELECT name FROM {stg_schema}.{table_csv} UNION ALL " 
    f"SELECT name FROM {stg_schema}.{table_parquet}) t "
    "GROUP BY name HAVING COUNT(name) > 1;"
)

repeated_names = connection.execute_sql(sql_query)
print(repeated_names)

4. Genere una consulta SQL que devuelva los nombres de las personas cuyo height esté entre 180 y 190,  
cuyo gender sea male y cuyo hair_color sea diferente de none o cualquier valor null.

In [None]:
#Solution with pandas
sql_query = (
    "SELECT name "    
    f"FROM {rfnd_schema}.{table_union} "
    "WHERE height >= 180 and height <= 190 " 
    "AND gender = 'masculine' "
    "AND (hair_color is not null or hair_color not in ('','null','None','none'));"
)

names_filtered_by_hieght = pd.read_sql_query(sql_query,connection.get_connection())

print(names_filtered_by_hieght)

In [None]:
#Solution with connector object
sql_query = (
    "SELECT name "    
    f"FROM {rfnd_schema}.{table_union} "
    "WHERE height >= 180 and height <= 190 " 
    "AND gender = 'masculine' "
    "AND (hair_color is not null or hair_color not in ('','null','None','none'));"
)

names_filtered_by_hieght = connection.execute_sql(sql_query)
print(repeated_names)

5. Escribir una consulta SQL que genera una columna de bandera (flag),  
donde se asigna el valor booleano o integer 1 si el mass es superior al promedio,  
y 0 si el mass es menor o igual al promedio.

In [None]:
#Solution wiht Connector object
sql_query = (
    f"SELECT *, CASE WHEN mass > (SELECT AVG(mass) FROM {rfnd_schema}.{table_union}) "
    "THEN 1 ELSE 0 END AS flag "    
    f"FROM {rfnd_schema}.{table_union};"
)

flag = pd.read_sql_query(sql_query,connection.get_connection())

print(flag)

In [None]:
sql_query = (
    f"SELECT *, CASE WHEN mass > (SELECT AVG(mass) FROM {rfnd_schema}.{table_union}) "
    "THEN 1 ELSE 0 END AS flag "    
    f"FROM {rfnd_schema}.{table_union};"
)

flag = connection.execute_sql(sql_query)
print(flag)

6. Cree una tabla en dónde inserte los valores únicos de la columna Starships.

In [None]:
tble_name = "starships"
starships = data_union["starships"]
starships.drop_duplicates(keep='first',inplace=True)
starships.duplicated()
starships.to_sql(tble_name,connection.get_connection(),schema=rfnd_schema,if_exists="replace",index=False)

In [None]:
#Solution with Connector object
connection.schema = rfnd_schema
connection.create_table(tble_name,starships)

In [None]:
sql_query = f"SELECT * FROM {rfnd_schema}.{tble_name}"
unique_starships = connection.execute_sql(sql_query)
print(unique_starships)

7. Actualiza los registros del tercer dataset (inciso/indicación 2)  
actualizando el campo Starships con el valor Slave I para todos los valores del campo name iguales a Jango Fett.

In [None]:
table_nm = table_union
values = {"starships":"Slave I"}
where = {"name":"Jango Fett"}
connection.update(table_nm=table_nm,schema=rfnd_schema,where=where,values=values)

8. Generar una consulta en dónde muestre el conteo de registros agrupados por las columnas Skin_color, eye_color.

In [None]:
sql_query = (
    "SELECT skin_color, eye_color, count(*) "
    f"FROM {rfnd_schema}.{table_union} "
    "GROUP BY skin_color, eye_color;"
)

skin_eye_color = pd.read_sql_query(sql_query,connection.get_connection())
print(skin_eye_color)

In [None]:
sql_query = (
    "SELECT skin_color, eye_color, count(*) "
    f"FROM {rfnd_schema}.{table_union} "
    "GROUP BY skin_color, eye_color;"
)

skin_eye_color = connection.execute_sql(sql_query)
print(skin_eye_color)

9. Escribir una consulta SQL para calcular la altura promedio, la altura máxima y la altura mínima por cada especie (species).

In [None]:
sql_query = (
    "SELECT species, AVG(height) avg_height, MAX(height) max_height, MIN(height) min_height "
    f"FROM {rfnd_schema}.{table_union} "
    "GROUP BY species;"
)

height_metrics = pd.read_sql_query(sql_query,connection.get_connection())
print(height_metrics)

In [None]:
#Solution wiht Connector object
sql_query = (
    "SELECT species, AVG(height) avg_height, MAX(height) max_height, MIN(height) min_height "
    f"FROM {rfnd_schema}.{table_union} "
    "GROUP BY species;"
)

height_metrics = connection.execute_sql(sql_query)
print(height_metrics)