# Desafio PasseiDireto - Parte 1
Na primeira parte do desafio, criei a solução em um notebook para melhor explicação das etapas.
É importante ler o Readme primeiro, pois no documento explico minha forma de pensar na arquitetura do CASE.

Importação de bibliotecas e conexão do Banco de dados.

In [1]:
import pandas as pd;
import datetime

In [2]:
from conexao import alchemy_connection
engine = alchemy_connection('CASE_PD')

## Leitura dos DataSets
Leitura dos dataSets para vários DataFrames. Json -> DataFrame

In [3]:
#Leitura Full - Desafio
path_BASEA = "Datasets - Teste Data Engineer - Passei Direto\BASE A"
dfCourses = pd.read_json(f'{path_BASEA}\courses.json')
dfSessions = pd.read_json(f'{path_BASEA}\sessions.json')
dfStudents = pd.read_json(f'{path_BASEA}\students.json')
dfSubjects = pd.read_json(f'{path_BASEA}\subjects.json')
dfSubscriptions = pd.read_json(f'{path_BASEA}\subscriptions.json')
dfUniversities = pd.read_json('Datasets - Teste Data Engineer - Passei Direto\\BASE A\\universities.json')
dfStudentSubject = pd.read_json('Datasets - Teste Data Engineer - Passei Direto\\BASE A\\student_follow_subject.json')

### Leituras de Modo Incremental
Levando em conta a arquitetura do README. Aqui estou fazendo uma leitura incremental M-1.Em um ambiente S3 ou local , como o exemplo atual , eu separaria em "pasta" os datasets de cada mês. O Código está comentado para não impactar no restante.

In [4]:
#Leitura Incremental.

# today = datetime.date.today()
# first = today.replace(day=1)
# lastMonth = first - datetime.timedelta(days=1)
# lastMonthT=lastMonth.strftime("%Y%m")
# path = "Datasets - Teste Data Engineer - Passei Direto\BASE INCR\{lastMonthT}"
# dfCourses = pd.read_json(f'{path}\courses.json')
# dfSessions = pd.read_json(f'{path}/sessions.json')
# dfStudents = pd.read_json(f'{path}/students.json')
# dfSubjects = pd.read_json(f'{path}/subjects.json')
# dfSubscriptions = pd.read_json(f'{path}/subscriptions.json')
# dfUniversities = pd.read_json(f'{path}/universities.json')
# dfStudentSubject = pd.read_json(f'{path}/student_follow_subject.json')


## Tratamento dos dados
1. Primeiro foi feito a modelagem dimensional e logo após inicia-se o tratamento dos dados.
2. Foi encontrado dados de usuários duplicados no DataSet, pois os usuários haviam feito 2 pagamentos no mesmo mês de novembro.O objetivo foi apenas extrair o tipo de plano de cada usuário, então foi retirado a coluna: PaymentDate e as duplicatas.
3. Em seguida, foi criado tabelas Stages para inserir todos os dados Temporariamente.



In [5]:
#Subscription possui valores de Estudantes duplicados.Retirei o PaymentDate pois não será feito anáise em cima desse campo.
dfSubscriptions.drop('PaymentDate',axis='columns',inplace=True)
dfSubscriptions.drop_duplicates(inplace=True)

In [6]:
dfStudentSubject.to_sql(
    name='STG_STUDENT_SUBJECT',
    con=engine,
    schema='CASE_01_STG',
    if_exists='replace',
    index=False
)
dfCourses.to_sql(
    name='STG_CURSO',
    con=engine,
    schema='CASE_01_STG',
    if_exists='replace',
    index=False
)
dfSessions.to_sql(
    name='STG_SESSION',
    con=engine,
    schema='CASE_01_STG',
    if_exists='replace',
    index=False
)
dfStudents.to_sql(
    name='STG_STUDENT',
    con=engine,
    schema='CASE_01_STG',
    if_exists='replace',
    index=False
)
dfSubjects.to_sql(
    name='STG_SUBJECT',
    con=engine,
    schema='CASE_01_STG',
    if_exists='replace',
    index=False
)
dfSubscriptions.to_sql(
    name='STG_SUBSCRIPTION',
    con=engine,
    schema='CASE_01_STG',
    if_exists='replace',
    index=False
)
dfUniversities.to_sql(
    name='STG_UNIVERSITY',
    con=engine,
    schema='CASE_01_STG',
    if_exists='replace',
    index=False
)

## Criação e Tratamento das Dimensões
1. Primeiro extraimos todos os dados da Stage para vários DataFrame.

In [7]:
dfStgCourses = pd.read_sql_table('STG_CURSO',engine,schema='CASE_01_STG')
dfStgSession = pd.read_sql_table('STG_SESSION',engine,schema='CASE_01_STG')
dfStgStudent = pd.read_sql_table('STG_STUDENT',engine,schema='CASE_01_STG')
dfStgSubject = pd.read_sql_table('STG_SUBJECT',engine,schema='CASE_01_STG')
dfStgSubscription = pd.read_sql_table('STG_SUBSCRIPTION',engine,schema='CASE_01_STG')
dfStgUniversity = pd.read_sql_table('STG_UNIVERSITY',engine,schema='CASE_01_STG')


In [8]:
#Verifica se Existe valores duplicados nas Stages
#dfDuplicates=dfUniversities.duplicated(subset=None, keep='first')
#dfSessions.isnull().sum()

2. Seguindo o modelo Star Schema, vamos criar duas dimensões para o nosso case : Dimensão Student e Dimensão Disciplina. A seguir, temos uma junção de vários dados relevantes na dimensão Student: Id,Plan Type,Registered Date, State,City, University, Course, Signup Source, Student Client.


In [9]:
dfStgStudent = pd.merge(dfStgStudent,dfStgUniversity,how='left', left_on='UniversityId', right_on='Id').rename(columns={'Id_x':'IdStudent','Name':'UniversityName'})
dfStgStudent.drop(['Id_y','UniversityId'],axis=1,inplace=True)
dfStgStudent = pd.merge(dfStgStudent,dfStgCourses,how='left', left_on='CourseId', right_on='Id').rename(columns={'Name':'CourseName'})
dfStgStudent.drop(['CourseId','Id'],axis=1,inplace=True)
dfStgStudent = pd.merge(dfStgStudent,dfStgSubscription,how='left',left_on='IdStudent',right_on='StudentId')
dfStgStudent.drop(['StudentId'],axis=1,inplace=True)

3.Foi Identificado que algumas colunas estão com o valor None, sem informação. Portanto foi feito a seguinte tratativa para cada linha: Caso o estado ou cidade estivesse preenchido iriamos repetir o valor no campo faltante.

In [10]:
#Tratando dados de Estado e Cidade
for i,n in dfStgStudent.iterrows():
    if n["City"] is None and n["State"] is not None:
        dfStgStudent.loc[i,"City"] = n['State']
    elif n["State"] is None and n["City"] is not None:
        dfStgStudent.loc[i,"State"] = n['City']

4. Ao fazer a junção dos dados, identificamos que nem todos os usuários possuem plano. Portanto seguindo as boas práticas, preenchemos com o valor "Sem plano" para os usuários que não possuem o plano premium.
5. Logo em seguida, foi preenchido com o valor "Não informado" aqueles campos que o usuário não preencheu ou não contém dados no sistema.
6. Foi identificado que uma amostra de dados possuem Apóstrafo , o que está dificultando a inserção no banco de dados.O correto seria retirar todos os acentos,caracteres especiais para inserir no banco. Foi feito alguns exemplos desses tratamentos.

In [11]:
#Tratando dados Nulos e com Apóstrofos
dfStgStudent['PlanType'].fillna('Sem Plano',inplace=True)
dfStgStudent.fillna('Nao Informado',inplace=True)

dfStgStudent['City']=dfStgStudent['City'].str.replace("'","")
dfStgSubject['Name']=dfStgSubject['Name'].str.replace("'","")

7. Neste tópico, foi criado funções para execução das queries das dimensões e fatos. Temos um exemplo de Insert/Update das Dimensões. Caso haja algum dado novo, iremos inserir. Se não, irá atualizar os dados.
8. Logo em seguida , chamo as funções para criar o nosso modelo Star Schema.

In [12]:
#Criação do cursor
connection =  engine.raw_connection()
cursorteste = connection.cursor()

In [13]:
def iu_subject(dataframe):
    for n,i in dataframe.iterrows():
        query = f"""
        IF EXISTS(SELECT ID FROM [CASE_PD].[CASE_01_DW].[DIM_SUBJECT] WHERE ID = {i[0]}) 
            BEGIN 
                UPDATE [CASE_PD].[CASE_01_DW].[DIM_SUBJECT] set Name=\'{i[1]}\' where ID={i[0]} 
            END 

        ELSE 
            BEGIN 
                insert into [CASE_PD].[CASE_01_DW].[DIM_SUBJECT] values({i[0]},\'{i[1]}\') 
            END
        """
        cursorteste.execute(query)
    cursorteste.commit()

In [14]:
def iu_dimStudent(dataframe):
    for i,n in dataframe.iterrows():
        query = f"""
                      IF EXISTS(SELECT ID FROM [CASE_PD].[CASE_01_DW].[DIM_STUDENT] WHERE ID =\'{n['IdStudent']}\') 
                        BEGIN 
                        UPDATE [CASE_PD].[CASE_01_DW].[DIM_STUDENT] set PLAN_TYPE=\'{n['PlanType']}\',STATE=\'{n['State']}\',CITY=\'{n['City']}\', UNIVERSITY=\'{n['UniversityName']}\',COURSE=\'{n['CourseName']}\' where ID=\'{n['IdStudent']}\'
                        END
                    ELSE 
                        BEGIN 
                            insert into [CASE_PD].[CASE_01_DW].[DIM_STUDENT] (ID,PLAN_TYPE,REGISTERED_DATE,STATE,CITY,UNIVERSITY,COURSE,SIGNUP_SOURCE,STUDENT_CLIENT) values(\'{n['IdStudent']}\',\'{n['PlanType']}\',\'{n['RegisteredDate']}\',\'{n['State']}\',\'{n['City']}\',\'{n['UniversityName']}\',\'{n['CourseName']}\',\'{n['SignupSource']}\',\'{n['StudentClient']}\') 
                        END
        """
        cursorteste.execute(query)
    cursorteste.commit()

In [15]:
def carregar_tabela_fato_session():
    insert="""INSERT INTO [CASE_01_DW].[FATO_SESSION] """
    select="""SELECT IIF(Student.SK_STUDENT is null, -7,Student.SK_STUDENT)
                    ,[SessionStartTime]
                    ,[SessionStartTime]
                    ,[StudentClient]
                FROM [CASE_PD].[CASE_01_STG].[STG_SESSION] Session
                LEFT JOIN [CASE_PD].[CASE_01_DW].[DIM_STUDENT] Student
                ON Session.StudentId = Student.Id"""

    query = insert + select
    cursorteste.execute(query)
    cursorteste.commit()

In [16]:
def carregar_tabela_fato_follow():
    insert="""INSERT INTO [CASE_01_DW].[FATO_FOLLOW_SUBJECT] """
    select="""SELECT IIF(Student.SK_STUDENT is null, -7,Student.SK_STUDENT)
                    ,IIF(SUBJECT.SK_SUBJECT is null, -7,SUBJECT.SK_SUBJECT)
                    ,[FollowDate]
                FROM [CASE_PD].[CASE_01_STG].[STG_STUDENT_SUBJECT] STD_SUBJCT
                LEFT JOIN [CASE_PD].[CASE_01_DW].[DIM_STUDENT] STUDENT
                ON STD_SUBJCT.StudentId = STUDENT.Id
                LEFT JOIN [CASE_PD].[CASE_01_DW].[DIM_SUBJECT] SUBJECT
                ON STD_SUBJCT.SubjectId = SUBJECT.Id"""

    query = insert + select
    cursorteste.execute(query)
    cursorteste.commit()

In [17]:
iu_dimStudent(dfStgStudent)
iu_subject(dfStgSubject)

In [18]:
carregar_tabela_fato_session()
carregar_tabela_fato_follow()