## Based on a PluralSight course
https://app.pluralsight.com/library/courses/b8ed343a-d95a-4b7a-a376-e726b867e961/table-of-contents

### Transforming and Cleaning Unstructured Data

In [3]:
#Load the NYC crime dataset
path = "file:///Users/apoorvadshenoy/NYC_Crime.csv"
data = sc.textFile(path)

In [4]:
#View the dataset
data.take(10) #currently each record is a string

['CMPLNT_NUM,ADDR_PCT_CD,BORO_NM,CMPLNT_FR_DT,CMPLNT_FR_TM,CMPLNT_TO_DT,CMPLNT_TO_TM,CRM_ATPT_CPTD_CD,HADEVELOPT,JURIS_DESC,KY_CD,LAW_CAT_CD,LOC_OF_OCCUR_DESC,OFNS_DESC,PARKS_NM,PD_CD,PD_DESC,PREM_TYP_DESC,RPT_DT,X_COORD_CD,Y_COORD_CD,Latitude,Longitude,Lat_Lon',
 '939675075,18,MANHATTAN,01/01/2016,00:01:00,05/31/2019,23:59:00,COMPLETED,,N.Y. POLICE DEPT,104,FELONY,INSIDE,RAPE,,157,RAPE 1,COMMERCIAL BUILDING,03/12/2020,988353,217918,40.764818269000045,-73.985189772999945,"(40.764818269000045, -73.98518977299995)"',
 '562697319,18,MANHATTAN,02/09/2016,00:01:00,12/15/2019,23:59:00,COMPLETED,,N.Y. POLICE DEPT,341,MISDEMEANOR,,PETIT LARCENY,,301,"LARCENY,PETIT BY ACQUIRING LOS",STREET,03/11/2020,989419,215672,40.758653017000029,-73.981343278999987,"(40.75865301700003, -73.98134327899999)"',
 '885793834,13,MANHATTAN,03/01/2016,00:00:00,07/01/2016,00:00:00,COMPLETED,,N.Y. POLICE DEPT,116,FELONY,INSIDE,SEX CRIMES,,177,SEXUAL ABUSE,RESIDENCE - APT. HOUSE,03/18/2020,988874,207673,40.73669807000

In [109]:
data.collect()

['CMPLNT_NUM,ADDR_PCT_CD,BORO_NM,CMPLNT_FR_DT,CMPLNT_FR_TM,CMPLNT_TO_DT,CMPLNT_TO_TM,CRM_ATPT_CPTD_CD,HADEVELOPT,JURIS_DESC,KY_CD,LAW_CAT_CD,LOC_OF_OCCUR_DESC,OFNS_DESC,PARKS_NM,PD_CD,PD_DESC,PREM_TYP_DESC,RPT_DT,X_COORD_CD,Y_COORD_CD,Latitude,Longitude,Lat_Lon',
 '939675075,18,MANHATTAN,01/01/2016,00:01:00,05/31/2019,23:59:00,COMPLETED,,N.Y. POLICE DEPT,104,FELONY,INSIDE,RAPE,,157,RAPE 1,COMMERCIAL BUILDING,03/12/2020,988353,217918,40.764818269000045,-73.985189772999945,"(40.764818269000045, -73.98518977299995)"',
 '562697319,18,MANHATTAN,02/09/2016,00:01:00,12/15/2019,23:59:00,COMPLETED,,N.Y. POLICE DEPT,341,MISDEMEANOR,,PETIT LARCENY,,301,"LARCENY,PETIT BY ACQUIRING LOS",STREET,03/11/2020,989419,215672,40.758653017000029,-73.981343278999987,"(40.75865301700003, -73.98134327899999)"',
 '885793834,13,MANHATTAN,03/01/2016,00:00:00,07/01/2016,00:00:00,COMPLETED,,N.Y. POLICE DEPT,116,FELONY,INSIDE,SEX CRIMES,,177,SEXUAL ABUSE,RESIDENCE - APT. HOUSE,03/18/2020,988874,207673,40.73669807000

High level steps before we can analyze this data

1. Parse and get a structured dataset - functional way (parallel process) vs. imperative way (1 record at a time)
2. Clean and fix missing values etc.
3. Come up with insightful questions
4. Summarize and report metrics for the questions

Functional programming is mainly Filter (drop or keep records), Map (transform in some way), and Reduce (combine).

Of these, Filter and Map are Transformations (lazily evaluated) and Reduce is an Action.

In [5]:
#Extract the first row
header = data.first()

In [9]:
#Get other rows
dataWoHeader = data.filter(lambda x: x!=header)

In [10]:
dataWoHeader.first()

'939675075,18,MANHATTAN,01/01/2016,00:01:00,05/31/2019,23:59:00,COMPLETED,,N.Y. POLICE DEPT,104,FELONY,INSIDE,RAPE,,157,RAPE 1,COMMERCIAL BUILDING,03/12/2020,988353,217918,40.764818269000045,-73.985189772999945,"(40.764818269000045, -73.98518977299995)"'

In [11]:
header

'CMPLNT_NUM,ADDR_PCT_CD,BORO_NM,CMPLNT_FR_DT,CMPLNT_FR_TM,CMPLNT_TO_DT,CMPLNT_TO_TM,CRM_ATPT_CPTD_CD,HADEVELOPT,JURIS_DESC,KY_CD,LAW_CAT_CD,LOC_OF_OCCUR_DESC,OFNS_DESC,PARKS_NM,PD_CD,PD_DESC,PREM_TYP_DESC,RPT_DT,X_COORD_CD,Y_COORD_CD,Latitude,Longitude,Lat_Lon'

In [21]:
#Parse the row to extract fields - takes the string record and converts into list of strings
dataWoHeader.map(lambda x:x.split(",")).take(3)

[['939675075',
  '18',
  'MANHATTAN',
  '01/01/2016',
  '00:01:00',
  '05/31/2019',
  '23:59:00',
  'COMPLETED',
  '',
  'N.Y. POLICE DEPT',
  '104',
  'FELONY',
  'INSIDE',
  'RAPE',
  '',
  '157',
  'RAPE 1',
  'COMMERCIAL BUILDING',
  '03/12/2020',
  '988353',
  '217918',
  '40.764818269000045',
  '-73.985189772999945',
  '"(40.764818269000045',
  ' -73.98518977299995)"'],
 ['562697319',
  '18',
  'MANHATTAN',
  '02/09/2016',
  '00:01:00',
  '12/15/2019',
  '23:59:00',
  'COMPLETED',
  '',
  'N.Y. POLICE DEPT',
  '341',
  'MISDEMEANOR',
  '',
  'PETIT LARCENY',
  '',
  '301',
  '"LARCENY',
  'PETIT BY ACQUIRING LOS"',
  'STREET',
  '03/11/2020',
  '989419',
  '215672',
  '40.758653017000029',
  '-73.981343278999987',
  '"(40.75865301700003',
  ' -73.98134327899999)"'],
 ['885793834',
  '13',
  'MANHATTAN',
  '03/01/2016',
  '00:00:00',
  '07/01/2016',
  '00:00:00',
  'COMPLETED',
  '',
  'N.Y. POLICE DEPT',
  '116',
  'FELONY',
  'INSIDE',
  'SEX CRIMES',
  '',
  '177',
  'SEXUAL AB

In [91]:
import csv
from io import StringIO
from collections import namedtuple

fields = header.split(",")
print(fields)

crime = namedtuple('Crime',fields)

['CMPLNT_NUM', 'ADDR_PCT_CD', 'BORO_NM', 'CMPLNT_FR_DT', 'CMPLNT_FR_TM', 'CMPLNT_TO_DT', 'CMPLNT_TO_TM', 'CRM_ATPT_CPTD_CD', 'HADEVELOPT', 'JURIS_DESC', 'KY_CD', 'LAW_CAT_CD', 'LOC_OF_OCCUR_DESC', 'OFNS_DESC', 'PARKS_NM', 'PD_CD', 'PD_DESC', 'PREM_TYP_DESC', 'RPT_DT', 'X_COORD_CD', 'Y_COORD_CD', 'Latitude', 'Longitude', 'Lat_Lon']


In [92]:
type(crime)

type

In [93]:
def parse(row):
    reader = csv.reader(StringIO(row))
    row = next(reader)
    return crime(*row)

In [94]:
crimes = dataWoHeader.map(parse)

In [95]:
crimes.first()

Crime(CMPLNT_NUM='939675075', ADDR_PCT_CD='18', BORO_NM='MANHATTAN', CMPLNT_FR_DT='01/01/2016', CMPLNT_FR_TM='00:01:00', CMPLNT_TO_DT='05/31/2019', CMPLNT_TO_TM='23:59:00', CRM_ATPT_CPTD_CD='COMPLETED', HADEVELOPT='', JURIS_DESC='N.Y. POLICE DEPT', KY_CD='104', LAW_CAT_CD='FELONY', LOC_OF_OCCUR_DESC='INSIDE', OFNS_DESC='RAPE', PARKS_NM='', PD_CD='157', PD_DESC='RAPE 1', PREM_TYP_DESC='COMMERCIAL BUILDING', RPT_DT='03/12/2020', X_COORD_CD='988353', Y_COORD_CD='217918', Latitude='40.764818269000045', Longitude='-73.985189772999945', Lat_Lon='(40.764818269000045, -73.98518977299995)')

In [97]:
crimes.take(2) #each record is now a 'Crime' class object 

[Crime(CMPLNT_NUM='939675075', ADDR_PCT_CD='18', BORO_NM='MANHATTAN', CMPLNT_FR_DT='01/01/2016', CMPLNT_FR_TM='00:01:00', CMPLNT_TO_DT='05/31/2019', CMPLNT_TO_TM='23:59:00', CRM_ATPT_CPTD_CD='COMPLETED', HADEVELOPT='', JURIS_DESC='N.Y. POLICE DEPT', KY_CD='104', LAW_CAT_CD='FELONY', LOC_OF_OCCUR_DESC='INSIDE', OFNS_DESC='RAPE', PARKS_NM='', PD_CD='157', PD_DESC='RAPE 1', PREM_TYP_DESC='COMMERCIAL BUILDING', RPT_DT='03/12/2020', X_COORD_CD='988353', Y_COORD_CD='217918', Latitude='40.764818269000045', Longitude='-73.985189772999945', Lat_Lon='(40.764818269000045, -73.98518977299995)'),
 Crime(CMPLNT_NUM='562697319', ADDR_PCT_CD='18', BORO_NM='MANHATTAN', CMPLNT_FR_DT='02/09/2016', CMPLNT_FR_TM='00:01:00', CMPLNT_TO_DT='12/15/2019', CMPLNT_TO_TM='23:59:00', CRM_ATPT_CPTD_CD='COMPLETED', HADEVELOPT='', JURIS_DESC='N.Y. POLICE DEPT', KY_CD='341', LAW_CAT_CD='MISDEMEANOR', LOC_OF_OCCUR_DESC='', OFNS_DESC='PETIT LARCENY', PARKS_NM='', PD_CD='301', PD_DESC='LARCENY,PETIT BY ACQUIRING LOS', PRE

In [103]:
crimes.first().BORO_NM

'MANHATTAN'

In [108]:
# Identifying and filtering missing values - damnit this dataset is stupid; only 18 records
crimes.map(lambda x:x.CMPLNT_NUM).countByValue()

#You can use the filter function to remove missing values etc. after exploring the dataset

defaultdict(int,
            {'939675075': 1,
             '562697319': 1,
             '885793834': 1,
             '517974194': 1,
             '487346031': 1,
             '367412512': 1,
             '163782558': 1,
             '784475162': 1,
             '577768082': 1,
             '881507062': 1,
             '742167698': 1,
             '617860640': 1,
             '196137002': 1,
             '742841631': 1,
             '375957517': 1,
             '521562187': 1,
             '853412723': 1,
             '986303070': 1})

In [None]:
#You can also chain the operations filter, map etc.
#For example, you can FILTER records for certain type of crime, then use MAP to pick the year column, and use 
#countByValue to see how that crime is doing over the years

In [113]:
#GMPLOT is super cool python function to plot on actual maps!

### Summarizing Data along Dimensions

In [116]:
# Pair RDDs are useful (key-value) pairs to summarize metrics with dimensions
# To create a pair RDD in Python, make sure each record is a tuple
# Summarize by Keys and Merge by Keys are two types of operations we can do

In [117]:
trafficpath = "file:///Users/apoorvadshenoy/Dodgers.data"
eventspath = "file:///Users/apoorvadshenoy/Dodgers.events"

traffic = sc.textFile(trafficpath)
games = sc.textFile(eventspath)

In [119]:
traffic.take(5)

['4/10/2005 0:00,-1',
 '4/10/2005 0:05,-1',
 '4/10/2005 0:10,-1',
 '4/10/2005 0:15,-1',
 '4/10/2005 0:20,-1']

In [120]:
games.take(5)

['04/12/05,13:10:00,16:23:00,55892,San Francisco,W 9-8�',
 '04/13/05,19:10:00,21:48:00,46514,San Francisco,W 4-1�',
 '04/15/05,19:40:00,21:48:00,51816,San Diego,W 4-0�',
 '04/16/05,19:10:00,21:52:00,54704,San Diego,W 8-3�',
 '04/17/05,13:10:00,15:31:00,53402,San Diego,W 6-0�']

In [125]:
# Creating a pair RDD for the traffic - need a tuple
from datetime import datetime

def parseTraffic(row):
    formatdate = "%m/%d/%Y %H:%M"
    row = row.split(",")
    row[0] = datetime.strptime(row[0],formatdate)
    row[1] = int(row[1])
    return (row[0],row[1])

In [126]:
trafficParsed = traffic.map(parseTraffic)

In [127]:
trafficParsed.take(5)

[(datetime.datetime(2005, 4, 10, 0, 0), -1),
 (datetime.datetime(2005, 4, 10, 0, 5), -1),
 (datetime.datetime(2005, 4, 10, 0, 10), -1),
 (datetime.datetime(2005, 4, 10, 0, 15), -1),
 (datetime.datetime(2005, 4, 10, 0, 20), -1)]

In [None]:
# Summarize pair RDD - using 'reduceByKey' - this is similar to reduce but has some differences (it is a transformation; not action)
# for each key in each node, a reduceByKey(lambda x,y:x+y) is applied until each distinct key has just 1 record. The lambda can be anything

In [128]:
#Commuting daily trends
dailyTrend = trafficParsed.map(lambda x: (x[0].date(),x[1])).reduceByKey(lambda x,y:x+y)

In [129]:
dailyTrend.take(10)

[(datetime.date(2005, 4, 10), -288),
 (datetime.date(2005, 4, 11), 5062),
 (datetime.date(2005, 4, 14), 6423),
 (datetime.date(2005, 4, 15), 6459),
 (datetime.date(2005, 4, 16), 6002),
 (datetime.date(2005, 4, 17), 5322),
 (datetime.date(2005, 4, 18), 5600),
 (datetime.date(2005, 4, 19), 6049),
 (datetime.date(2005, 4, 21), 5977),
 (datetime.date(2005, 4, 22), 6038)]

In [131]:
dailyTrend.sortBy(lambda x:-x[1]).take(10) #plot top 10 days with highest traffic

[(datetime.date(2005, 7, 28), 7661),
 (datetime.date(2005, 7, 29), 7499),
 (datetime.date(2005, 8, 12), 7287),
 (datetime.date(2005, 7, 27), 7238),
 (datetime.date(2005, 9, 23), 7175),
 (datetime.date(2005, 7, 26), 7163),
 (datetime.date(2005, 5, 20), 7119),
 (datetime.date(2005, 8, 11), 7110),
 (datetime.date(2005, 9, 8), 7107),
 (datetime.date(2005, 9, 7), 7082)]

In [132]:
# To test if game days have highest traffic, we need to merge wiht the other dataset
def parseGames(row):
    formatdate = "%m/%d/%y"
    row = row.split(",")
    row[0] = datetime.strptime(row[0],formatdate).date()
    return (row[0],row[4])

In [133]:
gamesParsed = games.map(parseGames)

In [134]:
gamesParsed.take(5)

[(datetime.date(2005, 4, 12), 'San Francisco'),
 (datetime.date(2005, 4, 13), 'San Francisco'),
 (datetime.date(2005, 4, 15), 'San Diego'),
 (datetime.date(2005, 4, 16), 'San Diego'),
 (datetime.date(2005, 4, 17), 'San Diego')]

In [135]:
dailyTrendCombined = dailyTrend.leftOuterJoin(gamesParsed)
dailyTrendCombined.take(10) #key is still the same but the value becomes both traffic + opponent details

[(datetime.date(2005, 4, 11), (5062, None)),
 (datetime.date(2005, 4, 15), (6459, 'San Diego')),
 (datetime.date(2005, 4, 17), (5322, 'San Diego')),
 (datetime.date(2005, 4, 19), (6049, None)),
 (datetime.date(2005, 4, 21), (5977, None)),
 (datetime.date(2005, 4, 22), (6038, None)),
 (datetime.date(2005, 4, 23), (5366, None)),
 (datetime.date(2005, 4, 24), (4319, None)),
 (datetime.date(2005, 4, 25), (6280, 'Arizona')),
 (datetime.date(2005, 4, 30), (6090, 'Colorado'))]

In [137]:
def checkgame(row):
    if row[1][1] == None:
        return (row[0],row[1][1],"regular day",row[1][0])
    else:
        return (row[0],row[1][1],"game day",row[1][0])

In [142]:
dailyTrendChecked = dailyTrendCombined.map(checkgame)

In [143]:
dailyTrendChecked.sortBy(lambda x:-x[3]).take(10) #9/10 highest traffic days are game days!

[(datetime.date(2005, 7, 28), 'Cincinnati', 'game day', 7661),
 (datetime.date(2005, 7, 29), 'St. Louis', 'game day', 7499),
 (datetime.date(2005, 8, 12), 'NY Mets', 'game day', 7287),
 (datetime.date(2005, 7, 27), 'Cincinnati', 'game day', 7238),
 (datetime.date(2005, 9, 23), 'Pittsburgh', 'game day', 7175),
 (datetime.date(2005, 7, 26), 'Cincinnati', 'game day', 7163),
 (datetime.date(2005, 5, 20), 'LA Angels', 'game day', 7119),
 (datetime.date(2005, 8, 11), 'Philadelphia', 'game day', 7110),
 (datetime.date(2005, 9, 8), None, 'regular day', 7107),
 (datetime.date(2005, 9, 7), 'San Francisco', 'game day', 7082)]

In [146]:
# Now we can check the average traffic for game and regular days
# You can do this using reduceByKey but doing it directly will result in incorrect values
# combineByKey is the easiest way to do this 
# https://stackoverflow.com/questions/33937625/who-can-give-a-clear-explanation-for-combinebykey-in-spark

dailyTrendChecked.map(lambda x:(x[2],x[3]))\
              .combineByKey(lambda value:(value,1), \ #createCombiner() function
              lambda acc, value: (acc[0]+value,acc[1]+1), \ #mergeValues() function 
              lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1]+acc2[1]))\ #mergeCombiners() function
              .mapValues(lambda x:x[0]/x[1])\
              .collect()

[('regular day', 5411.329787234043), ('game day', 5948.604938271605)]

### Modeling Relationships in the Marvel Universe

In [147]:
bookpath = "file:///Users/apoorvadshenoy/Books.txt"
edgespath = "file:///Users/apoorvadshenoy/Edges.txt"
characterpath = "file:///Users/apoorvadshenoy/Characters.txt"

books = sc.textFile(bookpath)
edges = sc.textFile(edgespath)
characters = sc.textFile(characterpath)

In [150]:
books.take(5) #vertex name and a book name

['Vertex 6487: AA2 35',
 'Vertex 6488: M/PRM 35',
 'Vertex 6489: M/PRM 36',
 'Vertex 6490: M/PRM 37',
 'Vertex 6491: WI? 9']

In [151]:
characters.take(5) #vertex name and a character name

['Vertex 1: 24-HOUR MAN/EMMANUEL',
 'Vertex 2: 3-D MAN/CHARLES CHANDLER & HAROLD CHANDLER',
 'Vertex 3: 4-D MAN/MERCURIO',
 'Vertex 4: 8-BALL/',
 'Vertex 5: A']

In [160]:
edges.take(10)  #contains lot of unnecessary info; filtering them here

['*Vertices 19428 6486',
 '1 "24-HOUR MAN/EMMANUEL"',
 '2 "3-D MAN/CHARLES CHAN"',
 '3 "4-D MAN/MERCURIO"',
 '4 "8-BALL/"',
 '5 "A"',
 '6 "A\'YIN"',
 '7 "ABBOTT, JACK"',
 '8 "ABCISSA"',
 '9 "ABEL"']

In [157]:
def edgefilter(row):
    if '*' in row or '"' in row:
        return False
    else:
        return True
edgesFiltered = edges.filter(edgefilter)

In [158]:
edgesFiltered.take(10) #looks good; connect Characters to Books

['1 6487',
 '2 6488 6489 6490 6491 6492 6493 6494 6495 6496',
 '3 6497 6498 6499 6500 6501 6502 6503 6504 6505',
 '4 6506 6507 6508',
 '5 6509 6510 6511',
 '6 6512 6513 6514 6515',
 '7 6516',
 '8 6517 6518',
 '9 6519 6520',
 '10 6521 6522 6523 6524 6525 6526 6527 6528 6529 6530 6531 6532 6533 6534 6535']

In [159]:
# Find the most influential characters

characterBookMap = edgesFiltered.map(lambda x: x.split()).map(lambda x: (x[0],x[1:])) #first is the Character; second is the set of Books
characterBookMap.take(10)

[('1', ['6487']),
 ('2',
  ['6488', '6489', '6490', '6491', '6492', '6493', '6494', '6495', '6496']),
 ('3',
  ['6497', '6498', '6499', '6500', '6501', '6502', '6503', '6504', '6505']),
 ('4', ['6506', '6507', '6508']),
 ('5', ['6509', '6510', '6511']),
 ('6', ['6512', '6513', '6514', '6515']),
 ('7', ['6516']),
 ('8', ['6517', '6518']),
 ('9', ['6519', '6520']),
 ('10',
  ['6521',
   '6522',
   '6523',
   '6524',
   '6525',
   '6526',
   '6527',
   '6528',
   '6529',
   '6530',
   '6531',
   '6532',
   '6533',
   '6534',
   '6535'])]

In [161]:
#Creating a lookup table with character iD and name
def charlookup(row):
    row = row.split(":")
    return (row[0][7:],row[1].strip()) #check the characters table for format!

charlookuptable = characters.map(charlookup).collectAsMap() #creates a dictionary of lookup

In [163]:
charlookuptable #has ID + name. Nice!

{'1': '24-HOUR MAN/EMMANUEL',
 '2': '3-D MAN/CHARLES CHANDLER & HAROLD CHANDLER',
 '3': '4-D MAN/MERCURIO',
 '4': '8-BALL/',
 '5': 'A',
 '6': "A'YIN",
 '7': 'ABBOTT, JACK',
 '8': 'ABCISSA',
 '9': 'ABEL',
 '10': 'ABOMINATION/EMIL BLONSKY',
 '11': 'ABOMINATION | MUTANT X-VERSE',
 '12': 'ABOMINATRIX',
 '13': 'ABRAXAS',
 '14': 'ADAM 3,031',
 '15': 'ABSALOM',
 '16': 'ABSORBING MAN/CARL CRUSHER CREEL',
 '17': 'ABSORBING MAN | MUTANT X-VERSE',
 '18': 'ACBA',
 '19': 'ACHEBE, REVEREND DOCTOR MICHAEL IBN AL-HAJJ',
 '20': 'ACHILLES',
 '21': 'ACHILLES II/HELMUT',
 '22': 'ACROBAT/CARL ZANTE',
 '23': 'ADAM X',
 '24': 'ADAMS, CINDY',
 '25': 'ADAMS, CONGRESSMAN HENRY',
 '26': 'ADAMS, GEORGE',
 '27': 'ADAMS, MARTHA',
 '28': 'ADAMS, NICOLE NIKKI',
 '29': 'ADAMSON, JASON',
 '30': 'ADAMSON, REBECCA',
 '31': 'ADMIRAL PROTOCOL/',
 '32': 'ADORA',
 '33': 'ADORA CLONE',
 '34': 'ADRIA',
 '35': 'ADVA',
 '36': 'ADVENT/KYLE GROBE',
 '37': 'ADVERSARY',
 '38': 'AEGIS/TREY ROLLINS',
 '39': 'AENTAROS',
 '40': 'AFTERLI

In [169]:
# Now to check the most influential characters, we see how many books are connect to it
#mapValues applies a map function but only to the values part
#We use a reduceByKey because a record contains only 15 books, so there could be multiple records per character

inf = characterBookMap.mapValues(lambda x:len(x)).map(lambda x: (charlookuptable[x[0]],x[1])).reduceByKey(lambda x,y:x+y).sortBy(lambda x: -x[1])

In [170]:
inf.take(10) #spider man FTW

[('SPIDER-MAN/PETER PARKER', 1625),
 ('CAPTAIN AMERICA', 1367),
 ('IRON MAN/TONY STARK', 1168),
 ('THING/BENJAMIN J. GRIMM', 990),
 ('THOR/DR. DONALD BLAKE/SIGURD JARLSON II/JAKE OLSON/LOREN OLSON', 965),
 ('HUMAN TORCH/JOHNNY STORM', 908),
 ('MR. FANTASTIC/REED RICHARDS', 875),
 ('HULK/DR. ROBERT BRUCE BANNER', 841),
 ('WOLVERINE/LOGAN', 820),
 ('INVISIBLE WOMAN/SUE STORM RICHARDS', 782)]

In [171]:
#not coding the co-occurence section but flatMapValues() is super useful function, itertools is really cool as well.