In [1]:
from pyspark.sql import SparkSession

In [2]:
trips=spark.read.csv('datasets/NYC_taxi_trip_records',header=True,
                       inferSchema=True)

In [3]:
trips.createOrReplaceTempView('viajes')

In [4]:
trips.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [5]:
spark.sql('SELECT * FROM trips').show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2018-11-22 22:37:46|  2018-11-22 22:43:06|              2|         1.26|         1|                 N|          43|         237|           2|        6.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         7.8|
|       2| 2018-11-22 22:51:04|  2018-11-22 23:4

In [6]:
df=trips

In [7]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [8]:
df

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: int, trip_distance: double, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double]

### A) Contar o número de viaxes entre 00:00 e 01:00 de cada día e dar o total por día (agrupados por día, tódolos días teñen un reconto).

In [9]:
from pyspark.sql.functions import col, date_format
from pyspark.sql import SparkSession
df_filtrado = df.where((date_format(col("tpep_pickup_datetime"), "HH:mm:ss") >= "00:00:00") &
    (date_format(col("tpep_dropoff_datetime"), "HH:mm:ss") < "01:00:00")).groupBy(date_format("tpep_pickup_datetime", "yyyy-MM-dd").alias("data")).count().orderBy("data")
df_filtrado.show()


+----------+-----+
|      data|count|
+----------+-----+
|2008-12-31|    3|
|2009-01-01|   25|
|2018-10-31|  169|
|2018-11-01|10356|
|2018-11-02|10735|
|2018-11-03|14348|
|2018-11-04|12737|
|2018-11-05| 4582|
|2018-11-06| 5603|
|2018-11-07| 7296|
|2018-11-08| 9171|
|2018-11-09|12400|
|2018-11-10|15177|
|2018-11-11|13227|
|2018-11-12| 5100|
|2018-11-13| 6282|
|2018-11-14| 7493|
|2018-11-15| 8978|
|2018-11-16|10524|
|2018-11-17|14927|
+----------+-----+
only showing top 20 rows



### B) Contar o número de viaxes entre 00:00 e 01:00 de cada día e dar o total por mes.

In [10]:
from pyspark.sql.functions import col, date_format
from pyspark.sql import SparkSession
df_filtrado2= df_filtrado.groupBy(date_format("data", "yyyy-MM").alias("mes")).sum("count").orderBy("mes")

df_filtrado2.show()

+-------+----------+
|    mes|sum(count)|
+-------+----------+
|2008-12|         3|
|2009-01|        25|
|2018-10|       169|
|2018-11|    269844|
|2018-12|    298290|
|2019-01|        26|
|2019-02|         1|
|2019-03|         1|
+-------+----------+



### C) A media de viaxes ao mes que fai cada conductor (VendorID).

In [11]:
df_c=spark.sql('''
    SELECT 
        VendorID,
        DATE_FORMAT(tpep_pickup_datetime, 'yyyy-MM') AS mes,
        AVG(COUNT(*)) OVER (PARTITION BY VendorID, DATE_FORMAT(tpep_pickup_datetime, 'yyyy-MM')) AS media_viajes_mes
    FROM 
        trips
    GROUP BY 
        VendorID, DATE_FORMAT(tpep_pickup_datetime, 'yyyy-MM')
''')
df_c.show()

+--------+-------+----------------+
|VendorID|    mes|media_viajes_mes|
+--------+-------+----------------+
|       2|2018-11|       4751768.0|
|       2|2018-10|           269.0|
|       2|2009-01|            85.0|
|       2|2019-05|             1.0|
|       2|2019-01|            38.0|
|       4|2018-11|        130817.0|
|       1|2018-12|       3209235.0|
|       2|2019-02|             6.0|
|       4|2018-12|        112210.0|
|       1|2018-11|       3262928.0|
|       2|2003-12|             1.0|
|       2|2018-12|       4850961.0|
|       2|2019-11|             6.0|
|       2|2020-12|             2.0|
|       2|2008-12|            49.0|
|       2|2019-06|            15.0|
|       2|2019-03|             4.0|
+--------+-------+----------------+



### D) A media de viaxes ao día que fai cada conductor (VendorID).

In [12]:
df_c=spark.sql('''
    SELECT 
        VendorID,
        DATE_FORMAT(tpep_pickup_datetime, 'yyyy-MM-dd') AS mes,
        AVG(COUNT(VendorID)) OVER (PARTITION BY VendorID, DATE_FORMAT(tpep_pickup_datetime, 'yyyy-MM-dd')) AS media_viajes_mes
    FROM 
        trips
    GROUP BY 
        VendorID, DATE_FORMAT(tpep_pickup_datetime, 'yyyy-MM-dd')
''')
df_c.show()

+--------+----------+----------------+
|VendorID|       mes|media_viajes_mes|
+--------+----------+----------------+
|       4|2018-11-19|          3800.0|
|       4|2018-12-21|          3836.0|
|       1|2018-12-19|        120722.0|
|       2|2018-11-06|        162115.0|
|       2|2018-12-09|        155120.0|
|       1|2018-11-30|        124868.0|
|       2|2019-01-12|             3.0|
|       2|2008-12-31|            49.0|
|       1|2018-11-04|         97453.0|
|       4|2018-11-01|          5117.0|
|       4|2018-12-20|          4154.0|
|       1|2018-12-22|         96187.0|
|       4|2018-12-18|          4028.0|
|       4|2018-12-22|          3453.0|
|       1|2018-11-05|        110679.0|
|       2|2018-11-22|        108013.0|
|       2|2020-12-10|             2.0|
|       1|2018-11-12|        104009.0|
|       2|2019-01-18|             1.0|
|       4|2018-11-11|          4450.0|
+--------+----------+----------------+
only showing top 20 rows



### E) Cantos pasaxeiros (número de persoas na viaxe) foron como máximo na primeira semana do mes (nunha viaxe).

In [13]:
max_pas= spark.sql("""SELECT MAX(passenger_count) as maximo
            FROM trips
            WHERE day(tpep_pickup_datetime) <= 7""")
max_pas.show()

+------+
|maximo|
+------+
|     9|
+------+



### F) Cantos pasaxeiros (número de persoas na viaxe) foron como máximo en todo o mes (nunha viaxe).

In [14]:
max_pas= spark.sql("""SELECT MAX(passenger_count) as maximo, month(tpep_pickup_datetime) as mes
            FROM trips
            group by month(tpep_pickup_datetime)
            """)
max_pas.show()

+------+---+
|maximo|mes|
+------+---+
|     9| 12|
|     6|  1|
|     1|  6|
|     5|  3|
|     5|  5|
|     6| 10|
|    96| 11|
|     5|  2|
+------+---+



In [32]:
from pyspark.sql.functions import month, max
max_pas = df.groupBy(month("tpep_pickup_datetime").alias("mes")).agg(max("passenger_count").alias("maximo"))
max_pas.show()

+---+------+
|mes|maximo|
+---+------+
| 12|     9|
|  1|     6|
|  6|     1|
|  3|     5|
|  5|     5|
| 10|     6|
| 11|    96|
|  2|     5|
+---+------+



### G) Cantos cartos costou o percorrido máis caro. supoño que e total_amount

In [15]:
viajes=trips.rdd

viajes.max(lambda x:x[-1])[-1]

325478.98

In [16]:
trips.selectExpr("max(total_amount)").first()[0]


325478.98

### H) Cantos cartos costou o percorrido máis barato.

In [17]:
viajes.min(lambda x:x[-1])[-1]

-450.3

In [18]:
trips.selectExpr("min(total_amount)").first()[0]


-450.3