# **Préambule**

In [205]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [206]:
!wget -q https://downloads.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz

In [207]:
!tar xf spark-3.2.3-bin-hadoop3.2.tgz

In [208]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"
import sys

In [209]:
!pip install pyspark==3.2.3

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [210]:
#import findspark
#findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import *

In [211]:
# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.2.3


In [212]:
spark = SparkSession.builder.master("local[*]").appName("MyFirstProgram").getOrCreate()
sc=spark.sparkContext

# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3, False)

+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows





---


# **Import**

In [213]:
# import for exercise 0

import zipfile
from google.colab import drive

# import for exercise 1

from pyspark import SparkFiles
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import Row
import re
import string

# import for exercise 2

from pyspark.sql.types import *



---


# **Exercise n°0: Download and Prepare your Log File**

In [214]:
drive.mount('/content/drive')
!apt install unzip
!unzip -u "/content/drive/MyDrive/Colab_Notebooks/Big_Data/TP2/pagecounts-20160101-000000_parsed.out.zip" -d "/content/drive/MyDrive/Colab_Notebooks/Big_Data/TP2/"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Reading package lists... Done
Building dependency tree       
Reading state information... Done
unzip is already the newest version (6.0-25ubuntu1.1).
The following package was automatically installed and is no longer required:
  libnvidia-common-510
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 21 not upgraded.
Archive:  /content/drive/MyDrive/Colab_Notebooks/Big_Data/TP2/pagecounts-20160101-000000_parsed.out.zip


In [215]:
wiki_url = "/content/drive/MyDrive/Colab_Notebooks/Big_Data/TP2/pagecounts-20160101-000000_parsed.out"
spark.sparkContext.addFile(wiki_url)
wiki_RDD_v1 = sc.textFile(SparkFiles.get("pagecounts-20160101-000000_parsed.out"))

In [216]:
wiki_RDD_v1.take(10)

['aa 271_a.C 1 4675',
 'aa Category:User_th 1 4770',
 'aa Chiron_Elias_Krase 1 4694',
 'aa Dassault_rafaele 2 9372',
 'aa E.Desv 1 4662',
 'aa File:Wiktionary-logo-en.png 1 10752',
 'aa Indonesian_Wikipedia 1 4679',
 'aa Main_Page 5 266946',
 'aa Requests_for_new_languages/Wikipedia_Banyumasan 1 4733',
 'aa Special:Contributions/203.144.160.245 1 5812']

In [217]:
def separe_elements(ligne):
    elements = ligne.split(" ")
    return (elements[0], elements[1], int(elements[2]), int(elements[3]))

In [218]:
wiki_RDD = wiki_RDD_v1.map(lambda word : separe_elements(word))

In [219]:
wiki_RDD.take(10)

[('aa', '271_a.C', 1, 4675),
 ('aa', 'Category:User_th', 1, 4770),
 ('aa', 'Chiron_Elias_Krase', 1, 4694),
 ('aa', 'Dassault_rafaele', 2, 9372),
 ('aa', 'E.Desv', 1, 4662),
 ('aa', 'File:Wiktionary-logo-en.png', 1, 10752),
 ('aa', 'Indonesian_Wikipedia', 1, 4679),
 ('aa', 'Main_Page', 5, 266946),
 ('aa', 'Requests_for_new_languages/Wikipedia_Banyumasan', 1, 4733),
 ('aa', 'Special:Contributions/203.144.160.245', 1, 5812)]

In [220]:
columns = ["Project", "Page_title","Page_hits","Page_size"]
wiki_DF = spark.createDataFrame(wiki_RDD,columns)  

In [221]:
wiki_DF.show()

+-------+--------------------+---------+---------+
|Project|          Page_title|Page_hits|Page_size|
+-------+--------------------+---------+---------+
|     aa|             271_a.C|        1|     4675|
|     aa|    Category:User_th|        1|     4770|
|     aa|  Chiron_Elias_Krase|        1|     4694|
|     aa|    Dassault_rafaele|        2|     9372|
|     aa|              E.Desv|        1|     4662|
|     aa|File:Wiktionary-l...|        1|    10752|
|     aa|Indonesian_Wikipedia|        1|     4679|
|     aa|           Main_Page|        5|   266946|
|     aa|Requests_for_new_...|        1|     4733|
|     aa|Special:Contribut...|        1|     5812|
|     aa|Special:Contribut...|        1|     5805|
|     aa|Special:Contribut...|        1|     5808|
|     aa|Special:Contribut...|        1|     5812|
|     aa|Special:ListFiles...|        1|     5035|
|     aa|Special:ListFiles...|        1|     5036|
|     aa|Special:ListFiles...|        1|     5032|
|     aa|Special:Log/Md._F...| 

---
# **Exercise n°1 : Explore Web Logs with Spark RDDs**


In [222]:
Log = StructType([
    StructField("project", StringType(), True),
    StructField("title", StringType(), True),
    StructField("hits", IntegerType(), True),
    StructField("size", IntegerType(), True)
])

In [223]:
def separe_elements(ligne):
    elements = ligne.split(" ")
    LogEx = Row(project=elements[0], title=elements[1], hits=int(elements[2]), size=int(elements[3]))
    return LogEx

In [224]:
log_RDD = wiki_RDD_v1.map(lambda word : separe_elements(word))

In [225]:
log_RDD.take(10)

[Row(project='aa', title='271_a.C', hits=1, size=4675),
 Row(project='aa', title='Category:User_th', hits=1, size=4770),
 Row(project='aa', title='Chiron_Elias_Krase', hits=1, size=4694),
 Row(project='aa', title='Dassault_rafaele', hits=2, size=9372),
 Row(project='aa', title='E.Desv', hits=1, size=4662),
 Row(project='aa', title='File:Wiktionary-logo-en.png', hits=1, size=10752),
 Row(project='aa', title='Indonesian_Wikipedia', hits=1, size=4679),
 Row(project='aa', title='Main_Page', hits=5, size=266946),
 Row(project='aa', title='Requests_for_new_languages/Wikipedia_Banyumasan', hits=1, size=4733),
 Row(project='aa', title='Special:Contributions/203.144.160.245', hits=1, size=5812)]

In [226]:
log_RDD_Thirdline = log_RDD.take(3)[-1]

print(log_RDD_Thirdline.project)
print(log_RDD_Thirdline.title)
print(log_RDD_Thirdline.hits)
print(log_RDD_Thirdline.size)

aa
Chiron_Elias_Krase
1
4694


# **Question n°1 :**

In [227]:
def print_record(rdd, number):
  myLine = rdd.take(number)[-1]
  print("Project code: " + myLine.project + "\t Page: " + myLine.title + "\t Page hits: " + str(myLine.hits) + "\t Page size: " + str(myLine.size))

In [228]:
for i in range(1,21):
  print_record(log_RDD, i)

Project code: aa	 Page: 271_a.C	 Page hits: 1	 Page size: 4675
Project code: aa	 Page: Category:User_th	 Page hits: 1	 Page size: 4770
Project code: aa	 Page: Chiron_Elias_Krase	 Page hits: 1	 Page size: 4694
Project code: aa	 Page: Dassault_rafaele	 Page hits: 2	 Page size: 9372
Project code: aa	 Page: E.Desv	 Page hits: 1	 Page size: 4662
Project code: aa	 Page: File:Wiktionary-logo-en.png	 Page hits: 1	 Page size: 10752
Project code: aa	 Page: Indonesian_Wikipedia	 Page hits: 1	 Page size: 4679
Project code: aa	 Page: Main_Page	 Page hits: 5	 Page size: 266946
Project code: aa	 Page: Requests_for_new_languages/Wikipedia_Banyumasan	 Page hits: 1	 Page size: 4733
Project code: aa	 Page: Special:Contributions/203.144.160.245	 Page hits: 1	 Page size: 5812
Project code: aa	 Page: Special:Contributions/5.232.61.79	 Page hits: 1	 Page size: 5805
Project code: aa	 Page: Special:Contributions/Ayarportugal	 Page hits: 1	 Page size: 5808
Project code: aa	 Page: Special:Contributions/Born2bgra

# **Question n°2**

In [229]:
log_RDD_Count = log_RDD.count()

In [230]:
print("In this rdd, there is " + str(log_RDD_Count) + " records.")

In this rdd, there is 3324129 records.


# **Question n°3**

In [231]:
log_RDD_Max = log_RDD.map(lambda x : x[3]).max()

In [232]:
log_RDD_Min = log_RDD.map(lambda x : x[3]).min()

In [233]:
log_RDD_Mean = (log_RDD.map(lambda x : x[3]).sum())/log_RDD_Count

In [234]:
log_RDD_MaxHits = log_RDD.map(lambda x : x[2]).max()

In [235]:
print("About the page size, we know that :")
print("The max is : " + str(log_RDD_Max) + ", the min is : " + str(log_RDD_Min) + " and the mean is : " + str(log_RDD_Mean))

About the page size, we know that :
The max is : 141180155987, the min is : 0 and the mean is : 132239.56957446598


# **Question n°4**

In [236]:
def keepMax(x):
  if x[3]==log_RDD_Max:
    return x

In [237]:
log_RDD_MaxPages = log_RDD.filter(lambda x : keepMax(x))
log_RDD_MaxPages.take(10)

[Row(project='en.mw', title='en', hits=5466346, size=141180155987)]

In [238]:
def keepMin(x):
  if x[3]==log_RDD_Min:
    return x

In [239]:
log_RDD_MinPages = log_RDD.filter(lambda x : keepMin(x))
log_RDD_MinPages.take(10)

[Row(project='af', title='1337', hits=1, size=0),
 Row(project='af', title='1433', hits=1, size=0),
 Row(project='af', title='1498', hits=1, size=0),
 Row(project='af', title='1577', hits=1, size=0),
 Row(project='af', title='1864', hits=1, size=0),
 Row(project='af', title='689', hits=1, size=0),
 Row(project='af', title='Clifton-hangbrug', hits=1, size=0),
 Row(project='af', title='Die_Transvaler', hits=1, size=0),
 Row(project='af', title='Griekse_oergode', hits=1, size=0),
 Row(project='af', title='Kategorie:22ste_eeu', hits=1, size=0)]

# **Question n°5**

In [240]:
print(log_RDD_MaxPages.first().hits)

5466346


In [241]:
def keepMaxHits(x):
  if x[2]==log_RDD_MaxHits:
    return x

In [242]:
log_RDD_MaxHitsDone = log_RDD.filter(lambda x : keepMaxHits(x))
log_RDD_MaxHitsDone.take(10)

[Row(project='en.mw', title='en', hits=5466346, size=141180155987)]

# **Question n°6**

In [243]:
def keepSupAverage(x):
  if x[3]>log_RDD_Mean:
    return x

In [244]:
log_RDD_MaxPages = log_RDD.filter(lambda x : keepSupAverage(x))
log_RDD_MaxPages.take(10)

[Row(project='aa', title='Main_Page', hits=5, size=266946),
 Row(project='ace.mw', title='ace', hits=31, size=827168),
 Row(project='af', title='1859', hits=4, size=219540),
 Row(project='af', title='18_Oktober', hits=4, size=264724),
 Row(project='af', title='1941', hits=4, size=256344),
 Row(project='af', title='2016', hits=5, size=215498),
 Row(project='af', title='4_Januarie', hits=4, size=268828),
 Row(project='af', title='Afrika-unie', hits=1, size=172078),
 Row(project='af', title='Big_Ben', hits=13, size=136201),
 Row(project='af', title='Comrades-maraton', hits=1, size=155180)]

# **Question n°7**

In [245]:
log_RDD_HitsPerProject = log_RDD.map(lambda x : (x[0], x[2])).reduceByKey(lambda x, y : x+y).sortBy(lambda row : row[1], ascending=False)

In [246]:
log_RDD_HitsPerProject.take(10)

[('en.mw', 5466346),
 ('en', 4959090),
 ('es.mw', 695531),
 ('ja.mw', 611443),
 ('de.mw', 572119),
 ('fr.mw', 536978),
 ('ru.mw', 466742),
 ('it.mw', 400297),
 ('de', 315929),
 ('commons.m', 285796)]

In [247]:
log_RDD_HitsPerPage = log_RDD.map(lambda x : (x[1], x[2])).reduceByKey(lambda x, y : x+y).sortBy(lambda row : row[1], ascending=False)

In [248]:
log_RDD_HitsPerPage.take(5)

[('en', 5466350),
 ('es', 695535),
 ('ja', 611451),
 ('de', 572129),
 ('fr', 536978)]

# **Question n°8**

In [249]:
##### Create the cleaning function

def cleaningFunc(text):
  text = text.lower()
  translator = text.maketrans(string.punctuation,"_" * len(string.punctuation))
  text = text.translate(translator)
  text = text.split("_")
  return text

In [250]:
tst = "Requests_for.new:languages/Wikipedia-Banyumasan"
print(cleaningFunc(tst))

['requests', 'for', 'new', 'languages', 'wikipedia', 'banyumasan']


In [251]:
log_RDD_PageName2 = log_RDD.flatMap(lambda x : cleaningFunc(x[1])) #unfold the table

log_RDD_PageNameWordOnly = log_RDD_PageName2.filter(lambda x: x.isalpha()).map(lambda x : (x, 1))

log_RDD_PageNameWithOccurences = log_RDD_PageNameWordOnly.reduceByKey(lambda x, y : x+y)

log_RDD_Page_UniqueWord = log_RDD_PageNameWithOccurences.filter(lambda x : x[1]==1).map(lambda x : x[0])

log_RDD_Page_UniqueWord.take(10)

['ayeuen',
 'jeureuman',
 'blankmapturkeyprovinces',
 'closeter',
 'mmfe',
 'syaitan',
 'seisoen',
 'kampioenskap',
 'drienasiesreeks',
 'mosambiekse']

# **Question n°9**

In [252]:
log_TitleName_Ordered = log_RDD_PageNameWithOccurences.sortBy(lambda row : row[1], ascending=False)

log_TitleName_Ordered.first()

('special', 253531)

---
# **Exercise n°2: Query Web Logs with Spark SQL**





In [253]:
wiki_DF.show()

+-------+--------------------+---------+---------+
|Project|          Page_title|Page_hits|Page_size|
+-------+--------------------+---------+---------+
|     aa|             271_a.C|        1|     4675|
|     aa|    Category:User_th|        1|     4770|
|     aa|  Chiron_Elias_Krase|        1|     4694|
|     aa|    Dassault_rafaele|        2|     9372|
|     aa|              E.Desv|        1|     4662|
|     aa|File:Wiktionary-l...|        1|    10752|
|     aa|Indonesian_Wikipedia|        1|     4679|
|     aa|           Main_Page|        5|   266946|
|     aa|Requests_for_new_...|        1|     4733|
|     aa|Special:Contribut...|        1|     5812|
|     aa|Special:Contribut...|        1|     5805|
|     aa|Special:Contribut...|        1|     5808|
|     aa|Special:Contribut...|        1|     5812|
|     aa|Special:ListFiles...|        1|     5035|
|     aa|Special:ListFiles...|        1|     5036|
|     aa|Special:ListFiles...|        1|     5032|
|     aa|Special:Log/Md._F...| 

# **Question n°3 bis**

In [254]:
wiki_DF.select(max("Page_size")).show()

+--------------+
|max(Page_size)|
+--------------+
|  141180155987|
+--------------+



In [255]:
wiki_DF.select(min("Page_size")).show()

+--------------+
|min(Page_size)|
+--------------+
|             0|
+--------------+



In [256]:
wiki_DF.select(mean("Page_size")).show()

+------------------+
|    avg(Page_size)|
+------------------+
|132239.56957446598|
+------------------+



# **Question n°5 bis**

In [257]:
maxDF = wiki_DF.select(max("Page_size")).collect()[0][0]
minDF = wiki_DF.select(min("Page_size")).collect()[0][0]
meanDF = wiki_DF.select(mean("Page_size")).collect()[0][0]

In [258]:
wiki_DF_MaxPages = wiki_DF.filter(wiki_DF.Page_size==maxDF)

wiki_DF_MaxPages.show()

+-------+----------+---------+------------+
|Project|Page_title|Page_hits|   Page_size|
+-------+----------+---------+------------+
|  en.mw|        en|  5466346|141180155987|
+-------+----------+---------+------------+



In [259]:
maxHitsDF = wiki_DF.select(max("Page_hits")).collect()[0][0]

print(maxHitsDF)

5466346


In [260]:
wiki_DF_MaxHits = wiki_DF.filter(wiki_DF.Page_hits==maxHitsDF)

wiki_DF_MaxHits.show()

+-------+----------+---------+------------+
|Project|Page_title|Page_hits|   Page_size|
+-------+----------+---------+------------+
|  en.mw|        en|  5466346|141180155987|
+-------+----------+---------+------------+



In [261]:
wiki_DF_MinPages = wiki_DF.filter(wiki_DF.Page_size==minDF)

wiki_DF_MinPages.show()

+-------+--------------------+---------+---------+
|Project|          Page_title|Page_hits|Page_size|
+-------+--------------------+---------+---------+
|     af|                1337|        1|        0|
|     af|                1433|        1|        0|
|     af|                1498|        1|        0|
|     af|                1577|        1|        0|
|     af|                1864|        1|        0|
|     af|                 689|        1|        0|
|     af|    Clifton-hangbrug|        1|        0|
|     af|      Die_Transvaler|        1|        0|
|     af|     Griekse_oergode|        1|        0|
|     af| Kategorie:22ste_eeu|        1|        0|
|     af|Kategorie:Geograf...|        2|        0|
|     af|Kategorie:Middeld...|        1|        0|
|     af|  Kategorie:Politiek|        2|        0|
|     af|             Lapland|        2|        0|
|     af|             Oktober|        1|        0|
|     af|Sjabloon:Susterpr...|        2|        0|
|     af|             Skrywer| 

# **Question n°6 bis**

In [262]:
wiki_DF_AboveAverage = wiki_DF.filter(wiki_DF.Page_size>meanDF)

In [263]:
wiki_DF_AboveAverage.show(10)

+-------+----------------+---------+---------+
|Project|      Page_title|Page_hits|Page_size|
+-------+----------------+---------+---------+
|     aa|       Main_Page|        5|   266946|
| ace.mw|             ace|       31|   827168|
|     af|            1859|        4|   219540|
|     af|      18_Oktober|        4|   264724|
|     af|            1941|        4|   256344|
|     af|            2016|        5|   215498|
|     af|      4_Januarie|        4|   268828|
|     af|     Afrika-unie|        1|   172078|
|     af|         Big_Ben|       13|   136201|
|     af|Comrades-maraton|        1|   155180|
+-------+----------------+---------+---------+
only showing top 10 rows



# **Question n°8 bis**

In [264]:
# define with udf a function applicable on a dataframe
clean_udf = udf(cleaningFunc, ArrayType(StringType()))

# we apply our new cleaning function to the title_page column
cleaned_DF = wiki_DF.withColumn("words", clean_udf(wiki_DF["Page_title"]))

# explode: equivalent to flatMap but for dataframe
exploded_DF = cleaned_DF.select(explode(cleaned_DF["words"]).alias("word"))

# we filter to keep only lowercase words
filtered_DF = exploded_DF.filter(regexp_extract(exploded_DF["word"], r"[a-z]+", 0) != "")

# only words that appear once are kept
unique_words_DF = filtered_DF.select("word").distinct()

# on affiche
unique_words_DF.show()

+----------+
|      word|
+----------+
|    trotus|
|     oscar|
|    harder|
|    morant|
|  cheyenne|
|    petrie|
|     pools|
|  guernsey|
|   serebro|
|     fijru|
|    toegra|
|     turks|
|    welkom|
|       art|
|  heinrich|
|      elsa|
|     monte|
|occidental|
|   familia|
|shevchenko|
+----------+
only showing top 20 rows



# **Bonus : Question n°9 bis**

In [265]:
# count() creates a new "count" column with the number of repetitions of the words in the "word" column
count_DF = filtered_DF.groupBy("word").count()

# we order in descending order of the count and display the first line
most_frequent_word = count_DF.orderBy(desc("count")).first()

print(most_frequent_word)

Row(word='special', count=253531)
