In [1]:
from functools import reduce
from pyspark.sql.functions import col, lit, when
from graphframes import *

In [2]:
%sh curl -O 'https://s3.eu-central-1.amazonaws.com/meetupsocialnetwork/groups_nodefile.csv'

In [3]:
%sh curl -O 'https://s3.eu-central-1.amazonaws.com/meetupsocialnetwork/group_nw_.csv'

In [4]:
%sh curl -O 'https://s3.eu-central-1.amazonaws.com/meetupsocialnetwork/venues_nodefile.csv'

In [5]:
%sh curl -O 'https://s3.eu-central-1.amazonaws.com/meetupsocialnetwork/venues_edgefile.csv'

In [6]:
%fs ls "file:/databricks/driver"

path,name,size
file:/databricks/driver/conf/,conf/,4096
file:/databricks/driver/logs/,logs/,4096
file:/databricks/driver/derby.log,derby.log,731
file:/databricks/driver/venues_edgefile.csv,venues_edgefile.csv,243
file:/databricks/driver/eventlogs/,eventlogs/,4096
file:/databricks/driver/group_nw_.csv,group_nw_.csv,1339503326
file:/databricks/driver/groups_nodefile.csv,groups_nodefile.csv,1361227


In [7]:
# Read nodes and edges for group network
group_nodes = spark.read.csv('file:/databricks/driver/groups_nodefile.csv',inferSchema=True, header =True)
group_edges = spark.read.csv('file:/databricks/driver/group_nw_.csv',inferSchema=True, header =True)

print(group_nodes.count()) #16330 nodes
print(group_edges.count()) # 13079090 edges

In [8]:
group_nodes.printSchema()

In [9]:
group_edges.printSchema()

In [10]:
# Change colums name to construct the graph
oldColumns = group_nodes.schema.names
newColumns = ["id", "name", "category_id", "num_members", "city", "date_created", "rating"]

group_nodes = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), group_nodes)
group_nodes.printSchema()

In [11]:
# Change column names to construct the graph
oldColumns = group_edges.schema.names
newColumns = ["id", "edge_id", "src", "dst", "weight", "common_members"]

group_edges = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), group_edges)
group_edges.printSchema()

In [12]:
# Reduce number of edges
from pyspark.sql.functions import col
group_edges_reduced = group_edges.filter(col("weight") >= 5)


In [13]:
print(group_edges.count())
print(group_edges_reduced.count())

In [14]:
# Create the graph
g = GraphFrame(group_nodes, group_edges_reduced)

In [15]:
smaller = g.vertices.groupBy().min("num_members")
display(smaller)

min(num_members)
1


In [16]:
bigger = g.vertices.groupBy().max("num_members")
display(bigger)

max(num_members)
31215


In [17]:
display(g.vertices)

id,name,category_id,num_members,city,date_created,rating
6388,Alternative Health NYC,14,1440,New York,2002-11-21T16:50:46.000+0000,4.39
6510,Alternative Energy Meetup,4,969,New York,2003-05-20T14:48:54.000+0000,4.31
8458,NYC Animal Rights,26,2930,New York,2004-03-27T09:55:41.000+0000,4.84
8940,The New York City Anime Group,29,5080,New York,2002-11-16T04:49:16.000+0000,4.46
10104,NYC Pit Bull Group,26,2097,New York,2003-10-22T21:39:49.000+0000,4.09
10359,NYC International Arabic Language & Culture Club,16,1171,New York,2003-05-22T14:19:52.000+0000,4.58
12111,The New York City American Sign Language Meetup Group,16,140,New York,2002-10-08T17:22:28.000+0000,4.46
12542,The New York City Astrology Meetup,22,1738,New York,2002-10-08T17:22:23.000+0000,4.59
12907,New York City Atheists Meetings,28,2454,New York,2002-10-30T03:09:34.000+0000,4.63
14573,The New York City Ayn Rand Group,13,557,New York,2003-04-14T17:20:41.000+0000,4.56


In [18]:
display(g.edges)

id,edge_id,src,dst,weight,common_members
0,0,490552,1474611,57,"[3, 1418715, 1671282, 2148610, 2319427, 2709298, 2741328, 3169887, 3226548, 3592226, 3775564, 3890522, 4297339, 4331241, 5029229, 5620776, 5789458, 7607137, 8468628, 8693413, 9245645, 9308780, 9504442, 11108096, 11825789, 12093969, 12313224, 13861471, 30204882, 45304242, 45684522, 91319762, 104440312, 115507992, 142427502, 181895502, 183329781, 183757846, 184755010, 184942179, 185540240, 185711418, 185739525, 186244961, 186625433, 186884321, 187850939, 190974951, 192514972, 194186667, 200829702, 201854997, 201940363, 203147732, 208119525, 211275878, 213263083]"
1,1,490552,1490492,66,"[3, 1418715, 1584644, 2148610, 2709298, 3300861, 3592226, 3826156, 3890522, 4023165, 4097057, 4212790, 4340948, 4526750, 5029229, 5620776, 5789458, 5954245, 6431237, 6705140, 6958909, 7217725, 8312903, 8371582, 9131581, 9245645, 9306675, 9504442, 10496458, 10632989, 10750025, 10829602, 11041705, 11108096, 12093969, 13276555, 13566300, 13824655, 13918962, 13975863, 14221662, 14451551, 26187642, 31492942, 37209012, 45684522, 57566832, 64537082, 91318072, 100644052, 112736122, 135580162, 181895502, 182649499, 183329781, 184202937, 184221078, 184348918, 184755010, 185083341, 185963137, 186625433, 186669418, 188796105, 190974951, 191633392]"
2,2,490552,1515830,167,"[3, 1418715, 1597206, 1671282, 2148610, 2220252, 2496160, 2877642, 2972671, 3169887, 3181426, 3226548, 3300861, 3342013, 3592226, 3775564, 3810896, 3826156, 3925169, 4097057, 4212790, 4331241, 4340948, 5029229, 5620776, 5789458, 5826019, 6285042, 6285784, 6295534, 6876644, 6958909, 7217725, 7246838, 7426060, 7472311, 8371582, 8468628, 8693413, 9245645, 9504442, 10366201, 10632989, 11041705, 11192984, 11825789, 11926529, 12093969, 12566946, 13051582, 13329930, 13390033, 13861471, 13918962, 14221662, 14242793, 14451551, 21146231, 30204882, 37756082, 45304242, 57566832, 66214572, 77107442, 96267532, 100644052, 104440312, 112736122, 115507992, 116018732, 116761632, 124786812, 124833932, 132035062, 159155142, 177625822, 180210262, 181895502, 182649499, 183329781, 183417875, 183679635, 184221078, 184295112, 184348918, 184605126, 185343927, 185516598, 185675499, 185711452, 186233159, 186625433, 186669418, 186957479, 187405748, 187850939, 188532151, 189000563, 189021660, 189058705, 189462531, 190036811, 190974951, 191633392, 192514972, 193783737, 193918455, 195103922, 195788678, 197125844, 197417264, 198393427, 198571871, 198611759, 199139615, 201257244, 202686256, 203147732, 205510514, 206337168, 206947318, 207125360, 208119525, 210773539, 210791034, 210904396, 211275878, 211413603, 212486018, 212973561, 213794549, 213970819, 214221841, 214600680, 215095050, 215253724, 218010154, 219445363, 219898810, 220103001, 220288124, 221223822, 222824503, 223326608, 223547550, 223787449, 225382547, 226495961, 227011247, 227765943, 227816999, 228888786, 228966438, 228994890, 229908274, 230493176, 230712669, 230879378, 230982694, 231039234, 231668232, 231948039, 232536441, 232985942, 235064725, 238687841, 239669257]"
3,3,490552,1574965,46,"[3, 1418715, 1597206, 1671282, 2148610, 2496160, 3169887, 3226548, 3483447, 3592226, 3826156, 4097057, 4297339, 4340948, 4526750, 5029229, 5620776, 5789458, 6958909, 7472311, 8468628, 8693413, 9245645, 10289634, 11041705, 11300185, 11825789, 12093969, 13861471, 14221662, 21146231, 30204882, 31492942, 37756082, 45304242, 45684522, 51770172, 67119482, 91318072, 100644052, 135580162, 181895502, 189000563, 190974951, 216349950, 228589723]"
4,4,490552,1669000,62,"[3, 1584644, 1597206, 2684823, 2741328, 3181426, 3361856, 3592226, 3775564, 4273357, 5166001, 5620776, 7472311, 8772513, 9245645, 10750025, 12093969, 12313224, 12559818, 20100751, 57566832, 79068232, 94148492, 95445572, 100644052, 120822842, 124833932, 132979882, 135334272, 181895502, 183329781, 183530787, 185343927, 186002668, 187850939, 190974951, 194186667, 198449617, 198611759, 201233389, 201257244, 201915129, 202686256, 204532506, 208105025, 208119525, 208582566, 212486018, 213806018, 216877845, 219345202, 221223822, 223456125, 227765943, 228242091, 228966438, 228994890, 229908274, 230982694, 231668232, 233514831, 239669257]"
5,5,490552,1751326,52,"[3, 2884419, 2972671, 3169887, 3226548, 3483447, 4273357, 4297339, 5029229, 5620776, 5826019, 6233459, 6431237, 6958909, 7426060, 8116624, 8468628, 8693413, 10040920, 11825789, 12093969, 12313224, 13824655, 13861471, 13975863, 26187642, 30204882, 36040252, 45304242, 64537082, 66214572, 86517282, 89832292, 100644052, 112736122, 115507992, 124786812, 135580162, 141406412, 174222702, 180210262, 181895502, 186669418, 190036811, 190974951, 195753341, 197589056, 212813677, 216002305, 216349950, 218332429, 237479880]"
6,6,490552,1753252,6,"[3, 57566832, 185496514, 193783737, 213813218, 228242091]"
7,7,490552,1761388,10,"[3, 2220252, 3226548, 12093969, 26187642, 38411432, 74161992, 90477942, 135334272, 186002668]"
9,9,1474611,1490492,348,"[3, 67469, 233986, 595073, 773698, 873741, 886630, 973554, 1237529, 1326031, 1418715, 1675958, 1762810, 2057039, 2136899, 2148610, 2269810, 2378358, 2536097, 2588419, 2709298, 2712903, 2750143, 2780572, 2926974, 2928099, 2994667, 3180959, 3379840, 3492592, 3592226, 3613166, 3709204, 3740480, 3747521, 3787888, 3877960, 3890522, 3893585, 3966633, 4118992, 4379102, 4400101, 4848396, 5017205, 5029229, 5132132, 5344588, 5620776, 5647984, 5669029, 5729644, 5789458, 5844008, 5866862, 6206656, 6393334, 6470150, 6744301, 6901233, 6911153, 6937729, 7040591, 7059735, 7193970, 7303510, 7466364, 7506178, 7609755, 7649064, 7690098, 7902155, 7961962, 8088284, 8147769, 8156360, 8359765, 8378458, 8534461, 8577613, 8650271, 8677227, 8755729, 8777464, 8784594, 8822560, 8822825, 8842863, 8895881, 8896141, 9069838, 9192724, 9245645, 9341285, 9504442, 9553349, 9570636, 9632751, 9640749, 9653893, 9679257, 9696313, 9728902, 9812625, 9823402, 9872894, 9877326, 9879800, 9990143, 9993792, 10118910, 10264528, 10323957, 10334896, 10349531, 10543468, 10875182, 10902368, 10949128, 10981710, 11051486, 11108096, 11163201, 11362751, 11370154, 11397105, 11479370, 11482352, 11488723, 11518209, 11592230, 11606557, 11610265, 11618473, 11679676, 11685565, 11695537, 11823588, 11850353, 11868954, 11879893, 11880880, 11920097, 12025329, 12054730, 12065745, 12093969, 12161624, 12161836, 12170832, 12214608, 12237742, 12273335, 12387372, 12388486, 12471547, 12570525, 12599450, 12849841, 12913264, 12973344, 13074265, 13241101, 13303966, 13321463, 13334982, 13338440, 13367944, 13402404, 13492084, 13503474, 13538815, 13574089, 13589026, 13658488, 13743222, 13745970, 13762963, 13799851, 13820656, 13829288, 13840037, 13902388, 13918975, 13985598, 13999926, 14109698, 14135026, 14142022, 14160309, 14216729, 14332414, 14420910, 14566828, 14600823, 14630332, 15843521, 18146331, 18221361, 18889741, 19098391, 19800601, 20589511, 21142031, 21413381, 21464271, 22095461, 22143321, 22270791, 23281521, 24425942, 25303412, 25960282, 26629952, 26723942, 26895352, 32475202, 32478012, 33084932, 33400082, 33483652, 34033302, 34238152, 35244942, 35589622, 37202182, 37343562, 37854842, 39689102, 43007202, 43620852, 44871722, 45684522, 45738492, 46341512, 46610942, 48157942, 48281522, 48901742, 48997662, 50068952, 52741192, 53368692, 55430282, 57767722, 58403962, 60472272, 61328692, 63571122, 66392172, 67502302, 70102122, 71835872, 72658122, 74960932, 79355462, 84829572, 88785122, 89184832, 92373192, 95424702, 100534752, 103721462, 104455842, 108311122, 108991272, 112257212, 114824742, 115067642, 116519472, 118183012, 118532382, 121038132, 121694782, 124809412, 127036372, 127547522, 130942212, 133968282, 134789582, 139908292, 145688572, 147576172, 152242892, 156341242, 159820142, 159898282, 160453852, 162348992, 162516012, 166081692, 171895662, 173113422, 174425442, 178571452, 178729082, 181895502, 182668772, 183001745, 183025105, 183329781, 183499447, 183573871, 183578400, 183972575, 184101613, 184118266, 184323903, 184578871, 184625683, 184755010, 184787032, 184925457, 185654890, 186241038, 186509410, 186512500, 186625433, 187935251, 187960921, 187989145, 188148286, 188325632, 188326292, 188392069, 188577668, 188784427, 188834643, 188885815, 188958262, 189092149, 189215988, 189488477, 189592302, 190010927, 190300190, 190446436, 190503887, 190689350, 190974951, 191097785, 191450426, 191866045, 192256920, 194451019, 195106792, 206394993, 206453612]"
10,10,1474611,1515830,686,"[3, 67469, 92040, 430912, 495008, 523515, 595073, 886630, 1237529, 1326031, 1418715, 1671282, 1675958, 1741064, 1762810, 2027654, 2057039, 2136899, 2148610, 2325854, 2378358, 2481103, 2536097, 2634766, 2669559, 2703097, 2738421, 2750143, 2852710, 2892461, 2994667, 3004670, 3112717, 3169887, 3180959, 3226548, 3287577, 3344144, 3379840, 3515661, 3565452, 3592226, 3593980, 3640972, 3679984, 3709204, 3775564, 3785746, 3787888, 3893585, 3917259, 3955995, 3979785, 4118992, 4221858, 4325727, 4331241, 4379102, 4400101, 4577408, 4848396, 4972475, 5017205, 5029229, 5210987, 5239276, 5472639, 5620776, 5647984, 5721489, 5729644, 5789458, 5813002, 5844008, 5855795, 5866862, 6073434, 6173450, 6298451, 6470150, 6901233, 6924666, 6937729, 7059291, 7106009, 7344758, 7466364, 7595000, 7606394, 7649064, 7690098, 7696181, 7902155, 7934178, 7999036, 8085123, 8088284, 8147769, 8182628, 8378458, 8468628, 8551909, 8577613, 8602722, 8650271, 8673161, 8674724, 8677227, 8690864, 8693413, 8784594, 8814152, 8815126, 8842863, 8846279, 8853462, 8869105, 8895881, 8896141, 8935017, 9134352, 9141038, 9216498, 9245645, 9319398, 9341285, 9350069, 9500770, 9504442, 9551391, 9632751, 9647969, 9654171, 9661491, 9667281, 9696313, 9715620, 9774254, 9816397, 9823402, 9826276, 9888613, 10093761, 10115851, 10129629, 10152481, 10264528, 10323957, 10329797, 10367223, 10372474, 10537997, 10616162, 10692492, 10720103, 10875182, 10894735, 10902368, 10973961, 10979658, 11005245, 11008485, 11107395, 11133623, 11192226, 11362751, 11370154, 11464948, 11518209, 11618473, 11679676, 11729870, 11734148, 11774611, 11809313, 11825789, 11880880, 12067532, 12072094, 12088656, 12093969, 12137143, 12139820, 12161624, 12169484, 12170832, 12220393, 12237742, 12258935, 12335782, 12338601, 12533267, 12570114, 12570525, 12599450, 12606901, 12637979, 12928123, 12989505, 13056730, 13108539, 13139962, 13156343, 13204759, 13218444, 13226353, 13266726, 13281282, 13303966, 13321463, 13338440, 13367944, 13368745, 13368878, 13376443, 13397212, 13402404, 13427539, 13492084, 13510999, 13515246, 13548524, 13572225, 13574089, 13589026, 13614074, 13630843, 13762355, 13799851, 13844405, 13861471, 13877507, 13917157, 13952846, 13976496, 14142022, 14150127, 14197670, 14272599, 14309162, 14332414, 14409167, 14412257, 14420595, 14457319, 14513549, 14526135, 14543499, 14566828, 14651526, 15908931, 16917921, 17210691, 17221631, 17292061, 17349271, 17686301, 17938572, 18146331, 19195991, 19330621, 19513681, 19954761, 20220141, 20589511, 20770951, 21039221, 21280691, 21413381, 22270791, 22404261, 22584561, 23281521, 24086972, 24123312, 24874222, 25077052, 25245322, 25303412, 25960282, 26192242, 27601472, 27938442, 28193852, 28760712, 29395472, 29458712, 30204882, 30497652, 30690892, 30805522, 31251842, 31579052, 31847362, 32478012, 33084932, 33922702, 35230102, 36323532, 36571882, 37343562, 37473582, 38148992, 38623862, 39214352, 39342102, 39560222, 39626652, 39689102, 39983122, 42104172, 43007202, 43234092, 43620852, 44454732, 44871722, 45304242, 45712772, 46341512, 46562632, 46979422, 47415772, 47996912, 48343352, 48462522, 48724492, 48901742, 50618272, 51239782, 51500362, 52090452, 52881432, 53002342, 53737562, 56417612, 58119362, 58403962, 59281792, 59495212, 59602072, 60472272, 60923922, 61328692, 61764532, 62127092, 65246162, 66392172, 67502302, 69233122, 70102122, 70371322, 71835872, 71952242, 72658122, 73233952, 73480452, 74452712, 74960932, 74991542, 77925232, 78327722, 79894892, 80370082, 80656192, 82018542, 83626172, 83934342, 84613052, 84829572, 85077432, 85576442, 86342552, 86380622, 88810932, 89082062, 91717122, 92373192, 93007152, 93463752, 93502712, 93812322, 94232452, 94250432, 94663492, 95424702, 95987632, 97018932, 97540682, 97820532, 98560992, 98939522, 100478672, 101334042, 101697182, 102495002, 104440312, 105989182, 107954232, 108311122, 108396032, 109077662, 109598702, 110396962, 111476352, 111732342, 112045772, 113126222, 113329382, 115507992, 115706482, 116763382, 116890312, 117731462, 117741392, 118183012, 118542652, 118637882, 119291942, 119940372, 120531452, 121038132, 122209112, 122767312, 123315842, 124579442, 125129532, 126257062, 126594342, 126963392, 127544682, 129662342, 130384452, 131621752, 132654862, 133497822, 134323752, 134789582, 135964702, 136092132, 136231122, 136324972, 140819102, 145688572, 146676242, 147576172, 149592472, 149670662, 149785732, 150315152, 150590402, 152242892, 153080132, 153380682, 155172362, 156341242, 157196852, 157492342, 157735782, 159898282, 161388892, 173113422, 173726932, 176122902, 177873872, 178729082, 179204072, 180531372, 180797502, 181895502, 182844537, 183001745, 183290084, 183329781, 183395509, 183404072, 183411958, 183486211, 183568655, 183573871, 183578400, 183674467, 183725948, 183748543, 183774292, 183987357, 183997255, 184035674, 184047266, 184390414, 184416634, 184583232, 184601826, 184605547, 184680645, 184774911, 184787032, 184802623, 184871970, 185070408, 185075906, 185155050, 185187407, 185390514, 185481392, 185654890, 185658325, 185749238, 185869486, 185899365, 185933407, 185957286, 186006878, 186267882, 186269255, 186295815, 186509410, 186512500, 186557584, 186625433, 186772546, 186954472, 187034701, 187170240, 187475264, 187570821, 187605231, 187828424, 187850939, 188117685, 188122298, 188148286, 188370376, 188478465, 188479791, 188501119, 188528378, 188587190, 188658560, 188707383, 188784427, 188804961, 188848182, 188851751, 188882541, 188888202, 188958262, 189004056, 189004691, 189009092, 189092149, 189093640, 189139510, 189215988, 189254492, 189306309, 189345628, 189381565, 189497041, 189549916, 189605369, 189641087, 189881395, 189906631, 189949631, 189950086, 190010927, 190078171, 190113955, 190151590, 190260331, 190315733, 190394896, 190487955, 190503887, 190542559, 190544318, 190556921, 190593929, 190684565, 190689350, 190885746, 190971849, 190974951, 191040488, 191046688, 191074908, 191097785, 191148575, 191172354, 191175952, 191220135, 191435645, 191450426, 191499505, 191499827, 191518997, 191614866, 191674002, 191690517, 191692210, 191801224, 191866045, 191870498, 191893243, 191982451, 192063327, 192096979, 192146307, 192167816, 192208805, 192235543, 192256920, 192265489, 192480838, 192514972, 192530212, 192533837, 192566800, 192663412, 192692300, 192693717, 192758273, 192861989, 192874633, 192879034, 192879614, 192884554, 192900454, 192917480, 192976422, 193054849, 193317419, 193442367, 193839084, 194202450, 194243700, 194451019, 194483324, 194678072, 194699941, 195202633, 195809179, 196891351, 197089410, 197332042, 197557442, 198084896, 199120454, 199992649, 200065042, 200083521, 200658414, 201276153, 201338949, 202673297, 202908240, 203147732, 203511933, 203955894, 204378563, 204409722, 204970784, 205374496, 205660242, 206018404, 206266437, 206286874, 206394993, 207116821, 207951660, 208119525, 208281766, 208425406, 208524941, 209129911, 209496193, 209833579, 211275878, 212330059, 212487346, 213429964, 218419100, 218606528, 218618402, 220076437, 239372435]"


In [19]:
display(g.degrees)

id,degree
21471196,378
1748515,1635
37146,412
18693412,1091
3979102,1935
807758,807
6892592,1537
1539044,689
5087172,1565
1483633,1284


In [20]:
from __future__ import print_function

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets.samples_generator import make_blobs
from pyspark import SparkContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext

In [21]:
df = group_nodes
# Add degree property to each node
df = (g.degrees).join(df, 'id')
display(df)

id,degree,name,category_id,num_members,city,date_created,rating
21471196,378,The Society for Constitutional Protection,13,393,New York,2016-12-05T22:02:59.000+0000,4.81
1748515,1635,LispNYC,34,1548,New York,2010-12-09T14:23:53.000+0000,4.75
37146,412,The Chicago Comic Book Meetup Group,29,731,Chicago,2003-02-02T17:47:47.000+0000,4.6
18693412,1091,Goethe-Institut,16,1124,New York,2015-06-23T15:46:42.000+0000,4.63
3979102,1935,Angular-SF,34,4093,San Francisco,2012-05-31T21:15:29.000+0000,4.64
807758,807,The Miramar Ski Club,23,1063,New York,2007-11-09T18:25:56.000+0000,4.8
6892592,1537,Singles Travel Together,23,6437,Chicago,2013-01-28T15:31:55.000+0000,0.0
1539044,689,Business Intelligence User Group of Chicago,34,793,Chicago,2009-10-12T19:02:57.000+0000,4.88
5087172,1565,New York Haskell Users Group,34,1830,New York,2012-09-24T21:26:02.000+0000,4.84
1483633,1284,Clojure NYC,34,1457,New York,2009-07-03T18:50:28.000+0000,4.57


In [22]:
# Map city to an integer value
cities_mapping = {
"New York": 1,
"Chicago": 2,
"San Francisco": 3,
"South San Francisco": 4,
"West Chicago": 5,
"Chicago Ridge": 6,
"Chicago Heights": 7,
"West New York": 8,
"North Chicago": 9
}
def f(x): 
    return cities_mapping[x]

df = df.withColumn("city", when(col("city")=='New York', 1).when(col("city")=='Chicago', 2)
              .when(col("city")=='San Francisco', 3).when(col("city")=='South San Francisco', 4)
              .when(col("city")=='West Chicago', 5).when(col("city")=='Chicago Ridge', 6)
              .when(col("city")=='Chicago Heights', 7).when(col("city")=='West New York', 8)
              .when(col("city")=='North Chicago', 9).otherwise(0))

df.show()

In [23]:
max_src = g.edges.groupBy("src").max("weight")
min_src = g.edges.groupBy("src").min("weight")
avg_src = g.edges.groupBy("src").avg("weight")

max_dst = g.edges.groupBy("dst").max("weight")
min_dst = g.edges.groupBy("dst").min("weight")
avg_dst = g.edges.groupBy("dst").avg("weight")

In [24]:
# Change column names to construct the graph
oldColumns = max_src.schema.names
newColumns = ["id", "max"]

max_src = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), max_src)


oldColumns = max_dst.schema.names
newColumns = ["id", "max",]

max_dst = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), max_dst)


oldColumns = min_src.schema.names
newColumns = ["id", "min"]

min_src = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), min_src)


oldColumns = min_dst.schema.names
newColumns = ["id", "min" ]

min_dst = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), min_dst)

oldColumns = avg_src.schema.names
newColumns = ["id", "avg"]

avg_src = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), avg_src)


oldColumns = avg_dst.schema.names
newColumns = ["id", "avg" ]

avg_dst = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), avg_dst)



In [25]:
max_weights = max_src.union(max_dst)
min_weights = min_src.union(min_dst)
avg_weights = avg_src.union(avg_dst)

In [26]:
g.degrees.schema

In [27]:
print(max_weights.count())
print(min_weights.count())
print(avg_weights.count())

In [28]:
max_w=max_weights.groupBy("id").sum("max")

In [29]:
min_w=min_weights.groupBy("id").sum("min")
avg_w=avg_weights.groupBy("id").sum("avg")

In [30]:
display(min_w)

In [31]:
df = (df).join(min_w, 'id')
df = (df).join(max_w, 'id')
df = (df).join(avg_w, 'id')

In [32]:
df.count()

In [34]:
#for node in group_nodes.rdd.collect():
 #   print(node["id"])
 #   id = int(node["id"])
 #   tmp = g.edges.filter((col("src") == id) | (col("dst") == id))
 #   max = tmp.groupBy().max("weight").collect()
 #   min = tmp.groupBy().min("weight").collect()
 #   avg = tmp.groupBy().avg("weight").collect()
 #   df = df.withColumn("avg_weight", when(col("id")==id, avg[0][0]))
 #   df = df.withColumn("max_weight", when(col("id")==id, max[0][0]))      
 #   df = df.withColumn("min_weight", when(col("id")==id, min[0][0]))

In [35]:
df.show()

In [36]:
df_clean = df.na.drop()
df_clean.show()

In [37]:
df_clean.count()

In [38]:
results = g.triangleCount()
triangle_count=results.select("id", "count")

In [39]:
df = (df).join(triangle_count, 'id')

In [40]:
oldColumns = df.schema.names
newColumns = ["id", "degree","name","category_id","num_members","city","date_created","rating","min_weight","max_weight","avg_weight", "triangles" ]

df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df)


In [41]:
display(df)

In [42]:
FEATURES_COL = ['degree','name','category_id', 'num_members', 'city', 'rating',  'min_weight', 'max_weight', 'avg_weight','triangles']

for col in df.columns:
    if col in FEATURES_COL:
        df = df.withColumn(col,df[col].cast('float'))
#df.show()
#df = df.na.drop()
vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")
print(vecAssembler)
df_kmeans = vecAssembler.transform(df).select('id', 'features')
display(df_kmeans)

In [43]:
df.head(9)

In [44]:
df_kmeans.count()

In [45]:
cost = np.zeros(25)
for k in range(2,25):
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    model = kmeans.fit(df_kmeans.sample(False,0.1, seed=42))
    cost[k] = model.computeCost(df_kmeans) # requires Spark 2.0 or later
    print(k)
    print(cost[k])
    print('-------')

In [46]:
from pyspark.sql.types import *
display(spark.createDataFrame(cost[2:25].tolist(), FloatType()).toDF("x"))



x
18069293100.0
10491650000.0
6697594900.0
5735874000.0
3694574080.0
3003258110.0
2682729730.0
2433960700.0
2478032900.0
2169447680.0


In [47]:
# Look like there is very little gain after k=10, so we stick to that choice when processing the full data set

In [48]:
# Try out K Means clustering algorithm
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Trains a k-means model.
kmeans = KMeans().setK(10).setSeed(1)
model = kmeans.fit(df_kmeans)

# Make predictions
predictions = model.transform(df_kmeans)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

In [49]:
predictions = predictions.join(df, 'id')
#display(predictions)

In [50]:
display(predictions.select("id","prediction", "degree","count"))

In [51]:
predictions.select("id","prediction", "degree","count").write.format("com.databricks.spark.csv").save("file:/databricks/driver/data.csv")

In [52]:
# Typically at this point I would need to do something else with the data, which does not require Spark, so let's convert the Spark 
# dataframe to a good old Pandas dataframe for further processing.
import pandas as pd
pddf_pred = predictions.toPandas().set_index('id')
pddf_pred.head()

In [53]:
pddf_pred.count()

In [54]:

threedee = plt.figure(figsize=(12,10)).gca(projection='3d')
threedee.scatter(pddf_pred.category_id, pddf_pred.num_members, pddf_pred.rating, c=pddf_pred.prediction)
threedee.set_xlabel('x')
threedee.set_ylabel('y')
threedee.set_zlabel('z')
display(plt.show())

In [55]:
from keras import optimizers
from gensim.models import KeyedVectors
from sklearn.manifold import TSNE
from nltk.corpus import stopwords
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.neighbors import NearestNeighbors as nn
from itertools import islice
from matplotlib import pylab
import plotly.offline as plt
import plotly.graph_objs as go

tnse = TSNE(random_state=RS).fit_transform(df["features"])
np.set_printoptions(suppress=True)
Y = tsne.fit_transform(arr)
x_coords = Y[:, 0]
y_coords = Y[:, 1]
  
plot = [go.Scatter(x = x_coords,
                    y = y_coords,
                    mode = 'markers',
                    textposition='bottom center',
                    marker=dict(size=5,opacity=0.8))]

layout = go.Layout(title='Clustering')
fig = go.Figure(data=plot, layout=layout)
plt.iplot(fig)