In [1]:
import pandas as pd
import holoviews as hv
import numpy as np

hv.extension("bokeh")

In [3]:
types = {"fare_amount": np.float16,
        "pickup_longitude": np.float32, 
        "pickup_latitude": np.float32, 
        "dropoff_longitude": np.float32, 
        "dropoff_latitude": np.float32, 
        "passenger_count": np.int8}

test = pd.read_csv("~/work/data/test.csv", dtype=types, parse_dates=["pickup_datetime"])
# nrows limits the number of rows from the training set please remove to get the full dataset
train = pd.read_csv("~/work/data/train.csv", dtype=types, parse_dates=["pickup_datetime"], nrows=100000)
sample_submission = pd.read_csv("~/work/data/sample_submission.csv", dtype=types)

In [4]:
def haversine_np(lat1, lon1, lat2, lon2):
    '''euclidian Km distance between two lat long points'''
    
    # reference - http://rosettacode.org/wiki/Haversine_formula#Python
    
    R = np.int32(6378.137) # Earth radius in kilometers (NASA - https://nssdc.gsfc.nasa.gov/planetary/factsheet/earthfact.html)

    dLat = np.radians(lat2 - lat1)
    dLon = np.radians(lon2 - lon1)
    lat1 = np.radians(lat1)
    lat2 = np.radians(lat2)

    a = np.square(np.sin(dLat/2)) + np.cos(lat1)*np.cos(lat2)*np.square(np.sin(dLon/2))
    c = 2*np.arcsin(np.sqrt(a))

    return R * c

def haversine_manhattan_np(lat1, lon1, lat2, lon2):
    '''manhattan Km distance between two lat long points'''
    
    # reference - http://rosettacode.org/wiki/Haversine_formula#Python
    
    R = np.int32(6378.137) # Earth radius in kilometers (NASA - https://nssdc.gsfc.nasa.gov/planetary/factsheet/earthfact.html)

    dLat = np.radians(lat2 - lat1)
    dLon = np.radians(lon2 - lon1)
    lat1 = np.radians(lat1)
    lat2 = np.radians(lat2)
    
    # adjusted for manhattan distance
    a = np.sin(dLat/2) - np.cos(lat1)*np.cos(lat2)*np.sin(dLon/2)
    c = 2*np.arcsin(np.abs(a))

    return R * c

#haversine_manhattan_np(40.721317, -73.844315, 40.712276, -73.841614)  # example
#haversine_np(40.721317, -73.844315, 40.712276, -73.841614)  # example

In [5]:
# adds manhattan and euclidian distance
train["manhattan_distance"] = train.apply(lambda row: haversine_manhattan_np(row["pickup_latitude"],row["pickup_longitude"],row["dropoff_latitude"],row["dropoff_longitude"]), axis=1)
train["euclidian_distance"] = train.apply(lambda row: haversine_np(row["pickup_latitude"],row["pickup_longitude"],row["dropoff_latitude"],row["dropoff_longitude"]), axis=1)

In [6]:
hv.Scatter(test[["pickup_longitude", "pickup_latitude"]])

In [7]:
hv.Scatter(train[["pickup_longitude", "pickup_latitude"]])

In [8]:
train

Unnamed: 0,key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,manhattan_distance,euclidian_distance
0,2009-06-15 17:26:21.0000001,4.500000,2009-06-15 17:26:21,-73.844315,40.721317,-73.841614,40.712276,1,1.179114,1.031875
1,2010-01-05 16:52:16.0000002,16.906250,2010-01-05 16:52:16,-74.016045,40.711304,-73.979271,40.782005,1,5.520760,8.459285
2,2011-08-18 00:35:00.00000049,5.699219,2011-08-18 00:35:00,-73.982735,40.761269,-73.991241,40.750561,2,0.648605,1.391159
3,2012-04-21 04:30:42.0000001,7.699219,2012-04-21 04:30:42,-73.987129,40.733143,-73.991570,40.758091,1,3.060861,2.802286
4,2010-03-09 07:51:00.000000135,5.300781,2010-03-09 07:51:00,-73.968094,40.768009,-73.956657,40.783764,1,1.023713,2.001278
5,2011-01-06 09:50:45.0000002,12.101562,2011-01-06 09:50:45,-74.000961,40.731628,-73.972893,40.758232,1,1.168003,3.791279
6,2012-11-20 20:35:00.0000001,7.500000,2012-11-20 20:35:00,-73.980003,40.751663,-73.973801,40.764843,1,1.070971,1.557569
7,2012-01-04 17:22:00.00000081,16.500000,2012-01-04 17:22:00,-73.951302,40.774139,-73.990097,40.751049,1,0.092827,4.160066
8,2012-12-03 13:10:00.000000125,9.000000,2012-12-03 13:10:00,-74.006462,40.726711,-73.993080,40.731628,1,0.308085,1.254558
9,2009-09-02 01:11:00.00000083,8.898438,2009-09-02 01:11:00,-73.980659,40.733871,-73.991539,40.758137,2,3.396267,2.852721


In [114]:
from pyspark import SparkContext, SQLContext
from pyspark.sql.types import *
sc = SparkContext("local", "data explore")
print(pyspark.__version__)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=data explore, master=local) created by __init__ at <ipython-input-4-e0f81e41bcbf>:1 

NameError: name 'pyspark' is not defined

In [9]:
sqlsc = SQLContext(sc)

In [10]:
sqlsc

<pyspark.sql.context.SQLContext at 0x7f6ada8844a8>

In [75]:
# csv_schema = StructType([StructField("key", StringType(), True), \
#            StructField("fare_amount", DecimalType(7, 2), True), \
#            StructField("pickup_datetime", TimestampType(), True), \
#            StructField("pickup_longitude", DecimalType(9, 6), True), \
#            StructField("pickup_latitude", DecimalType(9, 6), True), \
#            StructField("dropoff_longitude", DecimalType(9, 6), True), \
#            StructField("dropoff_latitude", DecimalType(9, 6), True), \
#            StructField("passenger_count", ShortType(), True)])


In [107]:
# csv_schema = StructType(
#         [StructField("key", StringType()), \
#            StructField("fare_amount", DecimalType(7, 2)), \
#            StructField("pickup_datetime", TimestampType()), \
#            StructField("pickup_longitude", DecimalType(9, 6)), \
#            StructField("pickup_latitude", DecimalType(9, 6)), \
#            StructField("dropoff_longitude", DecimalType(9, 6)), \
#            StructField("dropoff_latitude", DecimalType(9, 6)), \
#            StructField("passenger_count", ShortType()) \
#         ])


# csv_schema = StructType(
#         [StructField("key", StringType(), nullable=True), \
#            StructField("fare_amount", DecimalType(7, 2), nullable=True), \
#            StructField("pickup_datetime", TimestampType(), nullable=True), \
#            StructField("pickup_longitude", DecimalType(9, 6), nullable=True), \
#            StructField("pickup_latitude", DecimalType(9, 6), nullable=True), \
#            StructField("dropoff_longitude", DecimalType(9, 6), nullable=True), \
#            StructField("dropoff_latitude", DecimalType(9, 6), nullable=True), \
#            StructField("passenger_count", ShortType(), nullable=True) \
#         ])


# csv_schema = StructType(
#         [StructField("key", StringType(), nullable=True), \
#            StructField("fare_amount", FloatType(), nullable=True), \
# #            StructField("pickup_datetime", TimestampType(), nullable=True), \
#            StructField("pickup_longitude", DoubleType(), nullable=True), \
#            StructField("pickup_latitude", DoubleType(), nullable=True), \
#            StructField("dropoff_longitude", DoubleType(), nullable=True), \
#            StructField("dropoff_latitude", DoubleType(), nullable=True), \
#            StructField("passenger_count", ShortType(), nullable=True) \
#         ])

coordinate_type = DoubleType

csv_schema = StructType(
        [StructField("key", StringType(), nullable=True), \
           StructField("fare_amount", FloatType(), nullable=True), \
           StructField("pickup_datetime", TimestampType(), nullable=True), \
           StructField("pickup_longitude", coordinate_type(), nullable=True), \
           StructField("pickup_latitude", coordinate_type(), nullable=True), \
           StructField("dropoff_longitude", coordinate_type(), nullable=True), \
           StructField("dropoff_latitude", coordinate_type(), nullable=True), \
           StructField("passenger_count", ShortType(), nullable=True) \
        ])




In [108]:
data = sqlsc.read.csv(path="../data/train.csv", header=True, schema=csv_schema, timestampFormat="yyyy-mm-dd HH:mm:ss")
#data = sqlsc.read.csv(path="../data/train.csv", header=True)


In [109]:
temp = data.first()

In [110]:
d = temp["pickup_datetime"]

In [111]:
temp

Row(key='2009-06-15 17:26:21.0000001', fare_amount=4.5, pickup_datetime=datetime.datetime(2009, 1, 15, 17, 26, 21), pickup_longitude=-73.844311, pickup_latitude=40.721319, dropoff_longitude=-73.84161, dropoff_latitude=40.712278, passenger_count=1)

In [113]:
d.second

21

In [86]:
from pyspark.sql.functions import from_unixtime

In [88]:
t = from_unixtime('2009-06-15 17:26:21 UTC')

In [89]:
t

Column<b'from_unixtime(2009-06-15 17:26:21 UTC, yyyy-MM-dd HH:mm:ss)'>

In [85]:
data_temp = data.select(from_utc_timestamp(data.t, 'yyyy-MM-dd HH:mm:ss').alias('dt'))

AttributeError: 'DataFrame' object has no attribute 't'

In [81]:
temp

Row(key='2009-06-15 17:26:21.0000001', fare_amount=4.5, pickup_datetime='2009-06-15 17:26:21 UTC', pickup_longitude=-73.844311, pickup_latitude=40.721319, dropoff_longitude=-73.84161, dropoff_latitude=40.712278, passenger_count=1)

In [113]:
temp

In [114]:
csv_schema

StructType(List(StructField(key,StringType,true),StructField(fare_amount,FloatType,true),StructField(pickup_datetime,TimestampType,true),StructField(pickup_longitude,DoubleType,true),StructField(pickup_latitude,DoubleType,true),StructField(dropoff_longitude,DoubleType,true),StructField(dropoff_latitude,DoubleType,true),StructField(passenger_count,ShortType,true)))

In [115]:
csv_schema["key"]

StructField(key,StringType,true)

In [116]:
data.columns

['key',
 'fare_amount',
 'pickup_datetime',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'passenger_count']

In [117]:
temp.show()

KeyboardInterrupt: 

In [None]:
temp.first()

In [102]:
eval('40.721319')

40.721319

In [103]:
temp.describe()

KeyboardInterrupt: 

In [None]:
temp.describe().show()

In [108]:
temp.dtypes

[('key', 'string'),
 ('fare_amount', 'decimal(7,2)'),
 ('pickup_datetime', 'timestamp'),
 ('pickup_longitude', 'decimal(9,6)'),
 ('pickup_latitude', 'decimal(9,6)'),
 ('dropoff_longitude', 'decimal(9,6)'),
 ('dropoff_latitude', 'decimal(9,6)'),
 ('passenger_count', 'smallint')]

In [109]:
temp.describe().show()

KeyboardInterrupt: 

In [None]:
TimestampType.needConversion()