In [1]:
import pyspark

#Only use this if running local
sc = pyspark.SparkContext('local[1]')
from pyspark import SQLContext
sqlContext = SQLContext(sc)

In [2]:
#df = sc.textFile("/Users/pgarias/python_examples/311_Service_Requests_from_2015_to_Present_head_1000_pipe_sep.csv").map(lambda line: line.split("|"))
#df = df.toDF()
#df.show()

In [2]:
df2 = sqlContext.read.format('csv') \
.options(header='true', inferschema='true', sep='|') \
.load('/Users/pgarias/python_examples/311_Service_Requests_from_2015_to_Present_head_1000_pipe_sep.csv')

# Use this if running on EMR Notebook
#df2 = spark.read.format('csv') \
#.options(header='true', inferschema='true', sep='|') \
#.load('s3://pgarias-bucket-cloud/311_Service_Requests_from_2015_to_Present_head_1000_pipe_sep.csv')

In [3]:
#Select columns
cols = ['Created Date','Closed Date']
df2.select(cols).show(truncate=False)

+----------------------+----------------------+
|Created Date          |Closed Date           |
+----------------------+----------------------+
|01/01/2015 12:00:50 AM|01/01/2015 02:47:50 AM|
|01/01/2015 12:01:29 AM|01/01/2015 02:42:22 AM|
|01/01/2015 12:01:30 AM|01/01/2015 12:20:33 AM|
|01/01/2015 12:04:28 AM|01/01/2015 02:25:02 AM|
|01/01/2015 12:04:44 AM|01/01/2015 10:22:31 AM|
|01/01/2015 12:04:51 AM|01/01/2015 01:03:07 AM|
|01/01/2015 12:05:00 AM|01/01/2015 12:00:00 PM|
|01/01/2015 12:05:05 AM|01/01/2015 01:22:10 AM|
|01/01/2015 12:05:12 AM|01/01/2015 01:13:15 AM|
|01/01/2015 12:06:02 AM|01/01/2015 12:43:41 AM|
|01/01/2015 12:06:43 AM|01/01/2015 06:05:18 AM|
|01/01/2015 12:07:42 AM|01/01/2015 12:16:24 AM|
|01/01/2015 12:08:02 AM|01/01/2015 01:17:43 AM|
|01/01/2015 12:08:34 AM|01/01/2015 02:42:23 AM|
|01/01/2015 12:09:39 AM|01/01/2015 01:29:36 AM|
|01/01/2015 12:09:40 AM|01/01/2015 07:38:38 AM|
|01/01/2015 12:10:44 AM|01/01/2015 10:30:50 AM|
|01/01/2015 12:11:14 AM|01/01/2015 05:02

In [19]:
from pyspark.sql.functions import from_unixtime,unix_timestamp
from pyspark.sql.types import DateType

In [20]:
timeFmt = 'MM/dd/yyyy hh:mm:ss a'
timeDiff = (unix_timestamp('Closed Date', format=timeFmt)
            - unix_timestamp('Created Date', format=timeFmt))

In [21]:
df3 = df2.withColumn("Duration", timeDiff)

In [24]:
df3.select(['Created Date','Closed Date','Duration']).show(truncate=False)

+----------------------+----------------------+--------+
|Created Date          |Closed Date           |Duration|
+----------------------+----------------------+--------+
|01/01/2015 12:00:50 AM|01/01/2015 02:47:50 AM|10020   |
|01/01/2015 12:01:29 AM|01/01/2015 02:42:22 AM|9653    |
|01/01/2015 12:01:30 AM|01/01/2015 12:20:33 AM|1143    |
|01/01/2015 12:04:28 AM|01/01/2015 02:25:02 AM|8434    |
|01/01/2015 12:04:44 AM|01/01/2015 10:22:31 AM|37067   |
|01/01/2015 12:04:51 AM|01/01/2015 01:03:07 AM|3496    |
|01/01/2015 12:05:00 AM|01/01/2015 12:00:00 PM|42900   |
|01/01/2015 12:05:05 AM|01/01/2015 01:22:10 AM|4625    |
|01/01/2015 12:05:12 AM|01/01/2015 01:13:15 AM|4083    |
|01/01/2015 12:06:02 AM|01/01/2015 12:43:41 AM|2259    |
|01/01/2015 12:06:43 AM|01/01/2015 06:05:18 AM|21515   |
|01/01/2015 12:07:42 AM|01/01/2015 12:16:24 AM|522     |
|01/01/2015 12:08:02 AM|01/01/2015 01:17:43 AM|4181    |
|01/01/2015 12:08:34 AM|01/01/2015 02:42:23 AM|9229    |
|01/01/2015 12:09:39 AM|01/01/2

In [29]:
df3.select("Duration").show()

+--------+
|Duration|
+--------+
|   10020|
|    9653|
|    1143|
|    8434|
|   37067|
|    3496|
|   42900|
|    4625|
|    4083|
|    2259|
|   21515|
|     522|
|    4181|
|    9229|
|    4797|
|   26938|
|   37206|
|   17488|
|   16017|
|    9950|
+--------+
only showing top 20 rows



In [57]:
from pyspark.sql.functions import udf,col
from pyspark.sql.types import FloatType, IntegerType

In [31]:
def int_to_float(x):
    return float(x)

In [33]:
udfint_to_float = udf(int_to_float, IntegerType())

In [39]:
df4 = df3.withColumn("Duration (float)", udfint_to_float("Duration"))

In [55]:
df4['Duration'].cast('float').alias("Duration_float")

Column<CAST(Duration AS FLOAT) AS `Duration_float`>

In [58]:
df = df4.select(col('Duration'), df4['Duration'].cast('float').alias('Duration_float'))

In [59]:
df.show()

+--------+--------------+
|Duration|Duration_float|
+--------+--------------+
|   10020|       10020.0|
|    9653|        9653.0|
|    1143|        1143.0|
|    8434|        8434.0|
|   37067|       37067.0|
|    3496|        3496.0|
|   42900|       42900.0|
|    4625|        4625.0|
|    4083|        4083.0|
|    2259|        2259.0|
|   21515|       21515.0|
|     522|         522.0|
|    4181|        4181.0|
|    9229|        9229.0|
|    4797|        4797.0|
|   26938|       26938.0|
|   37206|       37206.0|
|   17488|       17488.0|
|   16017|       16017.0|
|    9950|        9950.0|
+--------+--------------+
only showing top 20 rows



In [68]:
list_collect = df.select("Duration_float").describe().collect()

In [92]:

def return_stats(list_item):
    count = list_item[0]['Duration_float']
    mean = list_item[1]['Duration_float']
    stdev = list_item[2]['Duration_float']
    min_v = list_item[3]['Duration_float']
    max_v = list_item[4]['Duration_float']
    return float(count),float(mean),float(stdev),float(min_v),float(max_v)
    

In [86]:
list_collect

[Row(summary=u'count', Duration_float=u'992'),
 Row(summary=u'mean', Duration_float=u'312167.0816532258'),
 Row(summary=u'stddev', Duration_float=u'3607608.0097006033'),
 Row(summary=u'min', Duration_float=u'0.0'),
 Row(summary=u'max', Duration_float=u'1.07628584E8')]

In [93]:
count,mean,stdev,min_val,max_val = return_stats(list_collect)

In [94]:
count,mean,stdev,min_val,max_val

(992.0, 312167.0816532258, 3607608.0097006033, 0.0, 107628584.0)

In [9]:
df3.describe()\
    .select("summary")\
    .show()

+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+



In [16]:
all_cols = df3.columns

In [17]:
all_cols

['Unique Key',
 'Created Date',
 'Closed Date',
 'Agency',
 'Agency Name',
 'Complaint Type',
 'Descriptor',
 'Location Type',
 'Incident Zip',
 'Incident Address',
 'Street Name',
 'Cross Street 1',
 'Cross Street 2',
 'Intersection Street 1',
 'Intersection Street 2',
 'Address Type',
 'City',
 'Landmark',
 'Facility Type',
 'Status',
 'Due Date',
 'Resolution Description',
 'Resolution Action Updated Date',
 'Community Board',
 'BBL',
 'Borough',
 'X Coordinate (State Plane)',
 'Y Coordinate (State Plane)',
 'Open Data Channel Type',
 'Park Facility Name',
 'Park Borough',
 'Vehicle Type',
 'Taxi Company Borough',
 'Taxi Pick Up Location',
 'Bridge Highway Name',
 'Bridge Highway Direction',
 'Road Ramp',
 'Bridge Highway Segment',
 'Latitude',
 'Longitude',
 'Location',
 'Duration']

In [18]:
all_cols.remove('Location')
all_cols

['Unique Key',
 'Created Date',
 'Closed Date',
 'Agency',
 'Agency Name',
 'Complaint Type',
 'Descriptor',
 'Location Type',
 'Incident Zip',
 'Incident Address',
 'Street Name',
 'Cross Street 1',
 'Cross Street 2',
 'Intersection Street 1',
 'Intersection Street 2',
 'Address Type',
 'City',
 'Landmark',
 'Facility Type',
 'Status',
 'Due Date',
 'Resolution Description',
 'Resolution Action Updated Date',
 'Community Board',
 'BBL',
 'Borough',
 'X Coordinate (State Plane)',
 'Y Coordinate (State Plane)',
 'Open Data Channel Type',
 'Park Facility Name',
 'Park Borough',
 'Vehicle Type',
 'Taxi Company Borough',
 'Taxi Pick Up Location',
 'Bridge Highway Name',
 'Bridge Highway Direction',
 'Road Ramp',
 'Bridge Highway Segment',
 'Latitude',
 'Longitude',
 'Duration']