### ETL to Dashboard: Using Python, PostgreSQL and Power BI to create a Covid Dashboard using up to date data.

This project aims to showcase my skills to create a Covid Dashboard using data from the ECDE - European Centre for Disease Prevention and Control. 

In [29]:
#Import relevant libraries
import pandas as pd
import requests
import psycopg2
from sqlalchemy import create_engine

In [30]:
#Setting up the postgres database connection
engine = create_engine('postgresql+psycopg2://postgres:postgres@localhost:5432/CaseDB')

In [31]:
################# Getting Data and converting to Dataframe

def JsonUrlToDf(url):
    return pd.DataFrame(requests.get(url).json())

def CsvToDf(url):
    return pd.DataFrame(pd.read_csv(url))

def ExcelToDf(url):
    return pd.DataFrame(pd.read_excel(url))

################# SQL related functions

#SQL database connection

def DBInfo(dbengine,TableName,SchemaName,IfExists):
    return [dbengine,TableName,SchemaName,IfExists]

#Send Export DataFrame    

def ToSql(data,args):
   data.to_sql(args[1],con=args[0],schema=args[2],if_exists=args[3],index=False)

def CovidDataToSql(data,args=[engine,"CovidData","CaseSchema","replace"]):
   data.to_sql(args[1],con=args[0],schema=args[2],if_exists=args[3],index=False)

def CountriesDataToSql(data,args=[engine,"CountriesData","CaseSchema","replace"]):
   data.to_sql(args[1],con=args[0],schema=args[2],if_exists=args[3],index=False)

def QuerySql(sql,dbengine):
    return pd.read_sql(sql,con=dbengine)
    
#Query data and return as DataFrame

def FromSqltoDf(data,arg):
    return pd.DataFrame(pd.read_sql(data,con=arg))

################# Cleaning Data

def CleaningCovidData(data):
    return data.drop(['source','country_code','note'],axis=1)

def CleaningCountriesData(data):
    data.rename(columns={"Country":"country"},inplace=True)
    data['country'] = data['country'].astype(str).str[0:-1]
    return data

################# Pipelines

def PipelineCovidDataToSql(data):
    return (JsonUrlToDf(data)
    .pipe(CleaningCovidData)
    .pipe(CovidDataToSql)
    )

def PipelineCovidDataToDf(data):
    return (JsonUrlToDf(data)
    .pipe(CleaningCovidData)
    )

def PipelineCountriesDataToSql(data):
    return (CsvToDf(data)
    .pipe(CleaningCountriesData)
    .pipe(CountriesDataToSql)
    )  

def ReturnNonDuplicatesDf(data1,data2):
    return pd.concat([data1,data2]).drop_duplicates(keep=False)

## Preparing data to be used in Power BI
def VisualData(data,arg):
    return FromSqltoDf(data,arg).pivot_table(index=['country','continent','population'],columns='indicator',values=['IndicatorCountPer100k','cumulative_count']).reset_index().set_axis(['Country','Continent','Population','CasesPer100k','DeathsPer100k','TotalCases','TotalDeaths'],axis=1)

In [32]:
#Create Table in Postgres with Covid Data from url with json format
PipelineCovidDataToSql("https://opendata.ecdc.europa.eu/covid19/nationalcasedeath/json")

In [33]:
#Create table in Postgres with countries data from local file with csv format
#### Improve: get data directly from kaggle using API
PipelineCountriesDataToSql("C:/Users/GFreitas/Downloads/Revamped DDEC/datasource2.csv")

In [34]:
#Query the data from Postgres and compare to new data and append only the non-duplicate data
## Can be pipelined even more!
SQL = 'select * from "CaseSchema"."CovidData"'
ToSql(ReturnNonDuplicatesDf(FromSqltoDf(SQL,engine),PipelineCovidDataToDf("https://opendata.ecdc.europa.eu/covid19/nationalcasedeath/json")),DBInfo(engine,"CovidData","CaseSchema","append"))

In [35]:
#Query the data from Postgres and compare to new data and append only the data with higher date than the highest date from table, also replace old records with same country, date and indicator as a new one.
#
#
#

In [36]:
## Enrich data coparing it with HDI / IHDI

In [37]:
## Modeling data to be used to create visuals with SQL and Python
SQL = 'select CV."country", CV."continent", a."max_date", a."indicator",  CV."cumulative_count", CV."population",(CV."cumulative_count"/CV."population"*100000) as "IndicatorCountPer100k"   from "CaseSchema"."CovidData" as CV INNER JOIN(SELECT CVI."country", CVI."indicator", max(CVI."year_week") as "max_date" from "CaseSchema"."CovidData" as CVI group by CVI."country",CVI."indicator" ORDER BY CVI."country")a ON CV."year_week" = a."max_date" WHERE CV."indicator" = a."indicator" and a."country" = CV."country" ORDER BY CV."country"'
ToSql(VisualData(SQL,engine),DBInfo(engine,"VisualData","CaseSchema","replace"))

## conectar PowerBI com view

In [38]:
########################## Generic Formulas


# def ToSql(data,args):
#    data.to_sql(args[1],con=args[0],schema=args[2],if_exists=args[3],index=False)

#def CreateCountPer100kColumn(data,name):
#    data[name] = (pd.to_numeric(data['cumulative_count'],errors='coerce') / data.population)*100000
#   return data

#def SeparateDataByIndicator(data,column,indicator):
#    return data[data[column] == indicator].drop(column,axis=1).reset_index(drop=True)

#def ToCsv(data,arg):
#    data.to_csv(arg+".csv",index=False)

##########################

## Exercise 6 Alternative: Modeling the data to be used to create visuals with Python only.

#def SeparateDataByIndicatorCases(data):
#    return data[data['indicator'] == 'cases'].drop('indicator',axis=1).reset_index(drop=True)

#def SeparateDataByIndicatorDeaths(data):
#   return data[data['indicator'] == 'deaths'].drop('indicator',axis=1).reset_index(drop=True)

#def CreateCountPer100kCases(data):
#    data['CasesPer100k'] = (pd.to_numeric(data['cumulative_count'],errors='coerce') / data.population)*100000
#    return data.rename(columns={'cumulative_count':'CMLCases'})

#def CreateCountPer100kDeaths(data):
#    data['DeathsPer100k'] = (pd.to_numeric(data['cumulative_count'],errors='coerce') / data.population)*100000
#    return data.drop(['continent','population'],axis=1).rename(columns={'cumulative_count':'CMLDeaths'})

#def LatestCovidData(data):
#    data['year_week'] = data['year_week'].str.replace('-','').astype(int)
#    Filtro1 = data['year_week'].loc[data['year_week'].idxmax()]
#    LatestDate = data[data['year_week'] == Filtro1].fillna("null")
#    return LatestDate[LatestDate['country_code'] != "null"].drop(['country_code','weekly_count','year_week','source','rate_14_day','note'],axis=1)

#def PipelineLatestCasesData(data):
#    return (
#    PipelineCovidData(data)
#    .pipe(LatestCovidData)
#    .pipe(SeparateDataByIndicatorCases)
#    .pipe(CreateCountPer100kCases)
#    )

#def PipelineLatestDeathsData(data):
#    return (
#    PipelineCovidData(data)
#    .pipe(LatestCovidData)
#    .pipe(SeparateDataByIndicatorDeaths)
#    .pipe(CreateCountPer100kDeaths)
#    )

#def MergeData(data1,data2,on):
#    return data1.merge(data2,on=on)
