In [1]:
import os

os.environ["PYSPARK_PYTHON"] = r"C:\Users\Lenovo\.conda\envs\pyspark310\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Users\Lenovo\.conda\envs\pyspark310\python.exe"

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Test") \
    .master("local[*]") \
    .getOrCreate()


In [3]:
# Print Spark Web UI URL
print("\nSpark Web UI running at:")
print(spark.sparkContext.uiWebUrl, "\n")


Spark Web UI running at:
http://host.docker.internal:4040 



RDD Actions

In [4]:
rdd = spark.sparkContext.parallelize([1,2,3,4,5])
print(rdd.count())

5


In [5]:
data=[("Alice",27),("Bob",33),("Charlie",24),("Alice",32)]
rdd=spark.sparkContext.parallelize(data)

In [6]:
#print all elements of data
print(rdd.collect())

[('Alice', 27), ('Bob', 33), ('Charlie', 24), ('Alice', 32)]


In [7]:
first=rdd.first()
print(first)

('Alice', 27)


In [8]:
taken_elements=rdd.take(2)
print(taken_elements)

[('Alice', 27), ('Bob', 33)]


In [9]:
for x in rdd.collect():
    print(x)


('Alice', 27)
('Bob', 33)
('Charlie', 24)
('Alice', 32)


RDD Transformations

In [10]:
#convert name to uppercase
mapped_rdd=rdd.map(lambda x: (x[0].upper(),x[1]))
result=mapped_rdd.collect()
print(result)

[('ALICE', 27), ('BOB', 33), ('CHARLIE', 24), ('ALICE', 32)]


In [11]:
#filter records where age is greater than 30
filtered_rdd=rdd.filter(lambda x: x[1]> 30)
filtered_rdd.collect()

[('Bob', 33), ('Alice', 32)]

In [12]:
#sorting
sorted_rdd=rdd.sortBy(lambda x: x[1],ascending=False)
sorted_rdd.collect()

[('Bob', 33), ('Alice', 32), ('Alice', 27), ('Charlie', 24)]

In [13]:
import shutil
import os

out_path = "file:///D:/sparkapp/output.txt"  #better to use this format as spark does not understand windows directory style
local_path = "D:/sparkapp/output.txt"

# delete local folder if exists
shutil.rmtree(local_path, ignore_errors=True)

rdd.saveAsTextFile(out_path)

In [14]:
rdd_text=spark.sparkContext.textFile("output.txt")
rdd_text.collect()

["('Alice', 27)", "('Bob', 33)", "('Charlie', 24)", "('Alice', 32)"]

DataFrame

In [15]:
#read txt file and count the number of occurance of words
#Using RDD
rdd=spark.sparkContext.textFile("long-doc.txt")
result_rdd=rdd.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b)\
.sortBy(lambda x: x[1],ascending=False)

result_rdd.take(10)

[('Lorem', 2000),
 ('ipsum', 2000),
 ('consectetur', 2000),
 ('elit.', 2000),
 ('dolor', 2000),
 ('sit', 2000),
 ('amet,', 2000),
 ('adipiscing', 2000),
 ('', 2),
 ('sample', 2)]

In [16]:
from pyspark.sql.functions import desc


In [17]:
#using DataFrame
df=spark.read.text("long-doc.txt")

result_df=df.selectExpr("explode(split(value,' ')) as word") \
.groupBy("word").count().orderBy(desc("count"))

result_df.take(10)

[Row(word='sit', count=2000),
 Row(word='consectetur', count=2000),
 Row(word='dolor', count=2000),
 Row(word='Lorem', count=2000),
 Row(word='amet,', count=2000),
 Row(word='ipsum', count=2000),
 Row(word='elit.', count=2000),
 Row(word='adipiscing', count=2000),
 Row(word='sample', count=2),
 Row(word='', count=2)]

Reading csv file from dataframe

In [18]:
# 6608 records

In [19]:
import pandas as pd

df = pd.read_csv("StudentperformanceFactors.csv")
df.head(10)


Unnamed: 0,Hours_Studied,Attendance,Parental_Involvement,Access_to_Resources,Extracurricular_Activities,Sleep_Hours,Previous_Scores,Motivation_Level,Internet_Access,Tutoring_Sessions,Family_Income,Teacher_Quality,School_Type,Peer_Influence,Physical_Activity,Learning_Disabilities,Parental_Education_Level,Distance_from_Home,Gender,Exam_Score
0,23,84,Low,High,No,7,73,Low,Yes,0,Low,Medium,Public,Positive,3,No,High School,Near,Male,67
1,19,64,Low,Medium,No,8,59,Low,Yes,2,Medium,Medium,Public,Negative,4,No,College,Moderate,Female,61
2,24,98,Medium,Medium,Yes,7,91,Medium,Yes,2,Medium,Medium,Public,Neutral,4,No,Postgraduate,Near,Male,74
3,29,89,Low,Medium,Yes,8,98,Medium,Yes,1,Medium,Medium,Public,Negative,4,No,High School,Moderate,Male,71
4,19,92,Medium,Medium,Yes,6,65,Medium,Yes,3,Medium,High,Public,Neutral,4,No,College,Near,Female,70
5,19,88,Medium,Medium,Yes,8,89,Medium,Yes,3,Medium,Medium,Public,Positive,3,No,Postgraduate,Near,Male,71
6,29,84,Medium,Low,Yes,7,68,Low,Yes,1,Low,Medium,Private,Neutral,2,No,High School,Moderate,Male,67
7,25,78,Low,High,Yes,6,50,Medium,Yes,1,High,High,Public,Negative,2,No,High School,Far,Male,66
8,17,94,Medium,High,No,6,80,High,Yes,0,Medium,Low,Private,Neutral,1,No,College,Near,Male,69
9,23,98,Medium,Medium,Yes,8,71,Medium,Yes,0,High,High,Public,Positive,5,No,High School,Moderate,Male,72


In [20]:
path="StudentPerformanceFactors.csv"
df=spark.read.csv(path,header=True)

In [21]:
df.printSchema()

df.show(5)

root
 |-- Hours_Studied: string (nullable = true)
 |-- Attendance: string (nullable = true)
 |-- Parental_Involvement: string (nullable = true)
 |-- Access_to_Resources: string (nullable = true)
 |-- Extracurricular_Activities: string (nullable = true)
 |-- Sleep_Hours: string (nullable = true)
 |-- Previous_Scores: string (nullable = true)
 |-- Motivation_Level: string (nullable = true)
 |-- Internet_Access: string (nullable = true)
 |-- Tutoring_Sessions: string (nullable = true)
 |-- Family_Income: string (nullable = true)
 |-- Teacher_Quality: string (nullable = true)
 |-- School_Type: string (nullable = true)
 |-- Peer_Influence: string (nullable = true)
 |-- Physical_Activity: string (nullable = true)
 |-- Learning_Disabilities: string (nullable = true)
 |-- Parental_Education_Level: string (nullable = true)
 |-- Distance_from_Home: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Exam_Score: string (nullable = true)

+-------------+----------+-----------------

In [22]:
#Every datatype is string which is not expected
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DoubleType


In [23]:
schema = StructType([
    StructField("Hours_Studied", IntegerType(), True),
    StructField("Attendance", IntegerType(), True),
    StructField("Parental_Involvement", StringType(), True),
    StructField("Access_to_Resources", StringType(), True),
    StructField("Extracurricular_Activities", StringType(), True),
    StructField("Sleep_Hours", IntegerType(), True),
    StructField("Previous_Scores", IntegerType(), True),
    StructField("Motivation_Level", StringType(), True),
    StructField("Internet_Access", StringType(), True),
    StructField("Tutoring_Sessions", IntegerType(), True),
    StructField("Family_Income", StringType(), True),
    StructField("Teacher_Quality", StringType(), True),
    StructField("School_Type", StringType(), True),
    StructField("Peer_Influence", StringType(), True),
    StructField("Physical_Activity", IntegerType(), True),
    StructField("Learning_Disabilities", StringType(), True),
    StructField("Parental_Education_Level", StringType(), True),
    StructField("Distance_from_Home", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Exam_Score", IntegerType(), True)
])

In [24]:
df=spark.read.csv(path,header=True,schema=schema)

In [25]:
#again display with proper datatypes
df.printSchema()

df.show(5)

root
 |-- Hours_Studied: integer (nullable = true)
 |-- Attendance: integer (nullable = true)
 |-- Parental_Involvement: string (nullable = true)
 |-- Access_to_Resources: string (nullable = true)
 |-- Extracurricular_Activities: string (nullable = true)
 |-- Sleep_Hours: integer (nullable = true)
 |-- Previous_Scores: integer (nullable = true)
 |-- Motivation_Level: string (nullable = true)
 |-- Internet_Access: string (nullable = true)
 |-- Tutoring_Sessions: integer (nullable = true)
 |-- Family_Income: string (nullable = true)
 |-- Teacher_Quality: string (nullable = true)
 |-- School_Type: string (nullable = true)
 |-- Peer_Influence: string (nullable = true)
 |-- Physical_Activity: integer (nullable = true)
 |-- Learning_Disabilities: string (nullable = true)
 |-- Parental_Education_Level: string (nullable = true)
 |-- Distance_from_Home: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Exam_Score: integer (nullable = true)

+-------------+----------+----------

In [26]:
#automatically guess the data type of the columns
df=spark.read.csv(path,header=True,inferSchema=True)
df.printSchema()
df.show(5)

root
 |-- Hours_Studied: integer (nullable = true)
 |-- Attendance: integer (nullable = true)
 |-- Parental_Involvement: string (nullable = true)
 |-- Access_to_Resources: string (nullable = true)
 |-- Extracurricular_Activities: string (nullable = true)
 |-- Sleep_Hours: integer (nullable = true)
 |-- Previous_Scores: integer (nullable = true)
 |-- Motivation_Level: string (nullable = true)
 |-- Internet_Access: string (nullable = true)
 |-- Tutoring_Sessions: integer (nullable = true)
 |-- Family_Income: string (nullable = true)
 |-- Teacher_Quality: string (nullable = true)
 |-- School_Type: string (nullable = true)
 |-- Peer_Influence: string (nullable = true)
 |-- Physical_Activity: integer (nullable = true)
 |-- Learning_Disabilities: string (nullable = true)
 |-- Parental_Education_Level: string (nullable = true)
 |-- Distance_from_Home: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Exam_Score: integer (nullable = true)

+-------------+----------+----------

In [27]:
selected_columns=df.select("Sleep_Hours","Attendance")
selected_columns.show(10)

+-----------+----------+
|Sleep_Hours|Attendance|
+-----------+----------+
|          7|        84|
|          8|        64|
|          7|        98|
|          8|        89|
|          6|        92|
|          8|        88|
|          7|        84|
|          6|        78|
|          6|        94|
|          8|        98|
+-----------+----------+
only showing top 10 rows



In [28]:
#filtering based on condition on columns
filtered_data=df.filter(df.Hours_Studied >20)
print(filtered_data.count())
filtered_data.show(10)

3063
+-------------+----------+--------------------+-------------------+--------------------------+-----------+---------------+----------------+---------------+-----------------+-------------+---------------+-----------+--------------+-----------------+---------------------+------------------------+------------------+------+----------+
|Hours_Studied|Attendance|Parental_Involvement|Access_to_Resources|Extracurricular_Activities|Sleep_Hours|Previous_Scores|Motivation_Level|Internet_Access|Tutoring_Sessions|Family_Income|Teacher_Quality|School_Type|Peer_Influence|Physical_Activity|Learning_Disabilities|Parental_Education_Level|Distance_from_Home|Gender|Exam_Score|
+-------------+----------+--------------------+-------------------+--------------------------+-----------+---------------+----------------+---------------+-----------------+-------------+---------------+-----------+--------------+-----------------+---------------------+------------------------+------------------+------+--------

In [29]:
#sorting
sorted_data=df.orderBy("Exam_Score")
sorted_data.show(10)

+-------------+----------+--------------------+-------------------+--------------------------+-----------+---------------+----------------+---------------+-----------------+-------------+---------------+-----------+--------------+-----------------+---------------------+------------------------+------------------+------+----------+
|Hours_Studied|Attendance|Parental_Involvement|Access_to_Resources|Extracurricular_Activities|Sleep_Hours|Previous_Scores|Motivation_Level|Internet_Access|Tutoring_Sessions|Family_Income|Teacher_Quality|School_Type|Peer_Influence|Physical_Activity|Learning_Disabilities|Parental_Education_Level|Distance_from_Home|Gender|Exam_Score|
+-------------+----------+--------------------+-------------------+--------------------------+-----------+---------------+----------------+---------------+-----------------+-------------+---------------+-----------+--------------+-----------------+---------------------+------------------------+------------------+------+----------+
|

In [30]:
#distinct
distnct_rows=df.select("Teacher_Quality").distinct()
distnct_rows.show()

+---------------+
|Teacher_Quality|
+---------------+
|           High|
|            Low|
|         Medium|
|           NULL|
+---------------+



In [31]:
dropped_columns=df.drop("Gender")
dropped_columns.show(10)

+-------------+----------+--------------------+-------------------+--------------------------+-----------+---------------+----------------+---------------+-----------------+-------------+---------------+-----------+--------------+-----------------+---------------------+------------------------+------------------+----------+
|Hours_Studied|Attendance|Parental_Involvement|Access_to_Resources|Extracurricular_Activities|Sleep_Hours|Previous_Scores|Motivation_Level|Internet_Access|Tutoring_Sessions|Family_Income|Teacher_Quality|School_Type|Peer_Influence|Physical_Activity|Learning_Disabilities|Parental_Education_Level|Distance_from_Home|Exam_Score|
+-------------+----------+--------------------+-------------------+--------------------------+-----------+---------------+----------------+---------------+-----------------+-------------+---------------+-----------+--------------+-----------------+---------------------+------------------------+------------------+----------+
|           23|       

In [32]:
#creating new column
new_col=df.withColumn("NEW_SCORE",df.Previous_Scores+df.Exam_Score)
new_col.show(10)

+-------------+----------+--------------------+-------------------+--------------------------+-----------+---------------+----------------+---------------+-----------------+-------------+---------------+-----------+--------------+-----------------+---------------------+------------------------+------------------+------+----------+---------+
|Hours_Studied|Attendance|Parental_Involvement|Access_to_Resources|Extracurricular_Activities|Sleep_Hours|Previous_Scores|Motivation_Level|Internet_Access|Tutoring_Sessions|Family_Income|Teacher_Quality|School_Type|Peer_Influence|Physical_Activity|Learning_Disabilities|Parental_Education_Level|Distance_from_Home|Gender|Exam_Score|NEW_SCORE|
+-------------+----------+--------------------+-------------------+--------------------------+-----------+---------------+----------------+---------------+-----------------+-------------+---------------+-----------+--------------+-----------------+---------------------+------------------------+------------------+

In [33]:
#For SQL, register dataframe as a temporary table
df.createOrReplaceTempView("my_table")
result= spark.sql("SELECT * FROM my_table WHERE Hours_Studied>20")
result.show()

+-------------+----------+--------------------+-------------------+--------------------------+-----------+---------------+----------------+---------------+-----------------+-------------+---------------+-----------+--------------+-----------------+---------------------+------------------------+------------------+------+----------+
|Hours_Studied|Attendance|Parental_Involvement|Access_to_Resources|Extracurricular_Activities|Sleep_Hours|Previous_Scores|Motivation_Level|Internet_Access|Tutoring_Sessions|Family_Income|Teacher_Quality|School_Type|Peer_Influence|Physical_Activity|Learning_Disabilities|Parental_Education_Level|Distance_from_Home|Gender|Exam_Score|
+-------------+----------+--------------------+-------------------+--------------------------+-----------+---------------+----------------+---------------+-----------------+-------------+---------------+-----------+--------------+-----------------+---------------------+------------------------+------------------+------+----------+
|

In [34]:
avg_attendance_by_gender=spark.sql("SELECT Gender,AVG(Attendance) as avg_attendance FROM my_table GROUP BY Gender")
avg_attendance_by_gender.show()

+------+-----------------+
|Gender|   avg_attendance|
+------+-----------------+
|Female|79.86895810955961|
|  Male|80.05689564761406|
+------+-----------------+



In [35]:
spark.stop()