# Creating Star Schema 

This notebook explores the creation of a Star Schema from raw data using Apache Spark.

The data used in this notebook is from the Brazilian Basic Education Census, which is available at the [Brazilian government's open data portal](https://download.inep.gov.br/dados_abertos)

## Creating SparkSession

The first step is to create a SparkSession.

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import psycopg2
from datetime import datetime

from itertools import chain

In [2]:
spark = SparkSession.builder\
        .appName("CensoEscolarStarSchema")\
        .config("spark.sql.shuffle.partitions", "4")\
        .getOrCreate()

## Transforming CSV to Parquet

This transformation aims to increase the speed of data loading by using Parquet files.

In [3]:
# Read CSV data
data_csv = (
    spark
    .read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", ";")
    .option("encoding", "latin1")
    .load("./data/*.csv")
)

Write to Parquet file

In [4]:
data_csv.write.parquet("./data/censo_escolar.parquet")

Reading from Parquet file

In [5]:
data = (
    spark
    .read
    .format("parquet")
    .load("./data/censo_escolar.parquet/")
)

## Dimensions

The dimensions are...

---

The code below creates the dimensions based on a configuration dict.

```json
{
    "DIM_NAME":{
        # The fields are the table columns
        "fields":[
            {
                "field":"FIELD_1_NAME", # The column name
                "type":"FIELD_1_TYPE",  # The column type in spark
            },
            {
                "field":"FIELD_2_NAME",
                "type":"FIELD_2_TYPE",
            },
            ...
        ],
    }
}
```

In [6]:
INTEGER_DIMENSIONS = [
    "TP_DEPENDENCIA",           # The school administration (state, city, private) 
    "TP_LOCALIZACAO",           # The school location (urban rural)
    "IN_AGUA_POTAVEL",          # has access to drinkable water 
    "IN_ENERGIA_INEXISTENTE",   # has (NOT) access to energy
    "IN_ESGOTO_INEXISTENTE",    # has (NOT) access to energy
    "IN_BANHEIRO",              # has restroom
    "IN_BIBLIOTECA",            # has library
    "IN_REFEITORIO",            # has canteen 
    "IN_COMPUTADOR",            # has computer
    "IN_INTERNET",              # has internet
    "IN_EQUIP_NENHUM"           # no electronic equipment
]

DIMENSION_TABLES_CONFIG = {
    "DIM_LOCAL":{
        "fields": [
            {"field":"NO_UF", "type":"string",},        # State's name 
            {"field":"SG_UF", "type":"string",},        # State's abbreviation
            {"field":"CO_UF", "type":"string",},        # State's code
            {"field":"NO_MUNICIPIO", "type":"string",}, # City's name
            {"field":"CO_MUNICIPIO", "type":"string",}  # City's code
        ]
    },
}

DIMENSION_TABLES_CONFIG.update(
    {
        "DIM_"+dimension.upper():{
            "fields": [
                {"field":dimension, "type":"integer"} 
            ]
        }
        for dimension in INTEGER_DIMENSIONS
    }
)

### Creating dimensions table in Postgres
-----------------------------------------------------------------------------------------------------------------------

Defining the properties of the Postgres Connection

In [7]:
POSTGRES_USER = "censo"
POSTGRES_PASSWORD = "123"
POSTGRES_DB = "censo_escolar"

# Used to connect to the PostgreSQL database server
# in spark session
POSTGRES_CONFIG = {
    "url":f"jdbc:postgresql://localhost:5432/{POSTGRES_DB}",
    "properties":{
        "user":POSTGRES_USER, 
        "password":POSTGRES_PASSWORD,
        "driver":"org.postgresql.Driver",
    },
}

Establishing connection to Postgres

In [8]:
conn = psycopg2.connect(
    host="localhost",
    port="5432",

    dbname=POSTGRES_DB,
    user=POSTGRES_USER,
    password=POSTGRES_PASSWORD
)

Function to create a Dimension table in Postgres using the configuration in DIMENSION_TABLES_CONFIG and adding an id column

The code below creates the dimensions
Spark will create a table with the name of the dimension and the columns in the configuration

In [9]:
# Write data to Postgres
# Using the configuration in DIMENSION_TABLES_CONFIG
# With id as the primary key

for table_name, table_config in DIMENSION_TABLES_CONFIG.items():
    
    print(f"[{datetime.now()}] Writing {table_name}")
    
    data\
    .select(
        [
            F
            .col(field["field"])
            .cast(field["type"])
            .alias(field["field"])
            
            for field
            in table_config["fields"]
        ]
    )\
    .distinct()\
    .withColumn(
        "id", F.monotonically_increasing_id()
    )\
    .write\
    .jdbc(
        **POSTGRES_CONFIG,
        table=table_name,
        mode="overwrite"
    )
    
    print(f"[{datetime.now()}] Wrote {table_name}")
    # Define id as the primary key
    cursor = conn.cursor()
    cursor.execute(
        f"ALTER TABLE {table_name} ADD PRIMARY KEY (id);"
    )
    cursor.close()
    conn.commit()

    print(f"[{datetime.now()}] Added primary key to {table_name}")
    print(f"[{datetime.now()}] Done")

[2024-04-29 14:50:51.785874] Writing DIM_LOCAL
[2024-04-29 14:50:57.143585] Wrote DIM_LOCAL
[2024-04-29 14:50:57.345720] Added primary key to DIM_LOCAL
[2024-04-29 14:50:57.345720] Done
[2024-04-29 14:50:57.345720] Writing DIM_TP_DEPENDENCIA
[2024-04-29 14:50:58.498398] Wrote DIM_TP_DEPENDENCIA
[2024-04-29 14:50:58.517470] Added primary key to DIM_TP_DEPENDENCIA
[2024-04-29 14:50:58.517470] Done
[2024-04-29 14:50:58.517470] Writing DIM_TP_LOCALIZACAO
[2024-04-29 14:50:59.207641] Wrote DIM_TP_LOCALIZACAO
[2024-04-29 14:50:59.234410] Added primary key to DIM_TP_LOCALIZACAO
[2024-04-29 14:50:59.234410] Done
[2024-04-29 14:50:59.234410] Writing DIM_IN_AGUA_POTAVEL
[2024-04-29 14:50:59.886937] Wrote DIM_IN_AGUA_POTAVEL
[2024-04-29 14:50:59.915226] Added primary key to DIM_IN_AGUA_POTAVEL
[2024-04-29 14:50:59.915336] Done
[2024-04-29 14:50:59.915336] Writing DIM_IN_ENERGIA_INEXISTENTE
[2024-04-29 14:51:00.560334] Wrote DIM_IN_ENERGIA_INEXISTENTE
[2024-04-29 14:51:00.601708] Added primary key

## Facts table

The definition of the facts table follows a different pattern than the dimensions.

The table schema is previously defined to properly define the foreing keys.


Defining the facts table schema
Metrics + Facts + Dimensions (Foreign Keys)

In [10]:
FACT_TABLE_NAME = "FACT_CENSO_ESCOLAR"

FACT_COLUMNS = [
    "QT_DOC_BAS",  	# Number of Teachers in the basic education (TOTAL)
    "QT_DOC_INF",	  # Number of Teachers in the basic education (child education)
    "QT_DOC_FUND",	# Number of Teachers in the basic education (elementary education)
    "QT_DOC_MED",	  # Number of Teachers in the basic education (high school)
  
    "QT_MAT_BAS",	  # Number of enrollments in the basic education (TOTAL)
    "QT_MAT_INF",	  # Number of enrollments in the basic education (child education)
    "QT_MAT_FUND",	# Number of enrollments in the basic education (elementary education)
    "QT_MAT_MED",	  # Number of enrollments in the basic education (high school)

    "QT_MAT_BAS_ND",	      # Number of enrollments in the basic education - Skin color/Race Not Declared
    "QT_MAT_BAS_BRANCA",	  # Number of enrollments in the basic education - Skin color/Race Branco
    "QT_MAT_BAS_PRETA",	    # Number of enrollments in the basic education - Skin color/Race Preto
    "QT_MAT_BAS_PARDA",	    # Number of enrollments in the basic education - Skin color/Race Parda
    "QT_MAT_BAS_AMARELA",	  # Number of enrollments in the basic education - Skin color/Race Amarela
    "QT_MAT_BAS_INDIGENA",	# Number of enrollments in the basic education - Skin color/Race Indígena
    
    "NU_ANO_CENSO"          # Census' year
]

FACT_CONFIG = {
    fact:{
        "fields": [
            {"field":fact, "type":"integer"}
        ]
    }
    for fact in FACT_COLUMNS
}

DIMENSION_ID_CONFIG = {
    table_name:[
        field['field'] 
        for field 
        in table_fields['fields']
    ]
    for table_name, table_fields in DIMENSION_TABLES_CONFIG.items()
}

FACT_TABLE_ALL_COLUMNS_ORDERED = FACT_COLUMNS + list(map(lambda col:"ID_"+col, DIMENSION_ID_CONFIG.keys()))

Before inserting the data into the facts table, we need to create a function to create the facts table in Postgres

The code below creates the facts table in Postgres using the configuration in FACT_TABLES_CONFIG and adding an id column for each dimension

In [11]:
# Create fact table
# Using the configuration in FACT_CONFIG
# With id as the primary key

# Avoid inserting a backslash into a f-string
comma_break_line = ",\n\t\t\t"
facts_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {FACT_TABLE_NAME} (
        id SERIAL PRIMARY KEY,
        { 
            comma_break_line.join(
                [
                    f"{field} INTEGER" 
                    for field in FACT_COLUMNS
                ]
                +[
                    f"ID_{dim_table} BIGINT"
                    for dim_table in DIMENSION_ID_CONFIG.keys()
                ]
            )
        }
    );
    
    -- Adding Foreign Keys
    ALTER TABLE {FACT_TABLE_NAME}
    {
        comma_break_line.join(
            [
                f"ADD CONSTRAINT {FACT_TABLE_NAME}_{dim_table}_fk FOREIGN KEY (ID_{dim_table}) REFERENCES {dim_table}(id)"
                for dim_table in DIMENSION_ID_CONFIG.keys()
            ]
        )
    }
"""

In [12]:
print(facts_table_sql)


    CREATE TABLE IF NOT EXISTS FACT_CENSO_ESCOLAR (
        id SERIAL PRIMARY KEY,
        QT_DOC_BAS INTEGER,
			QT_DOC_INF INTEGER,
			QT_DOC_FUND INTEGER,
			QT_DOC_MED INTEGER,
			QT_MAT_BAS INTEGER,
			QT_MAT_INF INTEGER,
			QT_MAT_FUND INTEGER,
			QT_MAT_MED INTEGER,
			QT_MAT_BAS_ND INTEGER,
			QT_MAT_BAS_BRANCA INTEGER,
			QT_MAT_BAS_PRETA INTEGER,
			QT_MAT_BAS_PARDA INTEGER,
			QT_MAT_BAS_AMARELA INTEGER,
			QT_MAT_BAS_INDIGENA INTEGER,
			NU_ANO_CENSO INTEGER,
			ID_DIM_LOCAL BIGINT,
			ID_DIM_TP_DEPENDENCIA BIGINT,
			ID_DIM_TP_LOCALIZACAO BIGINT,
			ID_DIM_IN_AGUA_POTAVEL BIGINT,
			ID_DIM_IN_ENERGIA_INEXISTENTE BIGINT,
			ID_DIM_IN_ESGOTO_INEXISTENTE BIGINT,
			ID_DIM_IN_BANHEIRO BIGINT,
			ID_DIM_IN_BIBLIOTECA BIGINT,
			ID_DIM_IN_REFEITORIO BIGINT,
			ID_DIM_IN_COMPUTADOR BIGINT,
			ID_DIM_IN_INTERNET BIGINT,
			ID_DIM_IN_EQUIP_NENHUM BIGINT
    );
    
    -- Adding Foreign Keys
    ALTER TABLE FACT_CENSO_ESCOLAR
    ADD CONSTRAINT FACT_CENSO_ESCOLAR_DIM_LOCAL_fk FORE

Executing the function to create the facts table in Postgres

In [13]:
print(f"[{datetime.now()}] Creating facts table")

cursor = conn.cursor()
try:
    cursor.execute(facts_table_sql)
    cursor.close()
    conn.commit()
except Exception as e:
    print(e)
    conn.rollback()
    cursor.close()
else:
    print(f"[{datetime.now()}] Created facts table")
    print(f"[{datetime.now()}] Done")


[2024-04-29 14:55:36.635973] Creating facts table
[2024-04-29 14:55:36.842779] Created facts table
[2024-04-29 14:55:36.843736] Done


Extracting the data

In [14]:
facts_data = data\
    .select(
        [
            *chain(
                *DIMENSION_ID_CONFIG.values(), 
                FACT_CONFIG.keys()
            )
        ]
    )

Adding the ids for each dimension

In [15]:
# Joining the id of the dimensions

for table_name, table_fields in DIMENSION_ID_CONFIG.items():
    
    # Read the dimension data from Postgres
    dim_table = spark.read\
        .jdbc(
            **POSTGRES_CONFIG,
            table=table_name,
        )\
        .withColumnRenamed("id", f"ID_{table_name}")
    
    # Join the dimension data with the fact data
    facts_data = facts_data\
        .join(
            dim_table,
            on=table_fields,
            how="left"
        )\
        .drop(*table_fields)

Saving data to Postgres

In [16]:
# Order the columns to match the fact table on postgres
# and save the data
facts_data\
    .select(*FACT_TABLE_ALL_COLUMNS_ORDERED)\
    .write\
    .jdbc(
        **POSTGRES_CONFIG,
        table=FACT_TABLE_NAME,
        mode="append"
    )

## OLAP Functions

Before we can execute queries in Spark SQL, we need to make view for each tables

In [20]:
facts_data = spark.read.jdbc(**POSTGRES_CONFIG, table='FACT_CENSO_ESCOLAR')
# Register the fact table as a temporary view
facts_data.createOrReplaceTempView('FACT_CENSO_ESCOLAR')

# Loop through dimension tables
for table_name, table_fields in DIMENSION_ID_CONFIG.items():
    
    # Read the dimension data from Postgres
    dim_table = spark.read.jdbc(**POSTGRES_CONFIG, table=table_name)\
                      .withColumnRenamed("id", f"ID_{table_name}")
    
    # Register each dimension table as a temporary view
    dim_table.createOrReplaceTempView(table_name)


### Roll-Up & Drill-Down

Misal awalnya adalah data jumlah pengajar berdasarkan State

In [27]:
result = spark.sql("""
    SELECT NO_UF, SUM(QT_DOC_BAS) AS TOTAL_TEACHERS
    FROM FACT_CENSO_ESCOLAR
    JOIN DIM_LOCAL ON FACT_CENSO_ESCOLAR.ID_DIM_LOCAL = DIM_LOCAL.ID_DIM_LOCAL
    GROUP BY NO_UF
    ORDER BY NO_UF;

""")
result.show()

+-------------------+--------------+
|              NO_UF|TOTAL_TEACHERS|
+-------------------+--------------+
|               Acre|        154260|
|            Alagoas|        496007|
|              Amapá|        140811|
|           Amazonas|        667797|
|              Bahia|       2201321|
|              Ceará|       1374453|
|   Distrito Federal|        388278|
|     Espírito Santo|        679657|
|              Goiás|        857545|
|           Maranhão|       1437367|
|        Mato Grosso|        543119|
| Mato Grosso do Sul|        493332|
|       Minas Gerais|       3327083|
|             Paraná|       2127370|
|            Paraíba|        700280|
|               Pará|       1305632|
|         Pernambuco|       1284867|
|              Piauí|        715725|
|Rio Grande do Norte|        519593|
|  Rio Grande do Sul|       1820483|
+-------------------+--------------+
only showing top 20 rows



Jika kita Lakukan Drill-Down, maka kita ingin melihat berdasarkan City

In [28]:
result = spark.sql("""
    SELECT NO_UF, NO_MUNICIPIO, SUM(QT_DOC_BAS) AS TOTAL_TEACHERS
    FROM FACT_CENSO_ESCOLAR
    JOIN DIM_LOCAL ON FACT_CENSO_ESCOLAR.ID_DIM_LOCAL = DIM_LOCAL.ID_DIM_LOCAL
    GROUP BY NO_UF, NO_MUNICIPIO
    ORDER BY NO_UF, NO_MUNICIPIO;

""")
result.show()

+-----+--------------------+--------------+
|NO_UF|        NO_MUNICIPIO|TOTAL_TEACHERS|
+-----+--------------------+--------------+
| Acre|          Acrelândia|          2113|
| Acre|        Assis Brasil|          2291|
| Acre|           Brasiléia|          3986|
| Acre|              Bujari|          2394|
| Acre|            Capixaba|          1761|
| Acre|     Cruzeiro do Sul|         19748|
| Acre|      Epitaciolândia|          2480|
| Acre|               Feijó|          7902|
| Acre|              Jordão|          2590|
| Acre|       Manoel Urbano|          1710|
| Acre|Marechal Thaumaturgo|          5027|
| Acre|         Mâncio Lima|          4819|
| Acre|   Plácido de Castro|          3204|
| Acre|          Porto Acre|          3230|
| Acre|        Porto Walter|          2862|
| Acre|          Rio Branco|         55949|
| Acre|     Rodrigues Alves|          5423|
| Acre| Santa Rosa do Purus|          1951|
| Acre|      Sena Madureira|          7577|
| Acre|    Senador Guiomard|    

Sebaliknya jika merubah dari berdasarkan City menjadi berdasarkan State, itu adalah Roll-Up

### Slice & Dice

Misal awalnya menampilkan state, dan jumlah pengajar untuk tiap city

In [32]:
result = spark.sql("""
    SELECT NO_MUNICIPIO, NO_UF, SUM(QT_DOC_BAS) AS TOTAL_TEACHERS
    FROM FACT_CENSO_ESCOLAR
    JOIN DIM_LOCAL ON FACT_CENSO_ESCOLAR.ID_DIM_LOCAL = DIM_LOCAL.ID_DIM_LOCAL
    GROUP BY NO_MUNICIPIO, NO_UF
    ORDER BY NO_MUNICIPIO;

""")
result.show()

+-------------------+-------------------+--------------+
|       NO_MUNICIPIO|              NO_UF|TOTAL_TEACHERS|
+-------------------+-------------------+--------------+
|    Abadia de Goiás|              Goiás|          1255|
|Abadia dos Dourados|       Minas Gerais|           990|
|          Abadiânia|              Goiás|          1881|
|         Abaetetuba|               Pará|         29058|
|             Abaeté|       Minas Gerais|          3102|
|            Abaiara|              Ceará|          1795|
|              Abaré|              Bahia|          3344|
|             Abatiá|             Paraná|          1573|
|             Abaíra|              Bahia|          1336|
|      Abdon Batista|     Santa Catarina|           641|
|    Abel Figueiredo|               Pará|           954|
|       Abelardo Luz|     Santa Catarina|          3240|
|         Abre Campo|       Minas Gerais|          2419|
|       Abreu e Lima|         Pernambuco|         10593|
|        Abreulândia|          

Kemudian dengan melakukan slice, kita hanya ingin tunjukkan yang berada di State Bahia

In [34]:
result = spark.sql("""
    SELECT NO_MUNICIPIO, NO_UF, SUM(QT_DOC_BAS) AS TOTAL_TEACHERS
    FROM FACT_CENSO_ESCOLAR
    JOIN DIM_LOCAL ON FACT_CENSO_ESCOLAR.ID_DIM_LOCAL = DIM_LOCAL.ID_DIM_LOCAL
    WHERE DIM_LOCAL.NO_UF = 'Bahia'
    GROUP BY NO_MUNICIPIO, NO_UF
    ORDER BY NO_MUNICIPIO;

""")
result.show()

+-----------------+-----+--------------+
|     NO_MUNICIPIO|NO_UF|TOTAL_TEACHERS|
+-----------------+-----+--------------+
|            Abaré|Bahia|          3344|
|           Abaíra|Bahia|          1336|
|        Acajutiba|Bahia|          2493|
|         Adustina|Bahia|          2649|
|          Aiquara|Bahia|           907|
|       Alagoinhas|Bahia|         20008|
|         Alcobaça|Bahia|          4099|
|         Almadina|Bahia|          1077|
|         Amargosa|Bahia|          5492|
| Amélia Rodrigues|Bahia|          4152|
|  América Dourada|Bahia|          2421|
|            Anagé|Bahia|          3271|
|          Andaraí|Bahia|          2811|
|        Andorinha|Bahia|          2750|
|          Angical|Bahia|          2703|
|          Anguera|Bahia|          1914|
|            Antas|Bahia|          2321|
|  Antônio Cardoso|Bahia|          1893|
|Antônio Gonçalves|Bahia|          2587|
|            Aporá|Bahia|          3576|
+-----------------+-----+--------------+
only showing top

Jika kita melakukan dice, kita bisa kecilkan lagi dimensi nya menjadi hanya yang City Antas dan State Bahia

In [35]:
result = spark.sql("""
    SELECT NO_MUNICIPIO, NO_UF, SUM(QT_DOC_BAS) AS TOTAL_TEACHERS
    FROM FACT_CENSO_ESCOLAR
    JOIN DIM_LOCAL ON FACT_CENSO_ESCOLAR.ID_DIM_LOCAL = DIM_LOCAL.ID_DIM_LOCAL
    WHERE DIM_LOCAL.NO_UF = 'Bahia' AND DIM_LOCAL.NO_MUNICIPIO = 'Antas'
    GROUP BY NO_MUNICIPIO, NO_UF
    ORDER BY NO_MUNICIPIO;

""")
result.show()

+------------+-----+--------------+
|NO_MUNICIPIO|NO_UF|TOTAL_TEACHERS|
+------------+-----+--------------+
|       Antas|Bahia|          2321|
+------------+-----+--------------+



## References

https://towardsdatascience.com/explaining-technical-stuff-in-a-non-techincal-way-apache-spark-274d6c9f70e9

https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6

https://sparkbyexamples.com/pyspark/pyspark-read-and-write-parquet-file/

https://www.psycopg.org/docs/usage.html
