**1. Import de Librerías necesarias**

In [1]:
import os
import subprocess
import pandas as pd
from datetime import date

from pyspark import SparkContext, HiveContext
from pyspark.sql import DataFrame, SQLContext, SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import to_timestamp
from pyspark.sql import functions as f
from pyspark.sql.functions import percentile_approx

pd.set_option('display.max_columns', None)
pd.set_option('display.expand_frame_repr', False)

In [2]:
ss = SparkSession.builder.appName("bigdatita").getOrCreate()

24/12/11 05:22:59 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


**2. Lectura de los archivos**

In [3]:
!hdfs dfs -ls /user/bigdatita/

Found 2 items
-rw-r--r--   2 cuauhtemocbe hadoop 2689650394 2024-12-07 18:08 /user/bigdatita/taxi_trips_2023.csv
-rw-r--r--   2 cuauhtemocbe hadoop 2253224851 2024-12-11 02:50 /user/bigdatita/taxi_trips_2024.csv


In [4]:
ruta = '/user/bigdatita/'
cmd = f'hdfs dfs -ls {ruta}'
lst = str(subprocess.check_output(cmd, shell=True)).strip().split('\\n')
lst = [x.split(' ')[-1] for x in lst]
lst = ['hdfs://%s'%x for x in lst if x!="items" if x!="'"]
lst

['hdfs:///user/bigdatita/taxi_trips_2023.csv',
 'hdfs:///user/bigdatita/taxi_trips_2024.csv']

**3. Lectura de archivos desde HDFS a Spark**

In [5]:
%%time
for i, file in enumerate(lst):
    print(file)
    if i==0:
        df = ss.read.csv(file, header=True, inferSchema=False)
    else:
        df = df.union(ss.read.csv(file, header=True, inferSchema=False))

hdfs:///user/bigdatita/taxi_trips_2023.csv


                                                                                

hdfs:///user/bigdatita/taxi_trips_2024.csv
CPU times: user 6.54 ms, sys: 7.45 ms, total: 14 ms
Wall time: 8.18 s


In [37]:
df.printSchema()

root
 |-- Trip ID: string (nullable = true)
 |-- Taxi ID: string (nullable = true)
 |-- Trip Start Timestamp: string (nullable = true)
 |-- Trip End Timestamp: string (nullable = true)
 |-- Trip Seconds: string (nullable = true)
 |-- Trip Miles: string (nullable = true)
 |-- Pickup Census Tract: string (nullable = true)
 |-- Dropoff Census Tract: string (nullable = true)
 |-- Pickup Community Area: string (nullable = true)
 |-- Dropoff Community Area: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Tips: string (nullable = true)
 |-- Tolls: string (nullable = true)
 |-- Extras: string (nullable = true)
 |-- Trip Total: string (nullable = true)
 |-- Payment Type: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Pickup Centroid Latitude: string (nullable = true)
 |-- Pickup Centroid Longitude: string (nullable = true)
 |-- Pickup Centroid Location: string (nullable = true)
 |-- Dropoff Centroid Latitude: string (nullable = true)
 |-- Dropoff Centroid

In [38]:
date_format = 'MM/dd/yyyy hh:mm:ss a'

df = df.withColumn('Trip Start Timestamp', to_timestamp(df['Trip Start Timestamp'], date_format).cast(TimestampType()))
df = df.withColumn('Trip End Timestamp', to_timestamp(df['Trip End Timestamp'], date_format).cast(TimestampType()))

## Filtrando información

In [39]:
df = df[(df['Trip Start Timestamp'] <= '2024-08-31')]

In [40]:
df.select(f.max("Trip Start Timestamp")).show()



+-------------------------+
|max(Trip Start Timestamp)|
+-------------------------+
|      2024-08-31 00:00:00|
+-------------------------+



                                                                                

In [42]:
for var in ['Fare', 'Tips', 'Tolls', 'Extras', 'Trip Total']:
    df = df.withColumn(var, df[var].cast('float'))

In [13]:
df.printSchema()

root
 |-- Trip ID: string (nullable = true)
 |-- Taxi ID: string (nullable = true)
 |-- Trip Start Timestamp: timestamp (nullable = true)
 |-- Trip End Timestamp: timestamp (nullable = true)
 |-- Trip Seconds: string (nullable = true)
 |-- Trip Miles: string (nullable = true)
 |-- Pickup Census Tract: string (nullable = true)
 |-- Dropoff Census Tract: string (nullable = true)
 |-- Pickup Community Area: string (nullable = true)
 |-- Dropoff Community Area: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Tips: float (nullable = true)
 |-- Tolls: float (nullable = true)
 |-- Extras: float (nullable = true)
 |-- Trip Total: float (nullable = true)
 |-- Payment Type: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Pickup Centroid Latitude: string (nullable = true)
 |-- Pickup Centroid Longitude: string (nullable = true)
 |-- Pickup Centroid Location: string (nullable = true)
 |-- Dropoff Centroid Latitude: string (nullable = true)
 |-- Dropoff Centroi

**4. Operaciones básicas**

In [45]:
from IPython.display import display

In [46]:
describe_df = df.describe().toPandas()

                                                                                

Unnamed: 0,summary,Trip ID,Taxi ID,Trip Seconds,Trip Miles,Pickup Census Tract,Dropoff Census Tract,Pickup Community Area,Dropoff Community Area,Fare,...,Extras,Trip Total,Payment Type,Company,Pickup Centroid Latitude,Pickup Centroid Longitude,Pickup Centroid Location,Dropoff Centroid Latitude,Dropoff Centroid Longitude,Dropoff Centroid Location
0,count,10755497,10755497,10753418.0,10755450.0,4715559.0,4579195.0,10395504.0,9728878.0,10731465.0,...,10731465.0,10731465.0,10755497,10755497,10400001.0,10400001.0,10400001,9793910.0,9793910.0,9793910
1,mean,,,1253.8432065042018,6.611410067454639,17031507928.872871,17031414968.672115,35.790761563845294,26.23879351760809,22.53805691143093,...,2.274319610613868,28.014873584051116,,,41.90216939498016,-87.70184576171366,,41.89357786782515,-87.66223343453349,
2,stddev,,,1675.5977047159922,7.849773679995616,375165.198951974,344238.5790196729,26.30309864981512,20.95742682723646,26.23894602180289,...,18.14324821362521,38.6419479970113,,,0.0637277383372247,0.1141023449147195,,0.0573247369201358,0.0734826427053273,
3,min,00000019ef3c0b419a9e6b1843b83530b79627a4,000daaa11a2d961100513e232a1ce05391c5d797d2dc56...,0.0,0.0,17031010100.0,17031010100.0,1.0,1.0,0.0,...,0.0,0.0,Cash,2733 - 74600 Benny Jona,41.650221676,-87.531386257,POINT (-87.5313862567 41.7204632831),41.650221676,-87.534902901,POINT (-87.5349029012 41.707311449)
4,max,ffffffdda8f2f9f98cf474cce05b7e5e34dc25e4,ffda53354c610fd3af1aee46d723028a49014e35f7280c...,9999.0,99.99,17031980100.0,17031980100.0,9.0,9.0,9999.75,...,9693.78,9999.75,Unknown,Wolley Taxi,42.021223593,-87.913624596,POINT (-87.913624596 41.9802643146),42.021223593,-87.913624596,POINT (-87.913624596 41.9802643146)


## Análisis exploratorio

In [63]:
subset = [
    
    'summary',
    'Trip Seconds', 'Trip Miles',
    'Pickup Census Tract', 'Dropoff Census Tract', 'Pickup Community Area',
    'Dropoff Community Area', 'Fare', 'Tips', 'Tolls', 'Extras',
    'Trip Total', 'Payment Type', 'Company', 'Pickup Centroid Latitude',
    'Pickup Centroid Longitude', 'Pickup Centroid Location',
    'Dropoff Centroid Latitude', 'Dropoff Centroid Longitude',
    'Dropoff Centroid  Location'
]

display(describe_df[subset].round(2))

Unnamed: 0,summary,Trip Seconds,Trip Miles,Pickup Census Tract,Dropoff Census Tract,Pickup Community Area,Dropoff Community Area,Fare,Tips,Tolls,Extras,Trip Total,Payment Type,Company,Pickup Centroid Latitude,Pickup Centroid Longitude,Pickup Centroid Location,Dropoff Centroid Latitude,Dropoff Centroid Longitude,Dropoff Centroid Location
0,count,10753418.0,10755450.0,4715559.0,4579195.0,10395504.0,9728878.0,10731465.0,10731465.0,10731465.0,10731465.0,10731465.0,10755497,10755497,10400001.0,10400001.0,10400001,9793910.0,9793910.0,9793910
1,mean,1253.8432065042018,6.611410067454639,17031507928.872871,17031414968.672115,35.790761563845294,26.23879351760809,22.53805691143093,2.9734957981665016,0.0495877094770661,2.274319610613868,28.014873584051116,,,41.90216939498016,-87.70184576171366,,41.89357786782515,-87.66223343453349,
2,stddev,1675.5977047159922,7.849773679995616,375165.198951974,344238.5790196729,26.30309864981512,20.95742682723646,26.23894602180289,4.2933656948623,10.821039066251116,18.14324821362521,38.6419479970113,,,0.0637277383372247,0.1141023449147195,,0.0573247369201358,0.0734826427053273,
3,min,0.0,0.0,17031010100.0,17031010100.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,Cash,2733 - 74600 Benny Jona,41.650221676,-87.531386257,POINT (-87.5313862567 41.7204632831),41.650221676,-87.534902901,POINT (-87.5349029012 41.707311449)
4,max,9999.0,99.99,17031980100.0,17031980100.0,9.0,9.0,9999.75,500.0,6666.66,9693.78,9999.75,Unknown,Wolley Taxi,42.021223593,-87.913624596,POINT (-87.913624596 41.9802643146),42.021223593,-87.913624596,POINT (-87.913624596 41.9802643146)


In [66]:
%%time
deciles = df.selectExpr("percentile_approx(Fare, array(0.0, 0.01, 0.02, 0.03, 0.97, 0.98, 0.99, 1.0)) as Percentiles")
deciles.show(truncate=False)

[Stage 35:>                                                         (0 + 1) / 1]

+-----------------------------------------------------+
|Percentiles                                          |
+-----------------------------------------------------+
|[0.0, 3.25, 3.25, 3.25, 58.25, 64.25, 73.25, 9999.75]|
+-----------------------------------------------------+

CPU times: user 29 ms, sys: 15.7 ms, total: 44.7 ms
Wall time: 38.8 s


                                                                                

## Valores nulo

In [77]:
from pyspark.sql.functions import col, isnan, when, count

# Columnas numéricas
numeric_columns = [c for c, t in df.dtypes if t in ["double", "float"]]

# Columnas no numéricas
other_columns = [c for c, t in df.dtypes if t not in ["double", "float"]]

# Contar valores faltantes para columnas numéricas
numeric_missing = [
    count(when(isnan(col(c)) | col(c).isNull(), c)).alias(c) for c in numeric_columns
]

# Contar valores faltantes para columnas no numéricas
other_missing = [
    count(when(col(c).isNull(), c)).alias(c) for c in other_columns
]

# Combinar resultados
missings_df = df.select(numeric_missing + other_missing).toPandas()

                                                                                

In [103]:
df.filter(df["Taxi ID"].isNull() | isnan(df["Taxi ID"])).show()



+-------+-------+--------------------+------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+----+----+-----+------+----------+------------+-------+------------------------+-------------------------+------------------------+-------------------------+--------------------------+--------------------------+
|Trip ID|Taxi ID|Trip Start Timestamp|Trip End Timestamp|Trip Seconds|Trip Miles|Pickup Census Tract|Dropoff Census Tract|Pickup Community Area|Dropoff Community Area|Fare|Tips|Tolls|Extras|Trip Total|Payment Type|Company|Pickup Centroid Latitude|Pickup Centroid Longitude|Pickup Centroid Location|Dropoff Centroid Latitude|Dropoff Centroid Longitude|Dropoff Centroid  Location|
+-------+-------+--------------------+------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+----+----+-----+------+----------+------------+-------+--------------------

                                                                                

In [106]:
df.filter(df["Taxi ID"] == None).show()

+-------+-------+--------------------+------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+----+----+-----+------+----------+------------+-------+------------------------+-------------------------+------------------------+-------------------------+--------------------------+--------------------------+
|Trip ID|Taxi ID|Trip Start Timestamp|Trip End Timestamp|Trip Seconds|Trip Miles|Pickup Census Tract|Dropoff Census Tract|Pickup Community Area|Dropoff Community Area|Fare|Tips|Tolls|Extras|Trip Total|Payment Type|Company|Pickup Centroid Latitude|Pickup Centroid Longitude|Pickup Centroid Location|Dropoff Centroid Latitude|Dropoff Centroid Longitude|Dropoff Centroid  Location|
+-------+-------+--------------------+------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+----+----+-----+------+----------+------------+-------+--------------------

In [108]:
df.filter(df["Taxi ID"] == "nan").show()



+-------+-------+--------------------+------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+----+----+-----+------+----------+------------+-------+------------------------+-------------------------+------------------------+-------------------------+--------------------------+--------------------------+
|Trip ID|Taxi ID|Trip Start Timestamp|Trip End Timestamp|Trip Seconds|Trip Miles|Pickup Census Tract|Dropoff Census Tract|Pickup Community Area|Dropoff Community Area|Fare|Tips|Tolls|Extras|Trip Total|Payment Type|Company|Pickup Centroid Latitude|Pickup Centroid Longitude|Pickup Centroid Location|Dropoff Centroid Latitude|Dropoff Centroid Longitude|Dropoff Centroid  Location|
+-------+-------+--------------------+------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+----+----+-----+------+----------+------------+-------+--------------------

24/12/11 03:55:30 WARN BlockManagerMaster: Failed to remove broadcast 95 with removeFromMaster = true - org.apache.spark.SparkException: Could not find BlockManagerEndpoint1.
	at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:178)
	at org.apache.spark.rpc.netty.Dispatcher.postRemoteMessage(Dispatcher.scala:136)
	at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:683)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelR

In [101]:
df.filter(f.col("Trip ID").isNull() | f.col("Trip ID").isNull()).show()



+-------+-------+--------------------+------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+----+----+-----+------+----------+------------+-------+------------------------+-------------------------+------------------------+-------------------------+--------------------------+--------------------------+
|Trip ID|Taxi ID|Trip Start Timestamp|Trip End Timestamp|Trip Seconds|Trip Miles|Pickup Census Tract|Dropoff Census Tract|Pickup Community Area|Dropoff Community Area|Fare|Tips|Tolls|Extras|Trip Total|Payment Type|Company|Pickup Centroid Latitude|Pickup Centroid Longitude|Pickup Centroid Location|Dropoff Centroid Latitude|Dropoff Centroid Longitude|Dropoff Centroid  Location|
+-------+-------+--------------------+------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+----+----+-----+------+----------+------------+-------+--------------------

24/12/11 03:48:49 WARN BlockManagerMasterEndpoint: No more replicas available for broadcast_77_piece0 !
24/12/11 03:48:49 WARN BlockManagerMaster: Failed to remove broadcast 77 with removeFromMaster = true - org.apache.spark.SparkException: Could not find BlockManagerEndpoint1.
	at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:178)
	at org.apache.spark.rpc.netty.Dispatcher.postRemoteMessage(Dispatcher.scala:136)
	at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:683)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
	at io.netty.channel.SimpleChannelInboundHandler.channelRea

In [100]:
df.dtypes

[('Trip ID', 'string'),
 ('Taxi ID', 'string'),
 ('Trip Start Timestamp', 'timestamp'),
 ('Trip End Timestamp', 'timestamp'),
 ('Trip Seconds', 'string'),
 ('Trip Miles', 'string'),
 ('Pickup Census Tract', 'string'),
 ('Dropoff Census Tract', 'string'),
 ('Pickup Community Area', 'string'),
 ('Dropoff Community Area', 'string'),
 ('Fare', 'float'),
 ('Tips', 'float'),
 ('Tolls', 'float'),
 ('Extras', 'float'),
 ('Trip Total', 'float'),
 ('Payment Type', 'string'),
 ('Company', 'string'),
 ('Pickup Centroid Latitude', 'string'),
 ('Pickup Centroid Longitude', 'string'),
 ('Pickup Centroid Location', 'string'),
 ('Dropoff Centroid Latitude', 'string'),
 ('Dropoff Centroid Longitude', 'string'),
 ('Dropoff Centroid  Location', 'string')]

In [98]:
other_columns

['Trip ID',
 'Taxi ID',
 'Trip Start Timestamp',
 'Trip End Timestamp',
 'Trip Seconds',
 'Trip Miles',
 'Pickup Census Tract',
 'Dropoff Census Tract',
 'Pickup Community Area',
 'Dropoff Community Area',
 'Payment Type',
 'Company',
 'Pickup Centroid Latitude',
 'Pickup Centroid Longitude',
 'Pickup Centroid Location',
 'Dropoff Centroid Latitude',
 'Dropoff Centroid Longitude',
 'Dropoff Centroid  Location']

In [78]:
display(missings_df.round())

Unnamed: 0,Fare,Tips,Tolls,Extras,Trip Total,Trip ID,Taxi ID,Trip Start Timestamp,Trip End Timestamp,Trip Seconds,Trip Miles,Pickup Census Tract,Dropoff Census Tract,Pickup Community Area,Dropoff Community Area,Payment Type,Company,Pickup Centroid Latitude,Pickup Centroid Longitude,Pickup Centroid Location,Dropoff Centroid Latitude,Dropoff Centroid Longitude,Dropoff Centroid Location
0,24032,24032,24032,24032,24032,0,0,0,121,2079,47,6039938,6176302,359993,1026619,0,0,355496,355496,355496,961587,961587,961587


In [82]:
total = df.count()
missings_df = missings_df.T / total * 100


In [96]:
missings_df.reset_index().sort_values(0, ascending=False)

Unnamed: 0,index,0
12,Dropoff Census Tract,57.424608
11,Pickup Census Tract,56.156754
14,Dropoff Community Area,9.545063
21,Dropoff Centroid Longitude,8.940424
20,Dropoff Centroid Latitude,8.940424
22,Dropoff Centroid Location,8.940424
13,Pickup Community Area,3.347061
19,Pickup Centroid Location,3.305249
18,Pickup Centroid Longitude,3.305249
17,Pickup Centroid Latitude,3.305249


In [19]:
%%time
df2 = df.filter((df['Fare']>=3.25) & (df['Fare']<61) & (df['Trip Start Timestamp']>date(2022,1,1)))

CPU times: user 4.86 ms, sys: 0 ns, total: 4.86 ms
Wall time: 162 ms


In [20]:
%%time
df2.count()



CPU times: user 12.1 ms, sys: 4.03 ms, total: 16.1 ms
Wall time: 11.4 s


                                                                                

6330856

In [21]:
%%time
df2.describe('Fare').show()



+-------+------------------+
|summary|              Fare|
+-------+------------------+
|  count|           6330856|
|   mean|20.933412153155338|
| stddev|15.150629654538148|
|    min|              3.25|
|    max|             60.99|
+-------+------------------+

CPU times: user 22.2 ms, sys: 4.51 ms, total: 26.7 ms
Wall time: 21.1 s


                                                                                

In [22]:
df2.select('Taxi ID', 'Trip Start Timestamp', 'Trip End Timestamp', 'Fare').show(10)

+--------------------+--------------------+-------------------+-----+
|             Taxi ID|Trip Start Timestamp| Trip End Timestamp| Fare|
+--------------------+--------------------+-------------------+-----+
|e2c349c7cbb608d55...| 2023-01-01 00:00:00|2023-01-01 00:15:00|15.75|
|4ab7a7510c1ebcc9b...| 2023-01-01 00:00:00|2023-01-01 00:15:00| 41.5|
|8c76eb82f069c0731...| 2023-01-01 00:00:00|2023-01-01 00:15:00|16.14|
|a688de71e9eb70603...| 2023-01-01 00:00:00|2023-01-01 00:00:00|  5.5|
|8b1a88e5a09cfd55c...| 2023-01-01 00:00:00|2023-01-01 00:15:00| 7.75|
|36add30222345794b...| 2023-01-01 00:00:00|2023-01-01 00:00:00|  5.5|
|a1b762f1aec578cc8...| 2023-01-01 00:00:00|2023-01-01 00:15:00|24.16|
|32995ef9e82cc7265...| 2023-01-01 00:00:00|2023-01-01 00:00:00|  7.0|
|7cb607b940a690e65...| 2023-01-01 00:00:00|2023-01-01 00:30:00| 43.0|
|c797f1560410b9db3...| 2023-01-01 00:00:00|2023-01-01 00:45:00|53.94|
+--------------------+--------------------+-------------------+-----+
only showing top 10 

**5. Funciones definidas por el usuario**

In [23]:
f = udf(lambda x:x.strftime('%Y%m'))

In [24]:
%%time
df.withColumn('Month', f(df['Trip Start Timestamp'])).select('Month', 'Trip Start Timestamp', 'Fare').show(5)

[Stage 19:>                                                         (0 + 1) / 1]

+------+--------------------+-----+
| Month|Trip Start Timestamp| Fare|
+------+--------------------+-----+
|202301| 2023-01-01 00:00:00|15.75|
|202301| 2023-01-01 00:00:00| 41.5|
|202301| 2023-01-01 00:00:00|16.14|
|202301| 2023-01-01 00:00:00|  5.5|
|202301| 2023-01-01 00:00:00| 7.75|
+------+--------------------+-----+
only showing top 5 rows

CPU times: user 13.4 ms, sys: 1.5 ms, total: 14.9 ms
Wall time: 2.9 s


                                                                                

In [25]:
df3 = df2.withColumn('Month', f(df2['Trip Start Timestamp']))

In [26]:
from pyspark.sql import functions as F

In [27]:
%%time
df3.groupby('Month').agg(F.min('Fare'), F.mean('Fare'), F.max('Fare')).orderBy('Month').show()



+------+---------+------------------+---------+
| Month|min(Fare)|         avg(Fare)|max(Fare)|
+------+---------+------------------+---------+
|202301|     3.25|19.614685258580668|    60.98|
|202302|     3.25|19.331680329445366|    60.87|
|202303|     3.25|20.367781575363686|    60.89|
|202304|     3.25| 21.10538537557895|    60.89|
|202305|     3.25|21.823371132602876|    60.98|
|202306|     3.25|21.378843105051427|    60.99|
|202307|     3.25|20.858233512792676|    60.98|
|202308|     3.25|20.688272713133475|    60.87|
|202309|     3.25| 21.85586235420275|    60.87|
|202310|     3.25| 22.14559835620755|    60.98|
|202311|     3.25|21.473220534662975|    60.99|
|202312|     3.25|19.538692389349396|    60.89|
+------+---------+------------------+---------+

CPU times: user 21.6 ms, sys: 15.6 ms, total: 37.2 ms
Wall time: 34.4 s


                                                                                

**6. Uso de SQL**

In [28]:
sqlContext = SQLContext(sc)



In [29]:
df2.registerTempTable("taxi_trips")



In [30]:
query = """
SELECT
    year('Trip Start Timestamp')*100 + month('Trip Start Timestamp') as fh,
    count(*) as casos
FROM
    taxi_trips
GROUP BY
    year('Trip Start Timestamp')*100 + month('Trip Start Timestamp')
ORDER BY
    fh
"""

In [None]:
query = """SELECT * FROM taxi_trips LIMIT 10;"""

In [None]:
%%time
sqlContext.sql(query).show()

**7. Leer tabla desde Hive**

In [None]:
%%time
hive_context = HiveContext(sc)
tabla = hive_context.table("test.chicago")

In [None]:
%%time
tabla.count()

In [None]:
%%time
tabla = tabla.groupby('Company').agg({'fare':'mean'})

In [None]:
tabla.show(4, truncate=False)

**8. Escribir en Hive y HDFS**

In [None]:
tabla.createOrReplaceTempView('grupo_chicago')

In [None]:
%%time
hive_context.sql('create table test.grupo_chicago as select * from grupo_chicago')

In [None]:
f'hdfs://{ruta}'

In [None]:
%%time
tabla.write.format('csv').save(f'hdfs://{ruta}/agrupado.csv')

In [None]:
%%time
tabla.write.parquet(f'hdfs://{ruta}/parquet')

In [None]:
df3 = ss.read.csv(f'hdfs://{ruta}/agrupado.csv', header=False, inferSchema=True)
df3.count()

In [None]:
df3.show(4, truncate=False)

In [None]:
df_parquet = ss.read.parquet(f'hdfs://{ruta}/parquet')
df_parquet.count()

In [None]:
df_parquet.show(4, truncate=False)

**10. Convertir a pandas**

In [None]:
%%time
df = hive_context.table('test.grupo_chicago')

In [None]:
%%time
df_pandas = df.toPandas()

In [None]:
df_pandas.head()