# Preparation for the moodle exercise in Spark

In this jupyter notebook we are going to make the preprocessing part of the dataset that is going to be used in the graded exercise of this week.
1. Change to exercise08 repository

2. Start docker <br>
docker-compose up -d

3. Getting the data:
Follow the procedure that is described below. The dataset can be found here: http://data.greatlanguagegame.com.s3.amazonaws.com/confusion-2014-03-02.tbz2. More specifically do the following:
- download the data      :<br> ```wget http://data.greatlanguagegame.com.s3.amazonaws.com/confusion-2014-03-02.tbz2```
- extract the data       :<br> ```tar -jxvf confusion-2014-03-02.tbz2```

4. Go into the folder of the dataset:  ```cd confusion-2014-03-02```

5. Extract the part of the dataset that we will work with in this exercise: ```head -n 3000000 confusion-2014-03-02.json > confusion-part.json```


For more information about the dataset, you can refer to https://lars.yencken.org/datasets/great-language-game/

## Preprocessing commands
In your newly created notebook run these commands in order to have the dataset into an RDD:

In [1]:
import json
from pyspark.sql import SparkSession
from pyspark import SparkConf

spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

path = "confusion-2014-03-02/confusion-part.json"
raw_data = sc.textFile(path)
dataset = raw_data.map(json.loads).cache()

After that you will be able to run the queries of the moodle question of this week. The RDD that you have to perform your queries on is the ```dataset``` one. For example, the following command returns one element of the dataset:

In [2]:
dataset.take(1)

[{'guess': 'Norwegian',
  'target': 'Norwegian',
  'country': 'AU',
  'choices': ['Maori', 'Mandarin', 'Norwegian', 'Tongan'],
  'sample': '48f9c924e0d98c959d8a6f1862b3ce9a',
  'date': '2013-08-19'}]

In [2]:
dataset.count()

3000000

In [11]:
# filter
task1 = dataset.filter(lambda i: i['guess'] == 'Norwegian' and i['target'] == 'Norwegian')
task1.count()

42245

In [44]:
task2 = dataset.map(lambda i: i['guess']).distinct()
task2.collect()
task2.count()

78

In [50]:
task3 = dataset.filter(lambda i: i['guess'] == i['target'])
task3b = task3.sortBy(lambda x: (x['target'], x['country'],  x['date']), ascending=(False, True, False))
task3b.take(10)


[{'guess': 'Albanian',
  'target': 'Albanian',
  'country': 'A1',
  'choices': ['Albanian', 'Macedonian'],
  'sample': '00b85faa8b878a14f8781be334deb137',
  'date': '2013-09-04'},
 {'guess': 'Albanian',
  'target': 'Albanian',
  'country': 'A1',
  'choices': ['Albanian', 'Bulgarian', 'Indonesian', 'Portuguese'],
  'sample': 'efcd813daec1c836d9f030b30caa07ce',
  'date': '2013-09-05'},
 {'guess': 'Albanian',
  'target': 'Albanian',
  'country': 'A1',
  'choices': ['Albanian', 'Hindi', 'Swahili'],
  'sample': '13722ceed1eede7ba597ade9b4cb9807',
  'date': '2013-09-08'},
 {'guess': 'Albanian',
  'target': 'Albanian',
  'country': 'A1',
  'choices': ['Albanian', 'Tamil'],
  'sample': 'efcd813daec1c836d9f030b30caa07ce',
  'date': '2013-09-08'},
 {'guess': 'Albanian',
  'target': 'Albanian',
  'country': 'A1',
  'choices': ['Albanian', 'Urdu'],
  'sample': 'efcd813daec1c836d9f030b30caa07ce',
  'date': '2013-09-08'},
 {'guess': 'Albanian',
  'target': 'Albanian',
  'country': 'A1',
  'choices':

In [22]:

# 1. Filter records where the guessed language is correct
correct_guess_rdd = dataset.filter(lambda record: record['guess'] == record['target'])

# 2. Sort the filtered records by target language (descending), then by country (ascending), and date (descending)
sorted_records = correct_guess_rdd.sortBy(lambda record: (record['target'], record['country'], record['date']), ascending=[False, True, False])

# 3. Take the top record and extract the sample ID
top_record = sorted_records.first()
top_sample_id = top_record['sample']

# Print the result
print(f"Sample ID of the top game where the guessed language is correct: {top_sample_id}")


Sample ID of the top game where the guessed language is correct: 00b85faa8b878a14f8781be334deb137


In [6]:
task4 = dataset.map(lambda x: ((x['country'], x['guess']), x['sample'])).groupByKey().mapValues(len)
task4TopTwo = task4.takeOrdered(2, key=lambda x: -x[1])
task4TopTwo

[(('US', 'German'), 20932), (('US', 'French'), 20780)]

In [4]:
mapped_rdd = dataset.map(lambda record: ((record['country'], record['guess']), 1))

# Reduce by key to count the occurrences of each key
counted_rdd = mapped_rdd.reduceByKey(lambda x, y: x + y)

# Sort the results in descending order based on the count
sorted_rdd = counted_rdd.sortBy(lambda x: x[1], ascending=False)

# Take the top two elements
top_two = sorted_rdd.take(2)

# Extract the counts from the result
count1, count2 = [count for (key, count) in top_two]

# Print the result
print(f"{count1},{count2}")

20932,20780


In [11]:
task5a = dataset.filter(lambda i: i['guess'] == i['target'] and i['choices'][0] == i['target']).count()
task5b = dataset.count()
print(task5a/task5b)

0.25385233333333335


In [10]:
# 1. Filter records where the guess is correct
correct_guess_rdd = dataset.filter(lambda record: record['guess'] == record['target'])

# 2. Further filter records where the correct guess is the first choice
correct_first_choice_rdd = correct_guess_rdd.filter(lambda record: record['guess'] == record['choices'][0])

# 3. Calculate the percentage
total_games = dataset.count()
correct_first_choice_percentage = (correct_first_choice_rdd.count() / total_games)

# Print the result
correct_first_choice_percentage


0.25385233333333335

In [38]:
task6b = dataset.map(lambda x: (x['target'], x['guess'])).foldByKey((0,0), lambda x: if x[0] == x[1])
task6b.take(10)

[('Maltese', <pyspark.resultiterable.ResultIterable at 0x7f797c3357f0>),
 ('Tagalog', <pyspark.resultiterable.ResultIterable at 0x7f797c335ac0>),
 ('Danish', <pyspark.resultiterable.ResultIterable at 0x7f797c335700>),
 ('Gujarati', <pyspark.resultiterable.ResultIterable at 0x7f797c335940>),
 ('Hungarian', <pyspark.resultiterable.ResultIterable at 0x7f797c335a30>),
 ('Somali', <pyspark.resultiterable.ResultIterable at 0x7f797c277700>),
 ('Khmer', <pyspark.resultiterable.ResultIterable at 0x7f797c277790>),
 ('Latvian', <pyspark.resultiterable.ResultIterable at 0x7f797c2776d0>),
 ('Bulgarian', <pyspark.resultiterable.ResultIterable at 0x7f797c277820>),
 ('Slovak', <pyspark.resultiterable.ResultIterable at 0x7f797c277670>)]

In [41]:
# Filter records where 'guess' is equal to 'target'
correct_guesses = dataset.filter(lambda x: x['guess'] == x['target'])

# Count the number of correct guesses for each language
correct_guess_count = correct_guesses.map(lambda x: (x['guess'], 1)).reduceByKey(lambda a, b: a + b)

# Count the total number of guesses for each language
total_guess_count = dataset.map(lambda x: (x['guess'], 1)).reduceByKey(lambda a, b: a + b)

# Calculate the percentage of correct guesses for each language
percentage_correct = correct_guess_count.join(total_guess_count).map(lambda x: (x[0], x[1][0] / x[1][1]))

# Sort the languages by decreasing overall percentage of correct guesses
sorted_percentage = percentage_correct.sortBy(lambda x: x[1], ascending=True)

# Get the last three languages
last_three_languages = sorted_percentage.take(3)

# Print the result
for language, percentage in last_three_languages:
    print(f"Language: {language}, Percentage of Correct Guesses: {percentage}")

Language: South Efate, Percentage of Correct Guesses: 0.36936936936936937
Language: Hausa, Percentage of Correct Guesses: 0.390007745933385
Language: Shona, Percentage of Correct Guesses: 0.4151898734177215


In [16]:
correct_guess_rdd = dataset.filter(lambda record: record['guess'] == record['target'])

# 2. Calculate the overall percentage of correct guesses for each language
language_correct_counts = correct_guess_rdd.map(lambda record: (record['guess'], 1)).reduceByKey(lambda x, y: x + y)

language_total_counts = dataset.map(lambda record: (record['guess'], 1)).reduceByKey(lambda x, y: x + y)

language_percentages = language_correct_counts.join(language_total_counts).map(lambda x: (x[0], x[1][0] / x[1][1] * 100))

# 3. Sort the languages by decreasing overall percentage
sorted_languages = language_percentages.sortBy(lambda x: x[1], ascending=True)

# 4. Take the last three languages
last_three_languages = sorted_languages.take(3)

# Extract the languages from the result
language1, language2, language3 = [language for (language, percentage) in last_three_languages]

# Print the result
print(f"{language1},{language2},{language3}")

South Efate,Hausa,Shona


In [21]:
task7a = dataset.map(lambda x: x['date']).distinct().sortBy(lambda x: (x), ascending=(False))
task7a.collect()

['2013-09-09',
 '2013-09-08',
 '2013-09-07',
 '2013-09-06',
 '2013-09-05',
 '2013-09-04',
 '2013-09-03',
 '2013-09-02',
 '2013-09-01',
 '2013-08-31',
 '2013-08-29',
 '2013-08-28',
 '2013-08-27',
 '2013-08-26',
 '2013-08-25',
 '2013-08-24',
 '2013-08-23',
 '2013-08-22',
 '2013-08-21',
 '2013-08-20',
 '2013-08-19']

In [33]:
task7b = dataset.filter(lambda x : x['date'] == '2013-09-08').count()
task7b

430894

In [19]:

# 1. Find distinct dates and sort them in descending order based on the string
distinct_dates = dataset.map(lambda record: record['date']).distinct().sortBy(lambda x: x, ascending=False)

# 2. Take the second date in the sorted list
second_to_latest_date = distinct_dates.take(2)[-1]

# 3. Filter the RDD to include only records with that date
games_on_second_to_latest_day = dataset.filter(lambda record: record['date'] == second_to_latest_date)

# 4. Count the number of games played on the second-to-latest day
number_of_games = games_on_second_to_latest_day.count()

# Print the result
print(f"Number of games played on the second-to-latest day: {number_of_games}")


Number of games played on the second-to-latest day: 430894
