In [None]:
# Tasks perform in this Notebook taken from various platforms like linkedin

In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("GlobalSparkSession").getOrCreate()

In [2]:
from pyspark.sql import SparkSession
# Alternative Configuration
spark = SparkSession.builder \
    .config("spark.executor.memory", "10g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.execution.arrow.enabled", "true") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.python.worker.reuse", "false") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.heartbeatInterval", "120s") \
    .appName("Spark Interview Preparation") \
    .getOrCreate()

In [2]:
"""
Task - The dataset includes company_id, company_name, and multiple date columns representing the stock prices of the company. 
The task is to generate a new dataset with only three columns, "company_name", "Date", "Stock_price".
You have to unpivot the data.
"""
data = [
    (1, "Quantum Innovations", 12.5, 14.5, 16.5, 18.5, 20.5),
    (2, "Stellar Solutions", 14.5, 16.5, 18.5, 20.5, 22.5),
    (3, "Nebula Dynamics", 16.5, 18.5, 20.5, 22.5, 24.5),
    (4, "Fusion Enterprises", 18.5, 20.5, 22.5, 24.5, 26.5),
    (5, "Celestial Technologies", 20.5, 22.5, 24.5, 26.5, 28.5),
]
columns=["c_id","Company_Name","2024-02-01","2024-02-02","2024-02-03","2024-02-04","2024-02-05"]
df=spark.createDataFrame(data,columns)
# df.show()

In [78]:
from pyspark.sql.functions import col,when
dates_col=["Dates"]
dates_data=[("2024-02-01",),("2024-02-02",),("2024-02-03",),("2024-02-04",),("2024-02-05",)]
dates_df=spark.createDataFrame(dates_data,dates_col)
resultset=df.crossJoin(dates_df)
final_result=resultset.withColumn("Stock_Price",
                      when(col("Dates")=='2024-02-01',col('2024-02-01'))
                     .when(col("Dates")=='2024-02-02',col('2024-02-02'))
                     .when(col("Dates")=='2024-02-03',col('2024-02-03'))
                     .when(col("Dates")=='2024-02-04',col('2024-02-04'))
                     .when(col("Dates")=='2024-02-05',col('2024-02-05'))
                      )
# use sparksql to select required columns
final_result.select("c_id","Company_Name","Dates","Stock_Price")
# .show()
# dates_df.show()
              

DataFrame[c_id: bigint, Company_Name: string, Dates: string, Stock_Price: double]

In [43]:
# Do the above task using SparkSQL
# create temp sql table for values from dates_df dataframe
columns2=["c_id","Company_Name","feb1","feb2","feb3","feb4","feb5"]
df2=spark.createDataFrame(data,columns2)
temp_values=[("2024-02-01",),("2024-02-02",),("2024-02-03",),("2024-02-04",),("2024-02-05",)]
dates_df.createOrReplaceTempView("temp_values1")
df2.createOrReplaceTempView("temp_data1")
# df2.show()
results=spark.sql(f"select c_id,Company_Name,{'Dates'},case Dates when '{'2024-02-01'}' then feb1 when '{'2024-02-02'}' then feb2 when '{'2024-02-03'}' then feb3 when '{'2024-02-04'}' then feb4 else feb5 end as Stock_Price from temp_values1 cross join temp_data1")


In [1]:
# Write the results into parque file
# results.write.parquet('/home/BigData/Files/stock_price.parquet')
results.write.csv('/home/BigData/Files/stock_price.csv')

# results.show()

In [79]:
# Task :we have a dataset representing books issued by students, and we want to aggregate the books for each student.
# Multiple books should be shown separated by ";"
import pyspark.sql.functions as F

book_issued = [
    (101 ,'Mark' , "White Tiger"),
    (102 ,'Ria' , "The Fountainhead"),
    (102 ,'Ria' , "The Seceret History"),
    (101 ,'Mark' , "Bhagwad Gita"),
    (103 ,'Loi' , "The Fountainhead"),
    ]
books_schema=['s_id','s_name','book_names']
books_df=spark.createDataFrame(book_issued,books_schema)
gp=books_df.groupBy("s_id","s_name")\
.agg(F.concat_ws(';',F.collect_list("book_names")).alias("all_books"))\
# .show(truncate=False)
# print(type(gp))

In [81]:
'''
1.'Arranging a column with null values in ascending or descending order.'
Task - Sort the salary column of the dataframe :
'''
from pyspark.sql.types import *
import pyspark.sql.functions as F
data = [
    (1, "Aditya Sen" , None),
    (2, "Bikramaditya" , 200),
    (3, "Mark T" , 100),
    (4, "D K aditya" ,None),
    (5, "Danny" , 500),
    (6, "Eli" , 300),
    ]
cust_schema=StructType([
    StructField("id",IntegerType(),False), #is Nullable
    StructField("Name",StringType(),False),
    StructField("Amount",IntegerType(),True)
])
cust_df=spark.createDataFrame(data,cust_schema)
# print(cust_df)
# cust_df.show()
# a) in ascending order with NULL values at the top
cust_df.orderBy("Amount")
# .show()
# b) in ascending order with NULL values at the bottom

# c) in descending order with NULL values at the top

# d) in descending order with NULL values at the bottom
cust_df.orderBy(F.desc("Amount"))
# .show()

DataFrame[id: int, Name: string, Amount: int]

In [4]:
'''
Task - In your dataset, the "order_date" column includes a time component set to 00:00:00.
Your objective is to eliminate the time portion from the column, thereby conserving storage space.
'Remove time part from PySpark dataframe date column'
'''
from pyspark.sql.functions import col,slice
data = [(1, "2024-02-01 00:00:00"),
        (2, "2024-02-01 00:00:00"),
        (3, "2024-02-03 00:00:00"),
        (4, "2024-02-04 00:00:00",)]
columns = ["Order_id","Order_date"]
df=spark.createDataFrame(data,columns)
# df.select(df.Order_date[1:10]).show()
# using sparksql
df.createOrReplaceTempView("orders")
# print(df)
removedTime = spark.sql("select Order_id,cast(substring(Order_date,1,10) as date) as order_date from orders")
# print(removedTime)

# using Pyspark
df.withColumn("Order_date2",col("Order_date")[0:10])
# .show()
# removedTime.show()


DataFrame[Order_id: bigint, Order_date: string, Order_date2: string]

In [14]:
'''
case insensitive/ignorant data search'
Task - In the provided dataset, identify the rows where the 'Name' column contains the word 'aditya'.
'''
from pyspark.sql.functions import filter

data = [
    (1, "Aditya Sen" , 100),
    (2, "Bikramaditya" , 200),
    (3, "Mark T" , 100),
    (4, "D K aditya" , 200),
    (5, "Danny" , 500),
    (6, "Eli" , 100),
]
columns = ["id","name","Amount"]
df=spark.createDataFrame(data,columns)
df.createOrReplaceTempView("users")
# spark.sql("select id,name,Amount from users where lower(name) like '%aditya%'").show()
# df.where("lower(name) like '%aditya%'").show()
# df.filter(df["name"]).rlike('aditya').show()

In [11]:
# Task - Show work group wise , average salary for each department
# Suggestion: instead of following sql like approach follow, spark approach

from pyspark.sql.types import ArrayType
from pyspark.sql.types import *
from pyspark.sql.functions import col,when,avg
import pandas as pd
spark = spark.builder.getOrCreate()

schema=StructType([
    StructField("FirstName",ArrayType(StringType()),True),
    StructField("LastName",ArrayType(StringType()),True),
    StructField("Type",ArrayType(StringType()),True),
    StructField("Dept",ArrayType(StringType()),True),
    StructField("Yoe",ArrayType(IntegerType()),True),
    StructField("Sal",ArrayType(IntegerType()),True)
])
data = {'First Name': ['Aryan', 'Rohan', 'Riya', 'Yash', 'Siddhant'],
        'Last Name': ['Singh', 'Agarwal', 'Shah', 'Bhatia', 'Khanna'],
        'Type': ['Full-time Employee', 'Intern', 'Full-time Employee', 'Part-time Employee', 'Full-time Employee'],
        'Department': ['Administration', 'Technical', 'Administration', 'Technical', 'Management'],
        'YoE': [2, 3, 5, 7, 6],
        'Salary': [20000, 5000, 10000, 10000, 20000]
       }

pandas_df=pd.DataFrame(data)
# print(pandas_df)
# data1=(list(data.values()), list(data.keys()))
# df = spark.createDataFrame(list(data.values()), list(data.keys()))

df=spark.createDataFrame(pandas_df);
grouped_data=df.groupBy("Type","Department").agg(avg('Salary').alias("Avg_Salary"))#,min("Department").alias("Department"))
# grouped_data.show()
'''
gd=grouped_data.withColumn("Administration",when(col("Department")=='Administration',col('Avg_Salary')).otherwise(0))\
            .withColumn("Technical",when(col("Department")=='Technical',col('Avg_Salary')).otherwise(0))\
            .withColumn("Management",when(col("Department")=='Management',col('Avg_Salary')).otherwise(0))
# gd.select("Type","Administration","Management","Technical").show()
'''
df.groupBy("Type").pivot("Department").avg("Salary")#.na.fill(0).alias("Avg_Sal")
# .show()

# gd.show()
# print(data1)



+------------------+--------------+----------+---------+
|              Type|Administration|Management|Technical|
+------------------+--------------+----------+---------+
|Full-time Employee|       15000.0|   20000.0|     NULL|
|            Intern|          NULL|      NULL|   5000.0|
|Part-time Employee|          NULL|      NULL|  10000.0|
+------------------+--------------+----------+---------+



                                                                                

In [84]:
'''
Task - You are provided with two datasets, branch1 and branch2 , representing information about students and
their marks in different subjects across different branches. Your goal is to combine these datasets into one final dataset.
Missing text information should be shown as 'unknown' , and missing numerical information should be shown as -9999.
'''
from pyspark.sql.functions import lit
branch1 = spark.createDataFrame([["Delhi", "Neha", 90]], ["Branch", "Student", "Maths_marks"])
branch2 = spark.createDataFrame([["Arav","Kolkata", 79, 83], [None,"Kolkata", 89, 73]],["Student", "Branch", "Science_marks", "Maths_marks"])

branch_v1=branch1.withColumn("Science_marks",lit(-9999)).select("Student","Branch","Science_marks","Maths_marks")
union_data=branch_v1.union(branch2).selectExpr("Branch","case when Student is null then 'Unknown' else Student end as Student","Maths_marks","Science_marks")
# union_data.show()

In [2]:
'''
Task - The dataset contains information about food items in bills.
Your assignment is to determine the frequency of each food item ordered.
'''
from pyspark.sql.types import StructType,StructField,IntegerType,ArrayType,StringType
from pyspark.sql.functions import explode,count
import pandas as pd
data = [
(101, ["dosa", "biriyani", "idli"]),
(102, ["biriyani", "mineral water"]),
(103, ["rice", "mineral water", "poha"]),
(109, ["idli", "biriyani", "poha"]),
]

schema=StructType([
    StructField("id",IntegerType(),True),
    StructField("orders",ArrayType(StringType()),True)
])

df=spark.createDataFrame(data,schema).select(explode("orders").alias("orders")) #.show()
getCount=df.groupBy("orders").count()
getCount.orderBy(["count"],ascending=False)#.show(truncate=False)
# exploded=df.withColumn("items",explode(df.orders))
# exploded.show(truncate=False)
# orders.printSchema()

DataFrame[orders: string, count: bigint]

In [86]:
# Optimized code for the above

df=spark.createDataFrame(data,schema).select(explode("orders").alias("orders")) \
.groupBy("orders") \
.count().orderBy("count",ascending=False) \
# .show()

In [74]:
'''
Task - Transform the input DataFrame into a new DataFrame , where each row represents a unique student
and the columns include the student's name along with the marks for the "math" and "eng" subjects.
'''
from pyspark.sql.types import StructType,StructField,IntegerType,ArrayType,StringType

data=[
('Rudra','math',79),
('Rudra','eng',60),
('Shivu','math', 68),
('Shivu','eng', 59),
('Anu','math', 65),
('Anu','eng',80)
]
schema=StructType([
    StructField("Name",StringType(),True),
    StructField("Sub",StringType(),True),
    StructField("Marks",IntegerType(),True)
])
df=spark.createDataFrame(data,schema)\
# .show()


In [None]:
# Task: Read a table data from the sources like SQL Server/Hive/datalake


In [87]:
"""
Question: Given a list of sales transactions with product names and their corresponding sales amounts,
calculate the total sales for each product and store the results in a dictionary.
"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

spark=SparkSession.builder.appName("interview_prep").getOrCreate()
sales = [("Widget", 500), ("Gadget", 750), ("Widget", 1000), ("Doodad", 1200), ("Gadget", 600)]
sales_df=spark.createDataFrame(sales,schema=["product","price"])
sales_df=sales_df.groupBy("product").agg(sum("price").alias("sum_sal"))
# sales_df.show()
newdf=sales_df.select("product","sum_sal")
# newdf.show()
#.\
# product_sales_dict = sales_df.to_dict()
# print(sales_df)

24/03/02 23:39:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [126]:
# above problem using RDD
sales = [("Widget", 500), ("Gadget", 750), ("Widget", 1000), ("Doodad", 1200), ("Gadget", 600)]
rdd=sc.parallelize(sales)

# you have to give column index because RDD dont have column names.
# any transformation returns pipelined RDD
groupData=rdd.reduceByKey(lambda x,y:x+y).collectAsMap()
# data=groupData.collect()
print(groupData)

# print(type(data))
# print(groupData)
# unpacked_data = {key: list(value) for key, value in groupData.items()}
# print(unpacked_data)

[Stage 348:>                                                        (0 + 4) / 4]

{'Doodad': 1200, 'Widget': 1500, 'Gadget': 1350}


                                                                                

In [4]:
# Ways to handle duplicate data

from pyspark.sql.functions import expr

data = [("James", "Sales", 3000),("Michael", "Sales", 4600),("Robert", "Sales", 4100),("Maria", "Finance", 3000), \
 ("James", "Sales", 3000),("Scott", "Finance", 3300),("Jen", "Finance", 3900),("Jeff", "Marketing", 3000), \
 ("Kumar", "Marketing", 2000),("Saif", "Sales", 4100)
 ]
column= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = column)
# df.printSchema()
df.orderBy("employee_name",ascending=True)#.show(truncate=False)

#Distinct
distinctDF = df.distinct()
# print("Distinct count: "+str(distinctDF.count()))
distinctDF.orderBy("employee_name",ascending=True)#.show(truncate=False)
#Drop duplicates

df2 = df.dropDuplicates()
df2.orderBy("employee_name",ascending=True)#.show()

"""
print("Distinct count: "+str(df2.count()))
df2.show(truncate=False)
#Drop duplicates on selected columns
dropDisDF = df.dropDuplicates(["department","salary"])
print("Distinct count of department salary :
"+str(dropDisDF.count()))
dropDisDF.show(truncate=False)
"""

'\nprint("Distinct count: "+str(df2.count()))\ndf2.show(truncate=False)\n#Drop duplicates on selected columns\ndropDisDF = df.dropDuplicates(["department","salary"])\nprint("Distinct count of department salary :\n"+str(dropDisDF.count()))\ndropDisDF.show(truncate=False)\n'

In [None]:
# Broadcast variables examples practice

spark=SparkSession.builder.appName('SparkByExample.com').getOrCreate()
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)
data = [("James","Smith","USA","CA"),
 ("Michael","Rose","USA","NY"),
 ("Robert","Williams","USA","CA"),
 ("Maria","Jones","USA","FL")
 ]
rdd = spark.sparkContext.parallelize(data)

def state_convert(code):
 return broadcastState.value[code]
res = rdd.map(lambda a:(a[0],a[1],a[2],state_convert(a{3]))).collect()
print(res)

PySpark DataFrame Broadcast variable example
spark=SparkSession.builder.appName('PySpark broadcastvariable').getOrCreate()
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)
data = [("James","Smith","USA","CA"),("Michael","Rose","USA","NY"), ("Robert","William","USA","CA"),("Maria","Jones","USA","FL")
         ]

columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

def state_convert(code):
    return broadcastState.value[code]
res = df.rdd.map(lambda a:(a[0],a[1],a[2],state_convert(a[3]))).toDF(column)
res.show(truncate=False)

In [None]:
"""
Q2. How will you merge two files – File1 and File2 – into a single DataFrame if they have different schemas?
File -1:
Name|Age
Azarudeen, Shahul|25
Michel, Clarke|26
Virat, Kohli|28
Andrew, Simond|37
File -2:
Name|Age|Gender
Rabindra, Tagore |32|Male
Madona, Laure | 59|Female
Flintoff, David|12|Male
Ammie, James| 20|Female
Answer- import findspark
findspark.init()

"""
from pyspark.sql import SparkSession, types
spark = SparkSession.builder.master("local").appName('Modes of Dataframereader').getorCreate()
sc=spark.sparkContext
df1=spark.read.option("delimiter","|").csv('input.csv')

df2=spark.read.option("delimiter","|").csv("input2.csv",header=True)
from pyspark.sql.functions import lit
df_add=df1.withColumn("Gender",lit("null"))
df_add. union(df2).show()
For the Unionfrom pyspark.sql.types import *
schema=StructType(
 [
 StructField("Name",StringType(), True),
 StructField("Age",StringType(), True),
 StructField("Gender",StringType(),True),
 ]
)
df3=spark.read.option("delimiter","|").csv("input.csv",header=True, schema=schema)
df4=spark.read.option("delimiter","|").csv("input2.csv",header=True, schema=schema)
df3.union(df4).show()

In [None]:
"""
Q3. Examine the following file, which contains some corrupt/bad data. What will you do with such data, and how will
you import them into a Spark Dataframe?

"""
Emp_no, Emp_name, Department
101, Murugan, HealthCare
Invalid Entry, Description: Bad Record entry
102, Kannan, Finance
103, Mani, IT
Connection lost, Description: Poor Connection
104, Pavan, HR
Bad Record, Description: Corrupt record

import findspark
findspark.init()
from pyspark. sql import Sparksession, types
spark = Sparksession.builder.master("local").appName("Modes of Dataframereader").getorcreate()
sc=spark. sparkContext
from pyspark.sql.types import *
schm structiype([
structField("col_1",stringType(), True),
StructField("col_2",stringType(), True),
structrield("col",stringtype(), True),
])

df=spark.read.option("mode","DROPMALFORMED").csv('input1.csv', header=True,schema=schm)
df. show()

In [None]:
"""
Q4. Consider a file containing an Education column that includes an array of elements, as shown below. Using Spark
Dataframe, convert each element in the array to a record.
"""
Name| Age | EducaÈ›ion
Azar|25| MBA,BE,HSC
Hari|32|
Kumar|35|ME,BE,Diploma

import findspark
findspark.init()
from pyspark.sql import SparkSession, types
spark =
SparkSession.builder.master("local").appName('scenario based').getorCreate()
sc=spark.sparkContext
in_df=spark.read.option("delimiter","|").csv("input4.csv",header-True)
in_df.show()
from pyspark.sql.functions import posexplode_outer, split

in_df.withColumn("Qualification",
explode_outer(split("Education",","))).show()
in_df.select("*",posexplode_outer(split("Education",",")))\
     .withColumnRenamed("col", "Qualification")\
     .withColumnRenamed ("pos","Index")\
     .drop(“Education”).show()

In [None]:
Q2. What is SparkConf in PySpark? List a few attributes of SparkConf.
SparkConf aids in the setup and settings needed to execute a spark application locally or in a cluster. To put it another way, it
offers settings for running a Spark application. The following are some of SparkConf's most important features:
• set(key, value): This attribute aids in the configuration property setting.
• setSparkHome(value): This feature allows you to specify the directory where Spark will be installed on worker nodes.
• setAppName(value): This element is used to specify the name of the application.
• setMaster(value): The master URL may be set using this property.
• get(key, defaultValue=None): This attribute aids in the retrieval of a key's configuration value.

In [31]:
# // Create an RDD
# rdd = spark.sparkContext.parallelize(Seq(("A",1),("A",3),("B",4),("B",2),("C",5)))  
# // Get the data in RDD
# rdd_collect = rdd.collect()

In [8]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("RepartitionExample").getOrCreate()

# Create a DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 22), ("David", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# Check the number of partitions before repartitioning
print("Number of partitions before repartitioning:", df.rdd.getNumPartitions())

# Repartition the DataFrame into 2 partitions
df_repartitioned = df.repartition(2)

# Check the number of partitions after repartitioning
print("Number of partitions after repartitioning:", df_repartitioned.rdd.getNumPartitions())


24/03/04 11:06:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Number of partitions before repartitioning: 4
Number of partitions after repartitioning: 2




In [9]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("CoalesceExample").getOrCreate()

# Create a DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 22), ("David", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# Check the number of partitions before coalescing
print("Number of partitions before coalescing:", df.rdd.getNumPartitions())

# Coalesce the DataFrame into 2 partitions
df_coalesced = df.coalesce(2)

# Check the number of partitions after coalescing
print("Number of partitions after coalescing:", df_coalesced.rdd.getNumPartitions())


Number of partitions before coalescing: 4
Number of partitions after coalescing: 2


24/03/04 11:12:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("GlobalSparkSession2").getOrCreate()


24/03/06 15:55:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [169]:
"""
Task - Transform the input DataFrame into a new DataFrame , where each row represents a unique student,
and the columns include the student's name along with the marks for the "math" and "eng" subjects.
"""

data=[
('Rudra','math',79),
('Rudra','eng',60),
('Shivu','math', 68),
('Shivu','eng', 59),
('Anu','math', 65),
('Anu','eng',80)
]
schema="Name string,Sub string,Marks int"
df=spark.createDataFrame(data,schema)
df.groupBy("Name").pivot("Sub").max("Marks")\
.show()

+-----+---+----+
| Name|eng|math|
+-----+---+----+
|Shivu| 59|  68|
|Rudra| 60|  79|
|  Anu| 80|  65|
+-----+---+----+



In [195]:
# Task - Find the missing numbers in the column

data = [(1, ),(2,),(3,),(6,),(7,),(8,)]
df = spark.createDataFrame(data,['id'])#.toDF("id")
nums=spark.range(1,9)
nums.join(df,nums.id==df.id,'left').select(nums.id).filter(df.id.isNull()).show()

+---+
| id|
+---+
|  4|
|  5|
+---+



In [29]:
"""Asked in synechron 
return the gap between opened and closed date of a ticket
"""
from pyspark.sql.functions import * #to_date,col

data = [("t1", "01/01/2023", "opened"),
        ("t1", "05/01/2023", "assigned"),
        ("t1", "07/01/2023", "under_review"),
        ("t1", "10/01/2023", "closed"),
        ("t2", "01/01/2023", "opened"),
        ("t2", "05/01/2023", "assigned"),
        ("t2", "07/01/2023", "under_review"),
        ("t2", "10/01/2023", "under_manager_review"),
        ("t2", "14/01/2023", "closed")]

# Create DataFrame
df = spark.createDataFrame(data, ["ticket_id", "date_changed", "status"])
df=df.withColumn("date_changed",to_date(col("date_changed"),'dd/MM/yyyy'))#.show()
# df.printSchema()
df.groupBy("ticket_id").agg(min("date_changed").alias("opened_date"),\
                        max("date_changed").alias("closed_date"))\
.withColumn("diff_days",datediff("closed_date","opened_date")+1)\
.show()

+---------+-----------+-----------+---------+
|ticket_id|opened_date|closed_date|diff_days|
+---------+-----------+-----------+---------+
|       t1| 2023-01-01| 2023-01-10|       10|
|       t2| 2023-01-01| 2023-01-14|       14|
+---------+-----------+-----------+---------+



In [None]:
"""
Q1. What are SparkFiles in Pyspark?
PySpark provides the reliability needed to upload our files to Apache Spark. This is accomplished by using sc.addFile,
where 'sc' stands for SparkContext. We use SparkFiles.net to acquire the directory path.
We use the following methods in SparkFiles to resolve the path to the files added using SparkContext.addFile():
• get(filename),
• getrootdirectory()
"""

In [20]:
"""
Task - Find the top 5 customers with the highest number of clicks.
The click dataset has columns like 'user_id', 'timestamp', 'interaction_type'.
"""
from pyspark.sql.functions import count
data_tuples = [
(1, '2023-01-01 12:00:00', 'click'),
(2, '2023-01-01 12:05:00', 'view'),
(1, '2023-01-01 12:10:00', 'click'),
(3, '2023-01-01 12:15:00', 'view'),
(2, '2023-01-01 12:20:00', 'click'),
(1, '2023-01-01 12:25:00', 'view'),
(3, '2023-01-01 12:30:00', 'click'),
(2, '2023-01-01 12:35:00', 'view'),
(1, '2023-01-01 12:40:00', 'click'),
(3, '2023-01-01 12:45:00', 'view'),
(1, '2023-01-02 12:10:00', 'click'),
(1, '2023-01-03 12:10:00', 'click'),
(1, '2023-01-04 12:10:00', 'view'),(3, '2023-01-01 12:40:00', 'click'),
(3, '2023-01-01 12:45:00', 'view'),
# Add more tuples as needed
]
schema=['user_id', 'timestamp', 'interaction_type']
df=spark.createDataFrame(data_tuples,schema)
df.groupBy("user_id","interaction_type").agg(count("interaction_type").alias("click_count"))\
    .where("interaction_type='click'")\
    .orderBy("click_count",ascending=False)\
    .limit(5)#\.show()

DataFrame[user_id: bigint, interaction_type: string, click_count: bigint]

In [33]:
"""
A PivotTable is especially designed for querying large amounts of data in many user-friendly ways to derive quick insights easily.
Task - Show work group wise , average salary for each department
"""

data = {
        'First Name': ['Aryan', 'Rohan', 'Riya', 'Yash', 'Siddhant'],
        'Last Name': ['Singh', 'Agarwal', 'Shah', 'Bhatia', 'Khanna'],
        'Type': ['Full-time Employee', 'Intern', 'Full-time Employee', 'Part-time Employee', 'Full-time Employee'],
        'Department': ['Administration', 'Technical', 'Administration', 'Technical', 'Management'],
        'YoE': [2, 3, 5, 7, 6],
        'Salary': [20000, 5000, 10000, 10000, 20000]
       }
import pandas as pd
pd_table=pd.DataFrame(data)
df=spark.createDataFrame(pd_table)
df.groupBy('Type').pivot('Department').avg('Salary').na.fill(0)\
# .show()

DataFrame[Type: string, Administration: double, Management: double, Technical: double]

In [1]:
"""
Task 'camelCase to snake_case'
Task - Change the pyspark dataframe column names from camel case to snake case.
"""
schema=["userId","firstName","last_name"]
data = [
(101 , 'Neha' , 'K'),
(102 , 'Mark' , 'T'),
(103 , 'Iram' , 'D')
]
df=spark.createDataFrame(data,schema)
fields=df.columns
def convert_to_snake_case(charArr):
    return ''.join(['_'+char.lower() if char.isupper() else char  for char in charArr])
# snake_columns=list(map(convert_to_snake_case,fields))
snake_columns=[convert_to_snake_case(s) for s in fields]
df=spark.createDataFrame(data,snake_columns)
# df.show()

In [33]:
"""
Find the price difference between the months in the same year.
Pending due to inconsistend data
"""
from pyspark.sql.functions import datediff,to_date,col,year,lag
from pyspark.sql.window import Window
data = [
    (1,'2023-01-01',10),(1,'2023-01-27',13),(1,'2023-02-01',14)
    ,(1,'2023-02-15',15),(1,'2023-03-03',18),(1,'2023-03-27',15)
    ,(1,'2023-04-06',20),(2,'2024-01-01',50),(2,'2024-01-29',100)
    ,(2,'2024-02-01',150)
]
schema = "sku_id int , price_date string , price int"
df=spark.createDataFrame(data,schema)
windowSpec=Window.partitionBy(col("sku_id")).orderBy(col("price"))
getPrice=df.withColumn("month_year",col("price_date")[0:7])\
.groupBy("month_year").sum("price")\
# .show()
   # .withColumn("prev_price",lag("price").over(windowSpec))
# getPrice.withColumn("price_diff",col("price")-col("prev_price"))\


In [56]:
"""
🚀 Your Task:
Implement a PYSPARK program to solve these challenges. Share your code in the comments below and let the coding battle begin! 💻🔥
 
1️⃣ Get only Year part of "JoiningDate"
2️⃣ Get only Month part of "JoiningDate".
3️⃣ Get only date part of "JoiningDate".
4️⃣Get the current system date using DataFrame API
5️⃣Get the current UTC date and time using DataFrame API
"""
from pyspark.sql.types import *
from pyspark.sql.functions import to_date,col,to_timestamp,year,month,day,hour,minute,second,current_date,to_utc_timestamp

data = [
   [1, "Vikas", "Ahlawat", 600000.0, "2013-02-15 11:16:28.290", "IT", "Male"],
   [2, "nikita", "Jain", 530000.0, "2014-01-09 17:31:07.793", "HR", "Female"],
   [3, "Ashish", "Kumar", 1000000.0, "2014-01-09 10:05:07.793", "IT", "Male"],
   [4, "Nikhil", "Sharma", 480000.0, "2014-01-09 09:00:07.793", "HR", "Male"],
   [5, "anish", "kadian", 500000.0, "2014-01-09 09:31:07.793", "Payroll", "Male"],
]
 
# Create a schema for the DataFrame
schema = StructType([
   StructField("EmployeeID", IntegerType(), True),
   StructField("First_Name", StringType(), True),
   StructField("Last_Name", StringType(), True),
   StructField("Salary", DoubleType(), True),
   StructField("Joining_Date", StringType(), True),
   StructField("Department", StringType(), True),
   StructField("Gender", StringType(), True)
])
  
df=spark.createDataFrame(data,schema)
clean_data=df.withColumn("Joining_Date",col("Joining_Date")[0:19])\
.withColumn("Joining_Date",to_timestamp("Joining_Date",'yyyy-MM-dd HH:mm:ss'))\
.withColumn("Year",year("Joining_Date"))\
.withColumn("Month",month("Joining_Date"))\
.withColumn("Day",day("Joining_Date"))\
.withColumn("SystemDate",current_date())\
.withColumn("UTCTime",to_utc_timestamp(current_date(),"PST"))\
# .show(truncate=False)

In [85]:
# Challenge : 𝐅𝐢𝐧𝐝 𝐭𝐡𝐞 𝐭𝐨𝐭𝐚𝐥 𝐜𝐨𝐧𝐬𝐨𝐧𝐚𝐧𝐭𝐬 𝐜𝐨𝐮𝐧𝐭 𝐨𝐟 𝐭𝐡𝐞 𝐦𝐞𝐬𝐬𝐚𝐠𝐞 𝐜𝐨𝐥𝐮𝐦𝐧.
from pyspark.sql.functions import col,udf
from pyspark.sql.types import ArrayType,StringType
data = [(1,'I love to play cricket',),(2,'I am into motorbiking',),(3,'What do you like',)]
schema = ["id", "message"]
df = spark.createDataFrame(data = data , schema = schema)
# vowels=['a','e','i','o','u']
# return the consonant string
def find_consonants(str):
    cons=[]
    for char in str.replace(" ",""):
# use isalpha() method to filter out no-alphabetic characters
        if char.isalpha() and char.lower() not in ['a','e','i','o','u']:
            cons.append(char)
    return ''.join(cons),len(cons)
# register udf
consonant_udf=udf(find_consonants,returnType=ArrayType(StringType()))
apply_udf=df.withColumn("consonant",consonant_udf(col("message")))
apply_udf.select(col("id")
                 ,col("consonant")[0].alias("text")
                 ,col("consonant")[1].alias("length")
                )\
# .show(truncate= False)

DataFrame[id: bigint, text: string, length: string]

In [97]:
"""
Challenge : 𝑭𝒊𝒏𝒅 𝒕𝒉𝒆 𝒔𝒕𝒂𝒓𝒕 𝒂𝒏𝒅 𝒆𝒏𝒅 𝒍𝒐𝒄𝒂𝒕𝒊𝒐𝒏 𝒇𝒐𝒓 𝒆𝒂𝒄𝒉 𝒄𝒖𝒔𝒕𝒐𝒎𝒆𝒓.
df.show()
df.groupBy("customer")\
.agg(collect_list("start_location").alias("start_locations"),
     collect_list("end_location").alias("end_locations"))\
.show(truncate=False)
"""
from pyspark.sql.functions import collect_list

data = [ ('c1', 'New York', 'Lima'),('c1', 'London', 'New York'),
('c1', 'Lima', 'Sao Paulo'),('c1', 'Sao Paulo', 'New Delhi'),
('c2', 'Mumbai', 'Hyderabad'),('c2', 'Surat', 'Pune'),
('c2', 'Hyderabad', 'Surat'),('c3', 'Kochi', 'Kurnool'),
('c3', 'Lucknow', 'Agra'),('c3', 'Agra', 'Jaipur'),('c3', 'Jaipur', 'Kochi')]

schema = "customer string , start_location string , end_location string"
df = spark.createDataFrame(data = data , schema = schema).createOrReplaceTempView("journeys")
spark.sql("""
select
st.customer
,st.start_location as st_start,st.end_location as st_end
,ed.start_location as ed_start,ed.end_location as ed_end

from journeys as st
left join journeys as ed
on st.start_location=ed.end_location
order by st.customer
"""
)\
# .show(truncate=False)

DataFrame[customer: string, st_start: string, st_end: string, ed_start: string, ed_end: string]

In [103]:
"""
Challenge: 𝐅𝐢𝐧𝐝 𝐫𝐞𝐜𝐨𝐫𝐝𝐬 𝐢𝐧 𝐚 𝐝𝐚𝐭𝐚𝐟𝐫𝐚𝐦𝐞 𝐰𝐡𝐢𝐜𝐡 𝐚𝐫𝐞 𝐧𝐨𝐭 𝐩𝐫𝐞𝐬𝐞𝐧𝐭 𝐢𝐧 𝐚𝐧𝐨𝐭𝐡𝐞𝐫 𝐝𝐚𝐭𝐚𝐟𝐫𝐚𝐦𝐞.
"""
data_1 = [
Row(id=1, name='Mumbai'),
Row(id=2, name='Bangalore'),
Row(id=3, name='Delhi')
]

data_2 = [
Row(id=2, name='Bangalore'),
Row(id=1, name='Mumbai'),
Row(id=4, name='Ayodhya')
]

# Creating DataFrame 1
df_1 = spark.createDataFrame(data_1)

# Creating DataFrame 2
df_2 = spark.createDataFrame(data_2)
df_1.exceptAll(df_2)#.show()
# df_2.show()


DataFrame[id: bigint, name: string]

In [121]:
"""
Challenge: How to unpivot a dataframe in PySpark. Explain me with the help of example.
Miscellaneous: What if we have 100+ columns in the dataframe.
"""
from pyspark.sql.functions import lit,when

cricket_data = [
    ("Virat Kohli", 85, 100, 75),("Steve Smith", 90, 105, 80),("Kane Williamson", 88, 95, 70)
               ]

row_df=spark.createDataFrame([("Match1",),("Match2",),("Match3",)],["Matches"])

df = spark.createDataFrame(cricket_data, ["Player", "Match1", "Match2", "Match3"])
df2=df.crossJoin(row_df)\
.withColumn("Scores"
            ,when(col('Matches')=="Match1",col("Match1"))
            .when(col('Matches')=="Match2",col("Match2"))
            .when(col('Matches')=="Match3",col("Match3"))
           ).select("Player","Matches","Scores")\
# .show()

In [157]:
"""
𝑪𝒉𝒂𝒍𝒍𝒆𝒏𝒈𝒆 : Get the name of the 𝐡𝐢𝐠𝐡𝐞𝐬𝐭 𝐬𝐚𝐥𝐚𝐫𝐲 𝐚𝐧𝐝 𝐥𝐨𝐰𝐞𝐬𝐭 𝐬𝐚𝐥𝐚𝐫𝐲 employee name in 𝐞𝐚𝐜𝐡 𝐝𝐞𝐩𝐚𝐫𝐭𝐦𝐞𝐧𝐭.
If the salary is same then return the name of the employee whose name comes first in 𝐥𝐞𝐱𝐢𝐜𝐨𝐠𝐫𝐚𝐩𝐡𝐢𝐜𝐚𝐥 𝐨𝐫𝐝𝐞𝐫.
"""
from pyspark.sql.functions import *
from pyspark.sql.window import Window

emp_data = [ ('Siva',1,30000),('Ravi',2,40000),('Prasad',1,50000),('Arun',1,30000),('Sai',2,20000) ]

emp_schema = "emp_name string , dep_id int , salary long"

# Creating the dataframe
df = spark.createDataFrame(data = emp_data , schema = emp_schema)

windowSpec_min=Window.partitionBy("dep_id").orderBy("salary","emp_name")
windowSpec_max=Window.partitionBy("dep_id").orderBy(col("salary").desc(),"emp_name")

rank_salaries=df.withColumn("min_sal_rank",rank().over(windowSpec_min))\
                .withColumn("max_sal_rank",rank().over(windowSpec_max))\
                .selectExpr("dep_id","case when min_sal_rank=1 then emp_name end as min_sal_emp"\
                          ,"case when max_sal_rank=1 then emp_name end as max_sal_emp")\

rank_salaries.groupBy("dep_id").agg(max("min_sal_emp").alias("min_sal_emp")\
                    ,max("max_sal_emp").alias("max_sal_emp"))\
# .show()

DataFrame[dep_id: int, min_sal_emp: string, max_sal_emp: string]

In [5]:
"""
Challenge : Find the origin and the destination of each customer.
Note : There can be more than 1 stop for the same customer journey.

join(df_flight,(df_flight.cust_id==get_flightIds.cust_id) &
                  ((get_flightIds.min_flight==df_flight.flight_id) | (get_flightIds.max_flight==df_flight.flight_id))
                  )\
.agg(min("flight_id").alias("min_flight"),
                                max("flight_id").alias("max_flight"))
"""
flights_data = [(1,'Flight1' , 'Delhi' , 'Hyderabad'),
(1,'Flight2' , 'Hyderabad' , 'Kochi'),(1,'Flight3' , 'Kochi' , 'Mangalore'),
(2,'Flight1' , 'Mumbai' , 'Ayodhya'),(2,'Flight2' , 'Ayodhya' , 'Gorakhpur')
]

_schema = "cust_id int, flight_id string , origin string , destination string"
df_flight = spark.createDataFrame(data = flights_data , schema= _schema)
# df_flight.show()

# get_flightIds=df_flight.groupBy("cust_id")\
# .withColumn("origin_1",when(col("flight_id")==min("flight_id"),col("origin")))

# get_flightIds\
# # .show()


In [21]:
"""
Challenge:- Read the third quarter (25%) of emp_data table.
"""
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile,col,monotonically_increasing_id
emp_data = [("Alice", 28),
("Bob", 35),("Harvey",45),("Donna",45),
("Charlie", 42),
("David", 25),
("Eva", 31),
("Frank", 38),
("Grace", 45),
("Henry", 29)]

emp_schema = "name string , age int"

df = spark.createDataFrame(data = emp_data , schema = emp_schema)
createId=df.withColumn("mono_id",monotonically_increasing_id())
createId.withColumn("n_parts",ntile(4).over(Window.partitionBy("mono_id").orderBy("name"))).where(col("n_parts")==3)\
.show()

+----+---+-------+-------+
|name|age|mono_id|n_parts|
+----+---+-------+-------+
+----+---+-------+-------+



In [12]:
"""
Task : Retrieve information about consecutive login streaks for employees who have logged in for at least two consecutive days.

For each employee, provide the emp_id , the number of consecutive days logged in 
,the start_date of the streak and end_date of the streak.
"""
from pyspark.sql.functions import col,to_date,lag,date_diff
from pyspark.sql.window import Window

_data = [(101,'02-01-2024','N'),(101,'03-01-2024','Y'),(101,'04-01-2024','N'),
(101,'07-01-2024','Y'),(102,'01-01-2024','N'),(102,'02-01-2024','Y'),(102,'03-01-2024','Y'),
(102,'04-01-2024','N'),(102,'05-01-2024','Y'),(102,'06-01-2024','Y'),(102,'07-01-2024','Y'),
(103,'01-01-2024','N'),(103,'04-01-2024','N'),(103,'05-01-2024','Y'),(103,'06-01-2024','Y'),(103,'07-01-2024','N')
]
_schema = ["emp_id" , "log_date" , "isLogged_in"]

# creating the dataframe
df = spark.createDataFrame(data = _data , schema = _schema)
filter_Yes=df.withColumn("log_date",to_date("log_date","dd-MM-yyyy")).where(col("isLogged_in")=='Y').show()
windowSpec=Window.partitionBy("emp_id").orderBy("log_date")
# calc_diff=filter_Yes.withColumn("diff_days",date_diff(col("log_date"),lag("log_date").over(windowSpec)))
# consecutive_emp=calc_diff.where(col("diff_days")==1).distinct()\
# .show()

+------+----------+-----------+
|emp_id|  log_date|isLogged_in|
+------+----------+-----------+
|   101|2024-01-03|          Y|
|   101|2024-01-07|          Y|
|   102|2024-01-02|          Y|
|   102|2024-01-03|          Y|
|   102|2024-01-05|          Y|
|   102|2024-01-06|          Y|
|   102|2024-01-07|          Y|
|   103|2024-01-05|          Y|
|   103|2024-01-06|          Y|
+------+----------+-----------+



In [54]:
# Flatten the given Json_data:
companies={
        "Institute_Name" : "ABC_Coaching_Center",
        "Course_type" : "Best_seller" ,
        "branches" : [
        {
        "State" : "Maharashtra",
        "City" : "Mumbai",
        "address" : "XYZ"
        },
        {
        "State" : "Gujrat",
        "City" : "Surat",
        "address" : "PQRX"
        }
        ],
        "Head_Office_Contact" : 8787878787
        }
import pandas as pd
from pyspark.sql.functions import col

df_pd=pd.DataFrame(companies)
df=spark.createDataFrame(df_pd)
df.select("Course_type","Head_Office_Contact","Institute_Name",col("branches")["State"].alias("State")
          ,col("branches")["City"].alias("City")
          ,col("branches")["address"].alias("address"))\
# .show(truncate=False)
# print(df_pd)

DataFrame[Course_type: string, Head_Office_Contact: bigint, Institute_Name: string, State: string, City: string, address: string]

In [91]:
"""
Task : Write a sql query to find the products whose total sales revenue has increased every year.
Include the product_id, product_name and category in the result.
"""
from pyspark.sql.window import Window
from pyspark.sql.functions import col,when,collect_list,udf

product_data = [(1, 'Laptops', 'Electronics'),(2, 'Jeans', 'Clothing'),(3, 'Chairs', 'Home Appliances')]

sales_data = [
(1, 2019, 1000.00),
(1, 2020, 1200.00),
(1, 2021, 1100.00),
(2, 2019, 500.00),
(2, 2020, 600.00),
(2, 2021, 900.00),
(3, 2019, 300.00),
(3, 2020, 450.00),
(3, 2021, 400.00)
]

product_schema = ['product_id', 'product_name', 'category']
sales_schema = ['product_id', 'year', 'total_sales_revenue']

products= spark.createDataFrame(data = product_data , schema = product_schema)
sales = spark.createDataFrame(data = sales_data , schema = sales_schema)

joined_data=products.join(sales,products.product_id==sales.product_id)\
        .select(products.product_id,"product_name","category","year","total_sales_revenue")
collect_data=joined_data.groupBy("product_id","product_name","category")\
.agg(collect_list("year").alias("years"),collect_list("total_sales_revenue").alias("total_sales"))\

def find_flag(lst):
    flag="Y"
    for i in range(len(lst)-1):
        if(lst[i+1]<lst[i]):
            flag="N"
            return
    return flag

increase_udf=udf(find_flag)
collect_data.withColumn("increase_flag",increase_udf(col("total_sales"))).filter(col("increase_flag")=='Y')\
.show(truncate=False)

[Stage 143:>                                                        (0 + 4) / 4]

+----------+------------+--------+------------------+---------------------+-------------+
|product_id|product_name|category|years             |total_sales          |increase_flag|
+----------+------------+--------+------------------+---------------------+-------------+
|2         |Jeans       |Clothing|[2019, 2020, 2021]|[500.0, 600.0, 900.0]|Y            |
+----------+------------+--------+------------------+---------------------+-------------+



                                                                                

In [93]:
# How to join more than two dataframes
from pyspark.sql.functions import col

# Example DataFrame 1: Employee data
employee_data = [
    (1, "John", "Manager",1),
    (2, "Alice", "Developer",2),
    (3, "Bob", "Developer",3)
]
employee_schema = ["emp_id", "emp_name", "emp_role","dept_id"]
employee_df = spark.createDataFrame(employee_data, schema=employee_schema)

# Example DataFrame 2: Department data
department_data = [
    (1, "Engineering"),
    (2, "Marketing"),
    (3, "Sales")
]
department_schema = ["dept_id", "dept_name"]
department_df = spark.createDataFrame(department_data, schema=department_schema)

# Example DataFrame 3: Salary data
salary_data = [
    (1, 100000),
    (2, 90000),
    (3, 95000)
]
salary_schema = ["emp_id", "salary"]
salary_df = spark.createDataFrame(salary_data, schema=salary_schema)

# Perform Join/
join_data=(
    employee_df.join(salary_df,employee_df.emp_id==salary_df.emp_id)\
    .join(department_df,employee_df.dept_id==department_df.dept_id,"left")
).show()
                 # department_df,employee_df.


[Stage 154:>                                                        (0 + 4) / 4]

+------+--------+---------+-------+------+------+-------+-----------+
|emp_id|emp_name| emp_role|dept_id|emp_id|salary|dept_id|  dept_name|
+------+--------+---------+-------+------+------+-------+-----------+
|     1|    John|  Manager|      1|     1|100000|      1|Engineering|
|     3|     Bob|Developer|      3|     3| 95000|      3|      Sales|
|     2|   Alice|Developer|      2|     2| 90000|      2|  Marketing|
+------+--------+---------+-------+------+------+-------+-----------+



                                                                                

In [112]:
"""
Problem: You have a dataset of sales transactions, and you want to find the top-selling product for each month.
Write a PySpark code to achieve this using window functions.
"""
from pyspark.sql.types import StructType,StructField,StringType,DateType,DoubleType
import pyspark.sql.functions  as F
from datetime import datetime
from pyspark.sql.window import Window
# Define the schema for the DataFrame
schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("sales_date", DateType(), True),
    StructField("sales_amount", DoubleType(), True)
])

# Sample data
data = [
    ("A", datetime.strptime("2023-01-15", "%Y-%m-%d").date(), 100.0),
    ("B", datetime.strptime("2023-01-20", "%Y-%m-%d").date(), 150.0),
    ("A", datetime.strptime("2023-02-10", "%Y-%m-%d").date(), 120.0),
    ("B", datetime.strptime("2023-02-15", "%Y-%m-%d").date(), 180.0),
    ("C", datetime.strptime("2023-03-05", "%Y-%m-%d").date(), 200.0),
    ("A", datetime.strptime("2023-03-10", "%Y-%m-%d").date(), 250.0)
]
df = spark.createDataFrame(data, schema)
windowSpec=Window.partitionBy(F.year("sales_date"),F.month("sales_date")).orderBy(F.col("sales_amount").desc())
get_max_price=df.withColumn("year",F.year("sales_date")).withColumn("month",F.month("sales_date"))\
                .withColumn("price_rank",F.rank().over(windowSpec)).filter(col("price_rank")==1)\
# .show(truncate=False)


In [131]:
"""
Write a Pyspark code to find primary Department for Each Employee
Employees can belong to multiple departments. When the employee joins other departments,
they need to decide which department is their primary department. Note that when an employee belongs to only one department, 
their primary column is 'N'.
Write a solution to report all the employees with their primary department. For employees who belong to one department,
report their only department.
Return the result in any order.
"""
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
import pyspark.sql.functions  as F
# Define the schema for the Employee table
employee_schema = StructType([
    StructField("employee_id", IntegerType(), True),
    StructField("department_id", IntegerType(), True),
    StructField("primary_flag", StringType(), True)
])

# Create Employee DataFrame
employee_data = [  (1, 1, "N"),  (2, 1, "Y"), (2, 2, "N"), (3, 3, "N"), (4, 2, "N"), (4, 3, "Y"), (4, 4, "N") ]

employee_df = spark.createDataFrame(employee_data, schema=employee_schema)
"""
count_empid=employee_df.groupBy("employee_id").agg(F.count(F.col("employee_id")).alias("count_emp"))
decide_emp=employee_df.join(count_empid,employee_df.employee_id==count_empid.employee_id)\
.withColumn("flag",F.expr("CASE WHEN count_emp >1 and primary_flag ='Y' then 'Y' else 'N' end"))\
.where(F.expr("count_emp=1 or flag='Y'"))\
.show()
"""


'\ncount_empid=employee_df.groupBy("employee_id").agg(F.count(F.col("employee_id")).alias("count_emp"))\ndecide_emp=employee_df.join(count_empid,employee_df.employee_id==count_empid.employee_id).withColumn("flag",F.expr("CASE WHEN count_emp >1 and primary_flag =\'Y\' then \'Y\' else \'N\' end")).where(F.expr("count_emp=1 or flag=\'Y\'")).show()\n'

In [160]:
"""
1. Find City name that starts with vowels [A,I,O,U,E] 🤔.
City names should not repeat in the result dataset using pyspark.
"""

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import *
# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Lat_N", DoubleType(), True),
    StructField("Long_W", DoubleType(), True)
])

# Sample data
data = [
    (1, 'New York', 'NY', 40.7128, -74.006),
    (2, 'Los Angeles', 'CA', 34.0522, -118.2437),
    (3, 'Albany', 'IL', 41.8781, -87.6298),
    (4, 'East China', 'TX', 29.7604, -95.3698),
    (5, 'East China', 'AZ', 33.4484, -112.074),
    (6, 'Philadelphia', 'PA', 39.9526, -75.1652),
    (7, 'San Antonio', 'TX', 29.4241, -98.4936),
    (8, 'San Diego', 'CA', 32.7157, -117.1611),
    (9, 'Oakfield', 'TX', 32.7767, -96.797),
    (10, 'San Jose', 'CA', 37.3382, -121.8863)
]

# Create DataFrame
df = spark.createDataFrame(data=data, schema=schema)
# Filter city names that start with vowels
vowel_cities= df.filter(col("City").rlike('^[AEIOUaeiou]'))\
# .show()

In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("GlobalSparkSession").getOrCreate()

24/03/29 22:03:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
"""
Given an input pyspark dataframe, columns as account_id and income.
Write a solution to calculate the number of bank accounts for each salary category. The salary categories are:
"Low Salary": All the salaries are strictly less than $20000.
"Average Salary": All the salaries in the inclusive range [$20000, $50000].
"High Salary": All the salaries strictly greater than $50000.
The result table must contain all three categories. If there are no accounts in a category, return 0.
Return the result table in any order.
"""

from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.functions import *

# Define the schema for the DataFrame
schema = StructType([
    StructField("account_id", IntegerType(), True),
    StructField("income", IntegerType(), True)
])

# Insert data into the DataFrame
data = [ (3, 108939),   (2, 12747), (8, 87709), (6, 91796) ]

df = spark.createDataFrame(data, schema=schema)
# Define the CTE with the income category logic.

cte = df.withColumn("category",
     when(col("income") < 20000, "Low Salary")
    .when((col("income") >= 20000) & (col("income") <= 50000), "Average Salary")
    .when(col("income") > 50000, "High Salary")
    .otherwise("")
)

# Define the 'salary' DataFrame with distinct categories
salary = spark.createDataFrame([("Low Salary",), ("Average Salary",), ("High Salary",)], ["category"])
cte.join(salary,salary.category==cte.category,"right")\
.groupBy(salary.category).agg(count(cte.category).alias("count"))\
# .show()

DataFrame[category: string, count: bigint]

In [6]:
"""
Task - Leet code 1783. Grand slam titles
Find how many titles did a player won
"""
players_data = [(1, 'Nadal'), (2, 'Federer'), (3, 'Novak')]
championships_data = [(2018, 1, 1, 1, 1), (2019, 1, 1, 2, 2), (2020, 2, 1, 2, 2)]
players_schema=['player_id','player_name']
champions_schema='year int,wimbledon int,Fr_open int,US_open int,AUopen int'
players_df = spark.createDataFrame(players_data, schema=players_schema)
champions_df = spark.createDataFrame(championships_data, schema=champions_schema).createOrReplaceTempView("champions")

# players_df.show()
title_count=spark.sql("""
   select player,count(player) as p_count from (
    select wimbledon as player from champions union all
    select Fr_open from champions union all
    select US_open from champions union all
    select AUopen from champions
    ) group by player
""")

title_count.join(players_df,title_count.player==players_df.player_id)\
.select("player","player_name","p_count")\
# .show()

DataFrame[player: int, player_name: string, p_count: bigint]

In [48]:
"""𝐂𝐡𝐚𝐥𝐥𝐞𝐧𝐠𝐞:
-- Given is user login table for , identify dates where a user has logged in for 5 or more consecutive days.
-- Return the user id, start date, end date and no of consecutive days, sorting based on user id.
-- If a user logged in consecutively 5 or more times but not spanning 5 days then they should be excluded.
"""

data = [
    (1, '2024-03-01'),(1, '2024-03-02'),(1, '2024-03-03'),(1, '2024-03-04'),(1, '2024-03-06'),(1, '2024-03-10'),
    (1, '2024-03-11'),(1, '2024-03-12'),(1, '2024-03-13'),(1, '2024-03-14'),(1, '2024-03-20'),(1, '2024-03-25'),
    (1, '2024-03-26'),(1, '2024-03-27'),(1, '2024-03-28'),(1, '2024-03-29'),(1, '2024-03-30'),(2, '2024-03-01'),
    (2, '2024-03-02'),(2, '2024-03-03'),(2, '2024-03-04'),(3, '2024-03-01'),(3, '2024-03-02'),(3, '2024-03-03'),
    (3, '2024-03-04'),(3, '2024-03-04'),(3, '2024-03-04'),(3, '2024-03-05'),(4, '2024-03-01'),(4, '2024-03-02'),
    (4, '2024-03-03'),(4, '2024-03-04'),(4, '2024-03-04')
]
from pyspark.sql.functions import col,to_date,lag,date_diff,expr,sum
from pyspark.sql.window import Window
schema = "user_id int , login_date string"
diff_window=Window.partitionBy("user_id").orderBy("login_date")
df = spark.createDataFrame(data = data , schema = schema).withColumn("login_date",to_date("login_date","yyyy-MM-dd")).dropDuplicates()
get_diff=df.withColumn("date_diff",date_diff(col("login_date"),lag("login_date").over(diff_window)))
filter_diff=get_diff.selectExpr("user_id","login_date","coalesce(date_diff,0) as diff_days")
filter_ones=filter_diff#.where(expr("diff_days <=1"))

sum_window=Window.partitionBy("user_id","diff_days").orderBy("login_date")
# filter_ones.printSchema()
filter_ones.withColumn("sum_days",sum(col("diff_days")).over(sum_window))\
# .show(n=100)
#where(expr("date_diff<=1"))

DataFrame[user_id: int, login_date: date, diff_days: int, sum_days: bigint]

In [60]:
from pyspark.sql.functions import explode,posexplode,explode_outer\
                                  ,posexplode_outer

data = [('Jaya', '20', ['SQL', 'Data Science']),('Rohit', '19', None)
        ,('Maria', '20', ['DBMS', 'Networking']),
        ('Jay', '22', None),('Milan', '21', ['ML', 'AI'])
       ]
columns = ['Name', 'Age', 'Courses_enrolled']
df = spark.createDataFrame(data, columns)
df.select(df.Name,df.Age,explode(df.Courses_enrolled))\
# .show()

df.select(df.Name,df.Age,posexplode(df.Courses_enrolled))\
# .show()

df.select(df.Name,df.Age,explode_outer(df.Courses_enrolled))\
# .show()

df1=df.select(df.Name,posexplode_outer(df.Courses_enrolled))\
# .show()

In [86]:
"""
Identifying missing values and treating/imputing missing values is an important step in exploratory data analysis.
Let's try it in the PySpark dataframe API.
Task - In a PySpark dataframe,
a) Identify columns that contain no null values.
b) Identify columns where every value is null.
c) Identify columns with at least one null value
"""
data = [
    (1,'Neha' , 30 , None, 'IT'),
    (2,'Mark' , None , None, 'HR'),
    (3,'David' , 25 , None, 'HR'),
    (4,'Carol' , 30 , None, None)
]
schema="id int,Name string,marks int,salary int,dept string"
df=spark.createDataFrame(data,schema)

# df.printSchema()
# a) Identify columns that contain no null values.
from pyspark.sql.functions import col

non_null_columns=[
col_name for col_name in df.columns if df.filter(col(col_name).isNull()).count()==0
]

print("Columns with no null values:", non_null_columns)

# b) Identify columns where every value is null.
"""
null_columns=[ col_name for col_name in df.columns if df.filter(col(col_name).isNull()).count()==df.count() ]
"""
null_columns=[
col_name for col_name in df.columns if df.filter(col(col_name).isNotNull()).count()==0
]
print("Columns with All null values:", null_columns)

# c) Identify columns with at least one null value
one_null_columns=[
col_name for col_name in df.columns if df.filter(col(col_name).isNull()).count()>=1
]

print("Columns with one null values:", one_null_columns)


Columns with no null values: ['id', 'Name']
Columns with All null values: ['salary']
Columns with one null values: ['marks', 'salary', 'dept']


In [2]:
"""
You have a row that is too long to show horizontally, also the texts get truncated. 
How can you show the data vertically, and also show the full text?
"""
data = [
    (1,'Neha' , 30 , None, 'IT',2,'Mark' , None , None, 'HR'),
    (3,'David' , 25 , None, 'HR',4,'Carol' , 30 , None, None)
]
schema="id int,Name string,marks int,salary int,dept string,id2 int,Name2 string,marks2 int,salary2 int,dept2 string"
df=spark.createDataFrame(data,schema)
# df.show()
# df.show(n=2 ,vertical = True , truncate = False)


In [1]:
# i want to return top 3 resturants in the city in the month wise in terms of orders
# month,restaurant,city
from pyspark.sql.functions import *
from pyspark.sql.types import *

data = [
    (1, '2023-01-01', 'Restaurant A', 'New York'),
    (2, '2023-01-02', 'Restaurant B', 'Los Angeles'),
    (3, '2023-01-03', 'Restaurant C', 'Chicago'),
    (4, '2023-01-04', 'Restaurant B', 'Houston'),
    (5, '2023-01-05', 'Restaurant C', 'Phoenix'),
    (6, '2023-01-06', 'Restaurant C', 'Philadelphia'),
    (7, '2023-01-07', 'Restaurant D', 'San Antonio'),
    (8, '2023-01-08', 'Restaurant D', 'San Diego'),
    (9, '2023-01-09', 'Restaurant A', 'Dallas'),
    (10, '2023-01-10', 'Restaurant A', 'Austin'),
    (11, '2023-01-11', 'Restaurant E', 'San Jose'),
    (12, '2023-01-12', 'Restaurant E', 'Jacksonville'),
    (13, '2023-01-13', 'Restaurant D', 'Fort Worth'),
    (14, '2023-01-14', 'Restaurant E', 'Columbus'),
    (15, '2023-01-15', 'Restaurant D', 'Charlotte')
]
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("restaurant", StringType(), True),
    StructField("city", StringType(), True)
])
df=spark.createDataFrame(data,schema)
df.withColumn("order_date",to_date("order_date",'yyyy-MM-dd'))\
# .show()

DataFrame[order_id: string, order_date: date, restaurant: string, city: string]

In [13]:
"""
Task - Identify rows containing non-numeric values in the "Quantity" column, if any.
Detecting non-numeric values within columns intended for numerical data is a valuable step in both data exploration and data cleaning processes.
"""
from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([
StructField("ProductCode", StringType(), True),
StructField("Quantity", StringType(), True),
StructField("UnitPrice", StringType(), True),
StructField("CustomerID", StringType(), True),
])

data = [
("P001", 5, 20.0, "C001"),
("P002", 3, 15.5, "C002"),
("P003", 10, 5.99, "C003"),
("P004", 2, 50.0, "C001"),
("P005", "eight", 12.75, "C002"),
]

df=spark.createDataFrame(data,schema)

df.select("ProductCode","Quantity","UnitPrice","CustomerID").where(col("Quantity").cast("double").isNull()).show()

+-----------+--------+---------+----------+
|ProductCode|Quantity|UnitPrice|CustomerID|
+-----------+--------+---------+----------+
|       P005|   eight|    12.75|      C002|
+-----------+--------+---------+----------+



In [53]:
"""
𝐓𝐡𝐞 𝐂𝐡𝐚𝐥𝐥𝐞𝐧𝐠𝐞: Given a table with 'brands' and missing 'category' values, can you craft an SQL query that fills those
gaps with the last non-null category? (Edit - Lets solve in PySpark)

My PySpark dataframe API solution is in the comment (backward fill concept using last()over()).
Link to one SQL solution in comment
"""
from pyspark.sql.functions import *
from pyspark.sql.window import Window

data =[('chocolates','5-star'),(None,'dairy milk'),(None,'perk')
        ,(None,'eclair'),('Biscuits','britannia'),(None,'good day'),(None,'boost')
      ]

df = spark.createDataFrame(data,["category" , "brand_name"])
windowSpec=Window.orderBy("seq_no")#.rowsBetween(Window.unboundedPreceding,0)

df=df.withColumn('seq_no',row_number().over(Window.orderBy(monotonically_increasing_id())))
df.withColumn("category",last("category",ignorenulls=True).over(windowSpec))\
# .show()

DataFrame[category: string, brand_name: string, seq_no: int]

In [56]:
"""
𝐂𝐡𝐚𝐥𝐥𝐞𝐧𝐠𝐞: Fill the Null values. According to the ouput dataframe.
"""
from pyspark.sql.functions import *
from pyspark.sql.window import Window

job_skills_data = [
 (1, 'Data Engineer', 'SQL'), (2, None, 'Python'), (3, None, 'AWS'),
 (4, None, 'Snowflake'), (5, None, 'Apache Spark'), (6, 'Web Developer', 'Java'),
 (7, None, 'HTML'), (8, None, 'CSS'), (9, 'Data Scientist', 'Python'),
 (10, None, 'Machine Learning'), (11, None, 'Deep Learning'),(12, None, 'Tableau')
]

job_skills_schema = "row_id int , job_role string , skills string"

job_skills_df = spark.createDataFrame(data = job_skills_data , schema = job_skills_schema)
windowSpec=Window.orderBy("row_id")
job_skills_df.withColumn("job_role",last("job_role",ignorenulls=True).over(windowSpec))\
# .show()

DataFrame[row_id: int, job_role: string, skills: string]

In [1]:
"""
Write a solution to find the patient_id, patient_name, and conditions of the patients who have Type I Diabetes.
Type I Diabetes always starts with the DIAB1 prefix. Return the result table in any order.
"""
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
.appName("DiabetesPatients") \
.getOrCreate()

# Sample data
data = [
(1, "Daniel", "YFEV COUGH"),
(2, "Alice", ""),
(3, "Bob", "DIAB100 MYOP"),
(4, "George", "ACNE DIAB100"),
(5, "Alain", "DIAB201")
]

# Create DataFrame
patients_df = spark.createDataFrame(data, ["patient_id", "patient_name", "conditions"])
patients_df.where("conditions like '%DIAB1%'")\
# .show()

24/04/15 09:40:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


DataFrame[patient_id: bigint, patient_name: string, conditions: string]

In [1]:
"""
Task - Find the second highest salary in the Finance department.
"""

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.window import Window
from pyspark.sql.functions import desc,dense_rank

# Define schema
schema = StructType([
StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True),
StructField("Department", StringType(), True),
StructField("Salary", StringType(), True)
])


# Create data
data = [("Mike", 30, "Finance", "50,000"),
("Jim", 35, "Finance", "30,000"),
("Ami", 22, "Finance", "30,000"),
("Laura",29, "Sales", "35,000"),
("Kia", 28, "Sales", "30,000"),
("Megha", 22, "Finance", "20,000")]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Show DataFrame
windowSpec=Window.partitionBy("Department").orderBy(desc("Salary"))
df=df.where("Department='Finance'")
df.withColumn("sal_rank",dense_rank().over(windowSpec))\
.where("sal_rank=2")\
# .show()

DataFrame[Name: string, Age: int, Department: string, Salary: string, sal_rank: int]

In [11]:
"""
🔑 Step 1️⃣: Defining the Challenge
Your objective is to analyze the top-performing products in each category, along with their previous and subsequent sales trends.
🛠️ Step 2️⃣: PySpark Solution
"""

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Sample sales data
data = [("Electronics", "Laptop", "2024-04-01", 100),
("Electronics", "Laptop", "2024-04-02", 120),
("Electronics", "Laptop", "2024-04-03", 150),
("Electronics", "Phone", "2024-04-01", 200),
("Electronics", "Phone", "2024-04-02", 180),
("Electronics", "Phone", "2024-04-03", 220)]

# Create DataFrame
columns = ["category", "product", "date", "sales"]
df = spark.createDataFrame(data, columns)
# df.show()

# get overall rank of that product,previous price,subsequent ( next) price.
windowSpec=Window.partitionBy("category").orderBy(desc("sales"))

df.withColumn("sal_rank",dense_rank().over(windowSpec))\
.withColumn("prev_sal",F.lag("sales").over(windowSpec))\
.withColumn("subsequent_sal",F.lead("sales").over(windowSpec))\
# .show()

DataFrame[category: string, product: string, date: string, sales: bigint, sal_rank: int, prev_sal: bigint, subsequent_sal: bigint]