In [1]:
import os
os.environ["SPARK_HOME"] = "/home/software/spark-3.1.2-bin-hadoop3.2/"
os.environ["HADOOP_CONF_DIR"] = "/usr/local/hadoop/etc/hadoop/"
os.environ["PYSPARK_PYTHON"]= "python3.9"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3.9"

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf


conf = SparkConf().setAll([('spark.app.name', 'Trabajo_taxis_2'), \
                           ('spark.executor.memory', '1g'), \
                           ('spark.executor.instances','10'), \
                           ('spark.executor.cores', '4'), \
                           ('spark.ui.port', '4077'), \
                           ('spark.driver.memory','1g')])

spark = SparkSession.builder.config(conf=conf).master('yarn').getOrCreate()#yarn es gestor de recursos. Nos da los nodos dentro del cluster

sc = spark.sparkContext

# Datos de taxis
#!/usr/local/hadoop/bin/hdfs dfs -put tripdata_2017-01.csv
df = spark.read.csv('tripdata_2017-01.csv', header=True, inferSchema=True)

#Datos de la zona de taxis
#!/usr/local/hadoop/bin/hdfs dfs -put taxi+_zone_lookup.csv  
df1 = spark.read.csv('taxi+_zone_lookup.csv', header=True, inferSchema=True)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

PROBLEMA 1: Velocidad media de los taxis en función de la hora.

In [2]:
# pasamos el tiempo a timestamp, pasamos el timestamp a segundos (unix), dividimos espacio entre tiempo para tener velocidad

import pyspark.sql.functions as F

df.select(
    
        (df['trip_distance']/0.621371 /
        (
            F.unix_timestamp(F.to_timestamp(df['tpep_dropoff_datetime']))/3600  - 
            F.unix_timestamp(F.to_timestamp(df['tpep_pickup_datetime']))/3600
        )
        ).alias('v(km/h)'),
    
    F.concat(F.hour(F.to_timestamp(df['tpep_dropoff_datetime'])),F.lit(':00')).alias('Hour'),
    
    
).groupby('Hour').mean('v(km/h)').orderBy('avg(v(km/h))',ascending=False).toPandas()


                                                                                

Unnamed: 0,Hour,avg(v(km/h))
0,5:00,35.570693
1,6:00,32.195466
2,4:00,31.600243
3,23:00,30.7991
4,3:00,27.70374
5,22:00,27.379489
6,0:00,26.937918
7,16:00,25.944796
8,7:00,24.872721
9,2:00,24.776757


Las horas a las que se conduce más rapido son de madrugada (4:00 , 5:00 y 6:00 A.M.)

PROBLEMA 2: Viajes en taxi más comunes.

In [5]:
import pyspark.sql.functions as F

df.createOrReplaceTempView("df")
df1.createOrReplaceTempView("df1")

loc=spark.sql('select PULocationID as DepartureID, DOLocationID as ArrivalID, count(*) as Total from df group by DepartureID, ArrivalID order by count(*) desc limit 10')#.show() # Si le dejo el .show() pasa a ser de tipo NoneType

loc.createOrReplaceTempView('loc')

join1 = spark.sql('SELECT DepartureID, Zone as Zone_PU from loc join df1 on DepartureID=LocationID')
join2 = spark.sql('select ArrivalID, Zone as Zone_DO from loc join df1 on ArrivalID=LocationID')

join1.createOrReplaceTempView("join1")
join2.createOrReplaceTempView("join2")

join3 = spark.sql('select distinct Zone_PU as Departure_Zone, Total from loc left join join1 on loc.DepartureID=join1.DepartureID')#.show()
join4 = spark.sql('select distinct Zone_DO as Arrival_Zone, Total from loc left join join2 on loc.ArrivalID=join2.ArrivalID')#.show()

join3.createOrReplaceTempView("join3")
join4.createOrReplaceTempView("join4")

result = spark.sql('select Departure_zone, Arrival_zone, j3.Total from join3 as j3 join join4 as j4 on j3.Total=j4.Total where j3.Total != 14776 order by Total desc')#.show()
result.select(result['Departure_zone'], result['Arrival_zone'], (result['Total']).alias('Number_of_trips')).toPandas()

# Hemos quitado la fila con 'Number_of_trips=14776' porque se correspondía a columnas de llegada y salida nulas

Unnamed: 0,Departure_zone,Arrival_zone,Number_of_trips
0,Upper East Side South,Upper East Side North,3433
1,Upper East Side North,Upper East Side North,3174
2,Upper East Side North,Upper East Side South,3011
3,Times Sq/Theatre District,West Chelsea/Hudson Yards,2944
4,East Village,East Village,2939
5,Upper East Side South,Upper East Side South,2744
6,Upper West Side South,Lincoln Square East,2595
7,Upper West Side South,Upper West Side North,2539
8,Lincoln Square East,Upper West Side South,2471


Los viajes en taxi más comunes se producen en la zona de Upper East Side, en el distrito de Manhattan.

PROBLEMA 3: Registros financieros (propinas, personas, etc.)

In [14]:
df.createOrReplaceTempView("Financial_register")

result = spark.sql('SELECT passenger_count as Number_of_passengers, payment_type as Payment_type, extra as Extra_payment, mta_tax as MTA_tax, tip_amount as Credit_card_tip, tolls_amount as Total_payment_toll, improvement_surcharge as Improvement_payment, fare_amount, total_amount as Total_payment FROM Financial_register')
result.show(5)
result.describe().show()

+--------------------+------------+-------------+-------+---------------+------------------+-------------------+-----------+-------------+
|Number_of_passengers|Payment_type|Extra_payment|MTA_tax|Credit_card_tip|Total_payment_toll|Improvement_payment|fare_amount|Total_payment|
+--------------------+------------+-------------+-------+---------------+------------------+-------------------+-----------+-------------+
|                   1|           1|          0.0|    0.5|            2.0|               0.0|                0.3|       12.5|         15.3|
|                   1|           1|          0.0|    0.5|           1.45|               0.0|                0.3|        5.0|         7.25|
|                   1|           1|          0.0|    0.5|            1.0|               0.0|                0.3|        5.5|          7.3|
|                   1|           1|          0.0|    0.5|            1.7|               0.0|                0.3|        6.0|          8.5|
|                   1|     

Se han obtenido registros finacieros con datos como el número máximo de pasajeros, la media de los pagos en peaje, precios de tasas etc.
Aparecen también pagos en negativo que se corresponden a tarjetas rechazadas o viajes en los que el/los pasajero/s no pagaron. Sorprende el pago de más de 538.000 dólares en un solo viaje.

PROBLEMA 4: Calcular la media de las propinas en efectivo.

In [10]:
# Nuestra tabla solo tiene información sobre las propinas con tarjeta. Para calcular las propinas en efectivo, sumamos todos los
# campos que contienen las partes en las que se descompone la factura y les restamos el campo con el pago total realizado. La diferencia
# nos da la propina en efectivo pagada en cada viaje. Para calcularlo, tomamos la tabla 'result' utilizada en el problema anterior.

result.createOrReplaceTempView("result")
p4 = spark.sql("select Extra_payment+MTA_tax+Credit_card_tip+Total_payment_toll+Improvement_payment+fare_amount as Total, Total_payment from result")#.show(5)

p4.createOrReplaceTempView('p4')
sol=spark.sql("select Total, Total_payment, Total_payment-Total as Cash_tip from p4 where Total_payment-Total>=0.01")

sol.describe().show()

+-------+------------------+----------------+------------------+
|summary|             Total|   Total_payment|          Cash_tip|
+-------+------------------+----------------+------------------+
|  count|              1800|            1800|              1800|
|   mean|19.447316666666662|         21.5414| 2.094083333333331|
| stddev|14.161037955042234|14.2536969268421|0.5927880852444508|
|    min|               3.3|            5.75|1.9499999999999886|
|    max|            129.36|          131.31| 4.950000000000003|
+-------+------------------+----------------+------------------+



La media de las propinas en efectivo es de 2.09 dólares.