In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import  pyspark.sql.functions as fun 
import numpy as np
from pyspark.sql.types import *

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
sc = spark.sparkContext

In [5]:
rdd = sc.parallelize(range(1, 50))
print(rdd.collect())


[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]


In [8]:
total = rdd.reduce(lambda x, y: x + y)
print(total)


1225


In [9]:
avg = rdd.reduce(lambda x, y: x + y) / rdd.count()
print(avg)

25.0


In [10]:
rdd.count()


49

In [11]:
rdd.min()


1

In [12]:
rdd.max()

49

In [15]:
even_count = rdd.filter(lambda x: x % 2 == 0).count()
odd_count = rdd.filter(lambda x: x % 2 != 0).count()

print("EVEN numbers:", even_count)
print("ODD numbers:", odd_count)

EVEN numbers: 24
ODD numbers: 25


In [18]:
people_data = [("Nada ", 25), ("Mona", 30), ("Ahmed", 35), ("Khaled", 40),("Ahmed", 35), ('Nada ', 25)]
rdd_people = sc.parallelize(people_data)
rdd_people.collect()

[('Nada ', 25),
 ('Mona', 30),
 ('Ahmed', 35),
 ('Khaled', 40),
 ('Ahmed', 35),
 ('Nada ', 25)]

In [19]:
people_data =  ("Khaled", 40)
rdd_people = sc.parallelize(people_data)
rdd_people.collect()

['Khaled', 40]

In [23]:
people_data = [("Nada", 25), ("Mona", 30), ("Ahmed", 35), ("Khaled", 40), ("Ahmed", 35), ("Nada", 25)]
rdd_people = sc.parallelize(people_data)
ages_rdd = rdd_people.map(lambda x: x[1])
average_age = ages_rdd.mean()
print(average_age)


31.666666666666668


In [22]:
people_data = [("Nada", 25), ("Mona", 30), ("Ahmed", 35), ("Khaled", 40), ("Ahmed", 35), ("Nada", 25)]
rdd_people = sc.parallelize(people_data)
grouped_by_age = rdd_people.map(lambda x: (x[1], x[0])).groupByKey().mapValues(list)
grouped_by_age.collect()


[(40, ['Khaled']),
 (25, ['Nada', 'Nada']),
 (30, ['Mona']),
 (35, ['Ahmed', 'Ahmed'])]

In [25]:
rdd_russia = sc.textFile("/data/russia.txt")
rdd_russia.collect()



['Russia is the largest country in the world by land area',
 'Moscow is the capital city of Russia',
 'The Russian language is one of the most widely spoken languages in the world',
 'Russia is known for its rich history and culture',
 'The Trans-Siberian Railway is the longest railway line in the world',
 'Russia has a strong tradition in literature, music and ballet',
 'The country is famous for its cold winters and vast landscapes',
 'Russia is a major player in global energy production']

In [26]:
total_lines = rdd_russia.count()
print(total_lines)


8


In [27]:
lines_with_russia = rdd_russia.filter(lambda line: "Russia" in line).count()
print(lines_with_russia)


6


In [28]:
words_rdd = rdd_russia.flatMap(lambda line: line.split())
word_counts = words_rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
top5_words = word_counts.takeOrdered(5, key=lambda x: -x[1])
print(top5_words)


[('is', 7), ('the', 7), ('Russia', 5), ('in', 5), ('world', 3)]


In [31]:
import re
words_rdd = rdd_russia.flatMap(lambda line: re.findall(r'\b\w+\b', line.lower()))
print(words_rdd.collect())


['russia', 'is', 'the', 'largest', 'country', 'in', 'the', 'world', 'by', 'land', 'area', 'moscow', 'is', 'the', 'capital', 'city', 'of', 'russia', 'the', 'russian', 'language', 'is', 'one', 'of', 'the', 'most', 'widely', 'spoken', 'languages', 'in', 'the', 'world', 'russia', 'is', 'known', 'for', 'its', 'rich', 'history', 'and', 'culture', 'the', 'trans', 'siberian', 'railway', 'is', 'the', 'longest', 'railway', 'line', 'in', 'the', 'world', 'russia', 'has', 'a', 'strong', 'tradition', 'in', 'literature', 'music', 'and', 'ballet', 'the', 'country', 'is', 'famous', 'for', 'its', 'cold', 'winters', 'and', 'vast', 'landscapes', 'russia', 'is', 'a', 'major', 'player', 'in', 'global', 'energy', 'production']


In [34]:

stopwords = {"a", "the", "is", "to", "in", "of"}

filtered_words = [word for word in words if word not in stopwords]
print(filtered_words)


['russia', 'largest', 'country', 'world', 'by', 'land', 'area', 'moscow', 'capital', 'city', 'russia', 'russian', 'language', 'one', 'most', 'widely', 'spoken', 'languages', 'world', 'russia', 'known', 'for', 'its', 'rich', 'history', 'and', 'culture', 'trans-siberian', 'railway', 'longest', 'railway', 'line', 'world', 'russia', 'has', 'strong', 'tradition', 'literature,', 'music', 'and', 'ballet', 'country', 'famous', 'for', 'its', 'cold', 'winters', 'and', 'vast', 'landscapes', 'russia', 'major', 'player', 'global', 'energy', 'production']


In [37]:
from collections import Counter
stopwords = {"a", "the", "is", "to", "in", "of"}
filtered_words = [word for word in words if word not in stopwords]
word_counts = Counter(filtered_words)
word_counts_list = list(word_counts.items())
print(word_counts_list)


[('russia', 5), ('largest', 1), ('country', 2), ('world', 3), ('by', 1), ('land', 1), ('area', 1), ('moscow', 1), ('capital', 1), ('city', 1), ('russian', 1), ('language', 1), ('one', 1), ('most', 1), ('widely', 1), ('spoken', 1), ('languages', 1), ('known', 1), ('for', 2), ('its', 2), ('rich', 1), ('history', 1), ('and', 3), ('culture', 1), ('trans-siberian', 1), ('railway', 2), ('longest', 1), ('line', 1), ('has', 1), ('strong', 1), ('tradition', 1), ('literature,', 1), ('music', 1), ('ballet', 1), ('famous', 1), ('cold', 1), ('winters', 1), ('vast', 1), ('landscapes', 1), ('major', 1), ('player', 1), ('global', 1), ('energy', 1), ('production', 1)]


In [38]:
schema = 'id integer, name string, age integer, salary integer' 
data = [
    (1, "Ali", 25, 4000),
    (2, "Mariam", 30, 6000),
    (3, "Omar", 35, 7000),
    (4, "Sara", 28, 5000),
    (5, "Omar", 25, 6500),
    (6, "Mariam", 26, 7500)
]

df = spark.createDataFrame(data,schema)

In [41]:
schema = 'id integer, name string, age integer, salary integer' 
data = [
    (1, "Ali", 25, 4000),
    (2, "Mariam", 30, 6000),
    (3, "Omar", 35, 7000),
    (4, "Sara", 28, 5000),
    (5, "Omar", 25, 6500),
    (6, "Mariam", 26, 7500)
]

df = spark.createDataFrame(data, schema)
df.printSchema()
df.show(2)


root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)

+---+------+---+------+
| id|  name|age|salary|
+---+------+---+------+
|  1|   Ali| 25|  4000|
|  2|Mariam| 30|  6000|
+---+------+---+------+
only showing top 2 rows



In [42]:
df.select("name", "salary").show()

+------+------+
|  name|salary|
+------+------+
|   Ali|  4000|
|Mariam|  6000|
|  Omar|  7000|
|  Sara|  5000|
|  Omar|  6500|
|Mariam|  7500|
+------+------+



In [43]:
from pyspark.sql.functions import avg

df.select(avg("salary")).show()


+-----------+
|avg(salary)|
+-----------+
|     6000.0|
+-----------+



In [44]:
df.filter(df.age > 28).show()


+---+------+---+------+
| id|  name|age|salary|
+---+------+---+------+
|  2|Mariam| 30|  6000|
|  3|  Omar| 35|  7000|
+---+------+---+------+



In [45]:
df.select("name").distinct().count()


4

In [57]:
from pyspark.sql.functions import avg

df.groupBy("name").avg("salary").show()


+------+-----------+
|  name|avg(salary)|
+------+-----------+
|   Ali|     4000.0|
|Mariam|     6750.0|
|  Omar|     6750.0|
|  Sara|     5000.0|
+------+-----------+



In [58]:
from pyspark.sql.functions import avg

df.groupBy("name").agg(avg("salary").alias("average_salary")).show()


+------+--------------+
|  name|average_salary|
+------+--------------+
|   Ali|        4000.0|
|Mariam|        6750.0|
|  Omar|        6750.0|
|  Sara|        5000.0|
+------+--------------+



In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Read NullData") \
    .getOrCreate()


In [10]:
df1 = spark.read.csv(r"C:\Users\Khashaba\Desktop\Data Engingeer\Spark\shared\NullData.csv",
                     header=True, inferSchema=True)

df1.show()


IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: C:%5CUsers%5CKhashaba%5CDesktop%5CData%20Engingeer%5CSpark%5Cshared%5CNullData.csv