
# Apache Spark for Everyone - PySpark + Python

### [Markdown](https://en.wikipedia.org/wiki/Markdown) blocks communicate text, images + whatever other useful HTML bits you want to share.

### Like TODO lists:
- ~~get [bikes data set](https://archive.ics.uci.edu/ml/datasets/Bike+Sharing+Dataset)~~
- ~~import csv~~
- ~~do some things with pyspark~~
- ~~do some thigns with python~~
- show a python vis?
- save out file

And code bits:

```
from pyspark.sql import SQLContext

```

Great Markdown cheatsheet on github [here](https://github.com/adam-p/markdown-here/wiki/Markdown-Cheatsheet)

In [1]:
# set your working directory if you want less pathy things later
WORK_DIR = '/Users/amcasari/repos/wwconnect-2016-spark4everyone/'

In [2]:
# create an RDD from bikes data
# sc is an existing SparkContext (initialized when PySpark starts)

bikes = sc.textFile(WORK_DIR + "data/bikes/p*")
bikes.count()

17380

In [3]:
# import SQLContext 
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [4]:
# since we are familiar with pandas dataframes, let's convert the RDD to a Spark DataFrame
# we'll try to infer the schema from the files

bikes_df = sqlContext.createDataFrame(bikes)

TypeError: Can not infer schema for type: <type 'unicode'>

In [5]:
# whoops a daisy, let's remove the header, split out the Rows + we can programmatically specify the schema

names = bikes.first().replace('"','')
names

u'instant,dteday,season,yr,mnth,hr,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,casual,registered,cnt'

In [6]:
# remove the header using subtract
bikesHeader = bikes.filter(lambda x: "instant" in x)
bikesFiltered = bikes.subtract(bikesHeader)
bikesFiltered.count()

17379

In [7]:
# programmatically specify the schema using a StructField
from pyspark.sql.types import *

fields = [StructField(field_name, StringType(), False) for field_name in names.split(',')]
fields

[StructField(instant,StringType,false),
 StructField(dteday,StringType,false),
 StructField(season,StringType,false),
 StructField(yr,StringType,false),
 StructField(mnth,StringType,false),
 StructField(hr,StringType,false),
 StructField(holiday,StringType,false),
 StructField(weekday,StringType,false),
 StructField(workingday,StringType,false),
 StructField(weathersit,StringType,false),
 StructField(temp,StringType,false),
 StructField(atemp,StringType,false),
 StructField(hum,StringType,false),
 StructField(windspeed,StringType,false),
 StructField(casual,StringType,false),
 StructField(registered,StringType,false),
 StructField(cnt,StringType,false)]

In [8]:
schema = StructType(fields)
schema

StructType(List(StructField(instant,StringType,false),StructField(dteday,StringType,false),StructField(season,StringType,false),StructField(yr,StringType,false),StructField(mnth,StringType,false),StructField(hr,StringType,false),StructField(holiday,StringType,false),StructField(weekday,StringType,false),StructField(workingday,StringType,false),StructField(weathersit,StringType,false),StructField(temp,StringType,false),StructField(atemp,StringType,false),StructField(hum,StringType,false),StructField(windspeed,StringType,false),StructField(casual,StringType,false),StructField(registered,StringType,false),StructField(cnt,StringType,false)))

In [9]:
# convert each line in the csv to a tuple
parts = bikesFiltered.map(lambda l: l.split(","))
bikesSplit = parts.map(lambda p: (p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7], p[8], p[9], p[10],
                                 p[11], p[12], p[13], p[14], p[15], p[16]))

In [10]:
# Apply the schema to the RDD.
bikes_df = sqlContext.createDataFrame(bikesSplit, schema)

In [11]:
bikes_df.show()

+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant|    dteday|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|  15086|2012-09-25|     4|  1|   9| 18|      0|      2|         1|         1|0.64|0.6212|0.41|   0.2239|    64|       758|822|
|   1060|2011-02-16|     1|  0|   2| 21|      0|      3|         1|         1|0.36|0.3485|0.46|    0.194|     5|        87| 92|
|   8138|2011-12-10|     4|  0|  12| 17|      0|      6|         0|         1|0.28|0.2576|0.36|   0.3284|    34|       151|185|
|  15068|2012-09-25|     4|  1|   9|  0|      0|      2|         1|         1|0.46|0.4545|0.67|   0.1642|     8|        56| 64|
|   6631|2011-10-08|     4|  0|  10| 20|      0|      6|         0|         1|0.52|   0.5|0.77|   0.1045

In [12]:
bikes_df.printSchema()

root
 |-- instant: string (nullable = false)
 |-- dteday: string (nullable = false)
 |-- season: string (nullable = false)
 |-- yr: string (nullable = false)
 |-- mnth: string (nullable = false)
 |-- hr: string (nullable = false)
 |-- holiday: string (nullable = false)
 |-- weekday: string (nullable = false)
 |-- workingday: string (nullable = false)
 |-- weathersit: string (nullable = false)
 |-- temp: string (nullable = false)
 |-- atemp: string (nullable = false)
 |-- hum: string (nullable = false)
 |-- windspeed: string (nullable = false)
 |-- casual: string (nullable = false)
 |-- registered: string (nullable = false)
 |-- cnt: string (nullable = false)



In [13]:
# now we can look for trends + data quality questions...

# total # of rows in the DataFrame
num_rows = bikes_df.count()

# number of distinct rows in the DataFrame
num_distinct = bikes_df.distinct().count()

# and we can start to see where pySpark returning python objects can be used locally
print "count() returns a python object of type " + str(type(num_rows))
print "number of duplicate rows in the DataFrame: " + str(num_rows - num_distinct)

count() returns a python object of type <type 'int'>
number of duplicate rows in the DataFrame: 0


In [14]:
# check out some more df methods
bikes_df.groupBy('holiday').count().show()

+-------+-----+
|holiday|count|
+-------+-----+
|      0|16879|
|      1|  500|
+-------+-----+



In [15]:
# let's looks at trips in July
july_trips = bikes_df.filter(bikes_df['mnth'] == 7)

# since we'll be working over the DAG quite a bit, let's persist the RDD in memory
july_trips.persist()

DataFrame[instant: string, dteday: string, season: string, yr: string, mnth: string, hr: string, holiday: string, weekday: string, workingday: string, weathersit: string, temp: string, atemp: string, hum: string, windspeed: string, casual: string, registered: string, cnt: string]

In [16]:
july_trips.count()

1488

In [17]:
july_trips.show()

+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant|    dteday|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|   4631|2011-07-16|     3|  0|   7| 20|      0|      6|         0|         1|0.72|0.6667|0.51|   0.2239|   108|       188|296|
|  13677|2012-07-29|     3|  1|   7|  1|      0|      0|         0|         1|0.66|0.6061|0.83|   0.1045|    49|       109|158|
|  13060|2012-07-03|     3|  1|   7|  8|      0|      2|         1|         1|0.74|0.6818|0.62|   0.0896|    42|       604|646|
|  13219|2012-07-09|     3|  1|   7| 23|      0|      1|         1|         2| 0.7|0.6515|0.65|   0.1045|    22|       109|131|
|  13233|2012-07-10|     3|  1|   7| 13|      0|      2|         1|         1|0.82|0.7273|0.38|    0.194

In [18]:
# what else would you examine here?
# more functions can be found here in documentation (listed in refs)



In [19]:
# when we are done working with data, remove from memory
july_trips.unpersist()

DataFrame[instant: string, dteday: string, season: string, yr: string, mnth: string, hr: string, holiday: string, weekday: string, workingday: string, weathersit: string, temp: string, atemp: string, hum: string, windspeed: string, casual: string, registered: string, cnt: string]

Refs:
- http://www.nodalpoint.com/spark-data-frames-from-csv-files-handling-headers-column-types/
- https://spark.apache.org/docs/1.6.0/api/python/_modules/pyspark/sql/types.html
- http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame