## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [2]:
# File location and type
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf
from datetime import datetime

file_location = "/FileStore/tables/hubway_trips.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","


#del df
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", "true") \
  .option("dateFormat", "MM/dd/yyyy HH:mm:ss") \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M:%S'), DateType())

tripEdges = df.withColumn('start_date', func(col('start_date'))).withColumn('end_date', func(col('end_date'))).withColumnRenamed('strt_statn','src').withColumnRenamed('end_statn','dst')




In [3]:

tripEdges.printSchema()




In [4]:

file_location = "/FileStore/tables/hubway_stations.csv"


# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","


#del df
# The applied options are for CSV files. For other file types, these will be ignored.
stationVerticies = spark.read.format(file_type) \
  .option("inferSchema", "true") \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)





In [5]:
#distance https://medium.com/@nikolasbielski/using-a-custom-udf-in-pyspark-to-compute-haversine-distances-d877b77b4b18


In [6]:
stationVerticies.printSchema()

In [7]:
# Create a view or table

temp_table_name = "hubway_trips_csv"

df.createOrReplaceTempView(temp_table_name)

In [8]:


small = spark.sql('select count(*) from hubway_trips_csv ')
display(small)

count(1)
1579025


## Installs
You need to install

pip install graphframes


and the Graph Jar that corresponds to your spark instance https://spark-packages.org/package/graphframes/graphframes

In [10]:
from functools import reduce
from pyspark.sql.functions import col, lit, when
from graphframes import *

stationGraph = GraphFrame(stationVerticies, tripEdges)

In [11]:
print(stationGraph)

# Basic graph and DataFrame queries
GraphFrames provide several simple graph queries, such as node degree.

In [13]:
display(stationGraph.vertices)

id,terminal,station,municipal,lat,lng,status
3,B32006,Colleges of the Fenway,Boston,42.340021,-71.100812,Existing
4,C32000,Tremont St. at Berkeley St.,Boston,42.345392,-71.069616,Existing
5,B32012,Northeastern U / North Parking Lot,Boston,42.341814,-71.090179,Existing
6,D32000,Cambridge St. at Joy St.,Boston,42.361285,-71.06514,Existing
7,A32000,Fan Pier,Boston,42.353412,-71.044624,Existing
8,A32001,Union Square - Brighton Ave. at Cambridge St.,Boston,42.353334,-71.137313,Existing
9,A32002,Agganis Arena - 925 Comm Ave.,Boston,42.351313,-71.116174,Existing
10,A32003,B.U. Central - 725 Comm. Ave.,Boston,42.350075,-71.105884,Existing
11,A32004,Longwood Ave / Binney St,Boston,42.338629,-71.1065,Existing
12,B32002,Ruggles Station / Columbus Ave.,Boston,42.335911,-71.088496,Existing


In [14]:
display(stationGraph.edges)

seq_id,hubway_id,status,duration,start_date,src,end_date,dst,bike_nr,subsc_type,zip_code,birth_date,gender
1,8,Closed,9,2011-07-28,23,2011-07-28,23,B00468,Registered,'97217,1976.0,Male
2,9,Closed,220,2011-07-28,23,2011-07-28,23,B00554,Registered,'02215,1966.0,Male
3,10,Closed,56,2011-07-28,23,2011-07-28,23,B00456,Registered,'02108,1943.0,Male
4,11,Closed,64,2011-07-28,23,2011-07-28,23,B00554,Registered,'02116,1981.0,Female
5,12,Closed,12,2011-07-28,23,2011-07-28,23,B00554,Registered,'97214,1983.0,Female
6,13,Closed,19,2011-07-28,23,2011-07-28,23,B00456,Registered,'02021,1951.0,Male
7,14,Closed,24,2011-07-28,23,2011-07-28,23,B00554,Registered,'02140,1971.0,Female
8,15,Closed,7,2011-07-28,23,2011-07-28,23,B00554,Registered,'02140,1971.0,Female
9,16,Closed,8,2011-07-28,23,2011-07-28,23,B00554,Registered,'97214,1983.0,Female
10,17,Closed,1108,2011-07-28,47,2011-07-28,40,B00550,Registered,'01867,1994.0,Male


# Verticies Degrees

### Use Cases
##### Network Communicaton
####### Mesh Networks

* Traffic monitoring
* Influence
* Communities

In [16]:
# The incoming degree of the vertices:
display(stationGraph.inDegrees)

id,inDegree
31.0,13389
85.0,364
137.0,6350
65.0,6705
53.0,33369
133.0,4528
78.0,7174
108.0,75
34.0,5243
115.0,4223


In [17]:
#The outgoing degree of the vertices:
display(stationGraph.outDegrees)

id,outDegree
31.0,12492
85.0,345
137.0,5815
65.0,6421
53.0,35440
133.0,4808
78.0,6232
108.0,92
34.0,5061
115.0,4243


In [18]:
# Run simple data frame queries
youngest = stationGraph.edges.groupBy().min("birth_date")
display(youngest)

min(birth_date)
1932


# Motifs

In [20]:
motifs = stationGraph.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

a,e,b,e2
"List(22, A32010, South Station - 700 Atlantic Ave., Boston, 42.352175, -71.055547, Existing)","List(2731, 3008, Closed, 1933, 2011-07-31, 22, 2011-07-31, 3, B00533, Casual, null, null, null)","List(3, B32006, Colleges of the Fenway, Boston, 42.340021, -71.100812, Existing)","List(264, 308, Closed, 1955, 2011-07-28, 3, 2011-07-28, 22, B00071, Registered, '02118, 1967, Male)"
"List(22, A32010, South Station - 700 Atlantic Ave., Boston, 42.352175, -71.055547, Existing)","List(2731, 3008, Closed, 1933, 2011-07-31, 22, 2011-07-31, 3, B00533, Casual, null, null, null)","List(3, B32006, Colleges of the Fenway, Boston, 42.340021, -71.100812, Existing)","List(10009, 10981, Closed, 1553, 2011-08-07, 3, 2011-08-07, 22, B00251, Registered, '02115, 1980, Female)"
"List(22, A32010, South Station - 700 Atlantic Ave., Boston, 42.352175, -71.055547, Existing)","List(2731, 3008, Closed, 1933, 2011-07-31, 22, 2011-07-31, 3, B00533, Casual, null, null, null)","List(3, B32006, Colleges of the Fenway, Boston, 42.340021, -71.100812, Existing)","List(14157, 15523, Closed, 1278, 2011-08-11, 3, 2011-08-11, 22, B00177, Registered, '02118, 1977, Female)"
"List(22, A32010, South Station - 700 Atlantic Ave., Boston, 42.352175, -71.055547, Existing)","List(2731, 3008, Closed, 1933, 2011-07-31, 22, 2011-07-31, 3, B00533, Casual, null, null, null)","List(3, B32006, Colleges of the Fenway, Boston, 42.340021, -71.100812, Existing)","List(53064, 60700, Closed, 1493, 2011-09-12, 3, 2011-09-12, 22, B00050, Registered, '02113, 1986, Female)"
"List(22, A32010, South Station - 700 Atlantic Ave., Boston, 42.352175, -71.055547, Existing)","List(2731, 3008, Closed, 1933, 2011-07-31, 22, 2011-07-31, 3, B00533, Casual, null, null, null)","List(3, B32006, Colleges of the Fenway, Boston, 42.340021, -71.100812, Existing)","List(145028, 163299, Closed, 1109, 2012-03-21, 3, 2012-03-21, 22, B00404, Registered, '02360, 1961, Female)"
"List(22, A32010, South Station - 700 Atlantic Ave., Boston, 42.352175, -71.055547, Existing)","List(2731, 3008, Closed, 1933, 2011-07-31, 22, 2011-07-31, 3, B00533, Casual, null, null, null)","List(3, B32006, Colleges of the Fenway, Boston, 42.340021, -71.100812, Existing)","List(147860, 166839, Closed, 1269, 2012-03-23, 3, 2012-03-23, 22, B00531, Registered, '02360, 1961, Female)"
"List(22, A32010, South Station - 700 Atlantic Ave., Boston, 42.352175, -71.055547, Existing)","List(2731, 3008, Closed, 1933, 2011-07-31, 22, 2011-07-31, 3, B00533, Casual, null, null, null)","List(3, B32006, Colleges of the Fenway, Boston, 42.340021, -71.100812, Existing)","List(154029, 174250, Closed, 1058, 2012-03-30, 3, 2012-03-30, 22, B00012, Registered, '02360, 1961, Female)"
"List(22, A32010, South Station - 700 Atlantic Ave., Boston, 42.352175, -71.055547, Existing)","List(2731, 3008, Closed, 1933, 2011-07-31, 22, 2011-07-31, 3, B00533, Casual, null, null, null)","List(3, B32006, Colleges of the Fenway, Boston, 42.340021, -71.100812, Existing)","List(156465, 177203, Closed, 1142, 2012-04-02, 3, 2012-04-02, 22, B00192, Registered, '02360, 1961, Female)"
"List(22, A32010, South Station - 700 Atlantic Ave., Boston, 42.352175, -71.055547, Existing)","List(2731, 3008, Closed, 1933, 2011-07-31, 22, 2011-07-31, 3, B00533, Casual, null, null, null)","List(3, B32006, Colleges of the Fenway, Boston, 42.340021, -71.100812, Existing)","List(158966, 180154, Closed, 1417, 2012-04-04, 3, 2012-04-04, 22, B00309, Registered, '02045, 1970, Male)"
"List(22, A32010, South Station - 700 Atlantic Ave., Boston, 42.352175, -71.055547, Existing)","List(2731, 3008, Closed, 1933, 2011-07-31, 22, 2011-07-31, 3, B00533, Casual, null, null, null)","List(3, B32006, Colleges of the Fenway, Boston, 42.340021, -71.100812, Existing)","List(159154, 180381, Closed, 1121, 2012-04-04, 3, 2012-04-04, 22, B00208, Registered, '02360, 1961, Female)"


In [21]:
filtered = motifs.filter("e.duration > 2000")
display(filtered)

# Subgraphs
GraphFrames provides APIs for building subgraphs by filtering on edges and vertices.

In [23]:
oldBoston = stationGraph.filterEdges("birth_date < '1940'").filterVertices("municipal = 'Boston'").dropIsolatedVertices()
display(oldBoston.vertices)

id,terminal,station,municipal,lat,lng,status
31,B32014,Seaport Hotel,Boston,42.348833,-71.041747,Existing
44,D32009,Faneuil Hall - Union St. at North St.,Boston,42.360583,-71.056868,Existing
12,B32002,Ruggles Station / Columbus Ave.,Boston,42.335911,-71.088496,Existing
47,D32010,Cross St. at Hanover St.,Boston,42.362811,-71.056067,Existing
6,D32000,Cambridge St. at Joy St.,Boston,42.361285,-71.06514,Existing
16,C32003,Back Bay / South End Station,Boston,42.347433,-71.076163,Existing
3,B32006,Colleges of the Fenway,Boston,42.340021,-71.100812,Existing
20,B32004,Aquarium Station - 200 Atlantic Ave.,Boston,42.35977,-71.051601,Existing
40,D32006,Lewis Wharf - Atlantic Ave.,Boston,42.363871,-71.050877,Existing
54,D32014,Tremont St / West St,Boston,42.354979,-71.063348,Existing


In [24]:
display(oldBoston.edges)

seq_id,hubway_id,status,duration,start_date,src,end_date,dst,bike_nr,subsc_type,zip_code,birth_date,gender
27319,30433,Closed,494,2011-08-22,38,2011-08-22,23,B00508,Registered,'01810,1939,Male
27578,30731,Closed,1247,2011-08-22,25,2011-08-22,16,B00567,Registered,'02118,1938,Male
27682,30845,Closed,277,2011-08-22,16,2011-08-22,50,B00563,Registered,'02118,1938,Male
27755,30923,Closed,577,2011-08-22,50,2011-08-22,25,B00563,Registered,'02118,1938,Male
30314,33676,Closed,505,2011-08-24,38,2011-08-24,23,B00098,Registered,'01810,1939,Male
30544,33920,Closed,839,2011-08-24,44,2011-08-24,38,B00451,Registered,'01810,1939,Male
30637,34017,Closed,587,2011-08-24,40,2011-08-24,31,B00167,Registered,'02118,1938,Male
30730,34115,Closed,431,2011-08-24,48,2011-08-24,47,B00198,Registered,'02118,1938,Male
30749,34135,Closed,307,2011-08-24,47,2011-08-24,40,B00198,Registered,'02118,1938,Male
38225,42233,Closed,446,2011-08-31,38,2011-08-31,23,B00570,Registered,'01810,1939,Male


# Standard graph algorithms
GraphFrames comes with a number of standard graph algorithms built in:

* PageRank (regular and personalized)
* Breadth-first search (BFS)
* Connected components
* Strongly connected components
* Label Propagation Algorithm (LPA)
* Shortest paths
* Triangle count

#Breadth First Search

In [27]:

filteredPaths =stationGraph.bfs(
  fromExpr = "station = 'Seaport Hotel'",
  toExpr = "station == 'Mayor Thomas M. Menino - Government Center'",
  edgeFilter = "birth_date < 1960",
  maxPathLength = 1)
display(filteredPaths)

from,e0,to
"List(31, B32014, Seaport Hotel, Boston, 42.348833, -71.041747, Existing)","List(5194, 5677, Closed, 251, 2011-08-04, 31, 2011-08-04, 23, B00061, Registered, '01810, 1957, Male)","List(23, B32008, Mayor Thomas M. Menino - Government Center, Boston, 42.359677, -71.059364, Removed)"
"List(31, B32014, Seaport Hotel, Boston, 42.348833, -71.041747, Existing)","List(23826, 26339, Closed, 284, 2011-08-19, 31, 2011-08-19, 23, B00424, Registered, '02110, 1955, Male)","List(23, B32008, Mayor Thomas M. Menino - Government Center, Boston, 42.359677, -71.059364, Removed)"
"List(31, B32014, Seaport Hotel, Boston, 42.348833, -71.041747, Existing)","List(136829, 153692, Closed, 307, 2011-11-22, 31, 2011-11-22, 23, B00361, Registered, '02116, 1950, Male)","List(23, B32008, Mayor Thomas M. Menino - Government Center, Boston, 42.359677, -71.059364, Removed)"
"List(31, B32014, Seaport Hotel, Boston, 42.348833, -71.041747, Existing)","List(233349, 267265, Closed, 688, 2012-05-22, 31, 2012-05-22, 23, B00412, Registered, '02446, 1955, Male)","List(23, B32008, Mayor Thomas M. Menino - Government Center, Boston, 42.359677, -71.059364, Removed)"
"List(31, B32014, Seaport Hotel, Boston, 42.348833, -71.041747, Existing)","List(259856, 297215, Closed, 688, 2012-06-06, 31, 2012-06-06, 23, B00104, Registered, '02130, 1954, Male)","List(23, B32008, Mayor Thomas M. Menino - Government Center, Boston, 42.359677, -71.059364, Removed)"
"List(31, B32014, Seaport Hotel, Boston, 42.348833, -71.041747, Existing)","List(316972, 360241, Closed, 1208, 2012-07-04, 31, 2012-07-04, 23, B00089, Registered, '01742, 1957, Female)","List(23, B32008, Mayor Thomas M. Menino - Government Center, Boston, 42.359677, -71.059364, Removed)"
"List(31, B32014, Seaport Hotel, Boston, 42.348833, -71.041747, Existing)","List(495651, 560120, Closed, 945, 2012-09-12, 31, 2012-09-12, 23, B00401, Registered, '02110, 1956, Male)","List(23, B32008, Mayor Thomas M. Menino - Government Center, Boston, 42.359677, -71.059364, Removed)"


#Connected components

In [29]:
#sc.setCheckpointDir("/tmp/graphframes-connected-components")
result = stationGraph.connectedComponents()
display(result)

id,terminal,station,municipal,lat,lng,status,component
3,B32006,Colleges of the Fenway,Boston,42.340021,-71.100812,Existing,3
4,C32000,Tremont St. at Berkeley St.,Boston,42.345392,-71.069616,Existing,3
5,B32012,Northeastern U / North Parking Lot,Boston,42.341814,-71.090179,Existing,3
6,D32000,Cambridge St. at Joy St.,Boston,42.361285,-71.06514,Existing,3
7,A32000,Fan Pier,Boston,42.353412,-71.044624,Existing,3
8,A32001,Union Square - Brighton Ave. at Cambridge St.,Boston,42.353334,-71.137313,Existing,3
9,A32002,Agganis Arena - 925 Comm Ave.,Boston,42.351313,-71.116174,Existing,3
10,A32003,B.U. Central - 725 Comm. Ave.,Boston,42.350075,-71.105884,Existing,3
11,A32004,Longwood Ave / Binney St,Boston,42.338629,-71.1065,Existing,3
12,B32002,Ruggles Station / Columbus Ave.,Boston,42.335911,-71.088496,Existing,3


# Conected Components

In the below example we examine if all nodes are connected

In [31]:
cc = result.select('component').distinct()
display(cc)

component
3


# Label Propagation
Run static Label Propagation Algorithm for detecting communities in networks.

Each node in the network is initially assigned to its own community. At every superstep, nodes send their community affiliation to all neighbors and update their state to the most frequent community affiliation of incoming messages.

LPA is a standard community detection algorithm for graphs. It is very inexpensive computationally, although (1) convergence is not guaranteed and (2) one can end up with trivial solutions (all nodes are identified into a single community).

In [33]:
vertices = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)], ["id", "name", "age"])
edges = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])

g = GraphFrame(vertices, edges)

In [34]:
result = g.labelPropagation(maxIter=5)
display(result)

id,name,age,label
g,Gabby,60,146028888064
b,Bob,36,1047972020224
e,Esther,32,412316860416
a,Alice,34,670014898176
f,Fanny,36,670014898176
d,David,29,670014898176
c,Charlie,30,1382979469312


Strongly connected components

In [36]:
result = g.stronglyConnectedComponents(maxIter=10)
display(result.select("id", "component"))

id,component
g,146028888064
b,1047972020224
e,670014898176
a,670014898176
f,412316860416
d,670014898176
c,1047972020224


#PageRank
Identify important vertices in a graph based on connections.

In [38]:
results = g.pageRank(resetProbability=0.15, tol=0.01)
display(results.vertices)

id,name,age,pagerank
g,Gabby,60,0.1799821386239711
b,Bob,36,2.655507832863289
e,Esther,32,0.3708523318767607
a,Alice,34,0.4491063370653874
f,Fanny,36,0.3283606792049851
d,David,29,0.3283606792049851
c,Charlie,30,2.6878300011606218


In [39]:
display(results.edges)

src,dst,relationship,weight
a,b,friend,0.5
b,c,follow,1.0
e,f,follow,0.5
e,d,friend,0.5
c,b,follow,1.0
a,e,friend,0.5
f,c,follow,1.0
d,a,friend,1.0


# Shortest paths
Computes shortest paths to the given set of landmark vertices, where landmarks are specified by vertex ID.

In [41]:
results = g.shortestPaths(landmarks=["a", "d"])
display(results)

id,name,age,distances
g,Gabby,60,Map()
b,Bob,36,Map()
e,Esther,32,"Map(d -> 1, a -> 2)"
a,Alice,34,"Map(a -> 0, d -> 2)"
f,Fanny,36,Map()
d,David,29,"Map(d -> 0, a -> 1)"
c,Charlie,30,Map()


# Triangle count
Computes the number of triangles passing through each vertex.

In [43]:
results = g.triangleCount()
display(results)

count,id,name,age
0,g,Gabby,60
0,f,Fanny,36
1,e,Esther,32
1,d,David,29
0,c,Charlie,30
0,b,Bob,36
1,a,Alice,34
