In [1]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

In [2]:
#Import the stations data
stationsna = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true').load("file:///home/cloudera/Downloads/station.csv")

In [3]:
stationsna.registerTempTable("stations")

In [5]:
#Load the Trips csv and cache the same.
trips = sqlContext.read.format("com.databricks.spark.csv").options(header='true').load("file:///home/cloudera/Downloads/trip.csv")
trips.registerTempTable("trips")


trips.cache()



DataFrame[Trip ID: string, Duration: string, Start Date: string, Start Station: string, Start Terminal: string, End Date: string, End Station: string, End Terminal: string, Bike #: string, Subscriber Type: string, Zip Code: string]

In [6]:
#Build Graph
from pyspark.sql.functions import *
from graphframes import *


In [7]:
#Import the data and create a graph using GraphFrames 
tripVertices = stationsna.withColumnRenamed("name", "id").distinct()
trips =trips.withColumnRenamed("Start Station","src").distinct()

trips =trips.withColumnRenamed("End Station","dst").distinct()

tripEdges = trips.select("Trip ID", "Duration","Start Date", "src" , "Start Terminal", "End Date", "dst" ,"End Terminal")


tripEdges.cache()
tripVertices.cache()

print tripEdges
print tripVertices


tripGraph = GraphFrame(tripVertices, tripEdges)

print tripGraph

DataFrame[Trip ID: string, Duration: string, Start Date: string, src: string, Start Terminal: string, End Date: string, dst: string, End Terminal: string]
DataFrame[station_id: int, id: string, lat: double, long: double, dockcount: int, landmark: string, installation: string]
GraphFrame(v:[id: string, station_id: int, lat: double, long: double, dockcount: int, landmark: string, installation: string], e:[src: string, dst: string, Trip ID: string, Duration: string, Start Date: string, Start Terminal: string, End Date: string, End Terminal: string])


In [8]:
#Find out number of incoming connections and outgoing connections for
#each node and print the top 10 nodes.

tripGraph.edges.groupBy("src", "dst").count().sort(desc("count")).limit(10).show()

+--------------------+--------------------+-----+
|                 src|                 dst|count|
+--------------------+--------------------+-----+
|San Francisco Cal...|     Townsend at 7th| 3748|
|Harry Bridges Pla...|Embarcadero at Sa...| 3145|
|     2nd at Townsend|Harry Bridges Pla...| 2973|
|     Townsend at 7th|San Francisco Cal...| 2734|
|Harry Bridges Pla...|     2nd at Townsend| 2640|
|Embarcadero at Fo...|San Francisco Cal...| 2439|
|   Steuart at Market|     2nd at Townsend| 2356|
|Embarcadero at Sa...|   Steuart at Market| 2330|
|     Townsend at 7th|San Francisco Cal...| 2192|
|Temporary Transba...|San Francisco Cal...| 2184|
+--------------------+--------------------+-----+



In [17]:
#Find out number of incoming connections and outgoing connections for
#each node and print the top 10 nodes.

indeg =tripGraph.inDegrees
indeg.sort(desc("inDegree")).limit(10).show()



+--------------------+--------+
|                  id|inDegree|
+--------------------+--------+
|San Francisco Cal...|   34810|
|San Francisco Cal...|   22523|
|Harry Bridges Pla...|   17810|
|     2nd at Townsend|   15463|
|     Townsend at 7th|   15422|
|Embarcadero at Sa...|   15065|
|   Market at Sansome|   13916|
|   Steuart at Market|   13617|
|Temporary Transba...|   12966|
|  Powell Street BART|   10239|
+--------------------+--------+



In [32]:
outdeg =tripGraph.outDegrees
outdeg.sort(desc("outDegree")).limit(10).show()

+--------------------+---------+
|                  id|outDegree|
+--------------------+---------+
|San Francisco Cal...|    26304|
|San Francisco Cal...|    21758|
|Harry Bridges Pla...|    17255|
|Temporary Transba...|    14436|
|Embarcadero at Sa...|    14158|
|     2nd at Townsend|    14026|
|     Townsend at 7th|    13752|
|   Steuart at Market|    13687|
|      Market at 10th|    11885|
|   Market at Sansome|    11431|
+--------------------+---------+



In [10]:
#Find out which are the most common direct routes that people take and
#print top 10.
motifs = tripGraph.find("(a)-[e]->(b);(b)-[e]->(a)").show(10)

+--------------------+--------------------+--------------------+--------------------+
|                   e|                   a|                   b|                   e|
+--------------------+--------------------+--------------------+--------------------+
|[784348,446,5/28/...|[39,Powell Street...|[70,San Francisco...|[867445,515,7/29/...|
|[784348,446,5/28/...|[39,Powell Street...|[70,San Francisco...|[783782,859,5/28/...|
|[784348,446,5/28/...|[39,Powell Street...|[70,San Francisco...|[705139,814,3/31/...|
|[784348,446,5/28/...|[39,Powell Street...|[70,San Francisco...|[689940,489,3/19/...|
|[784348,446,5/28/...|[39,Powell Street...|[70,San Francisco...|[624428,637,1/29/...|
|[784348,446,5/28/...|[39,Powell Street...|[70,San Francisco...|[609975,371,1/19/...|
|[784348,446,5/28/...|[39,Powell Street...|[70,San Francisco...|[579303,515,12/17...|
|[784348,446,5/28/...|[39,Powell Street...|[70,San Francisco...|[446690,611,9/10/...|
|[784348,446,5/28/...|[39,Powell Street...|[70,San Fra

In [20]:
#From the analysis in b, see which are the stations where people most
#frequently start their trips but do not come back.

inDeg = tripGraph.inDegrees
outDeg = tripGraph.outDegrees

degreeRatio = inDeg.join(outDeg, inDeg.id == outDeg.id).drop(outDeg.id).selectExpr("id", "(inDegree)/(outDegree) as degreeRatio").cache()

degreeRatio.sort(desc("id")).limit(10).show()




+--------------------+------------------+
|                  id|       degreeRatio|
+--------------------+------------------+
|Yerba Buena Cente...|1.1385116784356328|
|Washington at Kearny|1.3086466165413533|
|University and Em...|1.1966942148760331|
|     Townsend at 7th| 1.121436881908086|
|Temporary Transba...|0.8981712385702411|
|   Steuart at Market|0.9948856579235771|
|Stanford in Redwo...|               1.0|
|       St James Park|0.8688915375446961|
|     Spear at Folsom|0.8954072479368497|
|South Van Ness at...|0.8303267528931245|
+--------------------+------------------+



In [11]:
#Find all such patterns where any station a is connected to station b, b is
#connected to c, but c is not directly connected to a.

motifs = tripGraph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)").show(10)






Py4JJavaError: An error occurred while calling o115.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 77 in stage 27.0 failed 1 times, most recent failure: Lost task 77.0 in stage 27.0 (TID 3085, localhost): java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:345)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at org.xerial.snappy.SnappyOutputStream.dump(SnappyOutputStream.java:297)
	at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:244)
	at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:99)
	at org.apache.spark.io.SnappyOutputStreamWrapper.write(CompressionCodec.scala:202)
	at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1889)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1874)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
	at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1573)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.SerializationStream.writeKey(Serializer.scala:145)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:203)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
	at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
	at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
	at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
	at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
	at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
	at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
	at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
	at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:345)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at org.xerial.snappy.SnappyOutputStream.dump(SnappyOutputStream.java:297)
	at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:244)
	at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:99)
	at org.apache.spark.io.SnappyOutputStreamWrapper.write(CompressionCodec.scala:202)
	at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1889)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1874)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
	at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1573)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.SerializationStream.writeKey(Serializer.scala:145)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:203)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more


In [8]:
# Run a PageRank algorithm to figure out which is the most important
#station in the entire graph.

ranks = tripGraph.pageRank(resetProbability=0.15, maxIter=5)
ranks.vertices.orderBy(ranks.vertices.pagerank.desc()).limit(20).show()


 

+----------+--------------------+---------+-----------+---------+-------------+------------+------------------+
|station_id|                  id|      lat|       long|dockcount|     landmark|installation|          pagerank|
+----------+--------------------+---------+-----------+---------+-------------+------------+------------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|     San Jose|    8/6/2013| 2.281525822423689|
|        70|San Francisco Cal...|37.776617| -122.39526|       19|San Francisco|   8/23/2013|1.9076925128811353|
|        22|Redwood City Calt...|37.486078|-122.232089|       25| Redwood City|   8/15/2013|1.4225549210068076|
|        28|Mountain View Cal...|37.394358|-122.076713|       23|Mountain View|   8/15/2013|1.3956850803609109|
|        69|San Francisco Cal...|  37.7766| -122.39547|       23|San Francisco|   8/23/2013|1.2880093979600193|
|        50|Harry Bridges Pla...|37.795392|-122.394203|       23|San Francisco|   8/20/2013|1.0394789374