# MapReduce Program for instagram filters

### importing libararies from pyspark  to setup the spark context

In [1]:
from pyspark import SparkContext

#### Setting up spark context and assigning it to variable sc

In [2]:
sc = SparkContext()

#### Checking the version of Spark

In [3]:
sc.version

'2.2.0'

#### checking of the python version

In [4]:
sc.pythonVer

'3.6'

#### Checking the type of sc variable

In [5]:
type(sc)

pyspark.context.SparkContext

#### Parallelizing the data into 6 partitions (or creating RDD)

In [6]:
RDD1 = sc.textFile('/home/adarsh/Desktop/instagram-micro.csv', 6)

### Verifiying the created RDD by checking the type of RDD

In [7]:
type(RDD1)

pyspark.rdd.RDD

### Checking the first 10 rows of the RDD (or Dataset)
#### here Spark reads the each line in the CSV as one row


In [8]:
RDD1.take(10)

['314039335,1228512552531771401_314039335,2016-04-14 21:42:10,Clarendon,25,1',
 '314039335,1227601957213159898_314039335,2016-04-13 15:32:58,X-Pro II,55,9',
 '314039335,1223207551810912967_314039335,2016-04-07 14:02:04,Normal,38,0',
 '314039335,1219306360320278738_314039335,2016-04-02 04:51:06,Normal,67,2',
 '314039335,1218162347915591838_314039335,2016-03-31 14:58:09,Normal,49,5',
 '314039335,1201498852205773160_314039335,2016-03-08 15:10:46,Normal,65,5',
 '314039335,1197917208060346666_314039335,2016-03-03 16:34:41,Clarendon,36,1',
 '314039335,1168884468984037638_314039335,2016-01-23 15:11:48,Lark,31,1',
 '314039335,1146967354622853134_314039335,2015-12-24 09:26:25,Normal,30,0',
 '314039335,1144288392159412265_314039335,2015-12-20 16:43:47,Aden,54,1']

### Now we can split each line by the delimiter ',' using the map function as shown below
#### the output will be a RDD with a list of elements i.e, (userId, photoId, createdTime, filter, likes, comments)

In [9]:
fil_data = RDD1.map(lambda x : x.split(','))

In [10]:
fil_data.take(5)

[['314039335',
  '1228512552531771401_314039335',
  '2016-04-14 21:42:10',
  'Clarendon',
  '25',
  '1'],
 ['314039335',
  '1227601957213159898_314039335',
  '2016-04-13 15:32:58',
  'X-Pro II',
  '55',
  '9'],
 ['314039335',
  '1223207551810912967_314039335',
  '2016-04-07 14:02:04',
  'Normal',
  '38',
  '0'],
 ['314039335',
  '1219306360320278738_314039335',
  '2016-04-02 04:51:06',
  'Normal',
  '67',
  '2'],
 ['314039335',
  '1218162347915591838_314039335',
  '2016-03-31 14:58:09',
  'Normal',
  '49',
  '5']]

### Creating an RDD using map function to map each filter to value 1

In [11]:
fil_data1 = fil_data.map(lambda x : (x[3] , 1))

In [12]:
fil_data1.collect()

[('Clarendon', 1),
 ('X-Pro II', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Clarendon', 1),
 ('Lark', 1),
 ('Normal', 1),
 ('Aden', 1),
 ('Juno', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Ludwig', 1),
 ('Juno', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Hudson', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Normal', 1),
 ('Sutro', 1),
 ('Unknown', 1),
 ('X-Pro II', 1),
 ('Sutro', 1),
 ('Normal', 1),
 ('Lo-fi', 1),
 ('Lo-fi', 1),
 ('Hefe', 1),
 ('Rise', 1),
 ('Normal', 1),
 ('Unknown', 1),
 ('Sutro', 1),
 ('Unknown', 1),
 ('Amaro', 1),
 ('Reyes', 1),
 ('Lark', 1),
 ('Brannan', 1),
 ('Mayfair', 1),
 ('Rise', 1),
 ('Lo-fi', 1),
 ('Lo-fi', 1),
 ('Sierra', 1),
 ('Normal', 1),
 ('Sierra', 1),
 ('Normal', 1),
 ('Earlybird', 1),

### Now by using the reduceByKey function of spark we can reduce the mapped filters by adding their values

In [13]:
fil_data2 = fil_data1.reduceByKey(lambda x , y : x + y)

### Visualising the first 10 filters ( and we can see that the output doesn't follow any order

In [14]:
fil_data2.take(10)

[('Sierra', 67928),
 ('Nashville', 60159),
 ('Vesper', 579),
 ('Clarendon', 114401),
 ('Valencia', 158344),
 ('Moon', 23660),
 ('1977', 12164),
 ('Ginza', 618),
 ('Rise', 127262),
 ('Reyes', 26897)]

### counting the number of filters. There are 45 filters in total

In [15]:
fil_data2.count()

45

### Creating an RDD with sorted values in descending order so that we can see which filter is popular

In [16]:
fil_data3 = fil_data2.sortBy(lambda x : x[1], ascending= False)

#### As you can see the highly ranked (or more used filter is normal with a total count of 2940972

In [17]:
fil_data3.collect()

[('Normal', 2940972),
 ('Amaro', 207543),
 ('X-Pro II', 176632),
 ('Valencia', 158344),
 ('Lo-fi', 141457),
 ('Rise', 127262),
 ('Clarendon', 114401),
 ('Mayfair', 101095),
 ('Hudson', 82386),
 ('Juno', 68941),
 ('Hefe', 68778),
 ('Sierra', 67928),
 ('Lark', 65166),
 ('Ludwig', 63605),
 ('Nashville', 60159),
 ('Earlybird', 57381),
 ('Unknown', 54277),
 ('Inkwell', 50561),
 ('Gingham', 41575),
 ('Walden', 38997),
 ('Crema', 38375),
 ('Willow', 35437),
 ('Brannan', 32159),
 ('Aden', 28457),
 ('Slumber', 28259),
 ('Reyes', 26897),
 ('Sutro', 26813),
 ('Moon', 23660),
 ('Kelvin', 20754),
 ('Perpetua', 15691),
 ('Toaster', 13412),
 ('1977', 12164),
 ('Poprocket', 2637),
 ('Ashby', 1468),
 ('Skyline', 1131),
 ('Stinson', 817),
 ('Gotham', 761),
 ('Dogpatch', 624),
 ('Ginza', 618),
 ('Vesper', 579),
 ('Charmes', 539),
 ('Helena', 424),
 ('Apollo', 326),
 ('Brooklyn', 278),
 ('Maven', 260)]

### To verify that the above list is true the values of the keys filter should add up to 5 million as shown below

In [18]:
fil_data4 = fil_data3.map(lambda x : x[1]).sum()

In [19]:
fil_data4

5000000