In [1]:
#imports necessários
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [2]:
format = 'parquet'
inferSchema = 'false'
header = 'false'
multiline = 'false'

In [3]:
#Configurando spark
conf = SparkConf().setAppName('Load').set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("hive.exec.dynamic.partition", "true").set("hive.exec.dynamic.partition.mode", "nonstrict")
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()


In [4]:
#Definindo caminho de arquivos parquet do raw_data para fazer as transformações necessárias
filePath1 = 'C:\\Users\\dbello\Documents\\pd-desafio\\raw_data\\students\\*.parquet'
filePath2 = 'C:\\Users\\dbello\Documents\\pd-desafio\\raw_data\\courses\\*.parquet'
filePath3 = 'C:\\Users\\dbello\Documents\\pd-desafio\\raw_data\\subjects\\*.parquet'
filePath4 = 'C:\\Users\\dbello\Documents\\pd-desafio\\raw_data\\universities\\*.parquet'
filePath5 = 'C:\\Users\\dbello\Documents\\pd-desafio\\raw_data\\subscriptions\\*.parquet'
filePath6 = 'C:\\Users\\dbello\Documents\\pd-desafio\\raw_data\\sessions\\*.parquet'
filePath7 = 'C:\\Users\\dbello\Documents\\pd-desafio\\raw_data\\student_follow_subject\\*.parquet'

In [5]:
#Lendo para dfs
df_students = spark.read.format(format) \
                    .option("inferSchema", inferSchema) \
                    .option("header", header) \
                    .option("multiLine", multiline) \
                    .parquet(filePath1)
df_courses = spark.read.format(format) \
                    .option("inferSchema", inferSchema) \
                    .option("header", header) \
                    .option("multiLine", multiline) \
                    .parquet(filePath2)
df_subjects = spark.read.format(format) \
                    .option("inferSchema", inferSchema) \
                    .option("header", header) \
                    .option("multiLine", multiline) \
                    .parquet(filePath3)
df_universities = spark.read.format(format) \
                    .option("inferSchema", inferSchema) \
                    .option("header", header) \
                    .option("multiLine", multiline) \
                    .parquet(filePath4)
df_subscriptions = spark.read.format(format) \
                    .option("inferSchema", inferSchema) \
                    .option("header", header) \
                    .option("multiLine", multiline) \
                    .parquet(filePath5)
df_sessions = spark.read.format(format) \
                    .option("inferSchema", inferSchema) \
                    .option("header", header) \
                    .option("multiLine", multiline) \
                    .parquet(filePath6)
df_student_follow_subject = spark.read.format(format) \
                    .option("inferSchema", inferSchema) \
                    .option("header", header) \
                    .option("multiLine", multiline) \
                    .parquet(filePath7)

In [9]:
#Criando um df parcial para receber o join entre curso e estudante, tirando o id duplicado de curso e renomeando a coluna de nome de matéria para não ficar confuso
df_parcial = df_students.join(df_courses,df_students.CourseId == df_courses.Id, how='left').drop(df_courses.Id).drop(df_students.CourseId).withColumnRenamed("Name", "CourseName")

In [10]:
#Join entre parcial e universidade para incluir o nome da universidade 
df_parcial2 = df_parcial.join(df_universities, df_parcial.UniversityId == df_universities.Id, how='left').drop(df_universities.Id).drop(df_parcial.UniversityId).withColumnRenamed("Name", "UniversityName").withColumnRenamed("StudentClient", "StudentClientSignUp")

In [11]:
#Join com subscripiton para incluir tipo de plano e data de pagamento
df_parcial3 = df_parcial2.join(df_subscriptions, df_parcial2.Id == df_subscriptions.StudentId, how='leftouter').drop(df_subscriptions.StudentId)

In [12]:
#2 registros a mais
df_parcial3.count()

60002

In [13]:
#Agregando por Id para descobrir qual id esta duplicado
df = df_parcial3.groupBy("Id").agg(count("Id"))


In [14]:
#pegando os Ids duplicados para analisar
df = df.withColumnRenamed("count(Id)", "count")
df.filter("count > 1").collect()

[Row(Id='308a29d8d37388ed84b6b1930bf084f50249c15c244df26a3491ed5751917af2', count=2),
 Row(Id='7b63067f9475c2e824608e412d1014e289997f9730cf918e975237136104016e', count=2)]

In [15]:
#2 datas de pagamento
df_parcial3.select(col('*')).filter(df_parcial3.Id == "7b63067f9475c2e824608e412d1014e289997f9730cf918e975237136104016e").toPandas()

Unnamed: 0,City,Id,RegisteredDate,SignupSource,State,StudentClientSignUp,CourseName,UniversityName,PaymentDate,PlanType
0,Rio de Janeiro,7b63067f9475c2e824608e412d1014e289997f9730cf91...,2017-04-07 09:26:36.423000,Facebook,Rio de Janeiro,,Processos Gerenciais,ESTÁCIO EAD,2017-11-30 08:34:57.729857,Mensal
1,Rio de Janeiro,7b63067f9475c2e824608e412d1014e289997f9730cf91...,2017-04-07 09:26:36.423000,Facebook,Rio de Janeiro,,Processos Gerenciais,ESTÁCIO EAD,2017-11-01 00:30:50.515472,Mensal


In [16]:
#join entre parcial e matérias seguidas
df_parcial4 = df_parcial3.join(df_student_follow_subject, df_parcial3.Id == df_student_follow_subject.StudentId, how='left').drop(df_student_follow_subject.StudentId)

In [17]:
#join com o df de matérias para deixar incluir o nome da matéria seguida
df_parcial5 = df_parcial4.join(df_subjects, df_parcial4.SubjectId == df_subjects.Id, how='left').drop(df_subjects.Id).drop(df_parcial4.SubjectId).withColumnRenamed("Name", "SubjectName")

In [18]:
#join com df de sessão
df_final = df_parcial5.join(df_sessions, df_parcial5.Id == df_sessions.StudentId, how='left').drop(df_sessions.StudentId).withColumnRenamed("StudentClient", "StudentClientSession")

In [19]:
df_final.agg(countDistinct("Id")).show()

+------------------+
|count(DISTINCT Id)|
+------------------+
|             60000|
+------------------+



In [20]:
df_final.printSchema()

root
 |-- City: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- RegisteredDate: string (nullable = true)
 |-- SignupSource: string (nullable = true)
 |-- State: string (nullable = true)
 |-- StudentClientSignUp: string (nullable = true)
 |-- CourseName: string (nullable = true)
 |-- UniversityName: string (nullable = true)
 |-- PaymentDate: string (nullable = true)
 |-- PlanType: string (nullable = true)
 |-- FollowDate: string (nullable = true)
 |-- SubjectName: string (nullable = true)
 |-- SessionStartTime: string (nullable = true)
 |-- StudentClientSession: string (nullable = true)



In [21]:
#Visão de quantos alunos seguem cada matéria 
df = df_final.groupBy("SubjectName").agg(countDistinct("Id")).orderBy('count(DISTINCT Id)').show(n =20580)

+--------------------+------------------+
|         SubjectName|count(DISTINCT Id)|
+--------------------+------------------+
|Desenvolvimento e...|                 1|
|   Agrossilvicultura|                 1|
|Educação Infantil...|                 1|
|Qual É A Resposta...|                 1|
|    Recursos Manuais|                 1|
|Ciência e Poluiçã...|                 1|
|Estrutura e Dinâm...|                 1|
|Política Educacio...|                 1|
|Análise do Transp...|                 1|
|Teoria Política C...|                 1|
|Governança e Plan...|                 1|
|Matemática e Esta...|                 1|
|   Pratica de Textos|                 1|
|Organização e Ava...|                 1|
|Microbiologia e S...|                 1|
|Gestao de Projeto...|                 1|
|Modelagem de Info...|                 1|
|Agressão, Defesa,...|                 1|
|Fundamentos e Mét...|                 1|
|      Tópicos Esp. C|                 1|
|Socioantropologia...|            

In [22]:
#Total de sessões de cada aluno por plataforma 
df = df_final.groupBy("Id", df_final.StudentClientSession).agg(countDistinct("SessionStartTime")).show()

+--------------------+--------------------+--------------------------------+
|                  Id|StudentClientSession|count(DISTINCT SessionStartTime)|
+--------------------+--------------------+--------------------------------+
|ca3889a8e8de284bc...|                 iOS|                              15|
|c00985ba97a7c689c...|             Website|                               2|
|4cffc6a291cc9fb34...|             Website|                               7|
|2803c8592434ffc04...|             Website|                               2|
|1602fce5d5d84f19f...|                 iOS|                             150|
|7e88909913fe8d358...|             Website|                               7|
|7f4b11f1b380ca821...|                 iOS|                              39|
|fc0cc3ecfa0cc8f93...|             Android|                              26|
|ac3d184258e417dda...|             Website|                               2|
|40b5a5f04d84b3c05...|             Website|                               2|

In [23]:
#Sessão por estudante 
df = df_final.groupBy("StudentClientSession").agg(countDistinct("Id")).show()

+--------------------+------------------+
|StudentClientSession|count(DISTINCT Id)|
+--------------------+------------------+
|              Webapp|              6707|
|                 iOS|              3291|
|             Website|             47795|
|             Android|             13564|
+--------------------+------------------+



In [24]:
#Estudantes por estado
df = df_final.groupBy("State").agg(countDistinct("Id")).show()

+-------------------+------------------+
|              State|count(DISTINCT Id)|
+-------------------+------------------+
|     Santa Catarina|               876|
| Mato Grosso do Sul|               421|
|              Goiás|               815|
|        Mato Grosso|               265|
|               null|             35393|
|              Ceará|              1236|
|     Espírito Santo|               595|
|              Piauí|               282|
|             Paraná|              1438|
|            Alagoas|               331|
|              Bahia|              1354|
|            Roraima|                59|
|   Distrito Federal|               385|
|         Pernambuco|               954|
|           Amazonas|               342|
|  Rio Grande do Sul|              1192|
|               Acre|                53|
|Rio Grande do Norte|               367|
|            Sergipe|               427|
|          São Paulo|              3870|
+-------------------+------------------+
only showing top

In [25]:
#Alunos por Universidade
df = df_final.groupBy("UniversityName").agg(countDistinct("Id")).show()

+-----------------+------------------+
|   UniversityName|count(DISTINCT Id)|
+-----------------+------------------+
|            UFABC|                56|
|             IFTO|                18|
|             FJAV|                 4|
|         UNIVILLE|                36|
|             ESUV|                 6|
|             IEST|                 2|
|UNINASSAU CARUARU|                17|
|            ASSER|                 4|
|             FSJT|                11|
|            FAEDA|                 1|
|           IMESSM|                 1|
|             FACH|                 1|
|         FACDELTA|                 5|
|             IFRJ|                50|
|            CEULP|                40|
|              FDC|                 4|
|             UEZO|                13|
|              AMF|                 4|
|     Faculdade JK|                 3|
|           ISETED|                 1|
+-----------------+------------------+
only showing top 20 rows



In [26]:
#Alunos por tipo de plano
df = df_final.groupBy('PlanType').agg(countDistinct("Id")).show()

+--------+------------------+
|PlanType|count(DISTINCT Id)|
+--------+------------------+
|    null|             59203|
|  Mensal|               737|
|   Anual|                60|
+--------+------------------+



In [29]:
#Escrevendo arquivo parquet tratado
df_final.write.parquet('C:\\Users\\dbello\Documents\\pd-desafio\\refined_data\\baseA')

AnalysisException: 'path file:/C:/Users/dbello/Documents/pd-desafio/refined_data/baseA already exists.;'