In [1]:
from pyspark import SparkContext, SparkConf
import re

In [2]:
conf = SparkConf().setMaster("yarn").setAppName("SparkJoin")

In [3]:
sc = SparkContext.getOrCreate (conf = conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
sc.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.app.startTime', '1668793344605'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'master'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES

CSV file's format is actually Text file but every features in a line is separated by a comma. Because of that, we read csv file like a normal text file with sc.textFile() with 200 partitions. RDD is then transformed by spliting each line to a set of values. We also filter to exclude header in RDD.

In [5]:
lineRDD = (sc.textFile("hdfs://master:9000/data/weblog_very_large.csv", 200).map(lambda x: x.split(",")).filter(lambda x: x[0] != "IP"))

# Question 1

In [6]:
print('The number of records in weblogs data = {}'.format(lineRDD.count()))

[Stage 0:>                                                        (0 + 0) / 200]

22/11/18 17:43:22 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources




The number of records in weblogs data = 1000000


                                                                                

### Additional check:

It's important to check integrity of data. In bad situations, the features of record might be lost, then the length is smaller
desire count of features (in this problem, 4). 

After executing below code, you can see the result equals to number of records in lineRDD. It indicates that our csv file goes into correct format. I just use a naive way to check but in practice, it's much more complicated, then we need some extensive methods to check instead

print(lineRDD.filter(lambda line: len(line) == 4).count() == lineRDD.count())

In [7]:
lineRDD = lineRDD.filter(lambda line: line[3] == '200')
print('The number of records in weblogs data with successful request = {}'.format(lineRDD.count()))

                                                                                

The number of records in weblogs data with successful request = 717487


# Question 2

This can be done with map-reduce. First, We need to map each line to set (UserID, 1). After that, apply reduceByKey() to get the number of access counts on site of each user.

In [8]:
pair_RDD = lineRDD.map(lambda line: (line[0].split("/")[1] , 1))

### Additional check:

In some situations, the first element does not go into correct form "IP/UserID". We need to check by:

print(lineRDD.filter(lambda line: len(line[0].split("/")) == 2).count() == pair_RDD.count())

In [9]:
print('Total access time for each user is:')
print(pair_RDD.reduceByKey(lambda a,b : a + b).collect())

Total access time for each user is:


                                                                                

[('7199', 105), ('9024', 51), ('2814', 57), ('196446', 2), ('75517', 2), ('248', 37), ('1729', 50), ('83981', 3), ('7109', 81), ('4511', 47), ('1618', 45), ('107359', 1), ('178464', 3), ('174661', 3), ('5090', 90), ('192066', 4), ('2315', 53), ('132409', 3), ('127192', 3), ('820', 45), ('30753', 2), ('180617', 1), ('6339', 95), ('130672', 4), ('819', 31), ('10', 28), ('107593', 2), ('7627', 56), ('10175', 1), ('125606', 3), ('7866', 44), ('7226', 25), ('81882', 5), ('1938', 55), ('92878', 3), ('4189', 33), ('2940', 59), ('119610', 5), ('187484', 1), ('83531', 3), ('8748', 63), ('89965', 3), ('2394', 28), ('92089', 3), ('4137', 50), ('22947', 2), ('4534', 20), ('118460', 2), ('98158', 3), ('8817', 26), ('52880', 2), ('61979', 2), ('160105', 2), ('73646', 4), ('135495', 4), ('17614', 2), ('57224', 7), ('4463', 19), ('6437', 25), ('174650', 2), ('174411', 1), ('103389', 3), ('5874', 23), ('90868', 3), ('57709', 1), ('7068', 37), ('34985', 2), ('171959', 4), ('490', 52), ('1114', 50), ('32

# Question 3

Use .histogram() function to extract the number of users for each visit times on website.
This function receives 1 param as buckets. Buckets can be understood as intervals in which value will fall into if it >= lower bound and < upper bound.
Buckets must be sorted. The result is a set of 2 items: the first one is buckets, the second one is values of histogram.

In [10]:
count_visited = pair_RDD.reduceByKey(lambda a,b : a + b).map(lambda line: line[1]).histogram([1,2,3,4,5,6,7,8,9,10,11])
print(count_visited)



([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], [57234, 51254, 30497, 13742, 5086, 1513, 404, 80, 12, 9])


                                                                                

In [11]:
print('Number of users with visited the site 1 time {}'.format(count_visited[1][0]))
print('Number of users with visited the site 5 times {}'.format(count_visited[1][4]))
print('Number of users with visited the site 10 times {}'.format(count_visited[1][9]))

Number of users with visited the site 1 time 57234
Number of users with visited the site 5 times 5086
Number of users with visited the site 10 times 9


To validate the result, execute following code which counts the number of users with visited the site 1,5,10 times

print('Validate number of users with visited the site 1 time {}'.format(pair_RDD.reduceByKey(lambda a,b : a + b).filter(lambda x: x[1] == 1).count()))

print('Validate number of users with visited the site 5 times {}'.format(pair_RDD.reduceByKey(lambda a,b : a + b).filter(lambda x: x[1] == 5).count()))

print('Validate number of users with visited the site 10 times {}'.format(pair_RDD.reduceByKey(lambda a,b : a + b).filter(lambda x: x[1] == 10).count()))

# Question 4

Use a .map() function to map each line to desired format

In [12]:
pair_set_RDD = lineRDD.map(lambda line: (line[0].split("/")[1], [line[0].split("/")[0], line[1], line[2], line[3]]))
pair_set_RDD.take(4)

[('59380',
  ['10.165.24.1', '03/11/2016:05:07:19', 'GET /compile.php HTTP/1.1', '200']),
 ('5647',
  ['10.131.0.1', '07/22/2013:08:22:34', 'POST /archive.php HTTP/1.1', '200']),
 ('4219',
  ['10.131.2.1',
   '02/07/2014:12:37:33',
   'DELETE /archive.php HTTP/1.1',
   '200']),
 ('62780',
  ['10.95.75.1',
   '07/11/2017:09:38:44',
   'UPDATE /js/vendor/modernizr-2.8.3.min.js HTTP/1.1',
   '200'])]

# Question 5

We join pair_set_RDD with the RDD of total access time by UserID as key. Each element has the form:

(UserID,
  ([IP,
    Time,
    Url,
    Response code], count))
   
after join operation. In order to get the desired format, we need to use .map()

   
(UserID,
  [IP,
   Time,
   Url,
   Response Code], count) 

In [13]:
result_join_RDD = pair_set_RDD.join(pair_RDD.reduceByKey(lambda a,b : a + b)).map(lambda x: (x[0], x[1][0], x[1][1]))
result_list = result_join_RDD.take(6)

                                                                                

In [14]:
result_list

[('55543',
  ['10.105.140.1',
   '07/05/2018:04:02:52',
   'DELETE /css/normalize.css HTTP/1.1',
   '200'],
  5),
 ('55543',
  ['10.108.68.1', '08/24/2019:07:24:10', 'GET /home.php HTTP/1.1', '200'],
  5),
 ('55543',
  ['10.215.103.1',
   '02/29/2012:10:41:18',
   'UPDATE /img/ruet.png HTTP/1.1',
   '200'],
  5),
 ('55543',
  ['10.252.124.1',
   '06/04/2012:06:28:38',
   'DELETE /js/vendor/modernizr-2.8.3.min.js HTTP/1.1',
   '200'],
  5),
 ('55543',
  ['10.170.66.1',
   '06/30/2020:04:10:07',
   'DELETE /details.php?name=Another%20Multiplication%20Game&cod=16 HTTP/1.1',
   '200'],
  5),
 ('60487',
  ['10.27.143.1', '04/18/2016:11:32:22', 'UPDATE / HTTP/1.1', '200'],
  2)]

In [15]:
for line in result_list:
    print('{} {} {} {}'.format(line[0], line[2], line[1][0], line[1][3]))

55543 5 10.105.140.1 200
55543 5 10.108.68.1 200
55543 5 10.215.103.1 200
55543 5 10.252.124.1 200
55543 5 10.170.66.1 200
60487 2 10.27.143.1 200


# 6. Answer the following questions in the last Markdown cell:
### a. Where is the Driver program executed?

Because Deployment mode is client (on Spark config info), the Driver program run on local machine (client) where application is submitted. In this case, it's on Edge node. The path is /usr/local/spark/ ($SPARK_HOME)

### b. Where does the processing happen?

The processing happens after The driver program is initialized. When the driver is located on client, then communicates with the Executors in Worker nodes to marshal processing of tasks, stages.

### c. Where is the result stored?

The log is stored in hadoop logs dir in machine and worker machines. Log files are under /usr/local/hadoop/logs dir (master, slave1, slave2). We can also view the content of log with command line:

To view log of applicationId: yarn logs -applicationId <application-id>.

To view log of containerId: yarn logs -containerId <container-id>

The rdd is saved in memory/disk in each datanodes of slaves/workers.

### d. How many stages and tasks were executed in total? You can retrieve this information from the Spark Master UI.

- stages: 10

- tasks: 1802