In [0]:
dbutils.fs.ls('dbfs:/FileStore/tables/nested_json.txt')

Out[321]: [FileInfo(path='dbfs:/FileStore/tables/nested_json.txt', name='nested_json.txt', size=610, modificationTime=1733578814000)]

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import MapType,StructType,StructField,StringType,IntegerType,ArrayType,DecimalType,BooleanType

In [0]:
json_schema=StructType([
    StructField('id',IntegerType(),True),
    StructField('name',StringType(),True),
    StructField('address',StructType([
        StructField('city',StringType(),True),
        StructField('state',StringType(),True)
    ])),
    StructField('phone_numbers',ArrayType(
       StructType([
        StructField('type',StringType(),True),
        StructField('number',StringType(),True)
 ] )
    ))
])

In [0]:
df_json=(spark.read.format('json').
         option('multiline',True).
         option('header',True).
         option('schema',json_schema).
         load('dbfs:/FileStore/tables/nested_json.txt')
)

In [0]:
df_json.show(truncate=False)

+-------------------+---+----+--------------------------------------------+
|address            |id |name|phone_numbers                               |
+-------------------+---+----+--------------------------------------------+
|{New York, NY}     |1  |John|[{123-456-7890, home}, {987-654-3210, work}]|
|{San Francisco, CA}|2  |Jane|[{555-1234, mobile}, {777-4321, work}]      |
+-------------------+---+----+--------------------------------------------+



In [0]:
df_json.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone_numbers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- number: string (nullable = true)
 |    |    |-- type: string (nullable = true)



In [0]:
##explode the dataframe
df_json=(df_json.withColumn('city',col('address').city)
                .withColumn('state',col('address').state)
                .withColumn('ph_num',explode(col('phone_numbers')))
                .drop('address')
                .drop('phone_numbers')
            )   

df_json.show(truncate=False)                    

+---+----+-------------+-----+--------------------+
|id |name|city         |state|ph_num              |
+---+----+-------------+-----+--------------------+
|1  |John|New York     |NY   |{123-456-7890, home}|
|1  |John|New York     |NY   |{987-654-3210, work}|
|2  |Jane|San Francisco|CA   |{555-1234, mobile}  |
|2  |Jane|San Francisco|CA   |{777-4321, work}    |
+---+----+-------------+-----+--------------------+



**RDD Map function**


In [0]:
data1=[(1,"Amir Khan",1000,"IT"),(2,"Habul Basar",1500,"IT"),(3,"Climont Flairy",2500,"IT"),(4,"Debasish Ghosh",3000,"HR"),(5,"Emanule Berrera",2000,"HR")]
schema1=["EmpId","EmpName","Salary","DeptName"]
df=spark.createDataFrame(data1,schema1)
display(df)

EmpId,EmpName,Salary,DeptName
1,Amir Khan,1000,IT
2,Habul Basar,1500,IT
3,Climont Flairy,2500,IT
4,Debasish Ghosh,3000,HR
5,Emanule Berrera,2000,HR


In [0]:
df.rdd.collect()

Out[329]: [Row(EmpId=1, EmpName='Amir Khan', Salary=1000, DeptName='IT'),
 Row(EmpId=2, EmpName='Habul Basar', Salary=1500, DeptName='IT'),
 Row(EmpId=3, EmpName='Climont Flairy', Salary=2500, DeptName='IT'),
 Row(EmpId=4, EmpName='Debasish Ghosh', Salary=3000, DeptName='HR'),
 Row(EmpId=5, EmpName='Emanule Berrera', Salary=2000, DeptName='HR')]

In [0]:
# Refering columns by index.
rdd_new=(
    df.rdd.map(lambda row:(row['EmpId'],row['Salary']*100,row['EmpName'],row['DeptName']))
)

In [0]:
rdd_new.collect()

Out[331]: [(1, 100000, 'Amir Khan', 'IT'),
 (2, 150000, 'Habul Basar', 'IT'),
 (3, 250000, 'Climont Flairy', 'IT'),
 (4, 300000, 'Debasish Ghosh', 'HR'),
 (5, 200000, 'Emanule Berrera', 'HR')]

In [0]:
rdd_new.toDF(['EmpId','Salary','Name','DeptName']).show()

+-----+------+---------------+--------+
|EmpId|Salary|           Name|DeptName|
+-----+------+---------------+--------+
|    1|100000|      Amir Khan|      IT|
|    2|150000|    Habul Basar|      IT|
|    3|250000| Climont Flairy|      IT|
|    4|300000| Debasish Ghosh|      HR|
|    5|200000|Emanule Berrera|      HR|
+-----+------+---------------+--------+



**Window function**

In [0]:
student_data=[(10,"Raj",2001,"M",30000,"1"),
              (12,"Raja",2004,"M",23456,"2"),
              (20,"Ram",2005,"M",38000,None),
                (10,"Modi",2015,"M",30000,"1"),
                (17,"Premi",1999,"F",37000,"2"),
                (22,"Jiyush",2005,"M",37000,"2"),
                (24,"Deba",2008,"M",78000,"3"),
                (34,"Barnali",2012,"F",22000,"4"),
                (38,"Kyle",2115,"M",2000,"2"),
                (30,"Minakshi",20012,"F",7800,None),
                (40,"Deb",2011,"M",50000,"1"),
                (50,"Sultan",2018,"M",10000,"7"),
                (60,"Kumar",2012,"M",20000,None),
                (23,"Jack",2010,"M",100000,"3"),
                (55,"Sayari",2006,"F",16000,"4")  
]

student_schema=StructType(fields=[StructField("student_id",IntegerType(),False),
                                  StructField("student_name",StringType(),False),
                                  StructField("dob",IntegerType(),False),
                                  StructField("gender",StringType(),False),
                                  StructField("salary",IntegerType(),False),
                                  StructField("course_id",StringType(),True)

])

stud_df=spark.createDataFrame(data=student_data,schema=student_schema)
display(stud_df)

student_id,student_name,dob,gender,salary,course_id
10,Raj,2001,M,30000,1.0
12,Raja,2004,M,23456,2.0
20,Ram,2005,M,38000,
10,Modi,2015,M,30000,1.0
17,Premi,1999,F,37000,2.0
22,Jiyush,2005,M,37000,2.0
24,Deba,2008,M,78000,3.0
34,Barnali,2012,F,22000,4.0
38,Kyle,2115,M,2000,2.0
30,Minakshi,20012,F,7800,


In [0]:
stud_df.rdd.getNumPartitions()

Out[334]: 8

In [0]:
from pyspark.sql.functions import spark_partition_id

display(stud_df.withColumn('partition_id',spark_partition_id()).groupBy('partition_id').count())

partition_id,count
0,1
1,2
2,2
3,2
4,2
5,2
6,2
7,2


In [0]:
stud_df=stud_df.repartition(3)

In [0]:
stud_df.rdd.getNumPartitions()

Out[337]: 3

In [0]:
display(stud_df.withColumn('partition_id',spark_partition_id()).groupBy('partition_id').count())

partition_id,count
0,7
1,3
2,5


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank,dense_rank,lead,lag

window_spec=Window.partitionBy('gender').orderBy('salary')
display(stud_df.withColumn('rank',rank().over(window_spec)).withColumn('drank',dense_rank().over(window_spec)))

student_id,student_name,dob,gender,salary,course_id,rank,drank
30,Minakshi,20012,F,7800,,1,1
55,Sayari,2006,F,16000,4.0,2,2
34,Barnali,2012,F,22000,4.0,3,3
17,Premi,1999,F,37000,2.0,4,4
38,Kyle,2115,M,2000,2.0,1,1
50,Sultan,2018,M,10000,7.0,2,2
60,Kumar,2012,M,20000,,3,3
12,Raja,2004,M,23456,2.0,4,4
10,Raj,2001,M,30000,1.0,5,5
10,Modi,2015,M,30000,1.0,5,5


In [0]:
from decimal import Decimal

df_schema = StructType([
    StructField('sno', IntegerType(), False),
    StructField('name', StringType(), False),
    StructField('deptid', IntegerType(), False),
    StructField('year', IntegerType(), False),
    StructField('salary', DecimalType(), False),
])

df_data = [
    (1, 'Patrick', 10, 2015, Decimal(1000.00)),(1, 'Patrick', 10, 2016, Decimal(2000.30)),(1, 'Patrick', 10, 2017, Decimal(2500.00)),(1, 'Patrick', 10, 2018, Decimal(5000.00)),    (2, 'Lisbon', 20, 2015, Decimal(500.00)),(2, 'Lisbon', 20, 2016, Decimal(1000.44)),(2, 'Lisbon', 20, 2017, Decimal(1700.99)),(2, 'Lisbon', 20, 2018, Decimal(5000.00)),    (3, 'Grace', 30, 2015, Decimal(1000.00)),(3, 'Grace', 30, 2016, Decimal(1000.00)),(3, 'Grace', 30, 2017, Decimal(2000.00)),(3, 'Grace', 30, 2018, Decimal(3000.56)),
    (4, 'Rigsby', 40, 2015, Decimal(1000.00)),(4, 'Rigsby', 40, 2016, Decimal(1500.00)),(4, 'Rigsby', 40, 2017, Decimal(2000.00)),(4, 'Rigsby', 40, 2018, Decimal(2000.00)),    (5, 'Cho', 10, 2015, Decimal(1000.00)),(5, 'Cho', 10, 2016, Decimal(1500.00)),(5, 'Cho', 10, 2017, Decimal(3000.00)),(5, 'Cho', 10, 2018, Decimal(3100.88))
]

In [0]:
spark_df = spark.createDataFrame(data = df_data, schema = df_schema)
#show the dataframe
spark_df.show(truncate=False)

+---+-------+------+----+------+
|sno|name   |deptid|year|salary|
+---+-------+------+----+------+
|1  |Patrick|10    |2015|1000  |
|1  |Patrick|10    |2016|2000  |
|1  |Patrick|10    |2017|2500  |
|1  |Patrick|10    |2018|5000  |
|2  |Lisbon |20    |2015|500   |
|2  |Lisbon |20    |2016|1000  |
|2  |Lisbon |20    |2017|1701  |
|2  |Lisbon |20    |2018|5000  |
|3  |Grace  |30    |2015|1000  |
|3  |Grace  |30    |2016|1000  |
|3  |Grace  |30    |2017|2000  |
|3  |Grace  |30    |2018|3001  |
|4  |Rigsby |40    |2015|1000  |
|4  |Rigsby |40    |2016|1500  |
|4  |Rigsby |40    |2017|2000  |
|4  |Rigsby |40    |2018|2000  |
|5  |Cho    |10    |2015|1000  |
|5  |Cho    |10    |2016|1500  |
|5  |Cho    |10    |2017|3000  |
|5  |Cho    |10    |2018|3101  |
+---+-------+------+----+------+



In [0]:
##create dataframe with previous year salary and salary imcrement

wsp=Window.partitionBy('sno','name','deptid').orderBy('year')
display(spark_df.withColumn('prev_yr_sal',lag('salary',1,0).over(wsp)).\
    withColumn('increment',(col('salary')-col('prev_yr_sal')))
)

sno,name,deptid,year,salary,prev_yr_sal,increment
1,Patrick,10,2015,1000,0,1000
1,Patrick,10,2016,2000,1000,1000
1,Patrick,10,2017,2500,2000,500
1,Patrick,10,2018,5000,2500,2500
2,Lisbon,20,2015,500,0,500
2,Lisbon,20,2016,1000,500,500
2,Lisbon,20,2017,1701,1000,701
2,Lisbon,20,2018,5000,1701,3299
3,Grace,30,2015,1000,0,1000
3,Grace,30,2016,1000,1000,0


In [0]:
spark_df.show(3)

+---+-------+------+----+------+
|sno|   name|deptid|year|salary|
+---+-------+------+----+------+
|  1|Patrick|    10|2015|  1000|
|  1|Patrick|    10|2016|  2000|
|  1|Patrick|    10|2017|  2500|
+---+-------+------+----+------+
only showing top 3 rows



In [0]:
## sql lead/lag window function
tempview=spark_df.createOrReplaceTempView('empviewtable')
#spark.sql('select * from empviewtable')

In [0]:
%sql

select *,
  lead(salary,1,0) over(partition by sno,name,deptid order by year asc) as nextyrsal
 from empviewtable

sno,name,deptid,year,salary,nextyrsal
1,Patrick,10,2015,1000,2000
1,Patrick,10,2016,2000,2500
1,Patrick,10,2017,2500,5000
1,Patrick,10,2018,5000,0
2,Lisbon,20,2015,500,1000
2,Lisbon,20,2016,1000,1701
2,Lisbon,20,2017,1701,5000
2,Lisbon,20,2018,5000,0
3,Grace,30,2015,1000,1000
3,Grace,30,2016,1000,2000


In [0]:
#sql rank and dense_rank


In [0]:
%sql
select *,
      rank(salary) over(partition by sno,name,deptid order by year asc) as sal_rank ,
      dense_rank(salary) over(partition by sno,name,deptid order by year asc) as sal_dens_rank
 from empviewtable

sno,name,deptid,year,salary,sal_rank,sal_dens_rank
1,Patrick,10,2015,1000,1,1
1,Patrick,10,2016,2000,2,2
1,Patrick,10,2017,2500,3,3
1,Patrick,10,2018,5000,4,4
2,Lisbon,20,2015,500,1,1
2,Lisbon,20,2016,1000,2,2
2,Lisbon,20,2017,1701,3,3
2,Lisbon,20,2018,5000,4,4
3,Grace,30,2015,1000,1,1
3,Grace,30,2016,1000,2,2


In [0]:
#rolling average

In [0]:
%sql

select *,
      avg(salary) over(partition by sno,name,deptid order by year asc rows between 2 preceding and current row) as roling_avg 
      
 from empviewtable

sno,name,deptid,year,salary,roling_avg
1,Patrick,10,2015,1000,1000.0
1,Patrick,10,2016,2000,1500.0
1,Patrick,10,2017,2500,1833.3333
1,Patrick,10,2018,5000,3166.6667
2,Lisbon,20,2015,500,500.0
2,Lisbon,20,2016,1000,750.0
2,Lisbon,20,2017,1701,1067.0
2,Lisbon,20,2018,5000,2567.0
3,Grace,30,2015,1000,1000.0
3,Grace,30,2016,1000,1000.0


**_Pepsico interview questions_**

**_Question 1_**

In [0]:
data=[(1,'2024-01-01',"I1",10,1000),(2,"2024-01-15","I2",20,2000),(3,"2024-02-01","I3",10,1500),(4,"2024-02-15","I4",20,2500),(5,"2024-03-01","I5",30,3000),(6,"2024-03-10","I6",40,3500),(7,"2024-03-20","I7",20,2500),(8,"2024-03-30","I8",10,1000)]
schema=["SOId","SODate","ItemId","ItemQty","ItemValue"]
df1=spark.createDataFrame(data,schema)
display(df1)

SOId,SODate,ItemId,ItemQty,ItemValue
1,2024-01-01,I1,10,1000
2,2024-01-15,I2,20,2000
3,2024-02-01,I3,10,1500
4,2024-02-15,I4,20,2500
5,2024-03-01,I5,30,3000
6,2024-03-10,I6,40,3500
7,2024-03-20,I7,20,2500
8,2024-03-30,I8,10,1000


In [0]:
df1.printSchema()

root
 |-- SOId: long (nullable = true)
 |-- SODate: string (nullable = true)
 |-- ItemId: string (nullable = true)
 |-- ItemQty: long (nullable = true)
 |-- ItemValue: long (nullable = true)



In [0]:
df1=df1.withColumn('SODate',to_date(col('SODate'),'yyyy-MM-dd'))
df1.show()

+----+----------+------+-------+---------+
|SOId|    SODate|ItemId|ItemQty|ItemValue|
+----+----------+------+-------+---------+
|   1|2024-01-01|    I1|     10|     1000|
|   2|2024-01-15|    I2|     20|     2000|
|   3|2024-02-01|    I3|     10|     1500|
|   4|2024-02-15|    I4|     20|     2500|
|   5|2024-03-01|    I5|     30|     3000|
|   6|2024-03-10|    I6|     40|     3500|
|   7|2024-03-20|    I7|     20|     2500|
|   8|2024-03-30|    I8|     10|     1000|
+----+----------+------+-------+---------+



In [0]:
df1.printSchema()

root
 |-- SOId: long (nullable = true)
 |-- SODate: date (nullable = true)
 |-- ItemId: string (nullable = true)
 |-- ItemQty: long (nullable = true)
 |-- ItemValue: long (nullable = true)



In [0]:
df1=df1.withColumn('year',year(col('SODate'))).\
    withColumn('month',month('SODate')).\
        withColumn('year_month',concat(col('year'),lit('_'),col('month')))

df1.show()        


+----+----------+------+-------+---------+----+-----+----------+
|SOId|    SODate|ItemId|ItemQty|ItemValue|year|month|year_month|
+----+----------+------+-------+---------+----+-----+----------+
|   1|2024-01-01|    I1|     10|     1000|2024|    1|    2024_1|
|   2|2024-01-15|    I2|     20|     2000|2024|    1|    2024_1|
|   3|2024-02-01|    I3|     10|     1500|2024|    2|    2024_2|
|   4|2024-02-15|    I4|     20|     2500|2024|    2|    2024_2|
|   5|2024-03-01|    I5|     30|     3000|2024|    3|    2024_3|
|   6|2024-03-10|    I6|     40|     3500|2024|    3|    2024_3|
|   7|2024-03-20|    I7|     20|     2500|2024|    3|    2024_3|
|   8|2024-03-30|    I8|     10|     1000|2024|    3|    2024_3|
+----+----------+------+-------+---------+----+-----+----------+



In [0]:
df1_mid=df1.groupBy(col('year_month')).agg(sum(col('ItemValue')).alias('Totalsale'))
df1_mid.show()

+----------+---------+
|year_month|Totalsale|
+----------+---------+
|    2024_1|     3000|
|    2024_2|     4000|
|    2024_3|    10000|
+----------+---------+



In [0]:
df1_final=(
    df1_mid.withColumn('percent_diff_prev_month',lag(col('Totalsale'),1,0).over(Window.orderBy(col('year_month'))))
)

In [0]:
df1_final=df1_final.withColumn('prev_month_sal_gap',(col('Totalsale')-col('percent_diff_prev_month')) * 100/col('Totalsale'))

In [0]:
df1_final.show()

+----------+---------+-----------------------+------------------+
|year_month|Totalsale|percent_diff_prev_month|prev_month_sal_gap|
+----------+---------+-----------------------+------------------+
|    2024_1|     3000|                      0|             100.0|
|    2024_2|     4000|                   3000|              25.0|
|    2024_3|    10000|                   4000|              60.0|
+----------+---------+-----------------------+------------------+



**_Question 2_**

In [0]:
data=[(1,'John','ADF'),(1,'John','ADB'),(1,'John','PowerBI'),(2,'Joanne','ADF'),(2,'Joanne','SQL'),(2,'Joanne','Crystal Report'),(3,'Vikas','ADF'),(3,'Vikas','SQL'),(3,'Vikas','SSIS'),(4,'Monu','SQL'),(4,'Monu','SSIS'),(4,'Monu','SSAS'),(4,'Monu','ADF')]
schema=["EmpId","EmpName","Skill"]
df1=spark.createDataFrame(data,schema)
display(df1)

EmpId,EmpName,Skill
1,John,ADF
1,John,ADB
1,John,PowerBI
2,Joanne,ADF
2,Joanne,SQL
2,Joanne,Crystal Report
3,Vikas,ADF
3,Vikas,SQL
3,Vikas,SSIS
4,Monu,SQL


In [0]:
df_new=df1.groupBy('EmpName').agg(collect_list(col('Skill')).alias('Skills'))
df_new.show(truncate=False)

+-------+--------------------------+
|EmpName|Skills                    |
+-------+--------------------------+
|John   |[ADF, ADB, PowerBI]       |
|Joanne |[ADF, SQL, Crystal Report]|
|Vikas  |[ADF, SQL, SSIS]          |
|Monu   |[SQL, SSIS, SSAS, ADF]    |
+-------+--------------------------+



In [0]:
from pyspark.sql.functions import collect_list,concat_ws

df_new=df_new.select(df_new.EmpName,concat_ws(',',df_new.Skills).alias('skills'))
df_new.show(truncate=False)

+-------+----------------------+
|EmpName|skills                |
+-------+----------------------+
|John   |ADF,ADB,PowerBI       |
|Joanne |ADF,SQL,Crystal Report|
|Vikas  |ADF,SQL,SSIS          |
|Monu   |SQL,SSIS,SSAS,ADF     |
+-------+----------------------+



**_Question 3_**

In [0]:
#Employees Salary info
data1=[(100,"Raj",None,1,"01-04-23",50000),
       (200,"Joanne",100,1,"01-04-23",4000),(200,"Joanne",100,1,"13-04-23",4500),(200,"Joanne",100,1,"14-04-23",4020)]
schema1=["EmpId","EmpName","Mgrid","deptid","salarydt","salary"]
df_salary=spark.createDataFrame(data1,schema1)
display(df_salary)
#department dataframe
data2=[(1,"IT"),
       (2,"HR")]
schema2=["deptid","deptname"]
df_dept=spark.createDataFrame(data2,schema2)
display(df_dept)

EmpId,EmpName,Mgrid,deptid,salarydt,salary
100,Raj,,1,01-04-23,50000
200,Joanne,100.0,1,01-04-23,4000
200,Joanne,100.0,1,13-04-23,4500
200,Joanne,100.0,1,14-04-23,4020


deptid,deptname
1,IT
2,HR


In [0]:
emp_df=(
    df_salary.alias('s').join(df_dept.alias('d'), col('s.deptid')==col('d.deptid'),how='left_outer')    
    .withColumn('salarydt',to_date(col('salarydt'),'dd-MM-yy'))
    .withColumn('sal_year',year(col('salarydt')))
    .withColumn('sal_month',month(col('salarydt')))
    .drop(col('s.deptid'),col('d.deptid'),col('salarydt'))
)
display(emp_df)

EmpId,EmpName,Mgrid,salary,deptname,sal_year,sal_month
100,Raj,,50000,IT,2023,4
200,Joanne,100.0,4000,IT,2023,4
200,Joanne,100.0,4500,IT,2023,4
200,Joanne,100.0,4020,IT,2023,4


In [0]:
emp_df_mngr=(
    emp_df.alias('main').join(emp_df.alias('mngr'),col('main.Mgrid')==col('mngr.EmpId'), how='left_outer')
 .select(   col('main.deptname'),
    col('mngr.EmpName').alias('ManagerName'),
    col('main.EmpName'),
    col('main.sal_year'),
    col('main.sal_month'),
    col('main.salary'))
)
display(emp_df_mngr)

deptname,ManagerName,EmpName,sal_year,sal_month,salary
IT,,Raj,2023,4,50000
IT,Raj,Joanne,2023,4,4000
IT,Raj,Joanne,2023,4,4500
IT,Raj,Joanne,2023,4,4020


**_Question 4_**

In [0]:
## Tiger Analytics

#Mentioning the dataframe details here
data = [("John,Smith", "Canada"), ("Mike,David", "USA")]
df = spark.createDataFrame(data,["Names","Country"])

In [0]:
df.show()

+----------+-------+
|     Names|Country|
+----------+-------+
|John,Smith| Canada|
|Mike,David|    USA|
+----------+-------+



In [0]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
df1=df.withColumn('Namesarray',split(col('Names'),',')).drop('Names')
df1.show()

+-------+-------------+
|Country|   Namesarray|
+-------+-------------+
| Canada|[John, Smith]|
|    USA|[Mike, David]|
+-------+-------------+



In [0]:
df1=df1.withColumn('Names',explode(col('Namesarray'))).drop('Namesarray')
df1.show()

+-------+-----+
|Country|Names|
+-------+-----+
| Canada| John|
| Canada|Smith|
|    USA| Mike|
|    USA|David|
+-------+-----+



**_Question 5--ITC Infotech_**

In [0]:
schema = StructType([
    StructField("player", StringType(), True),
    StructField("runs", IntegerType(), True),
    StructField("50s/100s", StringType(), True)
])

#Create a DataFrame with the defined schema
data = [("Sachin-IND", 18694, "93/49"), ("Ricky-AUS", 11274, "66/31"),("Lara-WI", 10222, "45/21"),("Rahul-IND", 10355, "95/11"),("Jhonty-SA", 7051, "43/5"),("Hayden-AUS", 8722, "67/19")]
players_df = spark.createDataFrame(data, schema)

data1 = [("IND", "India"), ("AUS", "Australia"), ("WI", "WestIndies"), ("SA", "SouthAfrica")]
countries_df = spark.createDataFrame(data1,["SRT","country"])

In [0]:
players_df.show()
players_df.printSchema()

+----------+-----+--------+
|    player| runs|50s/100s|
+----------+-----+--------+
|Sachin-IND|18694|   93/49|
| Ricky-AUS|11274|   66/31|
|   Lara-WI|10222|   45/21|
| Rahul-IND|10355|   95/11|
| Jhonty-SA| 7051|    43/5|
|Hayden-AUS| 8722|   67/19|
+----------+-----+--------+

root
 |-- player: string (nullable = true)
 |-- runs: integer (nullable = true)
 |-- 50s/100s: string (nullable = true)



In [0]:
countries_df.show()
countries_df.printSchema()

+---+-----------+
|SRT|    country|
+---+-----------+
|IND|      India|
|AUS|  Australia|
| WI| WestIndies|
| SA|SouthAfrica|
+---+-----------+

root
 |-- SRT: string (nullable = true)
 |-- country: string (nullable = true)



In [0]:
players_df=(
                    players_df.withColumn('playernm',split(col('player'),'-').getItem(0))
                      .withColumn('country',split(col('player'),'-').getItem(1))
                      .drop('player')
)
players_df.show()

+-----+--------+--------+-------+
| runs|50s/100s|playernm|country|
+-----+--------+--------+-------+
|18694|   93/49|  Sachin|    IND|
|11274|   66/31|   Ricky|    AUS|
|10222|   45/21|    Lara|     WI|
|10355|   95/11|   Rahul|    IND|
| 7051|    43/5|  Jhonty|     SA|
| 8722|   67/19|  Hayden|    AUS|
+-----+--------+--------+-------+



In [0]:
players_df=(
                    players_df.withColumn('50s',split(col('50s/100s'),'/').getItem(0))
                      .withColumn('100s',split(col('50s/100s'),'/').getItem(1))
                      .withColumn('sums',(col('50s').cast('int')+col('100s').cast('int')))
                      .filter(col('sums') > 90)
                      .drop('50s/100s','50s','100s')
)
players_df.show()

+-----+--------+-------+----+
| runs|playernm|country|sums|
+-----+--------+-------+----+
|18694|  Sachin|    IND| 142|
|11274|   Ricky|    AUS|  97|
|10355|   Rahul|    IND| 106|
+-----+--------+-------+----+



**Question-6: Tiger Analytics**

In [0]:
data=[
('Rudra','math',79),
('Rudra','eng',60),
('Shivu','math', 68),
('Shivu','eng', 59),
('Anu','math', 65),
('Anu','eng',80)
]
schema="Name string,Sub string,Marks int"
df=spark.createDataFrame(data,schema)
df.show()

+-----+----+-----+
| Name| Sub|Marks|
+-----+----+-----+
|Rudra|math|   79|
|Rudra| eng|   60|
|Shivu|math|   68|
|Shivu| eng|   59|
|  Anu|math|   65|
|  Anu| eng|   80|
+-----+----+-----+



In [0]:
df1=df.groupBy(col('Name')).agg(collect_list(col('Marks')).alias('Marks_list'))
df1.show()

+-----+----------+
| Name|Marks_list|
+-----+----------+
|Rudra|  [79, 60]|
|Shivu|  [68, 59]|
|  Anu|  [65, 80]|
+-----+----------+



In [0]:
df2=df1.select(col('Name'), col('Marks_list')[0].alias('Math'), col('Marks_list')[1].alias('eng'))
df2.show()

+-----+----+---+
| Name|Math|eng|
+-----+----+---+
|Rudra|  79| 60|
|Shivu|  68| 59|
|  Anu|  65| 80|
+-----+----+---+



In [0]:
pivoted_df = df.groupBy("Name").pivot("Sub").agg({"Marks": "first"}).show()

+-----+---+----+
| Name|eng|math|
+-----+---+----+
|Shivu| 59|  68|
|Rudra| 60|  79|
|  Anu| 80|  65|
+-----+---+----+



**Question-7: Walmart**

In [0]:
#Define the schema for transactions
from pyspark.sql.types import FloatType
transaction_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("transaction_amount", FloatType(), True)
])

transactions_data = [
    (1, "credit", 30.0),
    (1, "debit", 90.0),
    (2, "credit", 50.0),
    (3, "debit", 57.0),
    (2, "debit", 90.0)
]

transactions_df = spark.createDataFrame(transactions_data, schema=transaction_schema)
transactions_df.show()

+-----------+----------------+------------------+
|customer_id|transaction_type|transaction_amount|
+-----------+----------------+------------------+
|          1|          credit|              30.0|
|          1|           debit|              90.0|
|          2|          credit|              50.0|
|          3|           debit|              57.0|
|          2|           debit|              90.0|
+-----------+----------------+------------------+



In [0]:
from pyspark.sql import functions as F

df_trans=transactions_df.groupBy('customer_id').pivot('transaction_type').agg({'transaction_amount':'sum'})
df_trans.show()

+-----------+------+-----+
|customer_id|credit|debit|
+-----------+------+-----+
|          1|  30.0| 90.0|
|          3|  null| 57.0|
|          2|  50.0| 90.0|
+-----------+------+-----+



In [0]:
amount_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("current_amount", FloatType(), True)
])

amounts_data = [
    (1, 1000.0),
    (2, 2000.0),
    (3, 3000.0),
    (4, 4000.0)
]

amounts_df = spark.createDataFrame(amounts_data, schema=amount_schema)

##Show the amounts DataFrame
amounts_df.show()

+-----------+--------------+
|customer_id|current_amount|
+-----------+--------------+
|          1|        1000.0|
|          2|        2000.0|
|          3|        3000.0|
|          4|        4000.0|
+-----------+--------------+



In [0]:
cust_join=df_trans.join(amounts_df,df_trans.customer_id==amounts_df.customer_id,'left').drop(amounts_df.customer_id)
cust_join.show()

+-----------+------+-----+--------------+
|customer_id|credit|debit|current_amount|
+-----------+------+-----+--------------+
|          1|  30.0| 90.0|        1000.0|
|          3|  null| 57.0|        3000.0|
|          2|  50.0| 90.0|        2000.0|
+-----------+------+-----+--------------+



In [0]:
cust_join1=cust_join.withColumn('net',F.abs(col('credit') - col('debit')))
cust_join1.show()


+-----------+------+-----+--------------+----+
|customer_id|credit|debit|current_amount| net|
+-----------+------+-----+--------------+----+
|          1|  30.0| 90.0|        1000.0|60.0|
|          3|  null| 57.0|        3000.0|null|
|          2|  50.0| 90.0|        2000.0|40.0|
+-----------+------+-----+--------------+----+



In [0]:
cust_join1=cust_join1.fillna({'net': 0}).drop('credit','debit')
cust_join1=cust_join1.withColumn('current_amount',(col('current_amount') - col('net'))).drop('net')
cust_join1.show()

+-----------+--------------+
|customer_id|current_amount|
+-----------+--------------+
|          1|         940.0|
|          3|        3000.0|
|          2|        1960.0|
+-----------+--------------+



8. For a given dataframe find out that if its column - "department" has palidrome values or not:**

In [0]:
columns = ["name","department", "salary"]
data = [("Harry", "HR", 12000),
    ("George", "ADA", 23000),
    ("Fred", "TQT", 21000),
    ("Sally", "IT", 25000),
    ("Neel", "SOS", 23000),
    ("Ashish", "IT", 27000)]

df_src = spark.createDataFrame(data=data,schema=columns)
df_src.display()

name,department,salary
Harry,HR,12000
George,ADA,23000
Fred,TQT,21000
Sally,IT,25000
Neel,SOS,23000
Ashish,IT,27000


In [0]:
def check_palindrome(col):
    return col == col[::-1]

In [0]:
# Registering UDF
upper_udf = udf(lambda x: check_palindrome(x), BooleanType()) 

In [0]:
display(df_src.withColumn('is_palindrome',upper_udf(col('department'))))

name,department,salary,is_palindrome
Harry,HR,12000,False
George,ADA,23000,True
Fred,TQT,21000,True
Sally,IT,25000,False
Neel,SOS,23000,True
Ashish,IT,27000,False


9. In the below DF there is a column containing name with some special characters and numbers. Remove these special characters and numbers from name**

In [0]:
columns = ["no","Name"]
data = [("1", "sa$m sm**ith"),
    ("2", "tr@#ce!y s^mith"),
    ("3", "amy%^ & sand(*ers"),
    ("4", "da3@@%^ & sand(*123")]

df_src = spark.createDataFrame(data=data,schema=columns)
df_src.display()

no,Name
1,sa$m sm**ith
2,tr@#ce!y s^mith
3,amy%^ & sand(*ers
4,da3@@%^ & sand(*123


In [0]:
import re

def remove_special_char(x):
    # Remove anything that is not a letter, digit, or space
    x = re.sub(r'[^A-Za-z0-9 ]', '', x)
    return x


In [0]:
regular_exp_udf=udf(lambda x:remove_special_char(x), StringType())

In [0]:
df_src=df_src.withColumn('Newname',regular_exp_udf(col('Name')))
df_src.show(truncate=False)

+---+-------------------+------------+
|no |Name               |Newname     |
+---+-------------------+------------+
|1  |sa$m sm**ith       |sam smith   |
|2  |tr@#ce!y s^mith    |trcey smith |
|3  |amy%^ & sand(*ers  |amy  sanders|
|4  |da3@@%^ & sand(*123|da3  sand123|
+---+-------------------+------------+



In [0]:
# %md
# MAGIC Write pyspark code to display following data from below tables
# MAGIC -------
# MAGIC 1. employeeid
# MAGIC 2. default_number,
# MAGIC 3. total_entry
# MAGIC 4. total_login
# MAGIC 5. total_logout
# MAGIC 6. first_login
# MAGIC 7. first_logout
# MAGIC 8. last_login
# MAGIC 9. last_logout
# MAGIC

In [0]:
dbutils.fs.mkdirs('dbfs:/FileStore/Employeedata')

Out[394]: True

In [0]:
df_emp_logs = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/Employeedata/employee_logs.csv")
df_emp_logs.show(5)
# COMMAND ----------


+-----------+------------+-------------------+
|employee_id|entry_detail|   timestamp_detail|
+-----------+------------+-------------------+
|       1000|       login|2023-06-16 01:00:34|
|       1000|      logout|2023-06-17 01:00:34|
|       1000|       login|2023-06-18 01:00:34|
|       1000|      logout|2023-06-19 01:00:34|
|       1001|       login|2023-06-20 01:00:34|
+-----------+------------+-------------------+
only showing top 5 rows



In [0]:
df_result=(
    df_emp_logs.withColumn('total_login_num',when(col('entry_detail')=='login' ,lit(1)).otherwise(lit(0)))
            .withColumn('total_logout_num',when(col('entry_detail')=='logout' ,lit(1)).otherwise(lit(0)))
            .withColumn('login_time',when(col('entry_detail')=='login' ,col('timestamp_detail')).otherwise(None))
            .withColumn('logout_time',when(col('entry_detail')=='logout' ,col('timestamp_detail')).otherwise(None))
)

df_result.show()

+-----------+------------+-------------------+---------------+----------------+-------------------+-------------------+
|employee_id|entry_detail|   timestamp_detail|total_login_num|total_logout_num|         login_time|        logout_time|
+-----------+------------+-------------------+---------------+----------------+-------------------+-------------------+
|       1000|       login|2023-06-16 01:00:34|              1|               0|2023-06-16 01:00:34|               null|
|       1000|      logout|2023-06-17 01:00:34|              0|               1|               null|2023-06-17 01:00:34|
|       1000|       login|2023-06-18 01:00:34|              1|               0|2023-06-18 01:00:34|               null|
|       1000|      logout|2023-06-19 01:00:34|              0|               1|               null|2023-06-19 01:00:34|
|       1001|       login|2023-06-20 01:00:34|              1|               0|2023-06-20 01:00:34|               null|
|       1001|       login|2023-06-21 01:

In [0]:
display(
    df_result.groupBy('employee_id')
    .agg(sum('total_login_num').alias('total_login'),
        sum('total_logout_num').alias('total_logout'),
        max('login_time').alias('last_login_time'),
        max('logout_time').alias('last_logout_time'))
)

employee_id,total_login,total_logout,last_login_time,last_logout_time
1000,2,2,2023-06-18 01:00:34,2023-06-19 01:00:34
1001,3,1,2023-06-22 01:00:34,2023-06-23 01:00:34
1002,2,1,2023-06-25 01:00:34,2023-06-26 01:00:34
1003,2,1,2023-06-28 01:00:34,2023-06-29 01:00:34
1004,1,1,2023-06-30 01:00:34,2023-07-01 01:00:34


In [0]:
df_emp_ph = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/Employeedata/employee_phone.csv")
df_emp_ph.display()

employee_id,phone,default
1000,9240008920,False
1001,9550002323,False
1001,8580404777,True
1000,9000232350,True
1002,9240008920,True
1003,9450200459,False
1003,8181002222,True
1004,9000232350,False
1004,9000232351,True


In [0]:
df_joined=df_emp_logs.join(df_emp_ph,df_emp_logs.employee_id == df_emp_ph.employee_id).drop(df_emp_ph.employee_id)
df_joined.show(8)

+-----------+------------+-------------------+----------+-------+
|employee_id|entry_detail|   timestamp_detail|     phone|default|
+-----------+------------+-------------------+----------+-------+
|       1000|       login|2023-06-16 01:00:34|9000232350|   TRUE|
|       1000|       login|2023-06-16 01:00:34|9240008920|  FALSE|
|       1000|      logout|2023-06-17 01:00:34|9000232350|   TRUE|
|       1000|      logout|2023-06-17 01:00:34|9240008920|  FALSE|
|       1000|       login|2023-06-18 01:00:34|9000232350|   TRUE|
|       1000|       login|2023-06-18 01:00:34|9240008920|  FALSE|
|       1000|      logout|2023-06-19 01:00:34|9000232350|   TRUE|
|       1000|      logout|2023-06-19 01:00:34|9240008920|  FALSE|
+-----------+------------+-------------------+----------+-------+
only showing top 8 rows



In [0]:
display(
    df_joined.withColumn('first_login')
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-3838360358105518>:2[0m
[1;32m      1[0m display(
[0;32m----> 2[0m     [43mdf_joined[49m[38;5;241;43m.[39;49m[43mwithColumn[49m[43m([49m[38;5;124;43m'[39;49m[38;5;124;43mfirst_login[39;49m[38;5;124;43m'[39;49m[43m)[49m
[1;32m      3[0m )

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m.[39mlog_success(
[1;32m   

**Event status question**

In [0]:
event_data=[(1,'alice'),(2,'bob'),(3,'charlie'),(4,'david'),
            (5,'eve')]
event_schema=['id','student']            

In [0]:
evnt_df=spark.createDataFrame(event_data,event_schema)
evnt_df.show()

+---+-------+
| id|student|
+---+-------+
|  1|  alice|
|  2|    bob|
|  3|charlie|
|  4|  david|
|  5|    eve|
+---+-------+



In [0]:
evnt_df.printSchema()

root
 |-- event_date: string (nullable = true)
 |-- event_status: string (nullable = true)



In [0]:
evnt_df.show()

+---+-------+
| id|student|
+---+-------+
|  1|  alice|
|  2|    bob|
|  3|charlie|
|  4|  david|
|  5|    eve|
+---+-------+



In [0]:
from pyspark.sql.window import Window
#wind_spec=Window.partitionBy('event_status').orderBy('event_date')
wind_spec=Window.orderBy('id')
evnt_df1 =(
     evnt_df
     .withColumn('prev_stud', lag('id').over(wind_spec))
     .withColumn('next_stud', lead('id').over(wind_spec))
)
evnt_df1.show()

+---+-------+---------+---------+
| id|student|prev_stud|next_stud|
+---+-------+---------+---------+
|  1|  alice|     null|        2|
|  2|    bob|        1|        3|
|  3|charlie|        2|        4|
|  4|  david|        3|        5|
|  5|    eve|        4|     null|
+---+-------+---------+---------+



In [0]:
from pyspark.sql.functions import col

from pyspark.sql.functions import col

# Aliases to avoid column ambiguity
evnt_df1_alias = evnt_df1.alias("a")
evnt_df_alias = evnt_df.alias("b")

# Perform the join once, clean and correct
stud_joined_df = (
    evnt_df1_alias
    .join(
        evnt_df_alias,
        col("a.prev_stud") == col("b.id"),
        "left_outer"
    )
    .select(
        col("a.*"),
        col("b.student").alias("prev_student_name")
    )
)

# Show results
display(stud_joined_df)


id,student,prev_stud,next_stud,prev_student_name
1,alice,,2.0,
2,bob,1.0,3.0,alice
3,charlie,2.0,4.0,bob
4,david,3.0,5.0,charlie
5,eve,4.0,,david


In [0]:
stud_joined_alias = stud_joined_df.alias('a')
evnt_df_alias = evnt_df.alias("b")

In [0]:
stud_df=(
    stud_joined_alias
    .join(evnt_df_alias, col('a.next_stud') == col('b.id'),'left_outer')
    .select(stud_joined_alias['*'] ,col('b.student').alias('next_stn_name') )
)

display(stud_df)

id,student,prev_stud,next_stud,prev_student_name,next_stn_name
1,alice,,2.0,,bob
2,bob,1.0,3.0,alice,charlie
3,charlie,2.0,4.0,bob,david
4,david,3.0,5.0,charlie,eve
5,eve,4.0,,david,


In [0]:
  stud_df=  stud_df.withColumn('final_student',
                       when(col('id')%2==0,col('prev_student_name'))
                       .when(col('id')%2!=0,col('next_stn_name'))
                       .otherwise(col('student'))
                       )


In [0]:
display(stud_df.selectExpr('id', 'coalesce(final_student, student) as student_name'))


id,student_name
1,bob
2,alice
3,david
4,charlie
5,eve


**Call duration question**

In [0]:
call_data=[(10,20,58),(20,10,12),(10,30,20),(30,40,100), (30,40,200),(30,40,200),(40,30,500)]
call_schema=['from_id','to_id','duration']   

In [0]:
call_df=spark.createDataFrame(call_data,call_schema)
display(call_df)

from_id,to_id,duration
10,20,58
20,10,12
10,30,20
30,40,100
30,40,200
30,40,200
40,30,500


In [0]:
from pyspark.sql.functions import when, col
from pyspark.sql.types import IntegerType

call_df1 = (
    call_df.withColumn(
    'person1',
    when(col('from_id').cast(IntegerType()) > col('to_id').cast(IntegerType()), col('to_id'))
    .when(col('from_id').cast(IntegerType()) < col('to_id').cast(IntegerType()), col('from_id'))
    .otherwise(col('from_id')) 
).withColumn(
    'person2',
    when(col('to_id').cast(IntegerType()) > col('from_id').cast(IntegerType()), col('to_id'))
    .when(col('to_id').cast(IntegerType()) < col('from_id').cast(IntegerType()), col('from_id'))
    .otherwise(col('to_id'))
   )
    .drop('from_id','to_id')
)



In [0]:
display(call_df1)

duration,person1,person2
58,10,20
12,10,20
20,10,30
100,30,40
200,30,40
200,30,40
500,30,40


In [0]:
display(call_df1.groupBy('person1','person2')
        .agg(sum(col('duration')).alias('total_duration') , count(col('duration')).alias('toatl_calls'))
        )


person1,person2,total_duration,toatl_calls
10,20,70,2
10,30,20,1
30,40,1000,4


**Tennis player grand slam question**

In [0]:
player_data=[(1,'Nadal'),(2,'Federer'),(3,'Novak')]
player_cols=['player_id','player_name'] 

In [0]:
player_df=spark.createDataFrame(player_data,player_cols)
player_df.show()

+---------+-----------+
|player_id|player_name|
+---------+-----------+
|        1|      Nadal|
|        2|    Federer|
|        3|      Novak|
+---------+-----------+



In [0]:
year_data=[(2017,2,1,1,2),(2018,3,1,3,2) , (2019,3,1,1,3)]
year_cols=['year','wimbledon','fr_open' ,'us_open', 'au_open'] 

year_df=spark.createDataFrame(year_data,year_cols)
year_df.show()

+----+---------+-------+-------+-------+
|year|wimbledon|fr_open|us_open|au_open|
+----+---------+-------+-------+-------+
|2017|        2|      1|      1|      2|
|2018|        3|      1|      3|      2|
|2019|        3|      1|      1|      3|
+----+---------+-------+-------+-------+



In [0]:
wim_df=year_df.selectExpr('year','wimbledon as match')
wim_df.show()

+----+-----+
|year|match|
+----+-----+
|2017|    2|
|2018|    3|
|2019|    3|
+----+-----+



In [0]:
fr_open_df=year_df.selectExpr('year','fr_open as match')
fr_open_df.show()

+----+-----+
|year|match|
+----+-----+
|2017|    1|
|2018|    1|
|2019|    1|
+----+-----+



In [0]:
us_open_df=year_df.selectExpr('year','us_open as match')
us_open_df.show()

+----+-----+
|year|match|
+----+-----+
|2017|    1|
|2018|    3|
|2019|    1|
+----+-----+



In [0]:
au_open_df=year_df.selectExpr('year','au_open as match')
au_open_df.show()

+----+-----+
|year|match|
+----+-----+
|2017|    2|
|2018|    2|
|2019|    3|
+----+-----+



In [0]:
union_df = wim_df.union(fr_open_df).union(us_open_df).union(au_open_df)
union_df.show()

+----+-----+
|year|match|
+----+-----+
|2017|    2|
|2018|    3|
|2019|    3|
|2017|    1|
|2018|    1|
|2019|    1|
|2017|    1|
|2018|    3|
|2019|    1|
|2017|    2|
|2018|    2|
|2019|    3|
+----+-----+



In [0]:
palyer_joined_df=    union_df.alias('u').join(player_df.alias('p'),col('u.match')==col('p.player_id'),'inner')

palyer_joined_df.show()

+----+-----+---------+-----------+
|year|match|player_id|player_name|
+----+-----+---------+-----------+
|2017|    1|        1|      Nadal|
|2018|    1|        1|      Nadal|
|2019|    1|        1|      Nadal|
|2017|    1|        1|      Nadal|
|2019|    1|        1|      Nadal|
|2017|    2|        2|    Federer|
|2017|    2|        2|    Federer|
|2018|    2|        2|    Federer|
|2018|    3|        3|      Novak|
|2019|    3|        3|      Novak|
|2018|    3|        3|      Novak|
|2019|    3|        3|      Novak|
+----+-----+---------+-----------+



In [0]:
display(palyer_joined_df.groupBy('player_id','player_name').agg(count(col('match').alias('total_grandslams'))))

player_id,player_name,count(match AS total_grandslams)
1,Nadal,5
2,Federer,3
3,Novak,4
