In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=46b7dc06b02bb5a2658a0c11d905b1a646642eac88297c49d314656ef5605c7a
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


# Query 1 both Map-Reduce Paradigm and Sparks Loop

In [None]:
from pyspark import SparkContext, SparkConf

# Initialize SparkContext
conf = SparkConf().setAppName("Wikimedia")
sc = SparkContext(conf=conf)

# Load data
data = sc.textFile("/content/pagecounts-20160101-000000_parsed.out")

# Parse the data and create an RDD with (page size, 1) pairs
def parse_page_size(line):
    try:
        # Assuming the page size is the fourth field in your data
        return float(line.split()[3])
    except (ValueError, IndexError):
        # Handle cases where conversion to float fails or the index is out of range
        return None

def parse_codes_titles(line):
    try:
        fields = line.split()
        return fields[0], fields[1]
    except IndexError:
        # Handle cases where the index is out of range
        return None

original_data = data

page_sizes = data.map(parse_page_size).filter(lambda x: x is not None)

# Define custom min and max functions
def custom_min(x, y):
    return x if x < y else y

def custom_max(x, y):
    return x if x > y else y

# Query 1: Map-Reduce Paradigm
# Use reduce with custom min and max functions
min_size_mr = page_sizes.reduce(custom_min)
max_size_mr = page_sizes.reduce(custom_max)

# Calculate average
total_size_mr = page_sizes.reduce(lambda x, y: x + y)
average_size_mr = total_size_mr / page_sizes.count()

# Print or store results for Map-Reduce Paradigm
results_mr = [
    ("Min Size (Map-Reduce):", min_size_mr),
    ("Max Size (Map-Reduce):", max_size_mr),
    ("Average Size (Map-Reduce):", average_size_mr)
]

# Display results for Map-Reduce Paradigm
for result in results_mr:
    print(result[0], result[1])

# Sparks Loops

# Initialize variables for min, max, and sum
min_size_loops = float('inf')
max_size_loops = float('-inf')
total_size_loops = 0

# Query 1: Spark Loops
# Iterate over the original data using a loop with custom min and max functions
for line in original_data.collect():
    size = parse_page_size(line)
    if size is not None:
        min_size_loops = custom_min(min_size_loops, size)
        max_size_loops = custom_max(max_size_loops, size)
        total_size_loops += size

# Calculate average
average_size_loops = total_size_loops / original_data.count()

# Print or store results for Spark Loops
results_loops = [
    ("Min Size (Spark Loops):", min_size_loops),
    ("Max Size (Spark Loops):", max_size_loops),
    ("Average Size (Spark Loops):", average_size_loops)
]

# Display results for Spark Loops
for result in results_loops:
    print(result[0], result[1])

# Documenting results
with open("/content/results_document.txt", "w") as file:
    file.write("Results for Map-Reduce Paradigm Query 1:\n")
    for result in results_mr:
        file.write("{} {}\n".format(result[0], result[1]))

    file.write("\nResults for Spark Loops Query 1:\n")
    for result in results_loops:
        file.write("{} {}\n".format(result[0], result[1]))

Py4JError: ignored

# Query 2 both Map-Reduce Paradigm and Sparks Loop

In [None]:
# Query 2: Map-Reduce Paradigm
# Filter page titles that start with "The"

titles = data.map(parse_codes_titles).filter(lambda x: (x[0], x[1]) is not None)

the_titles_mr = titles.filter(lambda x: x[1].startswith("The"))

# Check if the RDD is not empty before performing further operations
if not the_titles_mr.isEmpty():
    # Count the number of titles that start with "The" and are not part of the English project
    count_the_titles_mr = the_titles_mr.map(lambda x: 1).reduce(lambda x, y: x + y)

    # Filter non-English titles
    non_english_the_titles_mr = the_titles_mr.filter(lambda x: x[0] != "en")

    # Use reduce to aggregate the count
    count_non_english_the_titles_mr = non_english_the_titles_mr.map(lambda x: 1).reduce(lambda x, y: x + y)

    # Display results for Map-Reduce Paradigm
    print("Number of 'The' titles (Map-Reduce):", count_the_titles_mr)
    print("Number of 'The' titles not part of the English project (Map-Reduce):", count_non_english_the_titles_mr)

    with open("/content/results_document.txt", "a") as file:
        file.write("\nResults for Map-Reduce Paradigm Query 2:\n")
        file.write("\nNumber of 'The' titles : {}\n".format(count_the_titles_mr))
        file.write("\nNumber of 'The' titles not part of the English project: {}\n".format(count_non_english_the_titles_mr))
else:
    print("No titles start with 'The' in the provided data.")


# Sparks Loops

# Query 2: Spark Loops
# Initialize variables
count_non_english_the_titles_loops = 0
count_the_titles_loops = 0


# Loop over the titles using a loop
for line in original_data.collect():
    title = parse_codes_titles(line)
    if title is not None:
      if title[1].startswith("The"):
        count_the_titles_loops += 1
        if title[0] != "en":
          count_non_english_the_titles_loops += 1

# Display result for Spark Loops
print("Number of 'The' titles(Spark Loops):", count_the_titles_loops)
print("Number of 'The' titles not part of the English project (Spark Loops):", count_non_english_the_titles_loops)
with open("/content/results_document.txt", "a") as file:
    file.write("\nResults for Spark Loops Query 2:\n")
    file.write("\nNumber of 'The' titles : {}\n".format(count_the_titles_loops))
    file.write("\nNumber of 'The' titles not part of the English project: {}\n".format(count_non_english_the_titles_loops))


Number of 'The' titles (Map-Reduce): 1541
Number of 'The' titles not part of the English project (Map-Reduce): 1541
Number of 'The' titles(Spark Loops): 1541
Number of 'The' titles not part of the English project (Spark Loops): 1541


# Query 3 both Map-Reduce Paradigm and Sparks Loop

In [None]:
# Query 3: Map-Reduce Paradigm

# Extract terms from page titles and normalize (e.g., lowercase)
def extract_and_normalize_terms(title):
    try:
        # Split title by "_", convert to lowercase, and remove non-alphanumeric characters
        terms = [term.lower().strip(".,?!") for term in title.split("_")]
        return terms
    except AttributeError:
        # Handle cases where the title is None
        return []

# Extract and normalize terms from page titles
terms_rdd = titles.flatMap(lambda x: extract_and_normalize_terms(x[1]))

# Count the number of unique terms
unique_terms_count_mr = terms_rdd.distinct().map(lambda x: 1).reduce(lambda x, y: x + y)

# Print the result
print("Number of unique terms in page titles:", unique_terms_count_mr)
unique_terms_count = 0
unique_terms_set = set()
# Query 3: Spark Loops Paradigm

# Loop over the titles using a Spark loop
for line in original_data.collect():
    title = parse_codes_titles(line)
    if title is not None:
        terms = extract_and_normalize_terms(title[1])
        if terms is not None:
            # Update the set with unique terms
            unique_terms_set.update(terms)

# Calculate the number of unique terms
unique_terms_count = len(unique_terms_set)

print("Number of unique terms in page titles(Spark Loops):", unique_terms_count)



Number of unique terms in page titles: 471446
Number of unique terms in page titles(Spark Loops): 471446


# Query 4: Map-Reduce Paradigm


In [None]:
# Query 4: Map-Reduce Paradigm
# Parse titles from data RDD
titles = data.map(parse_codes_titles).filter(lambda x: x[1] is not None)

# Map each title to (title, 1) for counting
title_counts = titles.map(lambda x: (x[1], 1))

# Reduce by key to get the count for each title
title_counts = title_counts.reduceByKey(lambda x, y: x + y)

# Display results
for result in title_counts.collect():
    print(result[0], result[1])

with open("/content/َQuery4_results_document.txt", "w") as file:
    file.write("\nResults for Map-Reduce Paradigm Query 4:\n")
    for result in title_counts.collect():
        file.write("{} {}\n".format(result[0], result[1]))

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Titisee-Neustadt 1
Tito 1
Tito_Larriva 1
Titoismus 1
Titration 1
Tittmoning 1
Titularnation 1
Titulus 1
Titus_Dittmann 1
Titus_Wouda_Kuipers 1
Titz 1
Tivoli_(Gotha) 1
Tiwaz 1
Tiznit 1
Tjaschinski 1
Tjati 1
Tjeker 1
Tjeld-Klasse 1
Tkinter 1
Tlahuizcalpantecutli 1
Tlaxcala_(Bundesstaat) 1
Tlokweng 1
To-do-Liste 1
To_Be_Contemptuous 1
To_Catch_a_Predator 1
To_Potami 1
To_Rome_With_Love 1
To_Save_a_Life 1
Toad_in_the_hole 1
Toast_Hawai 1
Toast_Hawaii 1
Toaster 1
Toasting 1
Toba-Katastrophentheorie 1
Tobago 1
Tobaj 1
Tobe_Hooper 1
Toben 1
Tobi_Schlegl 1
Tobia_Aun 1
Tobias_Furer 1
Tobias_Gravenhorst 1
Tobias_Kluckert 1
Tobias_Licht 1
Tobias_Menzies 1
Tobias_Moorstedt 1
Tobias_Moretti 1
Tobias_Nath 1
Tobias_Oertel 1
Tobias_Regner 1
Tobias_Reichmann 1
Tobias_Reitz 1
Tobias_Schenke 1
Tobias_Stephan 1
Tobin_Bell 1
Toblacher_Feld 1
Toblerone 1
Toby_Jones 1
Toby_Leonard_Moore 1
Toby_Purnell 1
Toby_Stephens 1
Toccata 1
Toccata_und_Fug

# Query 4: Sparks Loop

In [None]:
# Sparks Loops

# Query 4: Spark Loops
# Initialize variables
title_counts = {}

# Loop over the titles using a Spark loop
for line in original_data.collect():
    title = parse_codes_titles(line)
    if title is not None:
        title_str = title[1]
        if title_str in title_counts:
            title_counts[title_str] += 1
        else:
            title_counts[title_str] = 1

# Display results
for title, count in title_counts.items():
    print(title, count)

# Write results to a file
with open("/content/َQuery4_results_document.txt", "a") as file:
    file.write("\nResults for Spark Loops Query 4:\n")
    for title, count in title_counts.items():
        file.write("{} {}\n".format(title, count))

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Venkatraman_Ramakrishnan 1
Venla 1
Venla_Niemi 1
Venlafaxin 1
Venlojen_viesti 1
Vennbahn_(Radweg) 1
Vennes 1
Venohr 1
Venole 1
Venom_(Band) 1
Vensys_Energy 1
Vente-privee.com 1
Ventes_immobilires_les_prix_se_ngocient_moins 1
Ventilator 1
Ventilposaune 1
Ventosa_(Candamo) 1
Ventotene 1
Ventry 1
Ventschow 1
Ventura_County 1
Venturi_300_Atlantique 1
Venturi_Automobiles 1
Venus_(Mythologie) 1
Venus_(Planet) 1
Venus_(Shocking-Blue-Lied) 1
Venus_Berlin 1
Venus_im_Pelz_(2013) 1
Venus_in_Seide 1
Venus_und_Adonis 1
Venus_von_Bierden 1
Venus_von_Laussel 1
Venusberg_(Erzgebirge) 1
Venusfliegenfalle 1
Venuskolonisation 1
Venzone 1
Veolia_Environnement 1
Veolia_Transport 1
VeraCrypt 1
Vera_Balser-Eberle 1
Vera_Cruz 1
Vera_Dillier 1
Vera_F._Birkenbihl 1
Vera_Int-Veen 1
Vera_Lengsfeld 1
Vera_Lynn 1
Vera_Menchik 1
Vera_Romanowa 1
Vera_Russwurm 1
Vera_Tschechow 1
Vera_Tschechowa 1
Vera_Viktorowna_Galuschka 1
Verachtung_(Roman) 1
Veracruz_

# Query 5: Map-Reduce Paradigm

In [None]:
def parse_pages(line):
    parts = line.split(" ", 2)
    if len(parts) == 4:
        return parts[1], [parts[0], parts[1], parts[2]]
    return None

data = sc.textFile("/content/pagecounts-20160101-000000_parsed.out")

# Use map to parse pages and filter out invalid ones
parsed_data = data.map(parse_pages).filter(lambda x: x is not None)

# Group pages by title
grouped_pages = parsed_data.groupByKey()

# Display results
with open("/content/results_document.txt", "a") as file:
    file.write("Results for Map-Reduce Paradigm Query 5:\n")
    for result in grouped_pages.collect():
        title = result[0]
        data_list = list(result[1])

        # Check if there are at least two elements in the list
        if len(data_list) >= 2:
            for data1, data2 in zip(data_list, data_list[1:]):
                if len(data1) >= 4 and len(data2) >= 4:
                    page1, project1, hits1, size1 = data1
                    page2, project2, hits2, size2 = data2

                    # Print pairs
                    print("Same Page Title: {}".format(title))
                    print("Pages Code: ({}, {})".format(page1, page2))
                    print("Page Hits: ({}, {})".format(hits1, hits2))
                    print("Page Size: ({}, {})".format(size1, size2))
                    print("----")

                    # Print pairs to file
                    file.write("Same Page Title: {}\n".format(title))
                    file.write("Pages Code: ({}, {})\n".format(page1, page2))
                    file.write("Page Hits: ({}, {})\n".format(hits1, hits2))
                    file.write("Page Size: ({}, {})\n".format(size1, size2))

                    # Add a separator for better readability
                    file.write("----\n")

# Print a separator for better visibility
print("=====")


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

# Query 5: Sparks Loop

In [None]:
# Initialize variables
grouped_pages = {}
repeated_titles ={}
data = sc.textFile("/content/pagecounts-20160101-000000_parsed.out")

# Loop over the data using a regular loop
for line in data.collect():
    parts = line.split(" ", 3)  # Split into four parts
    if len(parts) == 4:
        title = parts[1]
        page_data = [title, parts[0], parts[2], parts[3]]

        # Group pages by title
        if title in grouped_pages:
            grouped_pages[title].append(page_data)
        else:
            grouped_pages[title] = [page_data]

for title, pages in grouped_pages.items():
  if len(pages)>1:
    repeated_titles[title] = [page_data]
print(repeated_titles)

# # Display results
# with open("/content/results_document.txt", "a") as file:
#     file.write("Results for Regular Loop Query 5:\n")
#     for title, pages in grouped_pages.items():
#         print("Page Title: {}\n".format(title))
#         file.write("Page Title: {}\n".format(title))

#         # Loop over pairs of pages with the same title
#         i = 0
#         for i in range(0, len(pages) - 1, 2):
#             # Print the organized pairs
#             print("Pages Code: ({}, {})\n".format(pages[i][1], pages[i + 1][1]))
#             print("Page Hits: ({}, {})\n".format(pages[i][2], pages[i + 1][2]))
#             print("Page Size: ({}, {})\n".format(pages[i][3], pages[i + 1][3]))
#             print("----\n")

#             # Write to file
#             file.write("Pages Code: ({}, {})\n".format(pages[i][1], pages[i + 1][1]))
#             file.write("Page Hits: ({}, {})\n".format(pages[i][2], pages[i + 1][2]))
#             file.write("Page Size: ({}, {})\n".format(pages[i][3], pages[i + 1][3]))
#             file.write("----\n")

#         # Print the organized pair for the last element if there is an odd number of pages
#         if len(pages) % 2 != 0 and len(pages)>1:
#             print("Pages Code: ({}, {})\n".format(pages[i][1], pages[i][1]))
#             print("Page Hits: ({}, {})\n".format(pages[i][2], pages[i][2]))
#             print("Page Size: ({}, {})\n".format(pages[i][3], pages[i][3]))
#             print("----\n")

#             # Write to file
#             file.write("Pages Code: ({}, {})\n".format(pages[i][1], pages[i][1]))
#             file.write("Page Hits: ({}, {})\n".format(pages[i][2], pages[i][2]))
#             file.write("Page Size: ({}, {})\n".format(pages[i][3], pages[i][3]))
#             file.write("----\n")


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [None]:
print(repeated_titles)


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [None]:
grouped_pages

{'271_a.C': [['271_a.C', 'aa', '1', '4675'],
  ['271_a.C', 'az', '1', '6356'],
  ['271_a.C', 'bcl', '1', '5068'],
  ['271_a.C', 'be', '1', '6287']],
 'Category:User_th': [['Category:User_th', 'aa', '1', '4770'],
  ['Category:User_th', 'commons.m', '1', '0']],
 'Chiron_Elias_Krase': [['Chiron_Elias_Krase', 'aa', '1', '4694'],
  ['Chiron_Elias_Krase', 'az', '1', '6374'],
  ['Chiron_Elias_Krase', 'bg', '1', '7468'],
  ['Chiron_Elias_Krase', 'cho', '1', '4684'],
  ['Chiron_Elias_Krase', 'dz', '1', '5435'],
  ['Chiron_Elias_Krase', 'it', '1', '5929']],
 'Dassault_rafaele': [['Dassault_rafaele', 'aa', '2', '9372'],
  ['Dassault_rafaele', 'en', '1', '6649'],
  ['Dassault_rafaele', 'it', '1', '5919']],
 'E.Desv': [['E.Desv', 'aa', '1', '4662'],
  ['E.Desv', 'arc', '1', '5210'],
  ['E.Desv', 'ast', '1', '4825'],
  ['E.Desv', 'fiu-vro', '1', '5237'],
  ['E.Desv', 'fr', '1', '7057'],
  ['E.Desv', 'ik', '1', '4548']],
 'File:Wiktionary-logo-en.png': [['File:Wiktionary-logo-en.png',
   'aa',
   '1'