<a href="https://colab.research.google.com/github/Akashnavani/Akashnavani/blob/main/datavilab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Write a PySpark program to create a DataFrame with four columns: “name”, “age”, “city”, and
# “gender” and perform the following operations:
# Insert minimum 10 values for the given columns.
#  Filter rows with age greater than 30.
#  Add a new column named it “tax”.
#  Rename the “age” column to “years”.
#  Drop Multiple Columns from the given data frame.
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Initialize Spark session
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()
# Creates a Spark application named "PySparkExample"
# If a Spark session already exists, it reuses it (getOrCreate()).

data = [
    ("Alice", 25, "New York", "F"),
    ("Bob", 35, "Los Angeles", "M"),
    ("Charlie", 32, "Chicago", "M"),
    ("David", 28, "Houston", "M"),
    ("Eva", 45, "Phoenix", "F"),
    ("Frank", 22, "Philadelphia", "M"),
    ("Grace", 30, "San Antonio", "F"),
    ("Helen", 29, "San Diego", "F"),
    ("Ian", 41, "Dallas", "M"),
    ("Jane", 33, "San Jose", "F")
]

columns = ["name", "age", "city", "gender"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)

print("Original DataFrame:")
df.show() # I am displaying the data frame
filtered_df = df.filter(df.age > 30)
print("displaying more than age 30")
filtered_df.show()



df_with_tax = filtered_df.withColumn("tax",lit(0.1))
print("dataframe with new column tax")
df_with_tax.show()


df_renamed = df_with_tax.withColumnRenamed("age","years")
print("dataframe with column renamed")
df_renamed.show()

final_df = df_renamed.drop("city","gender")
print("After dropping")
final_df.show()

spark.stop()

Original DataFrame:
+-------+---+------------+------+
|   name|age|        city|gender|
+-------+---+------------+------+
|  Alice| 25|    New York|     F|
|    Bob| 35| Los Angeles|     M|
|Charlie| 32|     Chicago|     M|
|  David| 28|     Houston|     M|
|    Eva| 45|     Phoenix|     F|
|  Frank| 22|Philadelphia|     M|
|  Grace| 30| San Antonio|     F|
|  Helen| 29|   San Diego|     F|
|    Ian| 41|      Dallas|     M|
|   Jane| 33|    San Jose|     F|
+-------+---+------------+------+

displaying more than age 30
+-------+---+-----------+------+
|   name|age|       city|gender|
+-------+---+-----------+------+
|    Bob| 35|Los Angeles|     M|
|Charlie| 32|    Chicago|     M|
|    Eva| 45|    Phoenix|     F|
|    Ian| 41|     Dallas|     M|
|   Jane| 33|   San Jose|     F|
+-------+---+-----------+------+

dataframe with new column tax
+-------+---+-----------+------+---+
|   name|age|       city|gender|tax|
+-------+---+-----------+------+---+
|    Bob| 35|Los Angeles|     M|0.1|

In [None]:
#Write a PySpark program to create a DataFrame containing information about various products,
# including ProductID, ProductName, Category, Price, StockQuantity, & Rating and perform the
# following operations:
#
# Insert minimum 10 values for the given columns.
# Sort the DataFrame first by Price in descending order and then by Category in ascending
# order.
# Find the total sales amount for each product by category.
# Find the total sales amount and the total quantity sold for each product.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,sum as _sum,round
#Initialize Spark session
spark = SparkSession.builder.appName("ProductDataFrame").getOrCreate()
#Creates a Spark application named "PySparkExample"
#If a Spark session already exists, it reuses it (getOrCreate()).

data = [
    (1, "Laptop", "Electronics", 55000, 25, 4.5),
    (2, "Smartphone", "Electronics", 22000, 50, 4.3),
    (3, "Headphones", "Accessories", 2500, 120, 4.1),
    (4, "Office Chair", "Furniture", 8500, 15, 4.6),
    (5, "Water Bottle", "Home & Kitchen", 500, 200, 4.2),
    (6, "Microwave Oven", "Appliances", 12000, 18, 4.4),
    (7, "Running Shoes", "Footwear", 3500, 40, 4.0),
    (8, "Backpack", "Bags", 1800, 75, 4.3),
    (9, "Wrist Watch", "Accessories", 6000, 30, 4.5),
    (10, "Study Table", "Furniture", 9500, 10, 4.7)
]

columns = ["ProductID", "ProductName", "Category", "Price", "StockQuantity", "Rating"]


#Create DataFrame
df = spark.createDataFrame(data, schema=columns)

print("Original DataFrame:")
df.show() #I am displaying the data frame

print("Sorted dataframe (Price desc ,Category asc):")
sorted_df = df.orderBy(col("Price").desc(),col("Category").asc())
#Or sorted_df = df.orderBy(col("Category").asc(),col("Price").desc())

sorted_df.show()

df_with_sales = df.withColumn("SalesAmount",col("Price")*col("StockQuantity"))
print("Total sales amount by Category")
sales_by_category = df_with_sales.groupBy("Category").agg(round(_sum("SalesAmount"),2).alias("TotalSalesAmount"))
sales_by_category.show()

print("Total sales Amount and Quantity by product:")
sales_by_product = df_with_sales.groupBy("ProductName").agg(round(_sum("SalesAmount"),2).alias("TotalSalesAmount"),_sum("StockQuantity").alias("TotalQuantitySold"))
sales_by_product.show()
spark.stop()

Original DataFrame:
+---------+--------------+--------------+-----+-------------+------+
|ProductID|   ProductName|      Category|Price|StockQuantity|Rating|
+---------+--------------+--------------+-----+-------------+------+
|        1|        Laptop|   Electronics|55000|           25|   4.5|
|        2|    Smartphone|   Electronics|22000|           50|   4.3|
|        3|    Headphones|   Accessories| 2500|          120|   4.1|
|        4|  Office Chair|     Furniture| 8500|           15|   4.6|
|        5|  Water Bottle|Home & Kitchen|  500|          200|   4.2|
|        6|Microwave Oven|    Appliances|12000|           18|   4.4|
|        7| Running Shoes|      Footwear| 3500|           40|   4.0|
|        8|      Backpack|          Bags| 1800|           75|   4.3|
|        9|   Wrist Watch|   Accessories| 6000|           30|   4.5|
|       10|   Study Table|     Furniture| 9500|           10|   4.7|
+---------+--------------+--------------+-----+-------------+------+

Sorted datafr

In [None]:
#not in syllabus
# Write a PySpark program to create a DataFrame containing information about employees in a company. including the following columns:
#EmployeeID, EmployeeName, Department, Salary, Experience, Age
#Perform the following operations:
#Insert at least 10 sample records for the given columns.
#Sort the DataFrame first by Salary in ascending order and then by Experience in descending order.
#Find the average salary for each department.
#Find the total years of experience and average age for each department

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum as _sum

# Initialize SparkSession
spark = SparkSession.builder.appName("EmployeeData").getOrCreate()

# Sample data: list of tuples (EmployeeID, EmployeeName, Department, Salary, Experience, Age)
data = [
    (1, "Alice", "HR", 60000, 5, 30),
    (2, "Bob", "IT", 75000, 7, 35),
    (3, "Charlie", "Finance", 80000, 6, 40),
    (4, "David", "IT", 70000, 8, 28),
    (5, "Eva", "HR", 62000, 4, 32),
    (6, "Frank", "Finance", 85000, 9, 45),
    (7, "Grace", "IT", 72000, 3, 26),
    (8, "Hannah", "HR", 58000, 6, 29),
    (9, "Ian", "Finance", 79000, 5, 38),
    (10, "Jane", "IT", 68000, 7, 34),
]

#Define schema columns
columns = ["EmployeeID", "EmployeeName", "Department", "Salary", "Experience", "Age"]

#Create DataFrame
df = spark.createDataFrame(data, schema=columns)

print("Original DataFrame:")
df.show()

#Sort DataFrame by Salary ascending, then Experience descending
sorted_df = df.orderBy(col("Salary").asc(), col("Experience").desc())
print("Sorted DataFrame (Salary ascending, Experience descending):")
sorted_df.show()

#Average salary per department
avg_salary = df.groupBy("Department").agg(avg("Salary").alias("AvgSalary"))
print("Average Salary per Department:")
avg_salary.show()

#Total experience and average age per department
dept_stats = df.groupBy("Department").agg(
    _sum("Experience").alias("TotalExperience"),
    avg("Age").alias("AvgAge"))
print("Total Experience and Average Age per Department:")
dept_stats.show()

#Stop Spark session
spark.stop()

Original DataFrame:
+----------+------------+----------+------+----------+---+
|EmployeeID|EmployeeName|Department|Salary|Experience|Age|
+----------+------------+----------+------+----------+---+
|         1|       Alice|        HR| 60000|         5| 30|
|         2|         Bob|        IT| 75000|         7| 35|
|         3|     Charlie|   Finance| 80000|         6| 40|
|         4|       David|        IT| 70000|         8| 28|
|         5|         Eva|        HR| 62000|         4| 32|
|         6|       Frank|   Finance| 85000|         9| 45|
|         7|       Grace|        IT| 72000|         3| 26|
|         8|      Hannah|        HR| 58000|         6| 29|
|         9|         Ian|   Finance| 79000|         5| 38|
|        10|        Jane|        IT| 68000|         7| 34|
+----------+------------+----------+------+----------+---+

Sorted DataFrame (Salary ascending, Experience descending):
+----------+------------+----------+------+----------+---+
|EmployeeID|EmployeeName|Departmen

In [None]:

# Using PySpark, analyze airline flight data (e.g., departure and arrival times, delays, carrier information) and perform the following operations:    Load a CSV file containing airline flight data Filter flights that were more than 15 minutes delayed. Analyze whether there is any correlation between the flight length and the likelihood of a delay. Dataset:https://drive.google.com/drive/folders/1KTpKBf5w8VOyPNTTIniM9oN- pwaIgjqf?usp=drive_link
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, corr
from pyspark.sql import functions as F


spark = SparkSession.builder.appName("FlightDataAnalysis").getOrCreate()

file_path = "/content/flights.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

#to check
df.printSchema()
df.show(5)

#Filter flights >15 minutes delayed
delayed_flights = df.filter(col("arr_delay") > 15)
print(f"Total flights with arrival delay >15 mins is : {delayed_flights.count()} ")
delayed_flights.show(5)

correlation_value = df.corr("distance","arr_delay")
print(f"correlation of flight distance and arrival delay is:{correlation_value} ")

df_with_binary_delay = df.withColumn("IS_DELAYED",(col("arr_delay")>15).cast("integer"))
df_with_binary_delay.show(5)

correlation_binary = df_with_binary_delay.corr("distance","IS_DELAYED")
print(f"correlation of flight distance and likelyhood of delay is:{correlation_binary} ")





spark.stop()

root
 |-- id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: double (nullable = true)
 |-- sched_dep_time: integer (nullable = true)
 |-- dep_delay: double (nullable = true)
 |-- arr_time: double (nullable = true)
 |-- sched_arr_time: integer (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: double (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- time_hour: timestamp (nullable = true)
 |-- name: string (nullable = true)

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-----------

In [None]:
#10. Consider airline flight data, given in the previous question. Perform the following operation
# using PySpark
# Group the data by airline carrier and compute the average delay for each one.
# Determine the top five routes (origin-destination) with the highest average delay.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, corr
from pyspark.sql import functions as F


spark = SparkSession.builder.appName("FlightDataAnalysis").getOrCreate()

file_path = "/content/flights.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

print("Average delay of each group ")
avg_delay_by_carrier = df.groupBy("carrier").agg(F.avg("arr_delay").alias("avg_delay"))
avg_delay_by_carrier = avg_delay_by_carrier.orderBy(F.desc("avg_delay"))
avg_delay_by_carrier.show()

print("Top 5 routes ordered by average delay")
df = df.withColumn("route",F.concat_ws("-",F.col("origin"),F.col("dest")))

avg_delay_by_route = df.groupBy("route").agg(F.avg("arr_delay").alias("avg_delay"))
top5_routes = avg_delay_by_route.orderBy(F.desc("avg_delay")).limit(5)
top5_routes.show()

spark.stop()


Average delay of each group 
+-------+--------------------+
|carrier|           avg_delay|
+-------+--------------------+
|     F9|  19.791808873720136|
|     OO|                17.0|
|     EV|  16.401793298725813|
|     FL|  12.094907407407407|
|     YV|   8.615062761506277|
|     WN|   7.866318436834978|
|     MQ|   7.437600529851452|
|     B6|  6.4721437855766215|
|     9E|   5.823355601233299|
|     UA|  2.6395006428601056|
|     AA|  0.4358667165855485|
|     US|-0.01197532357566227|
|     DL|  -2.464905544463216|
|     HA|  -4.268115942028985|
|     VX|  -4.894525364138624|
|     AS|  -6.952218430034129|
+-------+--------------------+

Top 5 routes ordered by average delay
+-------+------------------+
|  route|         avg_delay|
+-------+------------------+
|LGA-SBN|40.666666666666664|
|EWR-CAE| 39.51219512195122|
|EWR-TUL| 37.61290322580645|
|EWR-JAC|              35.8|
|EWR-OKC|        34.6484375|
+-------+------------------+



In [None]:
# Given a Movie dataset containing user ratings for movies, using PySpark SQL perform the
# following operations

# Load a CSV file containing movie data
# Create temporary views for movies and ratings.
# Write queries to find the top 10 highest-rated movies with at least 10 ratings.
# Dataset:https://drive.google.com/file/d/17PFBafCd0J8brMNjdVV-NyNCBI01-7fr/view?usp=drive_link
# Find the most active users (users who have rated the most movies).
from  pyspark.sql import SparkSession



# Create Spark session
spark = SparkSession.builder.appName("MovieRatingsAnalysis").getOrCreate()
# 1. Load the CSV files

#Adjust file paths as needed
movies_df = spark.read.option("header", True).csv("/content/movies.csv", inferSchema=True)
ratings_df = spark.read.option("header", True).csv("/content/ratings.csv", inferSchema=True)

#2. Create temporary views
movies_df.createOrReplaceTempView("movies")
ratings_df.createOrReplaceTempView("ratings")

#Let me display the views
print("***** ***MOVIES AND RATINGS VIENS *********")
spark.sql("SELECT * FROM movies LIMIT 5").show()
spark.sql("SELECT * FROM ratings LIMIT 5").show()

#can i create a view with only few columns
movies_selected = movies_df.select("movieId", "title")
movies_selected.createOrReplaceTempView("movies_view")
print("********....................MOVIES VIEW.....")
spark.sql("SELECT * FROM movies_view LIMIT 5").show()



top_movies_query = """ SELECT m.title ,  COUNT(r.rating) AS num_ratings,ROUND(AVG(r.rating),2) AS avg_rating
FROM ratings r JOIN movies m ON r.movieId = m.movieId GROUP BY m.title HAVING COUNT(r.rating) >= 10 ORDER BY avg_rating DESC ,num_ratings DESC LIMIT 10 """
top_movies = spark.sql(top_movies_query)
print("==== Top 10 Highest RAted Movies (with at least 10 ratings) =====")
top_movies.show(10,truncate=False)




active_users_query = """ SELECT userId , COUNT(movieId) AS total_ratings FROM ratings GROUP BY userId ORDER BY total_ratings DESC LIMIT 10 """
active_users = spark.sql(active_users_query)
print("==== Most Active users (user who rated the most movies) =====")
active_users.show(10,truncate=False) #without trunctate title may be not fully displayed





top_movies.write.mode("overwrite").csv("output/top_movies.csv",header=True)
active_users.write.mode("overwrite").csv("output/active_users.csv",header=True)

spark.stop()




***** ***MOVIES AND RATINGS VIENS *********
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+

********....................MOVIES VIEW.....
+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|  

In [None]:
#out of syllabus

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg

# Initialize SparkSession
spark = SparkSession.builder.appName("StudentRecordsAnalysis").getOrCreate()

# Sample data
data = [
    (301, "Alice", 10, 85, 78, 88),
    (302, "Bob", 10, 92, 85, 79),
    (303, "Charlie", 9, 70, 75, 80),
    (304, "David", 9, 88, 82, 85),
    (305, "Eva", 10, 95, 89, 92),
    (306, "Frank", 9, 60, 65, 70),
    (307, "Grace", 10, 78, 80, 76),
    (308, "Henry", 9, 82, 79, 84),
    (309, "Irene", 10, 90, 88, 91),
    (310, "John", 9, 68, 72, 75)
]

# Define schema columns
columns = ["StudentID", "Name", "Class", "MathScore", "ScienceScore", "EnglishScore"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)

# 3. Filter students who scored more than 80 in Math
math_above_80_df = df.filter(col("MathScore") > 80)

# 4. Add a new column TotalScore = Math + Science + English
df = df.withColumn("TotalScore", col("MathScore") + col("ScienceScore") + col("EnglishScore"))

# 5. Rename column 'Class' to 'Grade'
df = df.withColumnRenamed("Class", "Grade")

# 6. Drop the EnglishScore column
df = df.drop("EnglishScore")

# 7. Compute average score in each subject for all students
avg_scores = df.select(
    avg("MathScore").alias("AvgMathScore"),
    avg("ScienceScore").alias("AvgScienceScore"),
    avg("TotalScore").alias("AvgTotalScore")  # Optional, since English dropped, TotalScore is still sum
).collect()[0]

# Show results
print("Students with MathScore > 80:")
math_above_80_df.show()

print("DataFrame after adding TotalScore, renaming Class to Grade, and dropping EnglishScore:")
df.show()

print("Average Scores:")
print(f"Average Math Score: {avg_scores['AvgMathScore']:.2f}")
print(f"Average Science Score: {avg_scores['AvgScienceScore']:.2f}")
print(f"Average Total Score: {avg_scores['AvgTotalScore']:.2f}")

# Stop SparkSession
spark.stop()


Students with MathScore > 80:
+---------+-----+-----+---------+------------+------------+
|StudentID| Name|Class|MathScore|ScienceScore|EnglishScore|
+---------+-----+-----+---------+------------+------------+
|      301|Alice|   10|       85|          78|          88|
|      302|  Bob|   10|       92|          85|          79|
|      304|David|    9|       88|          82|          85|
|      305|  Eva|   10|       95|          89|          92|
|      308|Henry|    9|       82|          79|          84|
|      309|Irene|   10|       90|          88|          91|
+---------+-----+-----+---------+------------+------------+

DataFrame after adding TotalScore, renaming Class to Grade, and dropping EnglishScore:
+---------+-------+-----+---------+------------+----------+
|StudentID|   Name|Grade|MathScore|ScienceScore|TotalScore|
+---------+-------+-----+---------+------------+----------+
|      301|  Alice|   10|       85|          78|       251|
|      302|    Bob|   10|       92|       

In [None]:
#out of syllabus

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as _sum

# Initialize SparkSession
spark = SparkSession.builder.appName("HospitalPatientAnalysis").getOrCreate()

# Sample data
data = [
    (501, "Alice", 45, "F", "Cardiology", 3, 5000),
    (502, "Bob", 62, "M", "Orthopedics", 6, 15000),
    (503, "Charlie", 55, "M", "Neurology", 4, 12000),
    (504, "David", 38, "M", "Cardiology", 2, 4000),
    (505, "Eva", 70, "F", "Orthopedics", 7, 20000),
    (506, "Frank", 50, "M", "Neurology", 5, 11000),
    (507, "Grace", 65, "F", "Cardiology", 6, 18000),
    (508, "Henry", 42, "M", "Orthopedics", 3, 8000),
    (509, "Irene", 58, "F", "Neurology", 4, 13000),
    (510, "John", 35, "M", "Cardiology", 2, 300)
]

# Define schema columns
columns = ["PatientID", "Name", "Age", "Gender", "Department", "VisitCount", "BillAmount"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)

# 3. Filter patients with Age > 50 or VisitCount > 5
filtered_df = df.filter((col("Age") > 50) | (col("VisitCount") > 5))

# 4. Add new column Discount: 10% of BillAmount if Age > 60 else 0
df = df.withColumn(
    "Discount",
    when(col("Age") > 60, col("BillAmount") * 0.10).otherwise(0)
)

# 5. Rename VisitCount to NumberOfVisits
df = df.withColumnRenamed("VisitCount", "NumberOfVisits")

# 6. Drop Department column
df = df.drop("Department")

# 7. Compute total bill amount per Gender
total_bill_per_gender = df.groupBy("Gender").agg(_sum("BillAmount").alias("TotalBillAmount"))

# 8. Sort patients by BillAmount descending
sorted_df = df.orderBy(col("BillAmount").desc())

# Show results
print("Filtered patients (Age > 50 or VisitCount > 5):")
filtered_df.show()

print("DataFrame after adding Discount, renaming, and dropping Department:")
df.show()

print("Total Bill Amount per Gender:")
total_bill_per_gender.show()

print("Patients sorted by BillAmount descending:")
sorted_df.show()

# Stop SparkSession
spark.stop()


Filtered patients (Age > 50 or VisitCount > 5):
+---------+-------+---+------+-----------+----------+----------+
|PatientID|   Name|Age|Gender| Department|VisitCount|BillAmount|
+---------+-------+---+------+-----------+----------+----------+
|      502|    Bob| 62|     M|Orthopedics|         6|     15000|
|      503|Charlie| 55|     M|  Neurology|         4|     12000|
|      505|    Eva| 70|     F|Orthopedics|         7|     20000|
|      507|  Grace| 65|     F| Cardiology|         6|     18000|
|      509|  Irene| 58|     F|  Neurology|         4|     13000|
+---------+-------+---+------+-----------+----------+----------+

DataFrame after adding Discount, renaming, and dropping Department:
+---------+-------+---+------+--------------+----------+--------+
|PatientID|   Name|Age|Gender|NumberOfVisits|BillAmount|Discount|
+---------+-------+---+------+--------------+----------+--------+
|      501|  Alice| 45|     F|             3|      5000|     0.0|
|      502|    Bob| 62|     M|    