In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MySession").getOrCreate()
# Your code here
spark.stop()

In [2]:
spark = SparkSession.builder.appName("RDD Demo").getOrCreate()
sc = spark.sparkContext
data = [1,2,3,4,5,6]
rdd = sc.parallelize(data)
mapped_rdd = rdd.map(lambda x:x*2)
filtered_rdd = mapped_rdd.filter(lambda x:x%2==0)
result = filtered_rdd.collect()
print("Final Result:", result)
print("Count of elements", rdd.count())
print("Sum of elements", rdd.reduce(lambda x,y:x+y))
print("First element:", rdd.first())

Final Result: [2, 4, 6, 8, 10, 12]
Count of elements 6
Sum of elements 21
First element: 1


In [3]:
import os
from pyspark.sql import SparkSession

#python_path = r'C:\Users\Dhivya Karthikeyan\AppData\Local\Microsoft\WindowsApps\python.exe'
#os.environ['PYSPARK_PYTHON'] = python_path
#os.environ['PYSPARK_DRIVER_PYTHON'] = python_path


# Initialize Spark session
spark = SparkSession.builder \
    .appName("Simple PySpark Test") \
    .getOrCreate()
    #.config("spark.python.worker.exec", python_path) \


#spark.conf.set("spark.sql.execution.pyspark.udf.faulthandler.enabled", "true")

# Sample data
data = [("Alice", 29), ("Bob", 35), ("Charlie", 30), ("David", 25)]

# Create DataFrame
df = spark.createDataFrame(data, ["Name", "Age"])

# Show the DataFrame
df.show()

# Perform a simple transformation: Filter by age greater than 30
filtered_df = df.filter(df.Age > 30)

# Show the filtered DataFrame
filtered_df.show()

# Stop Spark session
spark.stop()

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 29|
|    Bob| 35|
|Charlie| 30|
|  David| 25|
+-------+---+

+----+---+
|Name|Age|
+----+---+
| Bob| 35|
+----+---+



In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName("Iris Data Comparison") \
    .getOrCreate()

In [15]:
#/Iris.csv
import pandas as pd

iris_csv_pd = pd.read_csv("/Iris.csv")
iris_json_pd = pd.read_json("/Iris.json", lines=True)
iris_parquet_pd = pd.read_parquet("/Iris.parquet")

print("iris_csv sample: ")
iris_csv_pd.head()


iris_csv sample: 


Unnamed: 0,sepal.length,sepal.width,petal.length,petal.width,variety
0,5.1,3.5,1.4,0.2,Setosa
1,4.9,3.0,1.4,0.2,Setosa
2,4.7,3.2,1.3,0.2,Setosa
3,4.6,3.1,1.5,0.2,Setosa
4,5.0,3.6,1.4,0.2,Setosa


In [18]:
iris_csv_spark = spark.read.csv("/Iris.csv", header=True, inferSchema=True)
iris_json_spark = spark.read.json("/Iris.json")
iris_parquet_spark = spark.read.parquet("/Iris.parquet")

print("iris_csv sample: ")
iris_csv_spark.show()

iris_csv sample: 
+------------+-----------+------------+-----------+-------+
|sepal.length|sepal.width|petal.length|petal.width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
|         5.4|        3.9|         1.7|        0.4| Setosa|
|         4.6|        3.4|         1.4|        0.3| Setosa|
|         5.0|        3.4|         1.5|        0.2| Setosa|
|         4.4|        2.9|         1.4|        0.2| Setosa|
|         4.9|        3.1|         1.5|        0.1| Setosa|
|         5.4|        3.7|         1.5|        0.2| Setosa|
|         4.8|        3.4|         1.6|        0.2| Setosa|
|         4.8|        3.0|         1.4|        0.1| Setosa|
|         4.3|        

In [19]:
iris_csv_spark.printSchema()
#

root
 |-- sepal.length: double (nullable = true)
 |-- sepal.width: double (nullable = true)
 |-- petal.length: double (nullable = true)
 |-- petal.width: double (nullable = true)
 |-- variety: string (nullable = true)



In [20]:
for col_name in iris_csv_spark.columns:
  print(col_name)

sepal.length
sepal.width
petal.length
petal.width
variety


In [22]:
iris_csv_spark = iris_csv_spark.withColumnRenamed("Sepal.width", "Sepal_width")
iris_csv_spark = iris_csv_spark.withColumnRenamed("Petal.width", "Petal_width")
iris_csv_spark.columns

['sepal.length', 'Sepal_width', 'petal.length', 'Petal_width', 'variety']

In [24]:
from pyspark.sql.functions import col, avg

avg_sw_spark = iris_csv_spark.select(avg(col("sepal_width"))).collect()
avg_sw_spark[0][0]

3.057333333333334

In [27]:
filtered_spark = iris_csv_spark.filter(col("sepal_width") > avg_sw_spark[0][0])
filtered_spark.show(5)

+------------+-----------+------------+-----------+-------+
|sepal.length|Sepal_width|petal.length|Petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
|         5.4|        3.9|         1.7|        0.4| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows


In [29]:
iris_csv_spark = iris_csv_spark.withColumnRenamed("sepal.length", "sepal_length")
iris_csv_spark = iris_csv_spark.withColumnRenamed("petal.length", "petal_length")
iris_csv_spark = iris_csv_spark.withColumnRenamed("petal.width", "petal_width")
iris_csv_spark.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|Sepal_width|petal_length|Petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows


In [32]:
df = iris_csv_spark
df.show(5)
df.createOrReplaceTempView("Iris")

+------------+-----------+------------+-----------+-------+
|sepal_length|Sepal_width|petal_length|Petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows


In [33]:
filtered = spark.sql("""
SELECT * FROM iris
WHERE petal_length > 1.50
""")
filtered.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|Sepal_width|petal_length|Petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.4|        3.9|         1.7|        0.4| Setosa|
|         4.8|        3.4|         1.6|        0.2| Setosa|
|         5.7|        3.8|         1.7|        0.3| Setosa|
|         5.4|        3.4|         1.7|        0.2| Setosa|
|         5.1|        3.3|         1.7|        0.5| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows


In [34]:
avg_sepal_width = spark.sql("""
select variety, ROUND(avg(sepal_width), 2) as avg_sepal_width
from iris
group by variety
order by variety
""")
avg_sepal_width.show()


+----------+---------------+
|   variety|avg_sepal_width|
+----------+---------------+
|    Setosa|           3.43|
|Versicolor|           2.77|
| Virginica|           2.97|
+----------+---------------+



In [35]:
def classify_by_petal_length(petal_length):
  if petal_length < 2.0:
    return "Small"
  elif petal_length < 5.0:
    return "Medium"
  else:
    return "Large"

In [36]:
def classify_by_sepal_width(sepal_width):
  if sepal_width < 3.0:
    return "Narrow"
  elif sepal_width < 3.5:
    return "Moderate"
  else:
    return "Wide"


In [37]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

petal_size_udf = udf(classify_by_petal_length, StringType())
sepal_size_udf = udf(classify_by_sepal_width, StringType())

In [38]:
iris_classified = iris_csv_spark.withColumn("petal_size", petal_size_udf("petal_length"))\
.withColumn("sepal_size", sepal_size_udf("sepal_width"))
iris_classified.show(10)

+------------+-----------+------------+-----------+-------+----------+----------+
|sepal_length|Sepal_width|petal_length|Petal_width|variety|petal_size|sepal_size|
+------------+-----------+------------+-----------+-------+----------+----------+
|         5.1|        3.5|         1.4|        0.2| Setosa|     Small|      Wide|
|         4.9|        3.0|         1.4|        0.2| Setosa|     Small|  Moderate|
|         4.7|        3.2|         1.3|        0.2| Setosa|     Small|  Moderate|
|         4.6|        3.1|         1.5|        0.2| Setosa|     Small|  Moderate|
|         5.0|        3.6|         1.4|        0.2| Setosa|     Small|      Wide|
|         5.4|        3.9|         1.7|        0.4| Setosa|     Small|      Wide|
|         4.6|        3.4|         1.4|        0.3| Setosa|     Small|  Moderate|
|         5.0|        3.4|         1.5|        0.2| Setosa|     Small|  Moderate|
|         4.4|        2.9|         1.4|        0.2| Setosa|     Small|    Narrow|
|         4.9|  

In [5]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName("Global Student Migration") \
    .getOrCreate()

edu = spark.read.csv("/content/global_student_migration.csv", header=True, inferSchema=True)
edu.show(5)


+----------+--------------+-------------------+----------------+--------------------+--------------------+---------------+------------------+--------------------+-------------------+---------------+----------------+-----------------+-----------------+-------------------+------------+--------------------+--------------------+-------------------------+----------+
|student_id|origin_country|destination_country|destination_city|     university_name|         course_name| field_of_study|year_of_enrollment|scholarship_received|  enrollment_reason|graduation_year|placement_status|placement_country|placement_company|starting_salary_usd|gpa_or_score|         visa_status|post_graduation_visa|language_proficiency_test|test_score|
+----------+--------------+-------------------+----------------+--------------------+--------------------+---------------+------------------+--------------------+-------------------+---------------+----------------+-----------------+-----------------+-------------------+-

In [6]:
edu.columns

['student_id',
 'origin_country',
 'destination_country',
 'destination_city',
 'university_name',
 'course_name',
 'field_of_study',
 'year_of_enrollment',
 'scholarship_received',
 'enrollment_reason',
 'graduation_year',
 'placement_status',
 'placement_country',
 'placement_company',
 'starting_salary_usd',
 'gpa_or_score',
 'visa_status',
 'post_graduation_visa',
 'language_proficiency_test',
 'test_score']

In [8]:
edu.scholarship_received

Column<'scholarship_received'>

In [10]:
scholarship = edu.filter(edu["scholarship_received"] == 'Yes')
scholarship.show(5)
scholarship.count()

+----------+--------------+-------------------+----------------+--------------------+--------------------+---------------+------------------+--------------------+-------------------+---------------+----------------+-----------------+-----------------+-------------------+------------+--------------------+--------------------+-------------------------+----------+
|student_id|origin_country|destination_country|destination_city|     university_name|         course_name| field_of_study|year_of_enrollment|scholarship_received|  enrollment_reason|graduation_year|placement_status|placement_country|placement_company|starting_salary_usd|gpa_or_score|         visa_status|post_graduation_visa|language_proficiency_test|test_score|
+----------+--------------+-------------------+----------------+--------------------+--------------------+---------------+------------------+--------------------+-------------------+---------------+----------------+-----------------+-----------------+-------------------+-

2577

In [12]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def get_scholarship_received(scholarship_received):
  if scholarship_received == 'Yes':
    return True
  else:
    return False

#scholarship_udf = udf(get_scholarship_received, StringType())

def get_count(edu):
  scholarship = edu.filter(edu["scholarship_received"] == 'Yes')
  return scholarship.count()




In [16]:
country_usa = edu.filter(edu["destination_country"] == 'USA')
country_usa.show(5)

+----------+--------------+-------------------+----------------+-------------------+-----------+----------------+------------------+--------------------+-------------------+---------------+----------------+-----------------+-----------------+-------------------+------------+------------+--------------------+-------------------------+----------+
|student_id|origin_country|destination_country|destination_city|    university_name|course_name|  field_of_study|year_of_enrollment|scholarship_received|  enrollment_reason|graduation_year|placement_status|placement_country|placement_company|starting_salary_usd|gpa_or_score| visa_status|post_graduation_visa|language_proficiency_test|test_score|
+----------+--------------+-------------------+----------------+-------------------+-----------+----------------+------------------+--------------------+-------------------+---------------+----------------+-----------------+-----------------+-------------------+------------+------------+------------------

In [17]:
edu.createOrReplaceTempView("edu")
avg_gpa = spark.sql("""
    SELECT ROUND(AVG(gpa_or_score), 2) AS avg_gpa
    FROM edu
    where placement_status = 'Placed' and destination_country = 'USA'
""")
avg_gpa.show()

+-------+
|avg_gpa|
+-------+
|   3.26|
+-------+



In [22]:
top5 = spark.sql("""
    SELECT destination_country , count(*) as count
    FROM edu
    group by year_of_enrollment
    order by count desc
    limit 5
""")
top5.show()

AnalysisException: [MISSING_AGGREGATION] The non-aggregating expression "destination_country" is based on columns which are not participating in the GROUP BY clause.
Add the columns or the expression to the GROUP BY, aggregate the expression, or use "any_value(destination_country)" if you do not care which of the values within a group is returned. SQLSTATE: 42803;
GlobalLimit 5
+- LocalLimit 5
   +- Sort [count#471L DESC NULLS LAST], true
      +- Aggregate [year_of_enrollment#24], [destination_country#19, count(1) AS count#471L]
         +- SubqueryAlias edu
            +- View (`edu`, [student_id#17, origin_country#18, destination_country#19, destination_city#20, university_name#21, course_name#22, field_of_study#23, year_of_enrollment#24, scholarship_received#25, enrollment_reason#26, graduation_year#27, placement_status#28, placement_country#29, placement_company#30, starting_salary_usd#31, gpa_or_score#32, visa_status#33, post_graduation_visa#34, language_proficiency_test#35, test_score#36])
               +- Relation [student_id#17,origin_country#18,destination_country#19,destination_city#20,university_name#21,course_name#22,field_of_study#23,year_of_enrollment#24,scholarship_received#25,enrollment_reason#26,graduation_year#27,placement_status#28,placement_country#29,placement_company#30,starting_salary_usd#31,gpa_or_score#32,visa_status#33,post_graduation_visa#34,language_proficiency_test#35,test_score#36] csv


In [25]:
from pyspark.sql import functions as F
def top5(edu, year):
  result = (
    edu.filter(F.col("year_of_enrollment") == year)\
    .groupBy("destination_country")\
    .agg(F.count("*").alias("count"))\
    .orderBy(F.desc("count"))\
    .limit(5)
  )
  return result

top5 = top5(edu, 2019)
top5.show()



+-------------------+-----+
|destination_country|count|
+-------------------+-----+
|                UAE|  117|
|       South Africa|  108|
|             Russia|  107|
|            Germany|  106|
|              India|  101|
+-------------------+-----+



In [26]:
edu.columns

['student_id',
 'origin_country',
 'destination_country',
 'destination_city',
 'university_name',
 'course_name',
 'field_of_study',
 'year_of_enrollment',
 'scholarship_received',
 'enrollment_reason',
 'graduation_year',
 'placement_status',
 'placement_country',
 'placement_company',
 'starting_salary_usd',
 'gpa_or_score',
 'visa_status',
 'post_graduation_visa',
 'language_proficiency_test',
 'test_score']

In [33]:
filtered_df = (
      edu.filter(
          (F.col("university_name").rlike("Technology|Engineering")) &
          (F.col("placement_status") == 'Placed')
      )
  )
result = filtered_df.agg(F.avg("starting_salary_usd").alias("avg_salary")).collect()[0]["avg_salary"]
result

93310.02564102564

In [27]:
def avg_salary(df):
  filtered_df = (
      df.filter(
          (F.col("university_name").rlike("Technology|Engineering")) &
          (F.col("placement_status") == 'Placed')
      )
  )
  result = filtered_df.agg(F.avg("starting_salary_usd").alias("avg_salary")).collect()[0]["avg_salary"]

  return result


In [28]:
avg_salary = avg_salary(edu)
print("Average starting salary:", avg_salary)


Average starting salary: 93310.02564102564


In [34]:
from pyspark.sql import functions as F

def avg_salary_by_university(df):
    """
    Calculates the average starting salary for students who studied at universities
    with 'Technology' or 'Engineering' in their name and got placed,
    grouped by university.

    Parameters:
    df : pyspark.sql.DataFrame
         Must contain columns 'university_name', 'placed', and 'starting_salary'

    Returns:
    pyspark.sql.DataFrame
         DataFrame with university_name and average starting salary
    """

    result = (
        df.filter(
            (F.col("university_name").rlike("(?i)Technology|Engineering")) &
            (F.col("placement_status") == 'Placed')   # or == "Yes" depending on schema
        )
        .groupBy("university_name")
        .agg(F.avg("starting_salary_usd").alias("avg_salary"))
        .orderBy(F.desc("avg_salary"))
    )

    return result

In [35]:
avg_salaries = avg_salary_by_university(edu)
avg_salaries.show()


+--------------------+-----------------+
|     university_name|       avg_salary|
+--------------------+-----------------+
|Moscow Institute ...|93310.02564102564|
+--------------------+-----------------+



In [38]:
edu.filter(F.col("university_name").rlike("(?i)Technology|Engineering")).show()

+----------+--------------+-------------------+----------------+--------------------+--------------------+----------------+------------------+--------------------+-----------------+---------------+----------------+-----------------+-----------------+-------------------+------------+--------------------+--------------------+-------------------------+----------+
|student_id|origin_country|destination_country|destination_city|     university_name|         course_name|  field_of_study|year_of_enrollment|scholarship_received|enrollment_reason|graduation_year|placement_status|placement_country|placement_company|starting_salary_usd|gpa_or_score|         visa_status|post_graduation_visa|language_proficiency_test|test_score|
+----------+--------------+-------------------+----------------+--------------------+--------------------+----------------+------------------+--------------------+-----------------+---------------+----------------+-----------------+-----------------+-------------------+----

In [39]:
edu.columns

['student_id',
 'origin_country',
 'destination_country',
 'destination_city',
 'university_name',
 'course_name',
 'field_of_study',
 'year_of_enrollment',
 'scholarship_received',
 'enrollment_reason',
 'graduation_year',
 'placement_status',
 'placement_country',
 'placement_company',
 'starting_salary_usd',
 'gpa_or_score',
 'visa_status',
 'post_graduation_visa',
 'language_proficiency_test',
 'test_score']

In [40]:
from pyspark.sql import functions as F

def avg_toefl_by_origin(df):
    """
    Returns the average TOEFL score by origin country for students
    who were placed in a different country and took the TOEFL test.

    Parameters:
    df : pyspark.sql.DataFrame
         Must contain columns 'origin_country', 'placement_country',
         'toefl_score', and 'placed'

    Returns:
    pyspark.sql.DataFrame
         DataFrame with origin_country and average TOEFL score
    """

    result = (
        df.filter(
            (F.col("placement_status") == 'Placed') &                # or == "Yes" depending on schema
            (F.col("test_score").isNotNull()) &       # must have taken TOEFL
            (F.col("origin_country") != F.col("destination_country"))  # placed abroad
        )
        .groupBy("origin_country")
        .agg(F.avg("test_score").alias("avg_toefl_score"))
        .orderBy(F.desc("avg_toefl_score"))
    )

    return result

In [41]:
avg_scores = avg_toefl_by_origin(edu)
avg_scores.show()


+--------------+------------------+
|origin_country|   avg_toefl_score|
+--------------+------------------+
|        Canada| 6.175280898876403|
|         India| 5.933877551020407|
|       Germany| 5.781021897810218|
|           USA| 5.767088607594938|
|            UK| 5.613580246913581|
|        Russia| 5.529770992366416|
|       Finland|5.4856540084388214|
|       Ireland| 5.418283582089553|
|  South Africa| 5.403174603174605|
|           UAE| 5.267410714285712|
+--------------+------------------+

