# **Big Data Analytics [CN7031] CRWK 2023-24**
# **Group ID: [Your Group ID]**
1.   Student 1: Name and ID
2.   Student 2: Name and ID
3.   Student 3: Name and ID
4.   Student 4: Name and ID

---


If you want to add comments on your group work, please write it here for us:


# **Initiate and Configure Spark**

---


In [34]:
!pip3 install pyspark



In [None]:
# linking with Spark


# **Load Unstructured Data and Convert it to Spark DF [10 marks]**

---


In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split


In [36]:
# Load the unstructrued data: (1) drag and drop data here or (2) use Google Drive
spark = SparkSession.builder.appName("WebLogAnalysis").getOrCreate()


In [37]:
log_data_path = "/content/web.log"
raw_data = spark.read.text(log_data_path)


In [38]:
# Convert the unstructrued data to Spark DF
# Assuming common log format: IP_address - - [timestamp] "request" status_code bytes_sent
split_data = split(raw_data['value'], ' ')

# Create columns for different components of the log entry
raw_data = raw_data.withColumn('ip_address', split_data.getItem(0))
raw_data = raw_data.withColumn('timestamp', split_data.getItem(3))
raw_data = raw_data.withColumn('request', split_data.getItem(5))
raw_data = raw_data.withColumn('status_code', split_data.getItem(6))
raw_data = raw_data.withColumn('bytes_sent', split_data.getItem(7))


In [39]:
web_log_df = raw_data.select('ip_address', 'timestamp', 'request', 'status_code', 'bytes_sent')


In [40]:
web_log_df.show()


+---------------+--------------------+--------------------+-----------+----------+
|     ip_address|           timestamp|             request|status_code|bytes_sent|
+---------------+--------------------+--------------------+-----------+----------+
| 88.211.105.115|[04/Mar/2022:14:1...|  /history/missions/|  HTTP/2.0"|       414|
|   144.6.49.142|[02/Sep/2022:15:1...| /security/firewall/| HTTPS/1.0"|       203|
|  231.70.64.145|[19/Jul/2022:01:3...|/web-development/...|  HTTP/1.0"|       201|
| 219.42.234.172|[08/Feb/2022:11:3...|/networking/techn...|  HTTP/1.0"|       415|
| 183.173.185.94|[29/Aug/2023:03:0...| /security/firewall/|  HTTP/2.0"|       205|
|   164.12.8.113|[22/May/2023:09:4...|/web-development/...|  HTTP/1.0"|       200|
|   110.98.7.240|[22/Jan/2023:09:5...|    /history/apollo/|  HTTP/2.0"|       204|
| 27.182.196.243|[28/Mar/2022:05:3...| /history/apollo-11/| HTTPS/1.0"|       414|
|  123.31.25.147|[25/Feb/2023:12:0...|/data-analysis/mi...|  HTTP/1.0"|       204|
|  2

In [None]:
spark.stop()



# **Task 1: Spark SQL [30 marks]**

---



In [41]:
web_log_df.createOrReplaceTempView("web_log_table")

In [42]:
# Student 1 name and ID
# Query 1: Count the total number of log entries
query1 = spark.sql("SELECT COUNT(*) AS total_entries FROM web_log_table")


In [44]:
# Student 1 name and ID
# Query 2: Find the unique IP addresses in the log
query2 = spark.sql("SELECT DISTINCT ip_address FROM web_log_table")

In [43]:
# Student 2 name and ID
# Query 3: Count the number of requests for each status code
query3 = spark.sql("SELECT status_code, COUNT(*) AS count FROM web_log_table GROUP BY status_code")

In [45]:
# Student 2 name and ID

# Query 4: Find the top 10 most requested URLs
query4 = spark.sql("SELECT request, COUNT(*) AS request_count FROM web_log_table GROUP BY request ORDER BY request_count DESC LIMIT 10")


In [46]:
# Student 3 name and ID
# Query 5: Identify the busiest hours based on the number of requests
query5 = spark.sql("SELECT SUBSTRING_INDEX(timestamp, ':', 1) AS hour, COUNT(*) AS request_count FROM web_log_table GROUP BY hour ORDER BY request_count DESC LIMIT 5")

In [47]:
# Student 3 name and ID
# Query 6: Calculate the average bytes sent per request
query6 = spark.sql("SELECT AVG(bytes_sent) AS avg_bytes_sent FROM web_log_table")


In [48]:
# Student 4 name and ID

# Query 7: Find the requests with a status code of 404 (Not Found)
query7 = spark.sql("SELECT * FROM web_log_table WHERE status_code = '404'")

In [49]:
# Student 4 name and ID

# Query 8: Identify the IP addresses with the highest number of requests
query8 = spark.sql("SELECT ip_address, COUNT(*) AS request_count FROM web_log_table GROUP BY ip_address ORDER BY request_count DESC LIMIT 5")

In [50]:
# Show the results of the queries
query1.show()
query2.show()
query3.show()
query4.show()
query5.show()
query6.show()
query7.show()
query8.show()

+-------------+
|total_entries|
+-------------+
|       502788|
+-------------+

+---------------+
|     ip_address|
+---------------+
|230.145.149.229|
| 222.97.216.211|
| 56.252.244.161|
|  74.118.29.164|
| 223.19.252.140|
|  185.89.79.197|
|101.251.172.143|
|  120.127.56.14|
| 11.218.194.190|
| 229.251.98.142|
|     40.90.7.31|
|    29.78.31.21|
|  16.115.44.122|
|    8.36.91.198|
| 206.125.95.176|
| 91.188.228.254|
|227.121.136.126|
|209.243.134.154|
|   3.207.247.51|
|  10.108.187.20|
+---------------+
only showing top 20 rows

+-----------+------+
|status_code| count|
+-----------+------+
|  HTTP/1.0"|167668|
| HTTPS/1.0"|167580|
|  HTTP/2.0"|167539|
|       NULL|     1|
+-----------+------+

+--------------------+-------------+
|             request|request_count|
+--------------------+-------------+
|/data-analysis/ap...|         4394|
|/web-development/...|         4327|
|/software/technol...|         4310|
|/cloud-computing/...|         4289|
|/web-development/...|         42


# **Task 2 - Spark RDD [50 marks]**

---


In [59]:
from pyspark import SparkContext

# Assuming you have a SparkSession named 'spark'
sc = SparkContext.getOrCreate()

# Replace 'your_file_path' with the actual path to your log file
log_data_path = "/content/web.log"

# Read the raw log data into an RDD
web_log_rdd = sc.textFile(log_data_path)

# Now you can perform RDD transformations on 'raw_data_rdd'


In [60]:
# Student 1 name and ID
# Assuming web_log_rdd has already been created

# Query 1: Count the total number of log entries
query1 = web_log_rdd.count()

In [61]:
# Student 1 name and ID

# Query 2: Find the unique IP addresses in the log
query2 = web_log_rdd.map(lambda line: line.split(' ')[0]).distinct()

In [62]:
# Student 1 name and ID
# analysis 3 and result using RDD operators:

query3 = web_log_rdd.map(lambda line: (line.split(' ')[6], 1)).reduceByKey(lambda x, y: x + y)



In [63]:
# Student 2 name and ID
# analysis 1 and result using RDD operators:
query4_rdd = web_log_rdd.map(lambda line: (line.split(' ')[5], 1)).reduceByKey(lambda x, y: x + y)


In [64]:
# Student 2 name and ID
# analysis 2 and result using RDD operators:
query5 = web_log_rdd.map(lambda line: (line.split(' ')[3].split(':')[1], 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], False).take(5)



In [65]:
# Student 2 name and ID
# analysis 3 and result using RDD operators:
query6_rdd = web_log_rdd.map(lambda line: int(line.split(' ')[7]))

In [66]:
# Student 3 name and ID
# analysis 1 and result using RDD operators:
query7 = web_log_rdd.filter(lambda line: line.split(' ')[6] == '404')


In [67]:
# Student 3 name and ID
# analysis 2 and result using RDD operators:

query8 = web_log_rdd.map(lambda line: (line.split(' ')[0], 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], False).take(5)

In [68]:
# Student 3 name and ID
# analysis 3 and result using RDD operators:
query9 = web_log_rdd.map(lambda line: (line.split(' ')[3].split(':')[1], 1)).reduceByKey(lambda x, y: x + y)



In [70]:
# Student 4 name and ID
# analysis 2 and result using RDD operators:
query10 = web_log_rdd.map(lambda line: (line.split(' ')[6], int(line.split(' ')[7]))).combineByKey(
    lambda value: (value, 1),
    lambda x, value: (x[0] + value, x[1] + 1),
    lambda x, y: (x[0] + y[0], x[1] + y[1])
).map(lambda x: (x[0], x[1][0] / x[1][1]))


In [71]:

# Show the results of the queries
print("Query 1:", query1)
print("Query 2:", query2.collect())

print("Query 4:", query4)
print("Query 5:", query5)
print("Query 6:", query6)
print("Query 7:", query7)
print("Query 8:", query8)
print("Query 9:", query9.collect())
print("Query 10:", query10)


Query 1: 502788


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



Query 9: [('16', 21072), ('23', 20890), ('04', 20833), ('22', 20991), ('06', 21019), ('13', 20994), ('20', 20801), ('15', 20980), ('03', 21000), ('09', 20979), ('12', 20895), ('02', 20982), ('00', 20912), ('18', 20876), ('10', 20844), ('14', 20938), ('01', 21020), ('11', 21161), ('05', 20865), ('07', 20887), ('17', 21056), ('19', 20908), ('08', 21074), ('21', 20811)]
Query 10: PythonRDD[613] at RDD at PythonRDD.scala:53



# **Task 3 - LSEP (legal, social, ethical, and professional) considerations [5 marks]**

---


For all analyses performed, critically analyze the legal, social, ethical, and professional implications associated with the data and the analysis. Consider factors such as `data privacy`,` data protection`, `bias`, `fairness`, `transparency`, and the `potential impact` of the analysis on individuals or society as a whole.

**Each student should take one of these factors as their contribution.**

As a team, discuss and share your individual analyses and LSEP considerations with each other. Learn from each other's perspectives and insights.

## Student 1: chosen factor

*discussion*


## Student 2: chosen factor

*discussion*

## Student 3: chosen factor

*discussion*


## Student 4: chosen factor

*discussion*


In [78]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from pyspark.sql.functions import sha2

# Create a Spark session
spark = SparkSession.builder.appName("WebLogAnalysis").getOrCreate()

# Assuming 'web_log_df' is your DataFrame
# Anonymize the 'ip_address' column using sha2
web_log_df = web_log_df.withColumn('ip_address', F.sha2('ip_address', 256))

# Show the first few rows of the DataFrame
web_log_df.show()


+--------------------+--------------------+--------------------+-----------+----------+
|          ip_address|           timestamp|             request|status_code|bytes_sent|
+--------------------+--------------------+--------------------+-----------+----------+
|ecb86d3e4a7b77d0a...|[04/Mar/2022:14:1...|  /history/missions/|  HTTP/2.0"|       414|
|9dcc436e2a109702e...|[02/Sep/2022:15:1...| /security/firewall/| HTTPS/1.0"|       203|
|c5f7da76e80ecd4aa...|[19/Jul/2022:01:3...|/web-development/...|  HTTP/1.0"|       201|
|97e12c3e576d71893...|[08/Feb/2022:11:3...|/networking/techn...|  HTTP/1.0"|       415|
|9df0073041cee14d3...|[29/Aug/2023:03:0...| /security/firewall/|  HTTP/2.0"|       205|
|28f6ebd4b5dd11196...|[22/May/2023:09:4...|/web-development/...|  HTTP/1.0"|       200|
|b5d2333ec3111b529...|[22/Jan/2023:09:5...|    /history/apollo/|  HTTP/2.0"|       204|
|6ea4e1550f101b498...|[28/Mar/2022:05:3...| /history/apollo-11/| HTTPS/1.0"|       414|
|ab94ce18c4a9eb9d6...|[25/Feb/20

In [79]:
# Anonymizing user data
web_log_df = web_log_df.withColumn('ip_address', sha2('ip_address', 256))


In [82]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when

In [83]:
# Applying fairness-aware techniques
web_log_df = web_log_df.withColumn('request_count', when(web_log_df['status_code'] == '404', 1).otherwise(0))


In [84]:
# Communicating data usage to users
print("We use your data to improve our services. You can manage your privacy settings in the user dashboard.")


We use your data to improve our services. You can manage your privacy settings in the user dashboard.


# **Convert ipynb to HTML for Turnitin submission [5 marks]**

---



In [None]:
# install nbconvert
#!pip3 install nbconvert


# convert ipynb to html
# file name: "Your_Group_ID_CN7031.ipynb
!jupyter nbconvert --to html Your_Group_ID_CN7031.ipynb

This application is used to convert notebook files (*.ipynb)
        to various other formats.


Options
The options below are convenience aliases to configurable class-options,
as listed in the "Equivalent to" description-line of the aliases.
To see all configurable class-options for some <cmd>, use:
    <cmd> --help-all

--debug
    set log level to logging.DEBUG (maximize logging output)
    Equivalent to: [--Application.log_level=10]
--show-config
    Show the application's configuration (human-readable format)
    Equivalent to: [--Application.show_config=True]
--show-config-json
    Show the application's configuration (json format)
    Equivalent to: [--Application.show_config_json=True]
--generate-config
    generate default config file
    Equivalent to: [--JupyterApp.generate_config=True]
-y
    Answer yes to any questions instead of prompting.
    Equivalent to: [--JupyterApp.answer_yes=True]
--execute
    Execute the notebook prior to export.
    Equivalent to: [--ExecutePr