# **_Introduction To Data_**

This notebook has a **didactic** purpose, regarding the practical application of a simple **Data Engineering** process using **PostgreSQL** and **Python**.

## **Table of Contents**

1. [Database Connection](#1-Database-Connection)
2. [Load Data](#2-Load-Data)
3. [Modeling Data](#3-Modeling_Data)
4. [Verifiyng Data](#4-Verifiyng_Data)

## **Packages used in this notebook**

In [None]:
# Packages used in this notebook
import psycopg2
from psycopg2.extensions import cursor, connection
import pandas as pd
from pandas import DataFrame

## **1. Database Connection**

In [None]:
# Usually stored in environment variables for security and portability reasons
db_configs = {
    "user": "my_user",
    "password": "my_password",
    "dbname": "my_dbname",
    "host": "localhost"
}

In [None]:
def connect_with_database(**db_config: dict) -> tuple[connection, cursor]:
    """Connect with PostgreSQL database

    :param db_config: Dictionary or key arguments with database configurations
    :returns: A tuple with database connection and cursor 
    """

    try:
        conn = psycopg2.connect(**db_config)
        conn.set_session(autocommit=True)
        cur = conn.cursor()

        return conn, cur

    except psycopg2.Error as e:
        print(
            F"Could not establish connection to postgres database.\n{e}"
        )

In [None]:
# Connecting to the default database
conn, cur = connect_with_database(dbname="my_dbname", host="localhost",
                                  password="my_password", user="my_user")

In [None]:
# Creating sparkify database with UTF8 enconding
cur.execute("DROP DATABASE my_dbname")
cur.execute("CREATE DATABASE my_dbname")

# Commiting and closing the last connection
conn.commit()
conn.close()

In [None]:
# Connecting to the new sparkify database
conn, cur = connect_with_database(**db_configs)

## **2. Load data** 

In [None]:
accounts_data = pd.read_csv("data/Wealth-AccountData.csv")
accounts_data.head()

In [None]:
accounts_country = pd.read_csv("data/Wealth-AccountsCountry.csv")
accounts_country.head()

In [None]:
account_series = pd.read_csv("data/Wealth-AccountSeries.csv")
account_series.head()

### **2.1 Filtering data**

In [None]:
# Columns that will be filtered from dataframes

accounts_data_selected_columns = ["Country Name", "Country Code", "Series Name", "Series Code", 
                                  "1995 [YR1995]", "2000 [YR2000]", "2005 [YR2005]", "2010 [YR2010]"]

accounts_country_selected_columns = ["Code", "Short Name", "Table Name", "Long Name", "Currency Unit"]

accounts_series_selected_columns = ["Code", "Topic", "Indicator Name", "Long definition"]

In [None]:
accounts_country_filtered_columns = accounts_country.loc[0:12, accounts_country_selected_columns]
accounts_country_filtered_columns.head()

In [None]:
accounts_series_filtered_columns = account_series.loc[0:12, accounts_series_selected_columns]
accounts_series_filtered_columns.head()

In [None]:
accounts_data_filtered_columns = accounts_data.loc[0:12, accounts_data_selected_columns]
accounts_data_filtered_columns.head()

## **3. Modeling data**

In [None]:
def create_table(table_name: str, table_columns: str) -> None:
    """Create a table
    
    :param table_name: Name of the table to be created
    :param table_columns: Column names belonging to this table
    :returns: None
    """
    
    cur.execute(F"""
                 CREATE TABLE IF NOT EXISTS {table_name} ({table_columns})
                 """)
    
    conn.commit()

In [None]:
accountscountry_columns_with_type_definition = """
                                               country_code VARCHAR PRIMARY KEY,
                                               short_name VARCHAR,
                                               table_name VARCHAR,
                                               long_name VARCHAR,
                                               currency_unit VARCHAR
                                               """

In [None]:
create_table(table_name="accountscountry",
             table_columns=accountscountry_columns_with_type_definition)

In [None]:
accountsdata_columns_with_type_definition = """
                                            country_name VARCHAR,
                                            country_code VARCHAR,
                                            indicator_name VARCHAR,
                                            indicator_code VARCHAR,
                                            year_1995 numeric,
                                            year_2000 numeric,
                                            year_2005 numeric,
                                            year_2010 numeric
                                            """

In [None]:
create_table(table_name="accountsdata",
             table_columns=accountsdata_columns_with_type_definition)

In [None]:
accountsseries_columns_with_type_definition = """
                                              series_code VARCHAR,
                                              topic VARCHAR,
                                              indicator_name VARCHAR,
                                              short_definition VARCHAR
                                              """

In [None]:
create_table(table_name="accountsseries",
             table_columns=accountsseries_columns_with_type_definition)

### **3.1 Inserting data** 

In [None]:
def insert_in_table(table_name: str, columns: str,  df: DataFrame) -> None:
    """Insert Dataframe data into a table
    
    :param table_name: Name of the table where the data will be inserted
    :param columns: Columns of the table where the data will be inserted
    :param df: Dataframe where data will be imported from
    :returns: None
    """
    
    for _, row in df.iterrows():
        cur.execute(F"""
                     INSERT INTO {table_name} ({columns})
                     VALUES {tuple(row)};
                     """)

In [None]:
accountscountry_columns = accountscountry_columns_with_type_definition.replace(" VARCHAR", "") \
                                                                      .replace(" PRIMARY KEY", "")

insert_in_table(table_name="accountscountry",
                columns=accountscountry_columns,
                df=accounts_country_filtered_columns)

In [None]:
accountsdata_columns = accountsdata_columns_with_type_definition.replace(" VARCHAR", "") \
                                                                .replace(" numeric", "")

insert_in_table(table_name="accountsdata",
                columns=accountsdata_columns,
                df=accounts_data_filtered_columns)

In [None]:
accountsseries_columns = accountsseries_columns_with_type_definition.replace(" VARCHAR", "")


insert_in_table(table_name="accountsseries",
                columns=accountsseries_columns,
                df=accounts_series_filtered_columns)

## **4. Verifiyng data**

In [None]:
for table_name in ("AccountsCountry", "AccountsData", "AccountsSeries"):
    cur.execute(F"SELECT * FROM {table_name} LIMIT 5;")
    rows = cur.fetchall()
    
    print(table_name, *rows, sep="\n")