# Implementação de um data lakehouse para dados geoespaciais com Eelta Lake

In [161]:
import pyspark
from delta import *
import os

bronze_path = "./VED-lakehouse/bronze/"
silver_path = "./VED-lakehouse/silver/"

## Configurando Spark

In [162]:
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Nível Bronze

In [163]:
def _standardize_for_delta_columns(column: str) -> str:
    if 'Air' in column:
        column = column.replace('[', '_')[:-1]
    else:
        column = column.split('(')[0].split('[')[0]

    column = column.replace(' ', '_').lower()

    return(column)
    

In [164]:
def build_bronze():
    csvs = os.listdir('data')

    for csv in csvs:
        if not os.path.exists(f'VED-lakehouse/bronze/{csv[4:-9]}/'):
            df = spark.read.format("csv").option("header", True).load(f"./data/{csv}")
            
            columns = []
            for column in df.columns:
                columns.append(_standardize_for_delta_columns(column))

            df = df.toDF(*columns)
            
            df.write.format("delta").save(f"{bronze_path}{csv[4:-9]}")

In [165]:
build_bronze()

# Nível Prata

In [166]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, sum as _sum, min as _min, max as _max

numeric_columns = [
	'vehicle_speed', 'maf', 'engine_rpm', 'absolute_load', 'oat',
	'fuel_rate', 'air_conditioning_power_kw', 'air_conditioning_power_watts',
	'heater_power', 'hv_battery_current', 'hv_battery_soc', 'hv_battery_voltage',
	'short_term_fuel_trim_bank_1', 'short_term_fuel_trim_bank_2',
	'long_term_fuel_trim_bank_1', 'long_term_fuel_trim_bank_2'
]


df = spark.read.format('delta').option("header", True).load('VED-lakehouse/bronze/171108')
df.show()

+-------------+-----+----+---------+-------------+--------------+-------------+--------------+----------+-------------+----+---------+-------------------------+----------------------------+------------+------------------+--------------+------------------+---------------------------+---------------------------+--------------------------+--------------------------+
|       daynum|vehid|trip|timestamp|     latitude|     longitude|vehicle_speed|           maf|engine_rpm|absolute_load| oat|fuel_rate|air_conditioning_power_kw|air_conditioning_power_watts|heater_power|hv_battery_current|hv_battery_soc|hv_battery_voltage|short_term_fuel_trim_bank_1|short_term_fuel_trim_bank_2|long_term_fuel_trim_bank_1|long_term_fuel_trim_bank_2|
+-------------+-----+----+---------+-------------+--------------+-------------+--------------+----------+-------------+----+---------+-------------------------+----------------------------+------------+------------------+--------------+------------------+-------------

Transformação de valores string para valores numéricos

In [167]:
def build_silver():
    bronze_arch = os.listdir(bronze_path)

    for num in bronze_arch:
        if not os.path.exists(f'VED-lakehouse/silver/{num}/'):
            df = spark.read.format('delta').option("header", True).load(f'VED-lakehouse/bronze/{num}')

            columns = []
            for column in df.columns:
                columns.append(_standardize_for_delta_columns(column))

            for column in numeric_columns:
                df = df.withColumn(column, col(column).cast('float'))

            df = df.dropna(how='all')

            df = df.dropDuplicates()
            
            for column in numeric_columns:
                max_value = df.agg(_max(column)).collect()[0][0]
                min_value = df.agg(_min(column)).collect()[0][0]

                columns = columns
                if max_value != min_value: 
                    normalized_col = (col(column) - min_value) / (max_value - min_value)
                    col_name = f"{column}_norm"
                    df = df.withColumn(col_name, normalized_col)
                    columns.append(col_name)
                    
            df = df.toDF(*columns)
            df.write.format("delta").mode("overwrite").save(f"{silver_path}{num}")


In [168]:
build_silver()

['171101', '171108', '171115', '171122', '171129', '171206', '171213', '171220', '171227', '180103', '180110', '180117', '180124', '180131', '180207', '180214', '180221', '180228', '180307', '180314', '180321', '180328', '180404', '180411', '180418', '180425', '180502', '180509', '180516', '180523', '180530', '180606', '180613', '180620', '180627', '180704', '180711', '180718', '180725', '180801', '180808', '180815', '180822', '180829', '180905', '180912', '180919', '180926', '181003', '181010', '181017', '181024', '181031', '181107']


In [172]:
df = spark.read.format('delta').option("header", True).load('VED-lakehouse/silver/171108')
df.show()

+-------------+-----+----+---------+-------------+--------------+-------------+-----+----------+-------------+---+---------+-------------------------+----------------------------+------------+------------------+--------------+------------------+---------------------------+---------------------------+--------------------------+--------------------------+------------------+--------+---------------+------------------+--------+--------------+------------------------------+---------------------------------+-----------------+-----------------------+-------------------+-----------------------+--------------------------------+--------------------------------+-------------------------------+-------------------------------+
|       daynum|vehid|trip|timestamp|     latitude|     longitude|vehicle_speed|  maf|engine_rpm|absolute_load|oat|fuel_rate|air_conditioning_power_kw|air_conditioning_power_watts|heater_power|hv_battery_current|hv_battery_soc|hv_battery_voltage|short_term_fuel_trim_bank_1|sho