## 1. Librerias necesarias

In [1]:
from pyspark import SparkContext,HiveContext
from pyspark.sql import DataFrame,SQLContext,SparkSession
from pyspark.sql.functions import udf
import os
import subprocess
import pandas as pd
from datetime import date



## 2. Listado de archivos desde hdfs

In [None]:
ruta = '/user/bigdatita/chicago'
#ruta = '/user/hive/warehouse/test.db/chicago'

In [None]:
cmd = 'hdfs dfs -ls %s'%ruta
lst = subprocess.check_output(cmd, shell=True).strip().split('\n')
lst = [x.split(' ')[-1] for x in lst]
lst = ['hdfs://%s'%x for x in lst if x!='items']

In [None]:
lst

['hdfs:///user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_01.csv',
 'hdfs:///user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_02.csv',
 'hdfs:///user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_03.csv',
 'hdfs:///user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_04.csv',
 'hdfs:///user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_05.csv',
 'hdfs:///user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_06.csv',
 'hdfs:///user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_07.csv',
 'hdfs:///user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_08.csv',
 'hdfs:///user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_09.csv',
 'hdfs:///user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_10.csv',
 'hdfs:///user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_11.csv',
 'hdfs:///user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_12.csv']

## 3. Lectura de archivos desde HDFS a Spark

In [None]:
sparkSession = SparkSession.builder.appName("lectura").getOrCreate()

In [None]:
%%time
for i,arch in enumerate(lst):
    print(arch)
    if i == 0:
        df = sparkSession.read.csv(arch,header=True,inferSchema=False)
    else:
        df = df.union( sparkSession.read.csv(arch,header=True,inferSchema=False))

hdfs://localhost:8020/user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_01.csv
hdfs:/user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_02.csv
hdfs:/user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_03.csv
hdfs:/user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_04.csv
hdfs:/user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_05.csv
hdfs:/user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_06.csv
hdfs:/user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_07.csv
hdfs:/user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_08.csv
hdfs:/user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_09.csv
hdfs:/user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_10.csv
hdfs:/user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_11.csv
hdfs:/user/hive/warehouse/test.db/chicago/chicago_taxi_trips_2016_12.csv
CPU times: user 42.7 ms, sys: 1.75 ms, total: 44.5 ms
Wall time: 3.66 s


In [None]:
type(df)

pyspark.sql.dataframe.DataFrame

In [None]:
df.printSchema()

root
 |-- taxi_id: string (nullable = true)
 |-- trip_start_timestamp: string (nullable = true)
 |-- trip_end_timestamp: string (nullable = true)
 |-- trip_seconds: string (nullable = true)
 |-- trip_miles: string (nullable = true)
 |-- pickup_census_tract: string (nullable = true)
 |-- dropoff_census_tract: string (nullable = true)
 |-- pickup_community_area: string (nullable = true)
 |-- dropoff_community_area: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- tips: string (nullable = true)
 |-- tolls: string (nullable = true)
 |-- extras: string (nullable = true)
 |-- trip_total: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- company: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)



In [None]:
from pyspark.sql.functions import to_timestamp

In [None]:
%%time
df = df.withColumn('trip_start_timestamp',to_timestamp(df['trip_start_timestamp']))
df = df.withColumn('trip_end_timestamp',to_timestamp(df['trip_end_timestamp']))

CPU times: user 3.11 ms, sys: 558 µs, total: 3.67 ms
Wall time: 46.7 ms


In [None]:
df.printSchema()

root
 |-- taxi_id: string (nullable = true)
 |-- trip_start_timestamp: timestamp (nullable = true)
 |-- trip_end_timestamp: timestamp (nullable = true)
 |-- trip_seconds: string (nullable = true)
 |-- trip_miles: string (nullable = true)
 |-- pickup_census_tract: string (nullable = true)
 |-- dropoff_census_tract: string (nullable = true)
 |-- pickup_community_area: string (nullable = true)
 |-- dropoff_community_area: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- tips: string (nullable = true)
 |-- tolls: string (nullable = true)
 |-- extras: string (nullable = true)
 |-- trip_total: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- company: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)



## 4. Operaciones básicas

In [None]:
%%time
df.count()

CPU times: user 5.16 ms, sys: 195 µs, total: 5.35 ms
Wall time: 17.6 s


19866157

In [None]:
df.select('taxi_id','trip_start_timestamp').show(6)

+-------+--------------------+
|taxi_id|trip_start_timestamp|
+-------+--------------------+
|     85| 2016-01-13 06:15:00|
|   2776| 2016-01-22 09:30:00|
|   3168| 2016-01-31 21:30:00|
|   4237| 2016-01-23 17:30:00|
|   5710| 2016-01-14 05:45:00|
|   1987| 2016-01-08 18:15:00|
+-------+--------------------+
only showing top 6 rows



In [None]:
%%time
df.describe('fare').show()

+-------+------------------+
|summary|              fare|
+-------+------------------+
|  count|          19865857|
|   mean|13.890483490342264|
| stddev|25.389793055250195|
|    min|              0.00|
|    max|           9999.00|
+-------+------------------+

CPU times: user 5.97 ms, sys: 0 ns, total: 5.97 ms
Wall time: 13 s


In [None]:
%%time
df2=df.filter((df.fare>30)&(df['trip_start_timestamp']<date(2016,1,15)))

CPU times: user 880 µs, sys: 3.38 ms, total: 4.26 ms
Wall time: 92.5 ms


In [None]:
%%time
df2.count()

CPU times: user 0 ns, sys: 3.95 ms, total: 3.95 ms
Wall time: 11.4 s


82026

In [None]:
%%time
df2.select('taxi_id','trip_start_timestamp', 'trip_end_timestamp').show(5)

+-------+--------------------+-------------------+
|taxi_id|trip_start_timestamp| trip_end_timestamp|
+-------+--------------------+-------------------+
|   4986| 2016-01-14 04:30:00|2016-01-14 05:00:00|
|   8613| 2016-01-14 14:15:00|2016-01-14 14:45:00|
|   8162| 2016-01-14 18:15:00|2016-01-14 19:00:00|
|   6535| 2016-01-09 11:15:00|2016-01-09 11:30:00|
|   3690| 2016-01-10 19:00:00|2016-01-10 19:00:00|
+-------+--------------------+-------------------+
only showing top 5 rows

CPU times: user 4.3 ms, sys: 757 µs, total: 5.05 ms
Wall time: 1.39 s


## 5. Funciones definidas por el usuario

In [None]:
f = udf(lambda x:x.strftime('%Y%m'))

In [None]:
%%time
df.withColumn('fh',f(df['trip_start_timestamp'])).select('fh','trip_start_timestamp','fare').show(5)

+------+--------------------+-----+
|    fh|trip_start_timestamp| fare|
+------+--------------------+-----+
|201601| 2016-01-13 06:15:00| 4.50|
|201601| 2016-01-22 09:30:00| 4.45|
|201601| 2016-01-31 21:30:00|42.75|
|201601| 2016-01-23 17:30:00| 7.00|
|201601| 2016-01-14 05:45:00|10.25|
+------+--------------------+-----+
only showing top 5 rows

CPU times: user 15.3 ms, sys: 3.51 ms, total: 18.8 ms
Wall time: 12.3 s


In [None]:
%%time
df = df.withColumn('fh',f(df['trip_start_timestamp']))

CPU times: user 2.53 ms, sys: 449 µs, total: 2.98 ms
Wall time: 11.1 ms


In [None]:
from pyspark.sql import functions as F

In [None]:
%%time
df.groupby('fh').agg(F.min('fare'),F.max('fare'),F.mean('fare'),F.stddev('fare'),F.mean('tips')).show()

+------+---------+---------+------------------+------------------+------------------+
|    fh|min(fare)|max(fare)|         avg(fare)| stddev_samp(fare)|         avg(tips)|
+------+---------+---------+------------------+------------------+------------------+
|201607|     0.00|   999.99| 14.04946081363528| 20.58891652949124|1.5933120310403202|
|201611|     0.00|   999.99|14.062723070495885| 23.51303144563972|1.6601316774044181|
|201605|     0.00|  9999.00|14.581178349487448| 29.09179066737618| 1.783741596632322|
|201602|     0.00|   999.99|13.266467420290947| 19.88635909009765|1.5922205744464408|
|201606|     0.00|   999.99|14.570897891073576|23.594892747714532|1.8046511297304668|
|201609|     0.00|   999.99|14.222663623373517|25.177290998835247|1.6526758568247304|
|201610|     0.00|   999.99|14.368423414112398| 25.84362555261587|1.6878759348363959|
|201608|     0.00|   999.99| 13.87390564865237| 28.47597739157845|1.5590011227617797|
|201604|     0.00|   999.99| 14.04663645356125|18.4124

In [None]:
%%time
df.groupby('fh').agg(F.countDistinct('taxi_id')).show()

+------+-----------------------+
|    fh|count(DISTINCT taxi_id)|
+------+-----------------------+
|201607|                   5777|
|201611|                   4493|
|201605|                   6072|
|201602|                   5645|
|201606|                   5971|
|201609|                   4871|
|201610|                   4758|
|201608|                   5139|
|201604|                   5970|
|201601|                   5636|
|201603|                   5736|
|201612|                   4550|
+------+-----------------------+

CPU times: user 64.2 ms, sys: 10.9 ms, total: 75.1 ms
Wall time: 1min 3s


## 6. Uso de SQL 

In [None]:
sqlContext = SQLContext(sc)

In [None]:
df.registerTempTable("taxi_trips")

In [None]:
query = """
SELECT 
  year(trip_start_timestamp)* 100 + month(trip_start_timestamp) as fh, 
  count(*) as casos 
from 
  taxi_trips 
group by 
  year(trip_start_timestamp)* 100 + month(trip_start_timestamp) 
order by 
  fh

"""

In [None]:
%%time
sqlContext.sql(query).show()

+------+-------+
|    fh|  casos|
+------+-------+
|201601|1705805|
|201602|1751192|
|201603|1975108|
|201604|1952152|
|201605|1943584|
|201606|1934479|
|201607|1745387|
|201608|1523947|
|201609|1294020|
|201610|1499771|
|201611|1295000|
|201612|1245712|
+------+-------+

CPU times: user 3.87 ms, sys: 700 µs, total: 4.57 ms
Wall time: 21.1 s


## 7. Leer tabla desde Hive

In [None]:
%%time
hive_context = HiveContext(sc)
tabla = hive_context.table("test.chicago")

CPU times: user 3.56 ms, sys: 0 ns, total: 3.56 ms
Wall time: 156 ms


In [None]:
type(tabla)

pyspark.sql.dataframe.DataFrame

In [None]:
%%time
tabla.count()

CPU times: user 0 ns, sys: 3.59 ms, total: 3.59 ms
Wall time: 16.3 s


19866169

In [None]:
%%time
tabla = tabla.groupby('trip_start_timestamp').agg({'fare':'mean'})

CPU times: user 3.64 ms, sys: 0 ns, total: 3.64 ms
Wall time: 21 ms


In [None]:
tabla.show(4)

+--------------------+------------------+
|trip_start_timestamp|         avg(fare)|
+--------------------+------------------+
| 2016-02-22 12:45:00|14.950078528588978|
| 2016-02-12 15:00:00| 12.50338900253206|
| 2016-02-14 15:30:00| 12.20341426649658|
| 2016-02-04 16:45:00|14.139089924542644|
+--------------------+------------------+
only showing top 4 rows



## 8. Escribir en Hive y HDFS

In [None]:
tabla.createOrReplaceTempView('grupo_chicago')

In [None]:
%%time
hive_context.sql('create table test.grupo_chicago as select * from grupo_chicago')

CPU times: user 3.29 ms, sys: 592 µs, total: 3.88 ms
Wall time: 14.7 s


DataFrame[]

In [None]:
'hdfs://%s'%ruta

'hdfs:///user/bigdatita/chicago'

In [None]:
help(tabla.write.format)

Help on method format in module pyspark.sql.readwriter:

format(self, source) method of pyspark.sql.readwriter.DataFrameWriter instance
    Specifies the underlying output data source.
    
    :param source: string, name of the data source, e.g. 'json', 'parquet'.
    
    >>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
    
    .. versionadded:: 1.4



In [None]:
%%time
tabla.write.format('csv').save('hdfs://%s/agrupado.csv'%ruta)
#tabla.write.format('parquet').save('hdfs://%s/agrupado.parquet'%ruta)

In [None]:
df3 = sparkSession.read.csv('hdfs://%s/agrupado.csv'%ruta,header=True,inferSchema=True)
df3.count()

34933

## 9. Cruces

In [None]:
%%time
df =  hive_context.table("test.chicago")
df2=df.filter((df.fare>30)&(df['trip_start_timestamp']<date(2016,1,15)))

CPU times: user 3.26 ms, sys: 624 µs, total: 3.89 ms
Wall time: 57.3 ms


In [None]:
%%time
df.join(df2, df.taxi_id == df2.taxi_id,'inner').count()

CPU times: user 7.41 ms, sys: 0 ns, total: 7.41 ms
Wall time: 25.1 s


328651010

## 10. Convertir a pandas

In [None]:
%%time
df = df.toPandas()

In [None]:
df.describe()