<a href="https://colab.research.google.com/github/Hyenni/BDAI-Training/blob/master/sparkLAB3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Use Pair RDDs to Join Two Datasets

In [0]:
# Set the log level to WARN to reduce distracting INFO messages
sc.setLogLevel("WARN")

# Step 1 - Create an RDD based on a subset of weblogs (those ending in digit 2)
logs=sc.textFile("/loudacre/weblogs/*2.log")
# map each request (line) to a pair (userid, 1), then sum the values
userreqs = logs \
   .map(lambda line: line.split()) \
   .map(lambda words: (words[2],1))  \
   .reduceByKey(lambda count1,count2: count1 + count2)
   
# Step 2 - Show the count frequencies
freqcount = userreqs.map(lambda (userid,freq): (freq,userid)).countByKey()



### countByKey()를 하면 반환하는 객체가 더이상 collection type이 아니므로 take()가 없음!!!

In [0]:
freqcount.take(2)

AttributeError: 'collections.defaultdict' object has no attribute 'take'

In [0]:
print freqcount

defaultdict(<type 'int'>, {128: 9, 2: 7239, 3: 36, 4: 4155, 5: 26, 6: 2162, 7: 14, 8: 1409, 9: 14, 10: 878, 11: 12, 12: 549, 13: 7, 14: 308, 15: 8, 16: 155, 17: 4, 146: 8, 19: 2, 20: 41, 21: 1, 22: 17, 150: 11, 152: 11, 132: 12, 154: 5, 27: 1, 156: 5, 158: 6, 160: 8, 162: 5, 164: 3, 134: 9, 166: 3, 168: 4, 170: 3, 172: 6, 174: 2, 24: 6, 176: 2, 136: 11, 178: 1, 188: 1, 138: 6, 190: 1, 130: 10, 140: 14, 142: 8, 86: 1, 144: 7, 100: 1, 104: 1, 106: 1, 18: 76, 110: 4, 112: 1, 116: 1, 118: 4, 120: 5, 148: 2, 122: 4, 124: 7, 126: 5})


In [0]:

# Step 3 - Group IPs by user ID
userips = logs \
   .map(lambda line: line.split()) \
   .map(lambda words: (words[2],words[0])) \
   .groupByKey() \


In [0]:
userips.take(2)


[(u'3922', <pyspark.resultiterable.ResultIterable at 0x7efd40046a10>),
 (u'104959', <pyspark.resultiterable.ResultIterable at 0x7efd401d5ad0>)]

In [0]:
mmm = userips.mapValues(list)
mmmm = userips.map(lambda (k,v): (k, list(v)))
mmm.take(2)

[(u'3922',
  [u'195.220.211.104',
   u'195.220.211.104',
   u'138.217.174.182',
   u'138.217.174.182',
   u'138.217.174.182',
   u'138.217.174.182']),
 (u'104959', [u'183.123.205.115', u'183.123.205.115'])]

In [0]:
# print out the first 10 user ids, and their IP list
for (userid,ips) in userips.take(10):
   print userid, ":"
   for ip in ips: print "\t",ip

3922 :
	195.220.211.104
	195.220.211.104
	138.217.174.182
	138.217.174.182
	138.217.174.182
	138.217.174.182
104959 :
	183.123.205.115
	183.123.205.115
90396 :
	191.120.254.24
	191.120.254.24
62733 :
	93.120.232.94
	93.120.232.94
	92.75.142.64
	92.75.142.64
30390 :
	235.242.157.100
	235.242.157.100
84780 :
	148.5.198.57
	148.5.198.57
	148.5.198.57
	148.5.198.57
	240.70.72.108
	240.70.72.108
54217 :
	236.59.12.138
	236.59.12.138
	121.125.136.169
	121.125.136.169
	122.72.182.201
	122.72.182.201
	85.209.207.112
	85.209.207.112
	212.95.104.25
	212.95.104.25
	212.95.104.25
	212.95.104.25
	5.82.216.41
	5.82.216.41
	5.82.216.41
	5.82.216.41
	218.169.205.19
	218.169.205.19
	218.169.205.19
	218.169.205.19
60986 :
	81.217.213.96
	81.217.213.96
	44.89.72.134
	44.89.72.134
	249.13.225.46
	249.13.225.46
	249.13.225.46
	249.13.225.46
	88.110.41.147
	88.110.41.147
44490 :
	83.100.72.186
	83.100.72.186
54604 :
	159.112.48.88
	159.112.48.88
	110.199.28.152
	110.199.28.152
	197.162.156.152
	197.162.156.

In [0]:
# Step 4a - Map account data to (userid,[values....])
accountsdata = "/loudacre/accounts"
accounts = sc.textFile(accountsdata) \
   .map(lambda s: s.split(',')) \
   .map(lambda account: (account[0],account))

# Step 4b - Join account data with userreqs then merge hit count into valuelist   
accounthits = accounts.join(userreqs)


In [0]:
accounthits.take(2)

[(u'89371',
  ([u'89371',
    u'2013-09-08 02:21:15.0',
    u'2014-01-19 12:17:06.0',
    u'Ricky',
    u'Pope',
    u'4535 Highland Drive',
    u'Portland',
    u'OR',
    u'97212',
    u'5033136196',
    u'2014-03-18 13:32:36.0',
    u'2014-03-18 13:32:36.0'],
   4)),
 (u'99996',
  ([u'99996',
    u'2013-03-14 19:19:45.0',
    u'2014-02-07 16:32:29.0',
    u'Garrett',
    u'Allen',
    u'495 Wilson Street',
    u'Prescott',
    u'AZ',
    u'86360',
    u'9280545713',
    u'2014-03-18 13:32:56.0',
    u'2014-03-18 13:32:56.0'],
   2))]

In [0]:
# Step 4c - Display userid, hit count, first name, last name for the first 5 elements
for (userid,(values,count)) in accounthits.take(5) : 
    print  userid, count, values[3],values[4]


89371 4 Ricky Pope
99996 2 Garrett Allen
69171 6 Richard Tarver
90311 2 David Rosenberg
36848 6 Aaron Hutson


# Bonus

In [0]:
# Set the log level to WARN to reduce distracting INFO messages
sc.setLogLevel("WARN")

accountsdata = "/loudacre/accounts"
   
# Bonus 1 - key accounts by postal/zip code
accountsByPCode = sc.textFile(accountsdata) \
   .map(lambda s: s.split(','))\
   .keyBy(lambda account: account[8])


In [0]:
accountsByPCode.take(2)

[(u'94660',
  [u'1',
   u'2008-10-23 16:05:05.0',
   u'\\N',
   u'Donald',
   u'Becton',
   u'2275 Washburn Street',
   u'Oakland',
   u'CA',
   u'94660',
   u'5100032418',
   u'2014-03-18 13:29:47.0',
   u'2014-03-18 13:29:47.0']),
 (u'94171',
  [u'2',
   u'2008-11-12 03:00:01.0',
   u'\\N',
   u'Donna',
   u'Jones',
   u'3885 Elliott Street',
   u'San Francisco',
   u'CA',
   u'94171',
   u'4150835799',
   u'2014-03-18 13:29:47.0',
   u'2014-03-18 13:29:47.0'])]

In [0]:
# Bonus 2 - map account data to lastname,firstname  
namesByPCode = accountsByPCode\
   .mapValues(lambda account: account[4] + ',' + account[3]) \
   .groupByKey()

# Bonus 3 - print the first 5 zip codes and list the names 
for (pcode,names) in namesByPCode.sortByKey().take(5):
   print "---" ,pcode
   for name in names: print name

--- 85000
Allen,Harvey
Prinz,Daniel
Pascale,Robert
Brookes,Donna
Mackenzie,James
Chamberlain,Robert
Cunningham,Richard
Sewell,Bailey
Marin,Daniel
--- 85001
Mendelsohn,Frances
Watson,Mary
Brookover,Donald
Hathaway,Brandon
Leonard,Crystal
Moran,Carrie
Kirksey,Marie
Lance,Issac
Barnes,Vesta
Fiore,Eva
Tucker,Keith
Medford,Danielle
Spell,Norman
Soto,Shelley
Frantz,Kathy
Wilkins,Timothy
Snyder,Joseph
Flores,Delbert
Eakes,Gail
Daniels,Bert
Carpenter,Vincent
--- 85002
Whitney,Ruby
Perry,David
James,Marianne
Holiman,Nancy
Roman,Allen
Manus,Donna
Reed,Nancy
Baird,Estella
Gilbert,James
McKay,David
Clark,Laura
Horn,John
Payne,Jessica
Stewart,Bryant
Jones,Jose
Robinson,Wesley
--- 85003
Martin,Mark
Ross,Vivian
Tabor,Harry
Strickland,Kyle
Dvorak,Kevin
Wisniewski,Virginia
Gibson,Catherine
Thies,Lindsey
--- 85004
Kitts,Mary
Viola,Kevin
Meadows,Tonya
Royalty,Sherry
Collins,Greg
Shirley,Joseph
White,Sandra
Stern,Timothy
Johnson,Dominic
Dewitt,Mary
Carpenter,Matthew
Ball,Annie
Pate,Kathleen
Lish,Carrie
