In [8]:
import pandas as pd
import numpy as np
from ydata_profiling import ProfileReport
import great_expectations as gx
from great_expectations.dataset import PandasDataset
import psycopg2
from sklearn.preprocessing import LabelEncoder
import sqlalchemy
from sqlalchemy import create_engine 

## Setting up variables and environment

In [9]:
context = gx.get_context()

In [10]:
alzheimers_df = gx.read_csv("Alzheimers.csv", sep=';', encoding='ISO-8859-1')
alzheimers_df.head(2)

Unnamed: 0,RowId,YearStart,YearEnd,LocationAbbr,LocationDesc,Datasource,Class,Topic,Question,Data_Value_Unit,...,Stratification2,Geolocation,ClassID,TopicID,QuestionID,LocationID,StratificationCategoryID1,StratificationID1,StratificationCategoryID2,StratificationID2
0,BRFSS~2022~2022~9004~Q03~TMC01~AGE~GENDER,2022,2022,WEST,West,BRFSS,Mental Health,Frequent mental distress,Percentage of older adults who are experiencin...,%,...,Female,,C05,TMC01,Q03,9004,AGE,5064,GENDER,FEMALE
1,BRFSS~2022~2022~9001~Q03~TMC01~AGE~GENDER,2022,2022,NRE,Northeast,BRFSS,Mental Health,Frequent mental distress,Percentage of older adults who are experiencin...,%,...,Male,,C05,TMC01,Q03,9001,AGE,5064,GENDER,MALE


## Bronze ready
The raw data is loaded and, therefore, the bronze level is achieved and may be uploaded

In [13]:
alzheimers_bronze_df = pd.DataFrame(alzheimers_df)

In [14]:
alzheimers_bronze_df.dtypes

RowId                          object
YearStart                       int64
YearEnd                         int64
LocationAbbr                   object
LocationDesc                   object
Datasource                     object
Class                          object
Topic                          object
Question                       object
Data_Value_Unit                object
DataValueTypeID                object
Data_Value_Type                object
Data_Value                    float64
Data_Value_Alt                float64
Data_Value_Footnote_Symbol     object
Data_Value_Footnote            object
Low_Confidence_Limit          float64
High_Confidence_Limit         float64
StratificationCategory1        object
Stratification1                object
StratificationCategory2        object
Stratification2                object
Geolocation                    object
ClassID                        object
TopicID                        object
QuestionID                     object
LocationID  

In [17]:
create_bronze_table = '''CREATE TABLE alzheimers_bronze_tb (
    RowId VARCHAR(250),
    YearStart BIGINT,
    YearEnd BIGINT,
    LocationAbbr VARCHAR(250),
    LocationDesc VARCHAR(250),
    Datasource VARCHAR(250),
    Class VARCHAR(250),
    Topic VARCHAR(250),
    Question VARCHAR(250),
    Data_Value_Unit VARCHAR(250),
    DataValueTypeID VARCHAR(250),
    Data_Value_Type VARCHAR(250),
    Data_Value DOUBLE PRECISION,
    Data_Value_Alt DOUBLE PRECISION,
    Data_Value_Footnote_Symbol VARCHAR(250),
    Data_Value_Footnote VARCHAR(250),
    Low_Confidence_Limit DOUBLE PRECISION,
    High_Confidence_Limit DOUBLE PRECISION,
    StratificationCategory1 VARCHAR(250),
    Stratification1 VARCHAR(250),
    StratificationCategory2 VARCHAR(250),
    Stratification2 VARCHAR(250),
    Geolocation VARCHAR(250),
    ClassID VARCHAR(250),
    TopicID VARCHAR(250),
    QuestionID VARCHAR(250),
    LocationID BIGINT,
    StratificationCategoryID1 VARCHAR(250),
    StratificationID1 VARCHAR(250),
    StratificationCategoryID2 VARCHAR(250),
    StratificationID2 VARCHAR(250)
    )'''

connection_url = 'postgresql://postgres:root@localhost:5432/alzheimers_db'

conn = psycopg2.connect(connection_url)
cursor = conn.cursor()

cursor.execute(create_bronze_table)

alzheimers_bronze_df.to_sql('alzheimers_bronze_tb', con=conn, if_exists='replace', index=False)

conn.close()

DatabaseError: Execution failed on sql 'SELECT name FROM sqlite_master WHERE type='table' AND name=?;': ERRO:  erro de sintaxe em ou próximo a ";"
LINE 1: ...ELECT name FROM sqlite_master WHERE type='table' AND name=?;
                                                                      ^


## Raw Data Profiling

In [None]:
bronze_gx_profile = alzheimers_df.profile
bronze_gx_profile

In [None]:
bronze_profile_report = ProfileReport(alzheimers_df)
bronze_profile_report

### Repeated columns with different names
Columns such as Class and ClassId are the same but with different names, for an OLAP architecture with a single table this isn't helpful

In [None]:
repeated_cols = ['ClassID','QuestionID','TopicID','Data_Value_Alt','Data_Value_Unit','QuestionID','LocationID','StratificationCategoryID1','StratificationID1','StratificationCategory2','StratificationID2','DataValueTypeID','Data_Value_Footnote_Symbol']
alzheimers_df = alzheimers_df.drop(labels=repeated_cols, axis = 1)
alzheimers_df.head(3)

### StratificationCategory 1 is constant
The column that defines the first stratification is only 'Age group'. Therefore, we may remove the column 'StratificationCategory1' and change the column name where the value of the first stratification is ('Stratification1') to age group.

In [None]:
alzheimers_df = alzheimers_df.drop(labels=['StratificationCategory1'], axis = 1)
alzheimers_df = alzheimers_df.rename(columns={'Stratification1':'age_group'})
alzheimers_df.head(3)

### Stratification 2 can be either 'Gender' or 'Race', we shall inscrease redundancy by separating a column for race and another for gender. This should make the data more usable

In [None]:
ethnicity_categories = ['White, non-Hispanic','Hispanic','Black', 'non-Hispanic','Native Am/Alaskan Native','Asian/Pacific Islander', 'overall']
gender_categories = ['Male', 'Female', 'overall']

alzheimers_df['gender'] = alzheimers_df['Stratification2'].replace(ethnicity_categories, 'overall')
alzheimers_df['ethnicity'] = alzheimers_df['Stratification2'].replace(gender_categories, 'overall')

alzheimers_df = alzheimers_df.drop(labels=['Stratification2', 'StratificationCategoryID2'], axis = 1)
alzheimers_df.head(3)

### The 'Datasource' column values are constant. It's wasteful to have a whole column repeating the same thing but that information is important, so we will add metadata

In [None]:
alzheimers_df.attrs = {'datasource': 'BRFSS'}
print(alzheimers_df.attrs)

In [None]:
alzheimers_df = alzheimers_df.drop(columns=['Datasource'], axis = 1)
alzheimers_df.head(3)

## Silver ready
Now we may validate and create the checkpoint

In [None]:
silver_gx_profile = alzheimers_df.profile
silver_gx_profile

In [None]:
silver_profile_report = ProfileReport(alzheimers_df)
silver_profile_report

In [None]:
alzheimers_silver_df = alzheimers_df.copy()
silver_validator = context.sources.add_pandas("alzheimers_silver_df_source").read_dataframe(alzheimers_silver_df,asset_name="silver_asset")

In [None]:
expected_columns = ['RowId','YearStart','YearEnd','LocationAbbr','LocationDesc','Class','Topic','Question','Data_Value_Type','Data_Value','Data_Value_Footnote','Low_Confidence_Limit','High_Confidence_Limit','age_group','Geolocation','gender','ethnicity']
silver_validator.expect_table_columns_to_match_ordered_list(expected_columns)

In [None]:
silver_validator.save_expectation_suite(discard_failed_expectations=False)

In [None]:
context.list_expectation_suite_names()

In [None]:
checkpoint = context.add_or_update_checkpoint(
    name = "silver_checkpoint",
    #batch_request = my_batch_request,
    expectation_suite_name = "silver_asset",
    validator = silver_validator
)

checkpoint_result = checkpoint.run()

In [None]:
alzheimers_silver_df.dtypes

In [None]:
create_silver_table = '''CREATE TABLE alzheimers_silver_tb (
    RowId VARCHAR(250),
    YearStart numeric,
    YearEnd numeric,
    LocationAbbr VARCHAR(250),
    LocationDesc VARCHAR(250),
    Class VARCHAR(250),
    Topic VARCHAR(250),
    Question VARCHAR(250),
    Data_Value_Type VARCHAR(250),
    Data_Value numeric,
    Data_Value_Footnote VARCHAR(250),
    Low_Confidence_Limit numeric,
    High_Confidence_Limit numeric,
    age_group VARCHAR(250),
    Geolocation VARCHAR(250),
    gender VARCHAR(250),
    ethnicity VARCHAR(250)
    );'''

connection_url = 'postgresql://postgres:root@localhost:5432/alzheimers_db'

conn = psycopg2.connect(connection_url)
cursor = conn.cursor()

cursor.execute(create_silver_table)

conn.close()

alzheimers_silver_df.to_sql('alzheimers_silver_tb', con=conn, if_exists='replace', index=False)

## Preparing data for gold

### The RowId is not unique to each row
For a row ID to serve it's purppose, it needs to uniquely identify each row

In [None]:
alzheimers_df['RowId'] = [x for x in range (len(alzheimers_df['RowId']))]

In [None]:
alzheimers_df.head(3)

### For the data to be useful, 'Data_Value' must not be null, so we will drop all rows with missing data value

In [None]:
alzheimers_df = alzheimers_df.dropna(subset=['Data_Value'])
print('Number of missing values in Data Value after transformaion: ',alzheimers_df['Data_Value'].isna().sum())

### It's possible to assure and expect age group to be category type

In [None]:
alzheimers_df['age_group'] = pd.Categorical(alzheimers_df['age_group'], categories=alzheimers_df['age_group'].unique())
alzheimers_df.head(3)

### Making Ethnicity and Gender into categories

In [None]:
alzheimers_df['gender'] = pd.Categorical(alzheimers_df['gender'], gender_categories)
alzheimers_df['ethnicity'] = pd.Categorical(alzheimers_df['ethnicity'], ethnicity_categories)

## Gold Ready
Time to validate and create the gold checkpoint

In [None]:
alzheimers_gold_df = alzheimers_df.copy()

gold_validator = context.sources.add_pandas("alzheimers_gold_df_source").read_dataframe(alzheimers_gold_df,asset_name="gold_asset")

In [None]:
gold_validator.expect_column_values_to_be_unique('RowId')
gold_validator.expect_column_values_to_not_be_null(column='Data_Value')
gold_validator.expect_column_values_to_be_between(column="Data_Value", min_value=0, max_value=100)
gold_validator.expect_column_values_to_be_between(column="Low_Confidence_Limit", min_value=0, max_value=100)
gold_validator.expect_column_values_to_be_between(column="High_Confidence_Limit", min_value=0, max_value=100)
gold_validator.expect_column_values_to_be_of_type(column='age_group', type_='CategoricalDtypeType')
gold_validator.expect_column_values_to_be_of_type(column='gender', type_='CategoricalDtypeType')
gold_validator.expect_column_values_to_be_of_type(column='ethnicity', type_='CategoricalDtypeType')

In [None]:
gold_validator.save_expectation_suite(discard_failed_expectations=False)

In [None]:
context.list_expectation_suite_names()

In [None]:
checkpoint = context.add_or_update_checkpoint(
    name = "gold_checkpoint",
    #batch_request = my_batch_request,
    expectation_suite_name = "gold_asset",
    validator = gold_validator
)

checkpoint_result = checkpoint.run()

In [None]:
alzheimers_gold_df.dtypes

In [None]:
create_gold_table = '''CREATE TABLE alzheimers_gold_tb (
    RowId numeric,
    YearStart numeric,
    YearEnd numeric,
    LocationAbbr VARCHAR(250),
    LocationDesc VARCHAR(250),
    Class VARCHAR(250),
    Topic VARCHAR(250),
    Question VARCHAR(250),
    Data_Value_Type VARCHAR(250),
    Data_Value numeric,
    Data_Value_Footnote VARCHAR(250),
    Low_Confidence_Limit numeric,
    High_Confidence_Limit numeric,
    age_group VARCHAR(250),
    Geolocation VARCHAR(250),
    gender VARCHAR(250),
    ethnicity VARCHAR(250)
    );'''

connection_url = 'postgresql://postgres:root@localhost:5432/alzheimers_db'

conn = psycopg2.connect(connection_url)
cursor = conn.cursor()

cursor.execute(create_gold_table)

conn.close()

alzheimers_gold_df.to_sql('alzheimers_gold_tb', con=conn, if_exists='replace', index=False)