In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import  pyspark.sql.functions as fun 
import numpy as np
from pyspark.sql.types import *

In [5]:
spark = SparkSession.builder.getOrCreate()

In [4]:
sc = spark.sparkContext

Create an RDD from a list of numbers (1,50) using numpy methods

In [13]:
rdd1 = sc.parallelize(np.arange(1, 50))
rdd1.collect()

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49]

Find the sum, average, maximum, minimum, and count

In [17]:
#Sum
sum_rdd = rdd1.sum()
sum_rdd

1225

In [19]:
#avg
avg_rdd = rdd1.mean()
avg_rdd

25.0

In [20]:
#Count
count_rdd = rdd1.count()
count_rdd

49

In [21]:
#Min
min_rdd = rdd1.min()
min_rdd

1

In [22]:
#Max
max_rdd = rdd1.max()
max_rdd

49

Count how many numbers are even vs. odd.

In [24]:
even_count = rdd1.filter(lambda x : x % 2 == 0).count()
odd_count = rdd1.count() - even_count
even_count, odd_count
print(f"Even count: {even_count}, Odd count: {odd_count}")

Even count: 24, Odd count: 25


You have the following data of people info ('Name', 'Age'), answer the following questions

In [26]:
people_data = [("Nada ", 25), ("Mona", 30), ("Ahmed", 35), ("Khaled", 40),("Ahmed", 35), ('Nada ', 25)]
rdd_people = sc.parallelize(people_data)
rdd_people.collect()

[('Nada ', 25),
 ('Mona', 30),
 ('Ahmed', 35),
 ('Khaled', 40),
 ('Ahmed', 35),
 ('Nada ', 25)]

Find the oldest person

In [29]:
max_age = rdd_people.map(lambda x: x[1]).max()
oldest_person = rdd_people.filter(lambda x : x[1] == max_age).collect()
oldest_person

[('Khaled', 40)]

Compute the average age

In [31]:
avg_age = rdd_people.map(lambda x: x[1]).mean()
avg_age

31.666666666666668

Group all the names by their age

In [35]:
grouped_by_age = rdd_people.groupBy(lambda x: x[1]).mapValues(list).collect()
grouped_by_age

[(40, [('Khaled', 40)]),
 (25, [('Nada ', 25), ('Nada ', 25)]),
 (35, [('Ahmed', 35), ('Ahmed', 35)]),
 (30, [('Mona', 30)])]

Take the following text and put it in a text file named russia.txt and load it into rdd

"Russia is the largest country in the world by land area
Moscow is the capital city of Russia
The Russian language is one of the most widely spoken languages in the world
Russia is known for its rich history and culture
The Trans-Siberian Railway is the longest railway line in the world
Russia has a strong tradition in literature, music and ballet
The country is famous for its cold winters and vast landscapes
Russia is a major player in global energy production
"

In [None]:
rdd_russia = sc.textFile("/shared/russia.txt")
rdd_russia.collect()

['Russia is the largest country in the world by land area',
 'Moscow is the capital city of Russia',
 'The Russian language is one of the most widely spoken languages in the world',
 'Russia is known for its rich history and culture',
 'The Trans-Siberian Railway is the longest railway line in the world',
 'Russia has a strong tradition in literature, music and ballet',
 'The country is famous for its cold winters and vast landscapes',
 'Russia is a major player in global energy production']

Count the total number of lines.

In [50]:
count_lines = rdd_russia.count()
count_lines

8

Count how many lines contain the word "Russia"

In [55]:
count_russia = rdd_russia.flatMap(lambda x: x.split(" ")).filter(lambda x: x == "Russia").count()
count_russia

5

Find the most 5 frequent word in the file.

In [60]:
most_frequent_word = rdd_russia.flatMap(lambda x: x.split(" ")) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], ascending=False) \
    .take(5)
most_frequent_word

[('is', 7), ('the', 7), ('Russia', 5), ('in', 5), ('world', 3)]

Tokenize words

In [63]:
tokens = rdd_russia.flatMap(lambda x : x.split(" ")).collect()
tokens

['Russia',
 'is',
 'the',
 'largest',
 'country',
 'in',
 'the',
 'world',
 'by',
 'land',
 'area',
 'Moscow',
 'is',
 'the',
 'capital',
 'city',
 'of',
 'Russia',
 'The',
 'Russian',
 'language',
 'is',
 'one',
 'of',
 'the',
 'most',
 'widely',
 'spoken',
 'languages',
 'in',
 'the',
 'world',
 'Russia',
 'is',
 'known',
 'for',
 'its',
 'rich',
 'history',
 'and',
 'culture',
 'The',
 'Trans-Siberian',
 'Railway',
 'is',
 'the',
 'longest',
 'railway',
 'line',
 'in',
 'the',
 'world',
 'Russia',
 'has',
 'a',
 'strong',
 'tradition',
 'in',
 'literature,',
 'music',
 'and',
 'ballet',
 'The',
 'country',
 'is',
 'famous',
 'for',
 'its',
 'cold',
 'winters',
 'and',
 'vast',
 'landscapes',
 'Russia',
 'is',
 'a',
 'major',
 'player',
 'in',
 'global',
 'energy',
 'production']

Remove stopwords (a, the, is, to, in, of). 

In [64]:
stopwords = ["is", "the", "in", "a", "of"]
filtered_tokens = rdd_russia.flatMap(lambda x : x.split(" ")) \
    .filter(lambda x : x not in stopwords) \
    .collect() 
filtered_tokens

['Russia',
 'largest',
 'country',
 'world',
 'by',
 'land',
 'area',
 'Moscow',
 'capital',
 'city',
 'Russia',
 'The',
 'Russian',
 'language',
 'one',
 'most',
 'widely',
 'spoken',
 'languages',
 'world',
 'Russia',
 'known',
 'for',
 'its',
 'rich',
 'history',
 'and',
 'culture',
 'The',
 'Trans-Siberian',
 'Railway',
 'longest',
 'railway',
 'line',
 'world',
 'Russia',
 'has',
 'strong',
 'tradition',
 'literature,',
 'music',
 'and',
 'ballet',
 'The',
 'country',
 'famous',
 'for',
 'its',
 'cold',
 'winters',
 'and',
 'vast',
 'landscapes',
 'Russia',
 'major',
 'player',
 'global',
 'energy',
 'production']

Count the frequency of each word

In [66]:
words = rdd_russia.flatMap(lambda x: x.split(" ")) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], ascending=True) \
    .collect()
words

[('largest', 1),
 ('by', 1),
 ('land', 1),
 ('area', 1),
 ('capital', 1),
 ('language', 1),
 ('most', 1),
 ('widely', 1),
 ('known', 1),
 ('history', 1),
 ('Trans-Siberian', 1),
 ('Railway', 1),
 ('line', 1),
 ('literature,', 1),
 ('music', 1),
 ('famous', 1),
 ('cold', 1),
 ('winters', 1),
 ('landscapes', 1),
 ('player', 1),
 ('energy', 1),
 ('production', 1),
 ('Moscow', 1),
 ('city', 1),
 ('Russian', 1),
 ('one', 1),
 ('spoken', 1),
 ('languages', 1),
 ('rich', 1),
 ('culture', 1),
 ('longest', 1),
 ('railway', 1),
 ('has', 1),
 ('strong', 1),
 ('tradition', 1),
 ('ballet', 1),
 ('vast', 1),
 ('major', 1),
 ('global', 1),
 ('country', 2),
 ('of', 2),
 ('for', 2),
 ('its', 2),
 ('a', 2),
 ('world', 3),
 ('and', 3),
 ('The', 3),
 ('Russia', 5),
 ('in', 5),
 ('is', 7),
 ('the', 7)]

In [67]:
schema = 'id integer, name string, age integer, salary integer' 
data = [
    (1, "Ali", 25, 4000),
    (2, "Mariam", 30, 6000),
    (3, "Omar", 35, 7000),
    (4, "Sara", 28, 5000),
    (5, "Omar", 25, 6500),
    (6, "Mariam", 26, 7500)
]

df = spark.createDataFrame(data,schema)

Show schema and first 2 rows

In [69]:
df.printSchema()
df.take(2)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)



[Row(id=1, name='Ali', age=25, salary=4000),
 Row(id=2, name='Mariam', age=30, salary=6000)]

Select only name and salary

In [70]:
df.select('name', 'salary').show()

+------+------+
|  name|salary|
+------+------+
|   Ali|  4000|
|Mariam|  6000|
|  Omar|  7000|
|  Sara|  5000|
|  Omar|  6500|
|Mariam|  7500|
+------+------+



Find the average salary

In [82]:
avg_salary = df.agg(fun.avg('salary')).first()[0]
df_with_avg_salary = df.withColumn('avg_salary', fun.lit(avg_salary))
df_with_avg_salary.select('avg_salary').show(1)

+----------+
|avg_salary|
+----------+
|    6000.0|
+----------+
only showing top 1 row



Filter employees older than 28

In [83]:
df.filter(df['age'] >= 28).show()

+---+------+---+------+
| id|  name|age|salary|
+---+------+---+------+
|  2|Mariam| 30|  6000|
|  3|  Omar| 35|  7000|
|  4|  Sara| 28|  5000|
+---+------+---+------+



Count distinct values in the name column

In [84]:
distinct_name_count = df.select('name').distinct().count()
distinct_name_count

4

Group by a the name column and find average salary

In [87]:
group_by_name = df.groupBy('name').agg(fun.avg('salary').alias('avg_salary'))
group_by_name.show()

+------+----------+
|  name|avg_salary|
+------+----------+
|   Ali|    4000.0|
|Mariam|    6750.0|
|  Omar|    6750.0|
|  Sara|    5000.0|
+------+----------+



In [None]:
df1 = spark.read.csv("/shared/NullData.csv", header=True, inferSchema=True) #this file in shared folder
df1.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



Find the avg sales 

In [90]:
avg_sales = df1.agg(fun.avg('Sales')).first()[0]
avg_sales

400.5

Replace null name with 'Unknown' and sales with the avg sales of the column 

In [92]:
df1_filled = df1.na.fill({'Name': 'Unknown', 'Sales': avg_sales})
df1_filled.show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John|400.5|
|emp2|Unknown|400.5|
|emp3|Unknown|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+

