In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt


# Crear sesión de Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("AnalisisMovilidadCDMX") \
    .config("spark.driver.host", "192.168.1.104") \
    .config("spark.driver.extraJavaOptions", "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED") \
    .config("spark.executor.extraJavaOptions", "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED") \
    .getOrCreate()

# Configurar nivel de logging para evitar output excesivo
spark.sparkContext.setLogLevel("WARN")

25/04/23 18:28:43 WARN Utils: Your hostname, daniel resolves to a loopback address: 127.0.1.1; using 10.73.172.251 instead (on interface wlo1)
25/04/23 18:28:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/23 18:28:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/23 18:28:47 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.
25/04/23 18:28:47 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.
25/04/23 18:28:47 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check 

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.net.BindException: Cannot assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
	at java.base/sun.nio.ch.Net.bind0(Native Method)
	at java.base/sun.nio.ch.Net.bind(Net.java:459)
	at java.base/sun.nio.ch.Net.bind(Net.java:448)
	at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:227)
	at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:134)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:562)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1334)
	at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)
	at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)
	at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:973)
	at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:260)
	at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:356)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:829)


# Lectura de los datos

In [None]:
esquema = StructType([
    StructField("id_viaje", IntegerType(), True),
    StructField("id_vehiculo", StringType(), True),
    StructField("tipo_transporte", StringType(), True),
    StructField("fecha_hora_inicio", TimestampType(), True),
    StructField("fecha_hora_fin", TimestampType(), True),
    StructField("origen_lat", DoubleType(), True),
    StructField("origen_lon", DoubleType(), True),
    StructField("destino_lat", DoubleType(), True),
    StructField("destino_lon", DoubleType(), True),
    StructField("duracion_seg", IntegerType(), True),
    StructField("distancia_km", DoubleType(), True)
])

df = spark.read.csv("viajes_transporte_cdmx.csv", 
                   header=True, 
                   schema=esquema,
                   inferSchema=False)

print("Esquema del DataFrame:")
df.printSchema()
print(f"Número total de registros: {df.count()}")

# Analisis de deatos

In [None]:

congestion_por_transporte = df.groupBy("tipo_transporte") \
    .agg(
        avg("duracion_seg").alias("duracion_promedio_seg"),
        stddev("duracion_seg").alias("desviacion_duracion"),
        count("id_viaje").alias("total_viajes")
    ) \
    .orderBy(desc("duracion_promedio_seg"))

print("Rutas con mayor congestión (por duración promedio):")
congestion_por_transporte.show()

congestion_pd = congestion_por_transporte.toPandas()

plt.figure(figsize=(10, 6))
plt.bar(congestion_pd['tipo_transporte'], congestion_pd['duracion_promedio_seg']/60)
plt.title('Duración promedio de viaje por tipo de transporte (minutos)')
plt.xlabel('Tipo de transporte')
plt.ylabel('Duración promedio (min)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('duracion_promedio_por_transporte.png')
plt.close()

# Horarios con más viajes

In [None]:

df = df.withColumn("hora_inicio", hour("fecha_hora_inicio"))


viajes_por_hora = df.groupBy("hora_inicio") \
    .count() \
    .orderBy("hora_inicio")

print("Viajes por hora del día:")
viajes_por_hora.show(24)

viajes_por_hora_pd = viajes_por_hora.toPandas()
plt.figure(figsize=(12, 6))
plt.plot(viajes_por_hora_pd['hora_inicio'], viajes_por_hora_pd['count'], marker='o')
plt.title('Número de viajes por hora del día')
plt.xlabel('Hora del día')
plt.ylabel('Número de viajes')
plt.xticks(range(0, 24))
plt.grid(True)
plt.tight_layout()
plt.savefig('viajes_por_hora.png')
plt.close()

 # Zonas con más entrada

In [None]:


df = df.withColumn("origen_zona_lat", round(col("origen_lat"), 3)) \
       .withColumn("origen_zona_lon", round(col("origen_lon"), 3)) \
       .withColumn("destino_zona_lat", round(col("destino_lat"), 3)) \
       .withColumn("destino_zona_lon", round(col("destino_lon"), 3))


salidas_por_zona = df.groupBy("origen_zona_lat", "origen_zona_lon") \
    .count() \
    .orderBy(desc("count")) \
    .limit(10)


llegadas_por_zona = df.groupBy("destino_zona_lat", "destino_zona_lon") \
    .count() \
    .orderBy(desc("count")) \
    .limit(10)


print("Top 10 zonas con más salidas de vehículos:")
salidas_por_zona.show()

print("Top 10 zonas con más llegadas de vehículos:")
llegadas_por_zona.show()


salidas_por_zona.write.csv("salidas_por_zona.csv", header=True, mode="overwrite")
llegadas_por_zona.write.csv("llegadas_por_zona.csv", header=True, mode="overwrite")

# Velocidad promedio

In [None]:
velocidad_promedio = df.withColumn("velocidad_kmh", col("distancia_km") / (col("duracion_seg") / 3600)) \
    .groupBy("tipo_transporte") \
    .agg(
        avg("velocidad_kmh").alias("velocidad_promedio_kmh"),
        count("id_viaje").alias("total_viajes")
    ) \
    .orderBy(desc("velocidad_promedio_kmh"))

print("Velocidad promedio por tipo de transporte:")
velocidad_promedio.show()