In [None]:
!pip install pyspark



In [None]:
!pip install graphframes



In [None]:
!curl -L -o "/usr/local/lib/python3.6/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.2-s_2.12.jar" http://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.8.2-spark3.2-s_2.12/graphframes-0.8.2-spark3.2-s_2.12.jar

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   164  100   164    0     0   6158      0 --:--:-- --:--:-- --:--:--  6307
100   146  100   146    0     0   1405      0 --:--:-- --:--:-- --:--:--  1405
curl: (23) Failure writing output to destination


In [None]:
from tqdm import tqdm
import pandas as pd
import glob
import torch
import torch.nn as nn
from PIL import Image
import torchvision.transforms as transforms
import torchvision.models as models
from google.colab import drive
import os
from graphframes import GraphFrame
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12").getOrCreate()

In [None]:
station_path = "/content/drive/My Drive/bike-data/station_data.csv"
station_data_df = spark.read.csv(station_path, header=True, inferSchema=True)

station_data_df.printSchema()


root
 |-- station_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dockcount: integer (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)



In [None]:
trip_path = "/content/drive/My Drive/bike-data/trip_data.csv"
trip_data_df = spark.read.csv(trip_path, header=True, inferSchema=True)

trip_data_df.printSchema()

root
 |-- Trip ID: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- Start Terminal: integer (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Station: string (nullable = true)
 |-- End Terminal: integer (nullable = true)
 |-- Bike #: integer (nullable = true)
 |-- Subscriber Type: string (nullable = true)
 |-- Zip Code: integer (nullable = true)



In [None]:
station_data_df = station_data_df.withColumnRenamed('name', 'id')


In [None]:
trip_data_df = trip_data_df.withColumnRenamed('Start Station', 'src').withColumnRenamed('End Station', 'dst')

In [None]:
graph = GraphFrame(station_data_df, trip_data_df)



In [None]:
trip_counts = graph.edges.groupBy("src", "dst").count().orderBy("count", ascending=False)

trip_counts.show()


+--------------------+--------------------+-----+
|                 src|                 dst|count|
+--------------------+--------------------+-----+
|San Francisco Cal...|     Townsend at 7th|    4|
|       5th at Howard|San Francisco Cal...|    3|
|San Francisco Cal...|  Powell Street BART|    2|
|     2nd at Townsend|   Market at Sansome|    2|
|     Spear at Folsom|     2nd at Townsend|    2|
|   Market at Sansome|Broadway St at Ba...|    2|
|    Davis at Jackson|Embarcadero at Sa...|    2|
|San Francisco Cal...|   2nd at South Park|    2|
|   Steuart at Market|San Francisco Cal...|    2|
|       Market at 4th|San Francisco Cal...|    2|
|Embarcadero at Fo...|Embarcadero at Sa...|    2|
|Mountain View Cal...|Rengstorff Avenue...|    1|
|    Davis at Jackson|Temporary Transba...|    1|
|       5th at Howard|     Townsend at 7th|    1|
|   Market at Sansome|South Van Ness at...|    1|
|     Beale at Market|Temporary Transba...|    1|
|     2nd at Townsend|Powell at Post (U...|    1|


In [None]:
graph.edges.filter('src = "Townsend at 7th" or dst == "Townsend at 7th"').groupBy("src", "dst").count().orderBy("count", ascending=False)
trip_counts.show()


+--------------------+--------------------+-----+
|                 src|                 dst|count|
+--------------------+--------------------+-----+
|San Francisco Cal...|     Townsend at 7th|    4|
|       5th at Howard|San Francisco Cal...|    3|
|San Francisco Cal...|  Powell Street BART|    2|
|     2nd at Townsend|   Market at Sansome|    2|
|     Spear at Folsom|     2nd at Townsend|    2|
|   Market at Sansome|Broadway St at Ba...|    2|
|    Davis at Jackson|Embarcadero at Sa...|    2|
|San Francisco Cal...|   2nd at South Park|    2|
|   Steuart at Market|San Francisco Cal...|    2|
|       Market at 4th|San Francisco Cal...|    2|
|Embarcadero at Fo...|Embarcadero at Sa...|    2|
|Mountain View Cal...|Rengstorff Avenue...|    1|
|    Davis at Jackson|Temporary Transba...|    1|
|       5th at Howard|     Townsend at 7th|    1|
|   Market at Sansome|South Van Ness at...|    1|
|     Beale at Market|Temporary Transba...|    1|
|     2nd at Townsend|Powell at Post (U...|    1|


In [None]:
# create a new df of trips that start at spear
trips_starting_at_spear = trip_data_df.filter(trip_data_df['src'] == 'Spear at Folsom')

# greate a gf from the trips_starting_at_spear DataFrame
graph_spear = GraphFrame(station_data_df, trips_starting_at_spear)

# get the destinations of all trips starting at spear
destinations_from_spear = graph_spear.edges.select('dst').distinct()

# get the destinations of all trips in the original graph
all_destinations = graph.edges.select('dst').distinct()

# find the destinations that are not in the destinations from spear df
never_destinations = all_destinations.subtract(destinations_from_spear)

# get the vertices that correspond to the never destinations
never_destination_vertices = station_data_df.join(never_destinations, station_data_df.id == never_destinations.dst)

never_destination_vertices.show()


+----------+--------------------+---------+-----------+---------+-------------+------------+--------------------+
|station_id|                  id|      lat|       long|dockcount|     landmark|installation|                 dst|
+----------+--------------------+---------+-----------+---------+-------------+------------+--------------------+
|         8| San Salvador at 1st|37.330165|-121.885831|       15|     San Jose|    8/5/2013| San Salvador at 1st|
|        27|Mountain View Cit...|37.389218|-122.081896|       15|Mountain View|   8/16/2013|Mountain View Cit...|
|        28|Mountain View Cal...|37.394358|-122.076713|       23|Mountain View|   8/15/2013|Mountain View Cal...|
|        32|Castro Street and...|37.385956|-122.083678|       11|Mountain View|  12/31/2013|Castro Street and...|
|        33|Rengstorff Avenue...|37.400241|-122.099076|       15|Mountain View|   8/16/2013|Rengstorff Avenue...|
|        37|Cowper at University|37.448598|-122.159504|       11|    Palo Alto|   8/14/2

In [None]:
graph.inDegrees.orderBy("inDegree", ascending=False).limit(1).show()


+--------------------+--------+
|                  id|inDegree|
+--------------------+--------+
|San Francisco Cal...|       9|
+--------------------+--------+



In [None]:
graph.edges.orderBy('Duration').limit(1).show()

+-------+--------+---------------+---------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|            src|Start Terminal|       End Date|                 dst|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+---------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913449|     126|8/31/2015 22:12|Beale at Market|            56|8/31/2015 22:15|Temporary Transba...|          55|   439|     Subscriber|   94130|
+-------+--------+---------------+---------------+--------------+---------------+--------------------+------------+------+---------------+--------+



In [None]:
graph_townsend = GraphFrame(station_data_df, trip_data_df.filter((trip_data_df['src'] == 'Townsend at 7th') | (trip_data_df['dst'] == 'Townsend at 7th')))


In [None]:
graph.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(a)").show(5)

+--------------------+--------------------+--------------------+
|                   a|                   b|                   c|
+--------------------+--------------------+--------------------+
|{49, Spear at Fol...|{69, San Francisc...|{61, 2nd at Towns...|
|{49, Spear at Fol...|{69, San Francisc...|{65, Townsend at ...|
|{49, Spear at Fol...|{69, San Francisc...|{64, 2nd at South...|
|{49, Spear at Fol...|{69, San Francisc...|{64, 2nd at South...|
|{49, Spear at Fol...|{69, San Francisc...|{65, Townsend at ...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [None]:
graph_townsend.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(d)").show(5)

+--------------------+--------------------+--------------------+--------------------+
|                   a|                   b|                   c|                   d|
+--------------------+--------------------+--------------------+--------------------+
|{49, Spear at Fol...|{65, Townsend at ...|{49, Spear at Fol...|{65, Townsend at ...|
|{57, 5th at Howar...|{65, Townsend at ...|{49, Spear at Fol...|{65, Townsend at ...|
|{63, Howard at 2n...|{65, Townsend at ...|{49, Spear at Fol...|{65, Townsend at ...|
|{65, Townsend at ...|{49, Spear at Fol...|{65, Townsend at ...|{49, Spear at Fol...|
|{65, Townsend at ...|{49, Spear at Fol...|{65, Townsend at ...|{50, Harry Bridge...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows

