In [1]:
!which python

//anaconda/envs/spark/bin/python


In [2]:
!python --version

Python 2.7.13 :: Continuum Analytics, Inc.


In [3]:
# make sure SparkContext is loaded 
sc

<pyspark.context.SparkContext at 0x108753c90>

In [4]:
# load library 
import pandas as pd, numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext


In [5]:
# load applications  
conf = SparkConf().setAppName("building a warehouse")
#sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)


In [6]:
cd NYC_Taxi_Trip_Duration/spark_/

[Errno 2] No such file or directory: 'NYC_Taxi_Trip_Duration/spark_/'
/Users/yennanliu/notebook


### 1) Read csv via spark SQL

In [7]:
# read csv via spark SQL  
df_train = sqlContext.read.format('com.databricks.spark.csv')\
						 .options(header='true', inferschema='true')\
						 .load('/Users/yennanliu/NYC_Taxi_Trip_Duration/data/train.csv')

In [127]:
type(df_train)

pyspark.sql.dataframe.DataFrame

In [8]:
# check data type
df_train.dtypes

[('id', 'string'),
 ('vendor_id', 'int'),
 ('pickup_datetime', 'timestamp'),
 ('dropoff_datetime', 'timestamp'),
 ('passenger_count', 'int'),
 ('pickup_longitude', 'double'),
 ('pickup_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('dropoff_latitude', 'double'),
 ('store_and_fwd_flag', 'string'),
 ('trip_duration', 'int')]

In [9]:
# check missing values 
df_train.where( df_train['vendor_id'].isNull() ).count()

0

In [10]:
df_train.show()

+---------+---------+--------------------+--------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|       id|vendor_id|     pickup_datetime|    dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+--------------------+--------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|id2875421|        2|2016-03-14 17:24:...|2016-03-14 17:32:...|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|id2377394|        1|2016-06-12 00:43:...|2016-06-12 00:54:...|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|
|id3858529|        2|2016-01-19 11:35:...|2016-01-19 12

In [11]:
#df_grp = df_train.groupBy('vendor_id')
#df_grp.sum('pickup_longitude','dropoff_longitude').show()



In [12]:
df_train.take(4)

[Row(id=u'id2875421', vendor_id=2, pickup_datetime=datetime.datetime(2016, 3, 14, 17, 24, 55), dropoff_datetime=datetime.datetime(2016, 3, 14, 17, 32, 30), passenger_count=1, pickup_longitude=-73.9821548461914, pickup_latitude=40.76793670654297, dropoff_longitude=-73.96463012695312, dropoff_latitude=40.765602111816406, store_and_fwd_flag=u'N', trip_duration=455),
 Row(id=u'id2377394', vendor_id=1, pickup_datetime=datetime.datetime(2016, 6, 12, 0, 43, 35), dropoff_datetime=datetime.datetime(2016, 6, 12, 0, 54, 38), passenger_count=1, pickup_longitude=-73.98041534423828, pickup_latitude=40.738563537597656, dropoff_longitude=-73.99948120117188, dropoff_latitude=40.73115158081055, store_and_fwd_flag=u'N', trip_duration=663),
 Row(id=u'id3858529', vendor_id=2, pickup_datetime=datetime.datetime(2016, 1, 19, 11, 35, 24), dropoff_datetime=datetime.datetime(2016, 1, 19, 12, 10, 48), passenger_count=1, pickup_longitude=-73.9790267944336, pickup_latitude=40.763938903808594, dropoff_longitude=-74.

In [142]:
spark

<pyspark.sql.session.SparkSession at 0x108838210>

In [13]:
df_train.registerTempTable("df_train_table")
sqlContext.sql("""
                SELECT id, count(*) 
                FROM df_train_table
                group by 1 
                order by 2 desc 
                limit 10""").show()


+---------+--------+
|       id|count(1)|
+---------+--------+
|id0515898|       1|
|id2677357|       1|
|id0556588|       1|
|id1381256|       1|
|id3524926|       1|
|id0082224|       1|
|id1631034|       1|
|id3829159|       1|
|id3048673|       1|
|id0045059|       1|
+---------+--------+



In [45]:
spark.sql("SELECT count(*) from df_train_table" ).show()

+--------+
|count(1)|
+--------+
| 1458644|
+--------+



### 1-1) window

In [14]:
from pyspark.sql import Window
from pyspark.sql.functions import mean

In [15]:
window = Window.partitionBy('id', 'vendor_id')\
               .orderBy('pickup_datetime')\
               .rowsBetween(-3, 3)

window

<pyspark.sql.window.WindowSpec at 0x10a016350>

In [16]:
moving_avg = mean(df_train['passenger_count']).over(window)
moving_avg

Column<avg(passenger_count) OVER (PARTITION BY id, vendor_id ORDER BY pickup_datetime ASC ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING)>

In [17]:
df_window_ = df_train.withColumn('moving_avg', moving_avg)
df_window_.take(3)

[Row(id=u'id0000015', vendor_id=1, pickup_datetime=datetime.datetime(2016, 5, 17, 9, 6, 59), dropoff_datetime=datetime.datetime(2016, 5, 17, 9, 39, 18), passenger_count=1, pickup_longitude=-73.98369598388672, pickup_latitude=40.780948638916016, dropoff_longitude=-73.95437622070312, dropoff_latitude=40.76417541503906, store_and_fwd_flag=u'N', trip_duration=1939, moving_avg=1.0),
 Row(id=u'id0000023', vendor_id=2, pickup_datetime=datetime.datetime(2016, 5, 28, 4, 34, 47), dropoff_datetime=datetime.datetime(2016, 5, 28, 5, 1, 47), passenger_count=1, pickup_longitude=-73.94206237792969, pickup_latitude=40.817779541015625, dropoff_longitude=-73.7889175415039, dropoff_latitude=40.64738845825195, store_and_fwd_flag=u'N', trip_duration=1620, moving_avg=1.0),
 Row(id=u'id0000250', vendor_id=1, pickup_datetime=datetime.datetime(2016, 3, 30, 8, 38, 35), dropoff_datetime=datetime.datetime(2016, 3, 30, 8, 46, 42), passenger_count=1, pickup_longitude=-73.99744415283203, pickup_latitude=40.7363395690

### 1-2) pivot

In [18]:
df_pivot1 = df_train.groupby('pickup_datetime')\
                    .pivot('id', values=['passenger_count'])\
                    .sum('dropoff_longitude')
        
df_pivot1                                                                                
                                                                                

DataFrame[pickup_datetime: timestamp, passenger_count: double]

In [19]:
df_pivot1.take(2)

[Row(pickup_datetime=datetime.datetime(2016, 6, 30, 18, 23, 16), passenger_count=None),
 Row(pickup_datetime=datetime.datetime(2016, 4, 20, 11, 38, 30), passenger_count=None)]

### 2) basic functions 

In [21]:
df__ = sc.textFile("/Users/yennanliu/NYC_Taxi_Trip_Duration/data/train.csv")
#df__.filter(lambda x: '2124' in x.).collect()

In [22]:
df__.take(5)

[u'id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration',
 u'id2875421,2,2016-03-14 17:24:55,2016-03-14 17:32:30,1,-73.982154846191406,40.767936706542969,-73.964630126953125,40.765602111816406,N,455',
 u'id2377394,1,2016-06-12 00:43:35,2016-06-12 00:54:38,1,-73.980415344238281,40.738563537597656,-73.999481201171875,40.731151580810547,N,663',
 u'id3858529,2,2016-01-19 11:35:24,2016-01-19 12:10:48,1,-73.979026794433594,40.763938903808594,-74.005332946777344,40.710086822509766,N,2124',
 u'id3504673,2,2016-04-06 19:32:31,2016-04-06 19:39:40,1,-74.010040283203125,40.719970703125,-74.01226806640625,40.706718444824219,N,429']

In [2]:
#type(df__)

In [24]:
type(df_train)

pyspark.sql.dataframe.DataFrame

In [23]:
#protocols = df__.map(lambda x: x[4]).distinct()
#protocols.take(10)
df_train.dtypes

[('id', 'string'),
 ('vendor_id', 'int'),
 ('pickup_datetime', 'timestamp'),
 ('dropoff_datetime', 'timestamp'),
 ('passenger_count', 'int'),
 ('pickup_longitude', 'double'),
 ('pickup_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('dropoff_latitude', 'double'),
 ('store_and_fwd_flag', 'string'),
 ('trip_duration', 'int')]

In [25]:
xx = df_train.select('id','vendor_id','pickup_datetime').rdd
#xx = df_train.select('vendor_id').rdd
type(xx)

pyspark.rdd.RDD

In [93]:
# date 
xx.map(lambda x: str(x[2]).split(" ")[0] ).take(4)

['2016-03-14', '2016-06-12', '2016-01-19', '2016-04-06']

In [89]:
# id and vendor id 
xx.map(lambda x: (x[0], x[1]) ).take(4)

[(u'id2875421', 2), (u'id2377394', 1), (u'id3858529', 2), (u'id3504673', 2)]

In [97]:
# filter 

#xx.filter(lambda x : x[0] == 'id2875421').take(10)
xx.filter(lambda x : x[0] == 'id2875421').count()

1

In [102]:
xx.map(lambda x: (x[0:])).take(3)

[(u'id2875421', 2, datetime.datetime(2016, 3, 14, 17, 24, 55)),
 (u'id2377394', 1, datetime.datetime(2016, 6, 12, 0, 43, 35)),
 (u'id3858529', 2, datetime.datetime(2016, 1, 19, 11, 35, 24))]

In [115]:
# flatMap

xx.flatMap(lambda x: (x)).take(10)

[u'id2875421',
 2,
 datetime.datetime(2016, 3, 14, 17, 24, 55),
 u'id2377394',
 1,
 datetime.datetime(2016, 6, 12, 0, 43, 35),
 u'id3858529',
 2,
 datetime.datetime(2016, 1, 19, 11, 35, 24),
 u'id3504673']

In [84]:
#xx
xx.map(lambda x: (x[2]) ).take(4)

[datetime.datetime(2016, 3, 14, 17, 24, 55),
 datetime.datetime(2016, 6, 12, 0, 43, 35),
 datetime.datetime(2016, 1, 19, 11, 35, 24),
 datetime.datetime(2016, 4, 6, 19, 32, 31)]

In [26]:
xx.map(lambda x: x[0] == 'id2875421').take(3)

[True, False, False]

In [27]:
# RDD group 
# https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#groupBy

result = xx.groupBy(lambda x: x[1] % 2).count()

In [28]:
xx.map(lambda x: x[0]).take(3)
#sorted([(x, sorted(y)) for (x, y) in result])

[u'id2875421', u'id2377394', u'id3858529']

In [123]:
xx.map(lambda x: (x[0], x[1],x[2]) ).take(3)

[(u'id2875421', 2, datetime.datetime(2016, 3, 14, 17, 24, 55)),
 (u'id2377394', 1, datetime.datetime(2016, 6, 12, 0, 43, 35)),
 (u'id3858529', 2, datetime.datetime(2016, 1, 19, 11, 35, 24))]

In [126]:
#data_key_2 = xx.map(lambda x: (x[0]) )
#data_key_2.reduceByKey(lambda x: x + x).take(4)

In [122]:
# reduceByKey

#xx.reduceByKey(lambda x, y: x + y).collect()
data_key_ = xx.map(lambda x: (x[0], x[1]) )
data_key_.reduceByKey(lambda x, y: x + y).take(4)

[(u'id0349415', 2), (u'id2017679', 1), (u'id1697645', 2), (u'id0429368', 2)]

In [144]:
spark.createDataFrame(xx)


DataFrame[id: string, vendor_id: bigint, pickup_datetime: timestamp]

In [131]:
# rdd -> spark dataframe 

df_xx = sqlContext.createDataFrame(xx)
df_xx

DataFrame[id: string, vendor_id: bigint, pickup_datetime: timestamp]

In [133]:
type(df_xx)

pyspark.sql.dataframe.DataFrame

In [138]:
df_xx['id'].show

Column<id['show']>

In [140]:
# spark dataframe to spark sql 

df_xx.registerTempTable("df_xx_table")
sqlContext.sql("""
                SELECT id, count(*) 
                FROM df_xx_table
                group by 1 
                order by 2 desc 
                limit 10""").show()


+---------+--------+
|       id|count(1)|
+---------+--------+
|id3013319|       1|
|id1622754|       1|
|id2187774|       1|
|id3921267|       1|
|id2795297|       1|
|id0130048|       1|
|id2088360|       1|
|id0454719|       1|
|id2366364|       1|
|id0187208|       1|
+---------+--------+



In [29]:
xx.take(5)

[Row(id=u'id2875421', vendor_id=2, pickup_datetime=datetime.datetime(2016, 3, 14, 17, 24, 55)),
 Row(id=u'id2377394', vendor_id=1, pickup_datetime=datetime.datetime(2016, 6, 12, 0, 43, 35)),
 Row(id=u'id3858529', vendor_id=2, pickup_datetime=datetime.datetime(2016, 1, 19, 11, 35, 24)),
 Row(id=u'id3504673', vendor_id=2, pickup_datetime=datetime.datetime(2016, 4, 6, 19, 32, 31)),
 Row(id=u'id2181028', vendor_id=2, pickup_datetime=datetime.datetime(2016, 3, 26, 13, 30, 55))]

In [30]:
xx.map(lambda x: x[1]).take(3)

[2, 1, 2]

In [31]:
#xx.take(10)

In [32]:
df__.map(lambda x: x[0:22]).take(10)

[u'id,vendor_id,pickup_da',
 u'id2875421,2,2016-03-14',
 u'id2377394,1,2016-06-12',
 u'id3858529,2,2016-01-19',
 u'id3504673,2,2016-04-06',
 u'id2181028,2,2016-03-26',
 u'id0801584,2,2016-01-30',
 u'id1813257,1,2016-06-17',
 u'id1324603,2,2016-05-21',
 u'id1301050,1,2016-05-27']

In [34]:
# df__.map(lambda x: x[12:22]).take(10)

#df__value = df__.filter(lambda line: line != header)
#result = df__value.groupBy(lambda x : x[0:][12:22]).take(10)

In [35]:
header = df__.first()
df__value = df__.filter(lambda line: line != header)
df__value.map(lambda x : x[0:][12:22]).take(10)
print (df__value.map(lambda x : x[0:][12:22]).take(10))


[u'2016-03-14', u'2016-06-12', u'2016-01-19', u'2016-04-06', u'2016-03-26', u'2016-01-30', u'2016-06-17', u'2016-05-21', u'2016-05-27', u'2016-03-10']


In [None]:
###############

In [36]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.countByKey().items())


[('a', 2), ('b', 1)]

In [37]:
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())


[('a', 2), ('b', 1)]

In [38]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortByKey().first()
sc.parallelize(tmp).sortByKey(True, 1).collect()
sc.parallelize(tmp).sortByKey(True, 2).collect()
tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()


[('a', 3),
 ('fleece', 7),
 ('had', 2),
 ('lamb', 5),
 ('little', 4),
 ('Mary', 1),
 ('was', 8),
 ('white', 9),
 ('whose', 6)]

In [39]:
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
x.zip(y).collect()

[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

In [40]:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x % 2).collect()
sorted([(x, sorted(y)) for (x, y) in result])

[(0, [2, 8]), (1, [1, 1, 3, 5])]