# Introduction to Data Engineering

In [1]:
import pandas as pd
from sqlalchemy import create_engine
db_engine = create_engine('postgresql+psycopg2://adrik:root1234@localhost:5432/adrik')
table_names = db_engine.table_names()
table_names

  table_names = db_engine.table_names()


['categories',
 'countries',
 'businesses',
 'international_debt',
 'game_sales',
 'reviews',
 'top_critic_years',
 'top_critic_years_more_than_four_games',
 'top_user_years_more_than_four_games',
 'courses',
 'rating']

In [2]:
data = pd.read_sql("""
SELECT "country_code","country", "continent" FROM "countries"
ORDER BY "country"
""", db_engine)


data.head()

Unnamed: 0,country_code,country,continent
0,AFG,Afghanistan,Asia
1,ALB,Albania,Europe
2,DZA,Algeria,Africa
3,AND,Andorra,Europe
4,AGO,Angola,Africa


In [3]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 195 entries, 0 to 194
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   country_code  195 non-null    object
 1   country       195 non-null    object
 2   continent     195 non-null    object
dtypes: object(3)
memory usage: 4.7+ KB


In [4]:
data = pd.read_sql("""
SELECT * FROM "countries"
INNER JOIN "businesses"
ON "countries"."country_code"="businesses"."country_code"
""", db_engine)
data.head()

Unnamed: 0,country_code,country,continent,business,year_founded,category_code,country_code.1
0,DZA,Algeria,Africa,Hamoud Boualem,1878,CAT11,DZA
1,BEN,Benin,Africa,Communauté Électrique du Bénin,1968,CAT10,BEN
2,BWA,Botswana,Africa,Botswana Meat Commission,1965,CAT1,BWA
3,BFA,Burkina Faso,Africa,Air Burkina,1967,CAT2,BFA
4,BDI,Burundi,Africa,Brarudi,1955,CAT9,BDI


## EXTRACT

In [6]:
# Function to extract table to a pandas DataFrame
def extract_table_to_pandas(tablename, db_engine):
    query = "SELECT * FROM {}".format(tablename)
    return pd.read_sql(query, db_engine)

# Connect to the database using the connection URI
connection_uri = "postgresql+psycopg2://adrik:root1234@localhost:5432/adrik" 
db_engine = create_engine(connection_uri)

In [9]:
# Extract the film table into a pandas DataFrame
categories = extract_table_to_pandas("categories", db_engine)
categories.head()

Unnamed: 0,category_code,category
0,CAT1,Agriculture
1,CAT2,Aviation & Transport
2,CAT3,Banking & Finance
3,CAT4,"Cafés, Restaurants & Bars"
4,CAT5,Conglomerate


In [8]:
# Extract the customer table into a pandas DataFrame
extract_table_to_pandas("countries", db_engine)

Unnamed: 0,country_code,country,continent
0,AFG,Afghanistan,Asia
1,AGO,Angola,Africa
2,ALB,Albania,Europe
3,AND,Andorra,Europe
4,ARE,United Arab Emirates,Asia
...,...,...,...
190,YEM,Yemen,Asia
191,ZAF,South Africa,Africa
192,ZMB,Zambia,Africa
193,ZWE,Zimbabwe,Africa


## TRANSFORM
### Splitting columns

In [14]:
# Get the rental rate column as a string
category_code = categories.category_code.astype("str")

# Split up and expand the column
category_code_expanded = category_code.str.split("(\d+)", expand=True)

# Assign the columns
categories_df = categories.assign(
    CAT=category_code_expanded[0],
    NUMBER=category_code_expanded[1],
)

In [15]:
categories_df.head()

Unnamed: 0,category_code,category,CAT,NUMBER
0,CAT1,Agriculture,CAT,1
1,CAT2,Aviation & Transport,CAT,2
2,CAT3,Banking & Finance,CAT,3
3,CAT4,"Cafés, Restaurants & Bars",CAT,4
4,CAT5,Conglomerate,CAT,5


## LOAD

In [16]:
game_sales = extract_table_to_pandas("game_sales", db_engine)
game_sales.head()

Unnamed: 0,game,platform,publisher,developer,games_sold,year
0,7 Days to Die for PC,PC,The Fun Pimps,The Fun Pimps,4.18,2013
1,ARK: Survival Evolved for PC,PC,Studio Wildcard,Studio Wildcard,4.5,2015
2,Age of Empires II: HD Edition for PC,PC,Microsoft Studios,Hidden Path Entertainment,5.82,2013
3,Animal Crossing: City Folk for Wii,Wii,Nintendo,Nintendo EAD,4.32,2008
4,Animal Crossing: New Horizons for NS,NS,Nintendo,Nintendo,13.41,2020


In [30]:
import sqlalchemy

connection_uri = "postgresql+psycopg2://adrik:root1234@localhost:5432/adrik" 
db_engine_dwh = sqlalchemy.create_engine(connection_uri)

# Finish the .to_sql() call to write to store.film
categories_df.to_sql("categories_df", db_engine_dwh, schema="public", if_exists="replace")

# Run the query to fetch the data
pd.read_sql("SELECT * FROM public.categories_df", db_engine_dwh)

Unnamed: 0,index,category_code,category,CAT,NUMBER
0,0,CAT1,Agriculture,CAT,1
1,1,CAT2,Aviation & Transport,CAT,2
2,2,CAT3,Banking & Finance,CAT,3
3,3,CAT4,"Cafés, Restaurants & Bars",CAT,4
4,4,CAT5,Conglomerate,CAT,5
5,5,CAT6,Construction,CAT,6
6,6,CAT7,Consumer Goods,CAT,7
7,7,CAT8,Defense,CAT,8
8,8,CAT9,"Distillers, Vintners, & Breweries",CAT,9
9,9,CAT10,Energy,CAT,10


# Automatization

In [31]:
# Function to extract table to a pandas DataFrame
def extract_table_to_pandas(tablename, db_engine):
    query = "SELECT * FROM {}".format(tablename)
    return pd.read_sql(query, db_engine)

In [32]:
def split_columns_transform(df, column, pat, suffixes):
    # Get the rental rate column as a string
    new_columns = df.column.astype("str")
    # Split up and expand the column
    column_expanded = new_columns.str.split("(\d+)", expand=True)
    # Assign the columns
    new_df = df.assign(
    column1=new_columns[0],
    column2=new_columns[1],
    )

In [33]:
def load_df_into_dwh(df, tablename, schema, db_engine):
    return pd.to_sql(tablename, db_engine, schema=schema, if_exists="replace")

In [34]:
def etl():
    # Extract
    df = extract_table_to_pandas("table", db_engine["public"])
    # Transform
    df = split_columns_transform(df, "rental_rate", ".", ["_cat", "_cents"])
    # Load
    load_df_into_dwh(df, "table", "public", db_engine["dwh"])