# 1 - Twitter Connectivity Score 1 (Using RDD and then DataFrame)

In [None]:
# Big Data Analysis 

# Number of followings and followers for each user is calculated as N and M
# The number of times the user appeared in the first column is computed as the number of users the user is following
# (k,v) pair is returned having the key as the user, and the value as 1
# Reduce function is used to sum the values to get number of followings (N) for each user
# Next find the followers of each user, by same step on second column 
# Two tables are joined using join action
# Values of N and M are multiplied using mapValues transformation

In [1]:
from pyspark.sql import *
from pyspark import SparkContext
sc =SparkContext()
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

spark = sqlContext.sparkSession
spark

## Part A - Using RDD 

In [2]:
# importing dataset as textfile
rdd=sc.textFile("usergraph.txt") #combined text file imported
rdd.collect()

['user4145    user6371',
 'user9495    user1006',
 'user2250    user1875',
 'user3571    user2175',
 'user1008    user8041',
 'user3977    user1246',
 'user7284    user5102',
 'user6594    user3437',
 'user4719    user798',
 'user4455    user3405',
 'user4004    user1508',
 'user8226    user3913',
 'user3237    user5662',
 'user1651    user5154',
 'user878    user8926',
 'user4964    user3104',
 'user1508    user5809',
 'user7086    user8134',
 'user5566    user3929',
 'user7014    user729',
 'user3657    user8386',
 'user1442    user9989',
 'user8033    user1415',
 'user2478    user3663',
 'user5481    user1692',
 'user3334    user8490',
 'user1453    user2113',
 'user2691    user7067',
 'user4006    user2781',
 'user8550    user5183',
 'user6528    user7910',
 'user9137    user5631',
 'user353    user6165',
 'user9880    user2747',
 'user4559    user4126',
 'user6618    user8496',
 'user560    user9365',
 'user5593    user2816',
 'user4278    user5501',
 'user5094    user5078',
 'use

In [3]:
# create rdd by splitting lines of dataset using first column only
# use map to create (k,v) pairs 
users1=rdd.map(lambda x: (x.split(" ")[0],1))
# users1.getNumPartitions()
users1_p=users1.partitionBy(3).persist()
# reducebykey is performing the sum on values for each key, to find how many times the user occured, which is the values of N for that user (followings)
Followings=users1_p.reduceByKey(lambda x,y:x+y)
Followings.collect()

[('user9495', 510),
 ('user3977', 864),
 ('user4455', 912),
 ('user3237', 525),
 ('user4964', 365),
 ('user5566', 880),
 ('user1442', 881),
 ('user8033', 602),
 ('user3334', 338),
 ('user1453', 608),
 ('user2691', 469),
 ('user4559', 911),
 ('user9290', 689),
 ('user8165', 462),
 ('user1096', 953),
 ('user165', 750),
 ('user6801', 937),
 ('user4782', 288),
 ('user4714', 736),
 ('user9231', 907),
 ('user27', 962),
 ('user4732', 880),
 ('user7385', 690),
 ('user2114', 656),
 ('user4908', 240),
 ('user2495', 610),
 ('user3157', 770),
 ('user4978', 404),
 ('user5400', 659),
 ('user9403', 512),
 ('user8822', 599),
 ('user4442', 939),
 ('user2249', 415),
 ('user2331', 923),
 ('user486', 633),
 ('user8642', 427),
 ('user7151', 814),
 ('user4795', 755),
 ('user4860', 673),
 ('user6011', 992),
 ('user6397', 415),
 ('user3458', 566),
 ('user3931', 934),
 ('user5180', 986),
 ('user5121', 787),
 ('user1587', 930),
 ('user2434', 279),
 ('user7941', 648),
 ('user9105', 893),
 ('user4343', 962),
 ('u

In [4]:
# create rdd by splitting the lines of dataset using data in second column
# map creates (k,v) pairs with citynames as keys and 1 as values
users2=rdd.map(lambda x: (x.split(" ")[4], 1))
# users2.getNumPartitions()
users2_p=users2.partitionBy(3).persist()
# reducebykey is performing the sum on values for each key, to find how many times the cityname occured, which is the values of M for that city (followers)
Followers=users2_p.reduceByKey(lambda x,y: x+y)
Followers.collect()

[('user1006', 524),
 ('user8041', 513),
 ('user1246', 517),
 ('user5102', 557),
 ('user798', 520),
 ('user3913', 477),
 ('user8926', 542),
 ('user3104', 507),
 ('user8386', 505),
 ('user7910', 502),
 ('user5501', 491),
 ('user6120', 484),
 ('user6833', 477),
 ('user5283', 507),
 ('user2533', 507),
 ('user3031', 499),
 ('user5254', 532),
 ('user3132', 504),
 ('user3922', 502),
 ('user1866', 507),
 ('user9729', 497),
 ('user3558', 486),
 ('user6289', 521),
 ('user3063', 485),
 ('user5282', 460),
 ('user3956', 494),
 ('user9432', 528),
 ('user2164', 503),
 ('user4341', 482),
 ('user677', 518),
 ('user3867', 472),
 ('user8954', 515),
 ('user9992', 495),
 ('user1480', 496),
 ('user6600', 518),
 ('user2180', 515),
 ('user762', 518),
 ('user3768', 504),
 ('user5922', 518),
 ('user861', 524),
 ('user7766', 479),
 ('user6886', 517),
 ('user9794', 526),
 ('user6864', 519),
 ('user8281', 529),
 ('user3856', 490),
 ('user5325', 517),
 ('user4665', 463),
 ('user298', 551),
 ('user8576', 488),
 ('us

In [5]:
# the followings and followers are joind givind a tuple of (N,M) values
score=Followings.join(Followers)
score.collect()

[('user9495', (510, 521)),
 ('user3977', (864, 489)),
 ('user4455', (912, 510)),
 ('user3237', (525, 474)),
 ('user4964', (365, 489)),
 ('user5566', (880, 526)),
 ('user1442', (881, 482)),
 ('user8033', (602, 502)),
 ('user3334', (338, 499)),
 ('user1453', (608, 477)),
 ('user2691', (469, 498)),
 ('user4559', (911, 478)),
 ('user9290', (689, 493)),
 ('user8165', (462, 475)),
 ('user1096', (953, 494)),
 ('user165', (750, 517)),
 ('user6801', (937, 550)),
 ('user4782', (288, 530)),
 ('user4714', (736, 536)),
 ('user9231', (907, 543)),
 ('user27', (962, 507)),
 ('user4732', (880, 504)),
 ('user7385', (690, 481)),
 ('user2114', (656, 507)),
 ('user4908', (240, 512)),
 ('user2495', (610, 500)),
 ('user3157', (770, 534)),
 ('user4978', (404, 517)),
 ('user5400', (659, 500)),
 ('user9403', (512, 466)),
 ('user8822', (599, 508)),
 ('user4442', (939, 475)),
 ('user2249', (415, 549)),
 ('user2331', (923, 554)),
 ('user486', (633, 514)),
 ('user8642', (427, 522)),
 ('user7151', (814, 510)),
 ('us

In [6]:
# MapValue is used to numtiply the elements of the values tuple (N*M)
# the score of each user is the product of N and M.
finalscore=score.mapValues(lambda x: (x[0]*x[1]))
finalscore.collect()

[('user9495', 265710),
 ('user3977', 422496),
 ('user4455', 465120),
 ('user3237', 248850),
 ('user4964', 178485),
 ('user5566', 462880),
 ('user1442', 424642),
 ('user8033', 302204),
 ('user3334', 168662),
 ('user1453', 290016),
 ('user2691', 233562),
 ('user4559', 435458),
 ('user9290', 339677),
 ('user8165', 219450),
 ('user1096', 470782),
 ('user165', 387750),
 ('user6801', 515350),
 ('user4782', 152640),
 ('user4714', 394496),
 ('user9231', 492501),
 ('user27', 487734),
 ('user4732', 443520),
 ('user7385', 331890),
 ('user2114', 332592),
 ('user4908', 122880),
 ('user2495', 305000),
 ('user3157', 411180),
 ('user4978', 208868),
 ('user5400', 329500),
 ('user9403', 238592),
 ('user8822', 304292),
 ('user4442', 446025),
 ('user2249', 227835),
 ('user2331', 511342),
 ('user486', 325362),
 ('user8642', 222894),
 ('user7151', 415140),
 ('user4795', 376745),
 ('user4860', 325732),
 ('user6011', 488064),
 ('user6397', 199615),
 ('user3458', 307338),
 ('user3931', 456726),
 ('user5180', 4

## Part B - Using DataFrame

In [7]:
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
import collections

In [18]:
df = sc.textFile("usergraph.txt")

In [19]:
df = df.map(lambda line: line.split(" ")) #rdd is the textFile of usergraphs imported above

In [20]:
df = spark.read.csv(df)
df.show()

+-----------+---+---+---+------------+
|        _c0|_c1|_c2|_c3|         _c4|
+-----------+---+---+---+------------+
|['user4145'| ''| ''| ''| 'user6371']|
|['user9495'| ''| ''| ''| 'user1006']|
|['user2250'| ''| ''| ''| 'user1875']|
|['user3571'| ''| ''| ''| 'user2175']|
|['user1008'| ''| ''| ''| 'user8041']|
|['user3977'| ''| ''| ''| 'user1246']|
|['user7284'| ''| ''| ''| 'user5102']|
|['user6594'| ''| ''| ''| 'user3437']|
|['user4719'| ''| ''| ''|  'user798']|
|['user4455'| ''| ''| ''| 'user3405']|
|['user4004'| ''| ''| ''| 'user1508']|
|['user8226'| ''| ''| ''| 'user3913']|
|['user3237'| ''| ''| ''| 'user5662']|
|['user1651'| ''| ''| ''| 'user5154']|
| ['user878'| ''| ''| ''| 'user8926']|
|['user4964'| ''| ''| ''| 'user3104']|
|['user1508'| ''| ''| ''| 'user5809']|
|['user7086'| ''| ''| ''| 'user8134']|
|['user5566'| ''| ''| ''| 'user3929']|
|['user7014'| ''| ''| ''|  'user729']|
+-----------+---+---+---+------------+
only showing top 20 rows



In [21]:
df2=df.groupBy('_c0')
df2.count().show()

+-----------+-----+
|        _c0|count|
+-----------+-----+
|['user2434'|  279|
|['user8565'|  552|
|['user9237'|  330|
|['user9933'|  679|
|['user5515'|  423|
|['user3431'|  415|
|['user3346'|  271|
|['user3419'|  655|
|['user3535'|  735|
| ['user798'|  419|
| ['user765'|  917|
|['user6485'|  972|
|['user3533'|  324|
| ['user880'|  484|
|['user6091'|  495|
| ['user643'|  794|
|['user8423'|  547|
|['user1178'|  870|
|['user6640'|  967|
|['user5609'|  423|
+-----------+-----+
only showing top 20 rows



In [23]:
df3=df.groupBy('_c4')
df3.count().show()

+------------+-----+
|         _c4|count|
+------------+-----+
| 'user5551']|  500|
| 'user5331']|  485|
| 'user7198']|  476|
| 'user4461']|  512|
| 'user5269']|  500|
|  'user309']|  482|
| 'user5109']|  495|
| 'user3503']|  509|
| 'user7682']|  474|
| 'user8056']|  524|
| 'user9077']|  524|
| 'user5062']|  543|
| 'user8194']|  560|
| 'user1612']|  532|
| 'user6364']|  541|
| 'user5502']|  524|
| 'user5224']|  493|
|  'user112']|  480|
| 'user3854']|  534|
| 'user2103']|  495|
+------------+-----+
only showing top 20 rows

