Transformações a serem realizadas

- [ ] Remoção de arquivos com horário de ping diferente do horário de requsição.
- [ ]  Ajustar hora em sp, diminuindo -3.
- [ ]  Em Curitiba, quando o campo “codigolinha” estiver “REC”, o ônibus não está em operação, logo, será removido.
- [ ] Ausência de valor no campo “linha” em BSB indica que não está em operação, logo deverá ser removido.
- [ ] Atualizar campos de horas e datas para ISO 8601  2024-02-24T13:05Z.
- [ ] Padronizar o sentido de operação da linha em SP e CWB para integers 1 = ida 2=  volta.
- [ ] Padronizar os identificadores de ônibus CUR_idOnibus.
- [ ] Add nome dos arquivos para um campo algo como "query_timestamp"


In [1]:
from pyspark.sql import *
from delta import *

builder = SparkSession.builder.appName("topicos").config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

INPUT_PATH = "/home/felipe/code/topicos_dados/dados/"
BRONZE_PATH = "/home/felipe/code/topicos_dados/lake/bronze/"
SILVER_PATH = "/home/felipe/code/topicos_dados/lake/silver/"

print(spark.version)


24/03/18 13:54:06 WARN Utils: Your hostname, desktop resolves to a loopback address: 127.0.1.1; using 192.168.0.106 instead (on interface enp6s0)
24/03/18 13:54:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/felipe/.local/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/felipe/.ivy2/cache
The jars for the packages stored in: /home/felipe/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-caab55e4-d394-49e3-828f-8132d27be247;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.1.0 in central
	found io.delta#delta-storage;3.1.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 1031ms :: artifacts dl 30ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.1.0 from central in [default]
	io.delta#delta-storage;3.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   

3.5.0


In [58]:
from pyspark.sql import functions as F

def checkInvalidTimeRows(df):
    
    df = df.withColumn("updated_at", F.to_timestamp("updated_at"))
    df = df.withColumn("queried_at", F.to_timestamp("queried_at"))

    timediff = F.abs(F.unix_timestamp("updated_at") - F.unix_timestamp("queried_at"))

    return df.filter(timediff < 300)


In [65]:
# São Paulo

from pyspark.sql.functions import udf, input_file_name, col
from pyspark.sql.types import StringType, TimestampType
from datetime import datetime, timedelta


# Commmon Functions
def changeBusIdSP(onibus_id):
    """
    Change the column "id_onibus" to the following pattern: CITY_id_onibus

    Example: SP_0881
    """
    return f"SPO_{onibus_id}"

def changeTimestamp(timestamp):
    """ 
    Change timestamp to ISO 8601 (2024-02-24T13:05Z) using GMT-3
    """
    datetime_object = datetime.fromisoformat(timestamp)
    return str((datetime_object-timedelta(hours=3)).isoformat())

def addTimestampQueryTime(filenames):
    """ 
    Add the column "query_timestamp" indicating the timestamp
    
    file:///home/felipe/code/topicos_dados/dados/cb_micro/1706665101.9679544.parquet -> 1706665101.9679544
    """
    file_name = str(filenames)[54:-8]
    time = (datetime.fromtimestamp(float(file_name))).strftime("%Y-%m-%dT%H:%M:%SZ")
    return time

# UDFS

udf_transformBusIdSpo = udf(changeBusIdSP,StringType())
udf_changeTimestamp = udf(changeTimestamp,StringType())
udf_addTimestampFile = udf(addTimestampQueryTime,StringType())

# Reading DF
sp = spark.read.format("parquet").option("inferSchema","true").option("header","true").load(f"{INPUT_PATH}/sp_micro").withColumn("inputFiles",input_file_name())

# Changing DF
sp = sp.withColumn("queried_at",udf_addTimestampFile(col("inputFiles")))
sp = sp.withColumn("bus_id",udf_transformBusIdSpo(col("id_onibus")))
sp = sp.withColumn("updated_at",udf_changeTimestamp(col("tempo_captura")))

# Droping columns
sp = sp.drop("inputFiles","id_onibus","tempo_captura")

# Renaming columns

sp = sp.withColumnsRenamed({"lt0":"station0","lt1":"station1","c":"bus_code"})


# Testing filtering rows

sp = checkInvalidTimeRows(sp)

# saving
sp.write.format("delta").option("path",f"/home/felipe/code/topicos_dados/lake/silver/silver_sp").saveAsTable("silver_sp")

sp.show()

24/03/18 14:48:56 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/03/18 14:48:56 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
24/03/18 14:48:56 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
24/03/18 14:48:56 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
24/03/18 14:48:56 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
                                                                                

+-------------------+-------------------+--------+-------------------+--------------------+-------------------+---------+-------------------+
|           latitude|          longitude|bus_code|           station0|            station1|         queried_at|   bus_id|         updated_at|
+-------------------+-------------------+--------+-------------------+--------------------+-------------------+---------+-------------------+
|         -23.552871|-46.647738000000004| 609F-10|TERM. PRINC. ISABEL|       CHÁC. SANTANA|2024-01-30 19:39:15|SPO_71212|2024-01-30 19:39:09|
|        -23.6532175| -46.74036099999999| 609F-10|TERM. PRINC. ISABEL|       CHÁC. SANTANA|2024-01-30 19:39:15|SPO_71739|2024-01-30 19:38:46|
|        -23.6680755|         -46.748797| 609F-10|TERM. PRINC. ISABEL|       CHÁC. SANTANA|2024-01-30 19:39:15|SPO_71361|2024-01-30 19:38:37|
|         -23.534974|        -46.6443435| 609F-10|TERM. PRINC. ISABEL|       CHÁC. SANTANA|2024-01-30 19:39:15|SPO_71740|2024-01-30 19:38:55|
|     

In [66]:
# spark.sql("DROP TABLE IF EXISTS silver_sp")
spark.sql("DROP TABLE IF EXISTS silver_bsb")
spark.sql("DROP TABLE IF EXISTS silver_rj")
# spark.sql("DROP TABLE IF EXISTS silver_cwb")

DataFrame[]

In [52]:
# Curitiba

from pyspark.sql.functions import udf, input_file_name, col
from pyspark.sql.types import StringType, IntegerType, TimestampType, DoubleType
from datetime import datetime, timedelta
from pyspark.sql import functions as F


def changeBusIdCWB(onibus_id):
    """
    Change the column "id_onibus" to the following pattern: CITY_id_onibus

    Example: SP_0881
    """
    return f"CWB_{onibus_id}"

def changeTimestampCwb(time,timestamp):
    """ 
    Change timestamp to ISO 8601 (2024-02-24T13:05Z) using GMT-3 for Curitiba

    This is the 'tempo_captura' field in Curitiba: "22:40"
    2024-01-30T
    """
    return f"{timestamp[:10]}T{time}:00Z"

def addTimestampQueryTime(filenames):
    """ 
    Add the column "query_timestamp" indicating the timestamp
    
    file:///home/felipe/code/topicos_dados/dados/cb_micro/1706665101.9679544.parquet -> 1706665101.9679544
    """
    file_name = str(filenames)[54:-8]
    time = (datetime.fromtimestamp(float(file_name))).strftime("%Y-%m-%dT%H:%M:%SZ")
    return time

def changeSentidoField(sentido):
    sentidoMap = {
        'IDA': 1,
        'VOLTA': 2
    }

    return sentidoMap[sentido] if sentido in list(sentidoMap.keys()) else 0

def removeInactiveBus(df):

    filtered_df = df.filter(df['linha']!="REC")
    return filtered_df



# UDFS

udf_transformBusIdCwb = udf(changeBusIdCWB,StringType())
udf_changeTimestamp = udf(changeTimestampCwb,StringType())
udf_addTimestampFile = udf(addTimestampQueryTime,StringType())
udf_changeSentido = udf(changeSentidoField,IntegerType())

# Reading DF

cwb = spark.read.format("parquet").option("inferSchema","true").option("header","true").load(f"{INPUT_PATH}/cb_micro").withColumn("inputFiles",input_file_name())

# Transformation

cwb = cwb.withColumn("queried_at",udf_addTimestampFile(col("inputFiles")))
cwb = cwb.withColumn("bus_id",udf_transformBusIdCwb(col("id_onibus")))
cwb = cwb.withColumn("updated_at",udf_changeTimestamp(col("tempo_captura"),col('queried_at')))
cwb = cwb.withColumn("bus_direction",udf_changeSentido(col("sentido")))

cwb = removeInactiveBus(cwb)

# Dropping

cwb = cwb.drop("tempo_captura","sentido","inputFiles","id_onibus")

# Renamming

cwb = cwb.withColumnsRenamed({
    "adaptado":"is_adapted",
    "linha":"bus_code",
    "tipo_veiculo":"type_vehicle",
    "situacao":"situation",
    "situacao_2":"situation_2",
    "tabela":"table"
})

cwb = checkInvalidTimeRows(cwb)

# Casting types

cwb = cwb.withColumn("latitude", cwb["latitude"].cast(DoubleType()))
cwb = cwb.withColumn("longitude", cwb["longitude"].cast(DoubleType()))
cwb = cwb.withColumn("is_adapted", cwb["is_adapted"].cast(IntegerType()))

# Saving

cwb.write.format("delta").option("path",f"/home/felipe/code/topicos_dados/lake/silver/silver_cwb").saveAsTable("silver_cwb")

cwb.show()



+----------+----------+--------------+--------+----------+---------------+-----+----------+-------------------+---------+-------------------+-------------+
|  latitude| longitude|  type_vehicle|bus_code| situation|    situation_2|table|is_adapted|         queried_at|   bus_id|         updated_at|bus_direction|
+----------+----------+--------------+--------+----------+---------------+-----+----------+-------------------+---------+-------------------+-------------+
|-25.453483| -49.28948|MICRO ESPECIAL|     762|NO HORÁRIO|REALIZANDO ROTA|  2-2|         1|2024-01-30 19:42:22|CWB_JI859|2024-01-30 19:42:00|            2|
|-25.481223|-49.196893|MICRO ESPECIAL|     463|NO HORÁRIO|REALIZANDO ROTA|    2|         1|2024-01-30 19:42:22|CWB_DN603|2024-01-30 19:38:00|            1|
|-25.432205|  -49.2658|MICRO ESPECIAL|     463|NO HORÁRIO|REALIZANDO ROTA|    4|         1|2024-01-30 19:42:22|CWB_DN608|2024-01-30 19:42:00|            2|
|-25.514096|-49.322986|    ARTICULADO|     040|NO HORÁRIO|REALIZ

In [67]:
# Brasilia

from pyspark.sql.functions import udf, input_file_name, col
from pyspark.sql.types import StringType, IntegerType, TimestampType, DoubleType
from datetime import datetime, timedelta
from pyspark.sql import functions as F


def changeBusIdBSB(onibus_id):
    """
    Change the column "id_onibus" to the following pattern: CITY_id_onibus

    Example: SP_0881
    """
    return f"BSB_{onibus_id}"

def changeTimestampBsb(timestamp):
    """ 
    Change timestamp to ISO 8601 (2024-02-24T13:05Z) using GMT-3 for Curitiba

    This is the 'tempo_captura' field in Curitiba: "1701355514000"
    But the value isn't the same as the timestamp from querying. 
    """
    time = (datetime.fromtimestamp(float(str(timestamp)[:10]))).strftime("%Y-%m-%dT%H:%M:%SZ")
    return time

def addTimestampQueryTime(filenames):
    """ 
    Add the column "query_timestamp" indicating the timestamp
    
    file:///home/felipe/code/topicos_dados/dados/cb_micro/1706665101.9679544.parquet -> 1706665101.9679544
    """
    file_name = str(filenames)[54:-8]
    time = (datetime.fromtimestamp(float(file_name))).strftime("%Y-%m-%dT%H:%M:%SZ")
    return time

def changeSentidoField(sentido):
    sentidoMap = {
        'IDA': 1,
        'VOLTA': 2
    }

    return sentidoMap[sentido] if sentido in list(sentidoMap.keys()) else 0

def removeInactiveBus(df):

    filtered_df = df.filter(df['linha']!="")
    return filtered_df

# UDFS

udf_transformBusIdBSB = udf(changeBusIdBSB,StringType())
udf_changeTimestamp = udf(changeTimestampBsb,StringType())
udf_addTimestampFile = udf(addTimestampQueryTime,StringType())
udf_changeSentido = udf(changeSentidoField,IntegerType())

bsb = spark.read.format("parquet").option("inferSchema","true").option("header","true").load(f"{INPUT_PATH}/df_micro").withColumn("inputFiles",input_file_name())


bsb = bsb.withColumn("queried_at",udf_addTimestampFile(col("inputFiles")))
bsb = bsb.withColumn("updated_at",udf_changeTimestamp(col("tempo_captura")))
bsb = bsb.withColumn("bus_id",udf_transformBusIdBSB(col("id_onibus")))
bsb = bsb.withColumn("bus_direction",udf_changeSentido(col("sentido")))

bsb = removeInactiveBus(bsb)
bsb = checkInvalidTimeRows(bsb)

# dropping 

bsb = bsb.drop("tempo_captura","sentido","inputFiles","id_onibus")

# renaming

bsb = bsb.withColumnsRenamed({
    "velocidade":"bus_speed",
    "linha":"bus_code",
    "direcao":"direction"
})

# 


bsb.write.format("delta").option("path",f"/home/felipe/code/topicos_dados/lake/silver/silver_bsb").saveAsTable("silver_bsb")

bsb.show()


24/03/18 16:19:09 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/03/18 16:19:09 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
24/03/18 16:19:09 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
24/03/18 16:19:09 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 69,09% for 11 writers
24/03/18 16:19:09 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 63,33% for 12 writers
24/03/18 16:19:10 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 69,09% for 11 writers
24/03/18 16:19:10 WARN MemoryManager: Total allocation exceeds 95,

+---------+---------+---------+----------+--------+-------------------+-------------------+----------+-------------+
|longitude| latitude|bus_speed| direction|bus_code|         queried_at|         updated_at|    bus_id|bus_direction|
+---------+---------+---------+----------+--------+-------------------+-------------------+----------+-------------+
|-47.94006|-15.81103|    13.06|12.8996672|   391.3|2024-02-02 04:32:40|2024-02-02 04:27:41|BSB_339261|            1|
|-47.89697|-15.84607|      0.0|17.2043209|   0.822|2024-02-02 04:32:40|2024-02-02 04:27:41|BSB_337005|            1|
|-47.86511|-15.76293|    14.44|304.305342|   0.884|2024-02-02 04:32:40|2024-02-02 04:27:41|BSB_337072|            2|
|-47.93695|-15.86751|    16.11| 253.43652|   0.835|2024-02-02 04:32:40|2024-02-02 04:27:42|BSB_335479|            1|
|-48.06735|-15.87217|     4.72|15.0089136|   0.841|2024-02-02 04:32:40|2024-02-02 04:27:42|BSB_339351|            1|
|-48.10607|-15.88023|      0.0|18.3611965|   367.2|2024-02-02 04

In [68]:
# Rio de Janeiro

from pyspark.sql.functions import udf, input_file_name, col
from pyspark.sql.types import StringType, TimestampType, DoubleType
from datetime import datetime, timedelta
from pyspark.sql import functions as F


def changeBusIdRj(onibus_id):
    """
    Change the column "id_onibus" to the following pattern: CITY_id_onibus

    Example: SP_0881
    """
    return f"RJO_{onibus_id}"

def changeTimestampRj(timestamp):
    """ 
    Change timestamp to ISO 8601 (2024-02-24T13:05Z) using GMT-3 for Rio de janeiro

    This is the 'tempo_captura' field in Rio de Janeiro: "1701355514000"
    
    """
    time = (datetime.fromtimestamp(float(timestamp[:-3]))).strftime("%Y-%m-%d %H:%M:%S")
    return time

def addTimestampQueryTime(filenames):
    """ 
    Add the column "query_timestamp" indicating the timestamp
    
    file:///home/felipe/code/topicos_dados/dados/cb_micro/1706665101.9679544.parquet -> 1706665101.9679544
    """
    file_name = str(filenames)[54:-8]
    time = (datetime.fromtimestamp(float(file_name))).strftime("%Y-%m-%d %H:%M:%S")
    return time

def changeSeparatorString(values):
    return values.replace(",",".")

# UDFS

udf_transformBusIdRj = udf(changeBusIdRj,StringType())
udf_changeTimestamp = udf(changeTimestampRj,StringType())
udf_addTimestampFile = udf(addTimestampQueryTime,StringType())
udf_changeSeparator = udf(changeSeparatorString,StringType())

rj = spark.read.format("parquet").option("inferSchema","true").option("header","true").load(f"{INPUT_PATH}/rj_micro").withColumn("inputFiles",input_file_name())

rj = rj.withColumn("queried_at",udf_addTimestampFile(col("inputFiles")))
rj = rj.withColumn("updated_at",udf_changeTimestamp(col("tempo_captura")))
rj = rj.withColumn("bus_id",udf_transformBusIdRj(col("id_onibus")))
rj = rj.withColumn("latitude",udf_changeSeparator(col("latitude")))
rj = rj.withColumn("longitude",udf_changeSeparator(col("longitude")))

# dropping

rj = rj.drop("tempo_captura","id_onibus","inputFiles")

# Renaming

rj = rj.withColumnsRenamed({
    "velocidade":"bus_speed",
    "linha":"bus_code",
})

rj = checkInvalidTimeRows(rj)

# casting

rj = rj.withColumn("latitude", rj["latitude"].cast(DoubleType()))
rj = rj.withColumn("longitude", rj["longitude"].cast(DoubleType()))
rj = rj.withColumn("bus_speed", rj["bus_speed"].cast(DoubleType()))


rj.write.format("delta").option("path",f"/home/felipe/code/topicos_dados/lake/silver/silver_rj").saveAsTable("silver_rj")

rj.show()


24/03/18 16:19:26 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/03/18 16:19:26 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
24/03/18 16:19:26 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
24/03/18 16:19:26 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 69,09% for 11 writers
24/03/18 16:19:27 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 63,33% for 12 writers
24/03/18 16:19:36 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 69,09% for 11 writers
24/03/18 16:19:36 WARN MemoryManager: Total allocation exceeds 95,

+---------+---------+---------+--------+-------------------+-------------------+----------+
| latitude|longitude|bus_speed|bus_code|         queried_at|         updated_at|    bus_id|
+---------+---------+---------+--------+-------------------+-------------------+----------+
|-22.94364|-43.25486|      0.0|     220|2024-02-02 08:03:00|2024-02-02 08:01:48|RJO_A50159|
|-23.01045| -43.2975|      8.0|     302|2024-02-02 08:03:00|2024-02-02 08:01:48|RJO_C50118|
|-22.93032|-43.23769|      0.0|     220|2024-02-02 08:03:00|2024-02-02 08:01:48|RJO_A50025|
|-23.00146|-43.36449|     16.0|   SP805|2024-02-02 08:03:00|2024-02-02 08:01:48|RJO_C50047|
|-22.92475|-43.25124|     27.0|     608|2024-02-02 08:03:00|2024-02-02 08:01:49|RJO_A50187|
|-22.92565|-43.24485|     24.0|     608|2024-02-02 08:03:00|2024-02-02 08:01:46|RJO_A50127|
|-22.92509|-43.23414|     18.0|     645|2024-02-02 08:03:00|2024-02-02 08:01:48|RJO_C50116|
|-23.00145|-43.36455|     15.0|   SP805|2024-02-02 08:03:00|2024-02-02 08:01:49|