In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as F

spark = SparkSession.builder\
           .config('spark.jars.packages', 'org.xerial:sqlite-jdbc:3.34.0')\
           .getOrCreate()
sqlc = SQLContext(spark.sparkContext)

spark

In [2]:
# alunos, andares, aulas, blocos, cursos, materias, professores, salas, turmas
df_alunos = spark.read.format('jdbc').options(driver='org.sqlite.JDBC', dbtable='alunos', url='jdbc:sqlite:banco_exemplo.db').load()
df_andares = spark.read.format('jdbc').options(driver='org.sqlite.JDBC', dbtable='andares', url='jdbc:sqlite:banco_exemplo.db').load()
# read data_inicio and data_fim as string
df_aulas = spark.read.format('jdbc').options(driver='org.sqlite.JDBC', dbtable='aulas', url='jdbc:sqlite:banco_exemplo.db').load()
df_blocos = spark.read.format('jdbc').options(driver='org.sqlite.JDBC', dbtable='blocos', url='jdbc:sqlite:banco_exemplo.db').load()
df_cursos = spark.read.format('jdbc').options(driver='org.sqlite.JDBC', dbtable='cursos', url='jdbc:sqlite:banco_exemplo.db').load()
df_materias = spark.read.format('jdbc').options(driver='org.sqlite.JDBC', dbtable='materias', url='jdbc:sqlite:banco_exemplo.db').load()
df_professores = spark.read.format('jdbc').options(driver='org.sqlite.JDBC', dbtable='professores', url='jdbc:sqlite:banco_exemplo.db').load()
df_salas = spark.read.format('jdbc').options(driver='org.sqlite.JDBC', dbtable='salas', url='jdbc:sqlite:banco_exemplo.db').load()
df_turmas = spark.read.format('jdbc').options(driver='org.sqlite.JDBC', dbtable='turmas', url='jdbc:sqlite:banco_exemplo.db').load()

df_alunos.createOrReplaceTempView('alunos')
df_andares.createOrReplaceTempView('andares')
df_aulas.createOrReplaceTempView('aulas')
df_blocos.createOrReplaceTempView('blocos')
df_cursos.createOrReplaceTempView('cursos')
df_materias.createOrReplaceTempView('materias')
df_professores.createOrReplaceTempView('professores')
df_salas.createOrReplaceTempView('salas')
df_turmas.createOrReplaceTempView('turmas')

In [3]:
# select
#        alunos.nome as nome_aluno,
#        aulas.data_inicio as data_inicio_aula,
#        aulas.data_fim as data_fim_aula,
#        salas.numero as numero_da_sala,
#        andares.numero as numero_do_andar,
#        blocos.letra as letra_do_bloco,
#        professores.nome as nome_do_professor,
#        materias.nome as nome_da_materia,
#        cursos.nome as nome_do_curso
# from
#        alunos
#        join turmas on alunos.turma_id = turmas.id
#        join cursos on turmas.curso_id = cursos.id
#        join aulas on turmas.id = aulas.turma_id
#        join salas on aulas.sala_id = salas.id
#        join andares on salas.andar_id = andares.id
#        join blocos on andares.bloco_id = blocos.id
#        join professores on aulas.professor_id = professores.id
#        join materias on aulas.materia_id = materias.id

df_final = (
    df_alunos.join(df_turmas, df_alunos.turma_id == df_turmas.id, "inner")
    .join(df_cursos, df_turmas.curso_id == df_cursos.id, "inner")
    .join(df_aulas, df_turmas.id == df_aulas.turma_id, "inner")
    .join(df_salas, df_aulas.sala_id == df_salas.id, "inner")
    .join(df_andares, df_salas.andar_id == df_andares.id, "inner")
    .join(df_blocos, df_andares.bloco_id == df_blocos.id, "inner")
    .join(df_professores, df_aulas.professor_id == df_professores.id, "inner")
    .join(df_materias, df_aulas.materia_id == df_materias.id, "inner")
    .select(
        df_alunos.nome.alias("nome_aluno"),
        # df_aulas.data_inicio.alias("data_inicio_aula"), incompatible with sqlite
        # df_aulas.data_fim.alias("data_fim_aula"), incompatible with sqlite
        df_salas.numero.alias("numero_da_sala"),
        df_andares.numero.alias("numero_do_andar"),
        df_blocos.letra.alias("letra_do_bloco"),
        df_professores.nome.alias("nome_do_professor"),
        df_materias.nome.alias("nome_da_materia"),
        df_cursos.nome.alias("nome_do_curso"),
    )
)

df_final.show()

df_classes_per_teacher = df_final.groupBy("nome_do_professor").count()
df_classes_per_teacher.show()

+--------------------+--------------+---------------+--------------+-----------------+--------------------+--------------------+
|          nome_aluno|numero_da_sala|numero_do_andar|letra_do_bloco|nome_do_professor|     nome_da_materia|       nome_do_curso|
+--------------------+--------------+---------------+--------------+-----------------+--------------------+--------------------+
|Heitor Oliveira C...|             3|              1|             B| Alessandro Souza|Química de Inteli...|   Ciência Geofísica|
|Rafael Gonçalves ...|             3|              1|             B| Alessandro Souza|Química de Inteli...|   Ciência Geofísica|
|       Eduardo Rocha|             3|              1|             B| Alessandro Souza|Química de Inteli...|   Ciência Geofísica|
|Carlos Melo Carvalho|             3|              1|             B| Alessandro Souza|Química de Inteli...|   Ciência Geofísica|
|Fernando Lima Roc...|             3|              1|             B| Alessandro Souza|Química de 

In [4]:
# save the dfs
# df_final.write.format('csv').save('classes_per_teacher.csv')
# df_classes_per_teacher.write.format('csv').save('classes_per_teacher_count.csv')
df_final.toPandas().to_csv('classes_per_teacher.csv')
df_classes_per_teacher.toPandas().to_csv('classes_per_teacher_count.csv')