
# Projeto: Análise de Mobilidade Urbana (São Paulo) com PySpark

Este notebook guia você pelo pipeline completo:
1. **ETL batch** com um CSV estático (amostra).
2. **Consultas e agregações** com DataFrames/Spark SQL.
3. **Visualizações** com Matplotlib.
4. **Structured Streaming** com fonte de arquivos JSON simulando telemetria.
5. **Janelas temporais, watermark** e **join** com dados estáticos.

> Observação: Ajuste os caminhos conforme seu ambiente. Este notebook foi preparado para rodar com PySpark 3.x.


In [1]:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Mobilidade_SP") \
    .getOrCreate()

print("Spark Version:", spark.version)


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/17 06:26:48 WARN Utils: Your hostname, mota.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.138 instead (on interface en0)
25/09/17 06:26:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/17 06:26:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Version: 4.0.1


## 1) Leitura e limpeza (batch)

In [5]:
from os import path, getcwd
from pyspark.sql.functions import col, to_timestamp, hour, date_format

# csv_path = "/mnt/data/sptrans_sample_static.csv"
csv_path = path.join(getcwd(), 'sptrans_sample_static.xlsx')
df = spark.read.csv(csv_path, header=True, inferSchema=True)

df = df.withColumn("event_time", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")) \
         .drop("timestamp")

df = df.withColumn("hora", hour(col("event_time"))) \
         .withColumn("dia", date_format(col("event_time"), "yyyy-MM-dd"))

df.show(5)
df.printSchema()


{"ts": "2025-09-17 06:29:21.302", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `timestamp` cannot be resolved. Did you mean one of the following? [`PK\u0003\u0004\u0014\u0000\b\b\b\u0000�\u00121[\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0018\u0000\u0000\u0000xl/drawings/drawing1.xml��]n�0\f\u0007�\u0013�\u000eU�iZ\u0018\u0013C\u0014^�N0\u000e�%n\u001b���\u000e��~�J6i{\u0001\u001em�?���nt��Db\u0013|#�\u0012\u0005z\u0015��]#\u000e�o��(8��``��F\\��n��\u00195�ϼ�\"�{^��\u0011}��ZJV=:�2\f�Ӵ`]. SQLSTATE: 42703", "context": {"file": "line 8 in cell [5]", "line": "", "fragment": "col", "errorClass": "UNRESOLVED_COLUMN.WITH_SUGGESTION"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o69.withColumn.\n: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with 

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `timestamp` cannot be resolved. Did you mean one of the following? [`PK  �1[               xl/drawings/drawing1.xml��]n�0��U�iZC^�N0�%n�����~�J6i{m�?���nt��Db|#�z��]#�o��(8��``��F\��n��5�ϼ�"�{^��}��ZJV=:�2�Ӵ`]. SQLSTATE: 42703;
'Project [PK  �1[               xl/drawings/drawing1.xml��]n�0��U�iZC^�N0�%n�����~�J6i{m�?���nt��Db|#�z��]#�o��(8��`��F\��n��5�ϼ�"�{^��}��ZJV=:�2�Ӵ#17, 'to_timestamp('timestamp, yyyy-MM-dd HH:mm:ss) AS event_time#19]
+- Relation [PK  �1[               xl/drawings/drawing1.xml��]n�0��U�iZC^�N0�%n�����~�J6i{m�?���nt��Db|#�z��]#�o��(8��`��F\��n��5�ϼ�"�{^��}��ZJV=:�2�Ӵ#17] csv


### Agregações por hora e por linha

In [None]:

from pyspark.sql.functions import count, avg, round as rnd

viagens_por_hora = df.groupBy("hora").agg(count("*").alias("total"))
viagens_por_hora.orderBy(col("total").desc()).show()

velocidade_media_por_linha = df.groupBy("line").agg(rnd(avg("speed_kmh"), 2).alias("vel_media"))
velocidade_media_por_linha.orderBy(col("vel_media").desc()).show()


## 2) Spark SQL

In [None]:

df.createOrReplaceTempView("telemetria")

res = spark.sql('''
SELECT line, hora, COUNT(*) as total, ROUND(AVG(speed_kmh),2) AS vel_media
FROM telemetria
GROUP BY line, hora
ORDER BY total DESC
''')
res.show(10)


## 3) Visualizações com Matplotlib

In [None]:

# Atenção às regras do ambiente: usar matplotlib, sem seaborn e sem cores customizadas.
import matplotlib.pyplot as plt

# Converter para pandas para visualização simples
pdf = viagens_por_hora.toPandas().sort_values("hora")

plt.figure()
plt.plot(pdf["hora"], pdf["total"], marker="o")
plt.title("Total de eventos por hora")
plt.xlabel("Hora do dia")
plt.ylabel("Total de eventos")
plt.grid(True)
plt.show()

pdf2 = velocidade_media_por_linha.toPandas()

plt.figure()
plt.bar(pdf2["line"], pdf2["vel_media"])
plt.title("Velocidade média por linha")
plt.xlabel("Linha")
plt.ylabel("Velocidade média (km/h)")
plt.xticks(rotation=45)
plt.grid(True, axis="y")
plt.show()


## 4) Structured Streaming (arquivos JSON simulando telemetria)

In [None]:

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
from pyspark.sql.functions import window, watermark

schema = StructType([
    StructField("vehicle_id", IntegerType()),
    StructField("line", StringType()),
    StructField("lat", DoubleType()),
    StructField("lon", DoubleType()),
    StructField("event_time", StringType()),
    StructField("speed_kmh", DoubleType())
])

input_dir = "/mnt/data/streaming/input"

stream_df_raw = spark.readStream \
    .schema(schema) \
    .option("maxFilesPerTrigger", 1) \
    .json(input_dir)

from pyspark.sql.functions import to_timestamp, col

stream_df = stream_df_raw \
    .withColumn("event_ts", to_timestamp(col("event_time"))) \
    .drop("event_time")

# Janela tumbling de 10 minutos com watermark de 15 minutos
agg_df = stream_df \
    .withWatermark("event_ts", "15 minutes") \
    .groupBy(window(col("event_ts"), "10 minutes"), col("line")) \
    .agg(count("*").alias("eventos"), avg("speed_kmh").alias("vel_media"))

query = agg_df.writeStream \
    .format("memory") \
    .queryName("agg_mem") \
    .outputMode("complete") \
    .start()

# Aguarda processamento inicial (pode precisar de awaitTermination em ambientes reais)
import time; time.sleep(5)

spark.sql("SELECT * FROM agg_mem").show(truncate=False)
query.stop()


### Join streaming + referência estática (linhas)

In [None]:

# Tabela estática simples com nome da linha (exemplo)
ref = spark.createDataFrame([
    ("8600-10", "Term. Pq. D. Pedro II - Term. Pirituba"),
    ("477A-10", "Jd. Ibitirama - Pinheiros"),
    ("309-10",  "Term. Bandeira - Vila Maria"),
], ["line","descricao"])

ref.createOrReplaceTempView("ref_linhas")

df.createOrReplaceTempView("batch_lido")

joined = spark.sql('''
SELECT b.line, r.descricao, COUNT(*) as eventos, ROUND(AVG(b.speed_kmh),2) as vel_media
FROM batch_lido b
JOIN ref_linhas r ON b.line = r.line
GROUP BY b.line, r.descricao
ORDER BY eventos DESC
''')
joined.show()
