In [0]:
#You are given a dataset of employees in an Indian company with columns: `emp_id`, `name`, `department`, `salary`, and `city`. Write a PySpark program to find the total salary paid in each department.


from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import desc,col
schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("department", StringType(), True),
    StructField("salary", FloatType(), True),
    StructField("city", StringType(), True)
])

dataFrame1 = spark.read.csv('dbfs:/FileStore/psday1_1.csv', header=True, schema=schema)
result = dataFrame1.groupBy('department').agg({'salary':'sum'})#.sum('salary')
result = result.withColumnRenamed('sum(salary)', 'Total')
result = result.withColumn('Total', col('Total').cast('int')).orderBy(desc('Total'))
result.show()


+----------+------+
|department| Total|
+----------+------+
|        IT|233000|
|        HR|122000|
|   Finance| 90000|
+----------+------+



In [0]:
#You are given a dataset containing customer transactions in an Indian e-commerce platform with columns: `cust_id`, `cust_name`, `city`, `purchase_amount`, and `product_category`. Some records have missing `purchase_amount`. Write a PySpark program to fill missing `purchase_amount` values with the average purchase amount of that product category.

from pyspark.sql.types import IntegerType, StringType, FloatType, StructType, StructField
from pyspark.sql.functions import col, round,when

schema = StructType([
    StructField('cust_id', IntegerType(), True),
    StructField('cust_name', StringType(), True),
    StructField('city', StringType(), True),
    StructField('purchase_amount', FloatType(), True),
    StructField('product_category', StringType(), True)
])

Dataframe2 = spark.read.csv('dbfs:/FileStore/psday1_2.csv', header=True, schema=schema)
result = Dataframe2.groupBy('product_category').agg(round(avg('purchase_amount'),2).alias('Cat_Avg'))
joined_df=Dataframe2.join(result,on='product_category',how='left')
filled_df = joined_df.withColumn('purchase_amount', when(col('purchase_amount').isNull(), col('Cat_Avg')).otherwise(col('purchase_amount'))).drop('Cat_Avg')
filled_df.show()


+----------------+-------+---------+----------+---------------+
|product_category|cust_id|cust_name|      city|purchase_amount|
+----------------+-------+---------+----------+---------------+
|     Electronics|    201|     Aman|     Delhi|         1500.0|
|         Fashion|    202|    Kiran|    Mumbai|         2000.0|
|     Electronics|    203|     Ravi| Bangalore|         2000.0|
|         Fashion|    204|   Simran| Hyderabad|         2000.0|
|     Electronics|    205|    Vinay|      Pune|         1800.0|
|         Grocery|    206|    Pooja|   Chennai|         1300.0|
+----------------+-------+---------+----------+---------------+



In [0]:
#You have a dataset of students from different Indian states with columns: `student_id`, `student_name`, `state`, `score`. Write a PySpark program to rank students within each state based on their scores in descending order.
from pyspark.sql.types import IntegerType, StringType, FloatType, StructType, StructField
from pyspark.sql.window import Window
from pyspark.sql.functions import col, desc, rank


schema = StructType([
    StructField('student_id', IntegerType(), True),
    StructField('student_name', StringType(), True),
    StructField('state', StringType(), True),
    StructField('score', FloatType(), True)
])

Dataframe3=spark.read.csv('dbfs:/FileStore/psday2_1.csv',header=True,schema=schema)
window_spec=Window.partitionBy('state').orderBy(desc('score'))
ranked_df = Dataframe3.withColumn("rank", rank().over(window_spec))#.filter('rank==1')
ranked_df.show()


+----------+------------+------------+-----+----+
|student_id|student_name|       state|score|rank|
+----------+------------+------------+-----+----+
|       302|       Sneha|   Karnataka| 92.0|   1|
|       304|       Kunal|   Karnataka| 88.0|   2|
|       306|       Pavan|   Karnataka| 80.0|   3|
|       303|        Amit| Maharashtra| 90.0|   1|
|       301|       Rohit| Maharashtra| 85.0|   2|
|       305|       Nidhi| Maharashtra| 78.0|   3|
+----------+------------+------------+-----+----+



In [0]:

#data = [
#("Alice", "HR", 5000),
#("Bob", "HR", 6000),
#("Charlie", "HR", 7000),
#("David", "IT", 8000),
#("Eve", "IT", 7500),
#("Frank", "IT", 9000)
#]
#columns = ["name", "department", "salary"]
#Dataframe4=spark.createDataFrame(data,columns)
#Dataframe4.show()
#Find the top 2 highest salaries in each department
from pyspark.sql.types import IntegerType, StringType, FloatType, StructType, StructField
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, dense_rank
schema=StructType([
    StructField('name',StringType(),True),
    StructField('department',StringType(),True),
    StructField('salary',FloatType(),True)
])
Dataframe4=spark.read.csv('dbfs:/FileStore/psday2_2.csv',header=True,schema=schema)
window_spec=Window.partitionBy('department').orderBy(desc('salary'))
Ranked_df=Dataframe4.withColumn('rank',dense_rank().over(window_spec)).where('rank<=2')
Ranked_df.show()

+-------+----------+------+----+
|   name|department|salary|rank|
+-------+----------+------+----+
|Charlie|      "HR"|7000.0|   1|
|    Bob|      "HR"|6000.0|   2|
|  Frank|      "IT"|9000.0|   1|
|  David|      "IT"|8000.0|   2|
+-------+----------+------+----+



In [0]:
#You have a PySpark DataFrame containing customer transaction details. Write a PySpark query to find the top 3 customers who have spent the most money, sorted in descending order of total spend. 
from pyspark.sql.types import StringType,StructField,StructType,FloatType
from pyspark.sql.functions import sum
schema=StructType([
    StructField("user_id",StringType(),True),
    StructField("amount",FloatType(),True)
])
DataFrame5=spark.read.csv('dbfs:/FileStore/psday3_1.csv',header=True,schema=schema)
#result1=DataFrame5.groupBy("user_id").agg(sum("amount").alias("total_spent"))
result=DataFrame5.groupBy("user_id").agg(sum("amount").alias("total_spent")).orderBy("total_spent",ascending=False).limit(3)
result.show()

+-------+-----------+
|user_id|total_spent|
+-------+-----------+
|   C002|     1500.0|
|   C003|     1000.0|
|   C001|      800.0|
+-------+-----------+



In [0]:
#You have a PySpark DataFrame containing user login details. Write a PySpark query to find the first and last login timestamps for each user.
from pyspark.sql.types import StructField, StructType, TimestampType, StringType
from pyspark.sql.functions import min, max

schema = StructType([
    StructField('user_id', StringType(), True),
    StructField('timestamp', TimestampType(), True)
])

DataFrame6 = spark.read.csv('dbfs:/FileStore/psday3_2.csv', header=True, schema=schema)

result = DataFrame6.groupBy("user_id").agg(
    min("timestamp").alias("first_login"),
    max("timestamp").alias("last_login")
)
result.show()


+-------+-------------------+-------------------+
|user_id|        first_login|         last_login|
+-------+-------------------+-------------------+
|   U002|2024-03-10 09:30:00|2024-03-10 14:00:00|
|   U003|2024-03-10 11:15:00|2024-03-10 22:00:00|
|   U001|2024-03-10 08:00:00|2024-03-10 18:30:00|
+-------+-------------------+-------------------+



In [0]:
#For each customer, return the latest order amount, the total amount spent, and the number of orders. Output should include the customer's name.

from pyspark.sql.functions import to_date, desc, rank,col,sum
from pyspark.sql.window import Window
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType

schema1 = StructType([
    StructField('customer_id', IntegerType(), True),
    StructField('Customer_name', StringType(), True)
])

schema2 = StructType([
    StructField('order_id', IntegerType(), True),
    StructField('Customer_id', IntegerType(), True),
    StructField('order_date', StringType(), True),
    StructField('amount', FloatType(), True)
])


DataFrame7_1 = spark.read.csv('dbfs:/FileStore/psday4_1.csv', header=True, schema=schema1)
DataFrame7_2 = spark.read.csv('dbfs:/FileStore/psday4_2.csv', header=True, schema=schema2)


DataFrame7_2 = DataFrame7_2.withColumn("order_date", to_date(DataFrame7_2["order_date"], "yyyy-MM-dd"))


window_spec = Window.partitionBy('customer_id').orderBy(desc('order_date'))



result = DataFrame7_2.withColumn('ranker', rank().over(window_spec)).filter(col('ranker') == 1)

result_agg=DataFrame7_2.groupBy('customer_id').agg(sum('amount').alias('total_amount'))
final_result=result.join(result_agg,on='customer_id').select('customer_id','order_date','amount','total_amount')
final_result.show()


+-----------+----------+------+------------+
|customer_id|order_date|amount|total_amount|
+-----------+----------+------+------------+
|        101|2024-02-01| 700.0|      1400.0|
|        103|2024-03-15| 400.0|       500.0|
|        102|2024-01-01| 300.0|       300.0|
|        104|2024-03-10| 100.0|       100.0|
+-----------+----------+------+------------+



In [0]:
#You are given two DataFrames in PySpark:
#employee_df: Contains employee information.
#department_df: Contains department information.
#You need to perform an inner join on these DataFrames to find out which department each employee belongs to
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
schema1=StructType(
    [StructField('employee_id',IntegerType(),True),
     StructField('employee_name',StringType(),True),
     StructField('dept_id',IntegerType(),True)
    ]
)

schema2=StructType(
    [StructField('dept_id',IntegerType(),True),
     StructField('dept_name',StringType(),True),
    ]
)
Dataframe8_1=spark.read.csv('dbfs:/FileStore/psday4_2_1.csv',header=True,schema=schema1)
Dataframe8_2=spark.read.csv('dbfs:/FileStore/psday4_2_2.csv',header=True,schema=schema2)
result=Dataframe8_1.join(Dataframe8_2,on='dept_id').select('employee_id','employee_name','dept_id','dept_name')
result.show()

+-----------+-------------+-------+---------+
|employee_id|employee_name|dept_id|dept_name|
+-----------+-------------+-------+---------+
|          1|        Alice|    101|     'HR'|
|          2|          Bob|    102|'Finance'|
|          3|      Charlie|    103|     'IT'|
|          4|        David|    101|     'HR'|
+-----------+-------------+-------+---------+



In [0]:
#You have data with skillset(list) and date.now you have count the people with number of skills with respective dates
from pyspark.sql.functions import explode,to_date

data=[(['A','B'],'01/11/20'),
(['B','I','R'],'01/11/20'),
(['S','H'],'02/11/20'),
(['A','H','S'],'02/11/20')]

columns=['skillset','date']

Dataframe8_1=spark.createDataFrame(data,columns)
Dataframe8_1_1=Dataframe8_1.withColumn('dates',to_date(Dataframe8_1['date'],'dd/MM/yy'))
result=Dataframe8_1_1.withColumn('skillset',explode('skillset'))
grouped_df=result.groupBy('skillset','dates').count()
pivot_df=grouped_df.groupBy('skillset').pivot('dates',['2020-11-01', '2020-11-02']).sum('count')
final_result=pivot_df.fillna(0)
display(final_result)

skillset,2020-11-01,2020-11-02
B,2,0
A,1,1
S,0,2
R,1,0
I,1,0
H,0,2


In [0]:
#You are given a DataFrame of employee data with the following columns:
#employee_id (int)
#department (string)
#salary (int)
#Write PySpark code to:
#Rank employees within each department based on their salary (highest salary first).
#Add a column that shows the average salary per department.
#Filter to keep only the top 2 highest-paid employees per department.

from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType
from pyspark.sql.window import Window
from pyspark.sql.functions import rank,desc,avg,col,round
schema=StructType([
    StructField('employee_id',IntegerType(),True),
    StructField('department',StringType(),True),
    StructField('salary',FloatType(),True)
])

Dataframe8_2=spark.read.csv('dbfs:/FileStore/psday5_2.csv',header=True,schema=schema)
windows_spec=Window.partitionBy('department').orderBy(desc('salary'))
avg_window=Window.partitionBy('department')
result1=Dataframe8_2.withColumn('ranker',rank().over(windows_spec)).withColumn('Avg_Sal',round(avg('salary').over(avg_window),2))
final_result=result1.filter(col('ranker')<= 2)
display(final_result)


employee_id,department,salary,ranker,Avg_Sal
7,'FINANCE',65000.0,1,64000.0
8,'FINANCE',63000.0,2,64000.0
6,'HR',71000.0,1,60333.33
2,'HR',60000.0,2,60333.33
9,'IT',90000.0,1,69250.0
5,'IT',72000.0,2,69250.0


In [0]:
#You are given a PySpark DataFrame with a column named tags that contains comma-separated string
#Task: Write PySpark code to transform this DataFrame so that each tag appears in its own row, along with the corresponding id,
from pyspark.sql.functions import explode, split, col
Data = [
    (1, "spark,hadoop,hive"),
    (2, "python, flask"),
    (3, "sql")
]

col_names = ['id', 'skillset']
DataFrame9_1 = spark.createDataFrame(Data, col_names)

result = DataFrame9_1.withColumn('Skills', explode(split(col('skillset'),","))).select('id','skills')
display(result)



id,skills
1,spark
1,hadoop
1,hive
2,python
2,flask
3,sql


In [0]:
#How would you add a new column called Country to a PySpark DataFrame based on the values of an existing City column, using different methods in PySpark?
from pyspark.sql.types import StructField,StructType,StringType,IntegerType
schema=StructType([
    StructField('name',StringType(),True),
    StructField('age',IntegerType(),True),
    StructField('city',StringType(),True)
])
schema2=StructType([
    StructField('city',StringType(),True),
    StructField('country',StringType(),True)
])
Dataframe9_2_1=spark.read.csv('dbfs:/FileStore/psday6_1.csv',header=True,schema=schema)
Dataframe9_2_2=spark.read.csv('dbfs:/FileStore/psday6_1_2.csv',header=True,schema=schema2)
result=Dataframe9_2_1.join(Dataframe9_2_2,on='city').select('name','age','city','country')
display(result)


name,age,city,country
'John',28,'New York','USA'
'Sarah',24,'London','UK'
'Micheal',30,'Sydney','Australia'


In [0]:
from pyspark.sql.functions import broadcast
from pyspark.sql.types import StructField,StructType,StringType,IntegerType
schema=StructType([
    StructField('name',StringType(),True),
    StructField('age',IntegerType(),True),
    StructField('city',StringType(),True)
])


mapping_data = [("New York", "USA"), ("London", "UK"), ("Sydney", "Australia")]
mapping_df = spark.createDataFrame(mapping_data, ["City", "Country"])
Dataframe9_2_1=spark.read.csv('dbfs:/FileStore/psday6_1.csv',header=True,schema=schema)


df_with_country = Dataframe9_2_1.join(broadcast(mapping_df), on="City", how="left").select('name','age','city','country')
display(df_with_country)


name,age,city,country
John,28,New York,USA
Sarah,24,London,UK
Micheal,30,Sydney,Australia


In [0]:
#Code to count number of employees in each department.
from pyspark.sql.types import StructField,StructType,StringType
schema=StructType([
    StructField('name',StringType(),True),
    StructField('dept',StringType(),True)
])

DataFrame9_3=spark.read.csv('dbfs:/FileStore/psday6_2.csv',header=True,schema=schema)
result=DataFrame9_3.groupBy('dept').count()
display(result)

dept,count
HR,3
IT,2


In [0]:
#Given employee data, filter out everyone earning ≤50K, then group by department to calculate the average salary.
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
from pyspark.sql.functions import col,avg

schema=StructType([
    StructField('id',IntegerType(),True),
    StructField('name',StringType(),True),
    StructField('dept',StringType(),True),
    StructField('Salary',IntegerType(),True)
])

DataFrame10_1=spark.read.csv('dbfs:/FileStore/psday7_1.csv',header=True,schema=schema)
Result=DataFrame10_1.filter(col('Salary')>50000).groupby('dept').agg(avg('Salary').alias('Avg_sal'))
display(Result)

dept,Avg_sal
Sales,60000.0
HR,76500.0
IT,55000.0


In [0]:
#Remove duplicate rows based on name and department
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
from pyspark.sql.window import Window
from pyspark.sql.functions import desc,rank,col

schema=StructType([
    StructField('id',IntegerType(),True),
    StructField('name',StringType(),True),
    StructField('dept',StringType(),True),
    StructField('Salary',IntegerType(),True)
])

DataFrame10_2=spark.read.csv('dbfs:/FileStore/psday7_2.csv',header=True,schema=schema)
window_spec=Window.partitionBy('name','dept').orderBy(desc('id'))
result=DataFrame10_2.withColumn('ranked',rank().over(window_spec)).filter(col('ranked')==1)
display(result)

id,name,dept,Salary,ranked
7,Alice,HR,80000,1
2,Bob,Sales,48000,1
8,Charlie,IT,85000,1
4,Diana,Sales,60000,1
5,Evan,IT,49000,1
6,Fiona,HR,81000,1


In [0]:
# 𝑪𝒓𝒆𝒂𝒕𝒆 𝒂 𝑷𝒚𝑺𝒑𝒂𝒓𝒌 𝑫𝒂𝒕𝒂𝑭𝒓𝒂𝒎𝒆 𝒘𝒊𝒕𝒉 𝒂 “𝒃𝒐𝒏𝒖𝒔” 𝑪𝒐𝒍𝒖𝒎𝒏 (10% 𝒐𝒇 𝑺𝒂𝒍𝒂𝒓𝒚)

from pyspark.sql.types import StructType,StructField,IntegerType,StringType
from pyspark.sql.functions import col

schema=StructType([
    StructField('id',IntegerType(),True),
    StructField('name',StringType(),True),
    StructField('dept',StringType(),True),
    StructField('Salary',IntegerType(),True)
])

DataFrame10_3=spark.read.csv('dbfs:/FileStore/psday7_3.csv',header=True,schema=schema)
result=DataFrame10_3.withColumn('bonus',col('Salary')*0.10)
display(result)

id,name,dept,Salary,bonus
1,Alice,HR,72000,7200.0
2,Bob,Sales,48000,4800.0
3,Charlie,IT,55000,5500.0
4,Diana,Sales,60000,6000.0
5,Evan,IT,49000,4900.0
6,Fiona,HR,81000,8100.0


In [0]:
#Given a Dataframe with some null values in age column now you have replace it with average values

from pyspark.sql.types import StructType,StructField,IntegerType,StringType
from pyspark.sql.functions import col,avg,coalesce
from pyspark.sql.window import Window

schema=StructType([
    StructField('id',IntegerType(),True),
    StructField('name',StringType(),True),
    StructField('dept',StringType(),True),
    StructField('age',IntegerType(),True)
])

DataFrame10_4=spark.read.csv('dbfs:/FileStore/pyday7_4.csv',header=True,schema=schema)
result=DataFrame10_4.groupBy('dept').agg(avg('age').alias('Avg_age'))
joined_df=DataFrame10_4.join(result,on='dept')
Final_result = joined_df.withColumn("age", coalesce(col("age"), col("Avg_age"))).drop('Avg_age')
display(Final_result)
#second

dept_window = Window.partitionBy("dept")
Final_result1= DataFrame10_4.withColumn(
    "age",
    coalesce(col("age"), avg("age").over(dept_window))
)

display(Final_result1)


dept,id,name,age
HR,1,Alice,25.0
Sales,2,Bob,30.0
IT,3,Charlie,24.0
Sales,4,Diana,30.0
IT,5,Evan,24.0
HR,6,Fiona,25.0


id,name,dept,age
1,Alice,HR,25.0
6,Fiona,HR,25.0
3,Charlie,IT,24.0
5,Evan,IT,24.0
2,Bob,Sales,30.0
4,Diana,Sales,30.0


In [0]:
#Write a solution that selects the team_id, team_name and num_points of each team in the tournament after all described matches. Return the result table ordered by num_points in decreasing order. In case of a tie, order the records by team_id in increasing order.
#You would like to compute the scores of all teams after all matches. Points are awarded as follows:
# A team receives three points if they win a match (i.e., Scored more goals than the opponent team).
# A team receives one point if they draw a match (i.e., Scored the same number of goals as the opponent team).
# A team receives no points if they lose a match (i.e., Scored fewer goals than the opponent team).

from pyspark.sql.types import StructType,StructField,IntegerType,StringType
from pyspark.sql.functions import col,when,sum,desc

schema1=StructType([
    StructField('team_id',IntegerType(),True),
    StructField('team_name',StringType(),True),
    
])

schema2=StructType([
    StructField('match_id',IntegerType(),True),
    StructField('host_team',IntegerType(),True),
    StructField('guest_team',IntegerType(),True),
    StructField('host_goals',IntegerType(),True),
    StructField('guest_goals',IntegerType(),True)
])

Dataframe11_1_1=spark.read.csv('dbfs:/FileStore/pyday8_11.csv',header=True,schema=schema1)
Dataframe11_1_2=spark.read.csv('dbfs:/FileStore/psday8_12.csv',header=True,schema=schema2)

result=Dataframe11_1_1.join(Dataframe11_1_2,(Dataframe11_1_1.team_id==Dataframe11_1_2.host_team)|(Dataframe11_1_1.team_id==Dataframe11_1_2.guest_team),how='left')

final_result=result.withColumn('points', 
    when (
        (col('team_id')==col('host_team')) & (col('host_goals')>col('guest_goals')),3
        ).
    when(
        (col('team_id')==col('guest_team')) & (col('host_goals')<col('guest_goals')),3
        ).
    when(
        (col('host_goals')==col('guest_goals')),1
        ).
    otherwise(0))
final_result_1=final_result.groupBy('team_id','team_name').agg(sum('points').alias('points')).orderBy(desc('points'))

display(final_result_1)



team_id,team_name,points
10,leetcode FC,7
50,Tornto FC,3
20,Newyork FC,3
30,Atlanta FC,1
40,Chicago FC,0


In [0]:
#Problem Statement:
#You're given two DataFrames:
#employees – contains employee details:
#(employee_id, name, salary, department_id)
#departments – contains department metadata:
#(department_id, department_name)
#🎯 Objective:
#Fill in the missing (null) salaries in the employees DataFrame with the average salary of that respective department.

from pyspark.sql.types import StructType,StructField,IntegerType,StringType
from pyspark.sql.functions import col,avg,coalesce,round
from pyspark.sql.window import Window

schema1=StructType([
    StructField('id',IntegerType(),True),
    StructField('name',StringType(),True),
    StructField('salary',IntegerType(),True),
    StructField('dept_id',IntegerType(),True)
])

schema2=StructType([
    StructField('id',IntegerType(),True),
    StructField('dept_name',StringType(),True)
])


Dataframe11_2_1=spark.read.csv('dbfs:/FileStore/psday8_21.csv',header=True,schema=schema1)
Dataframe11_2_2=spark.read.csv('dbfs:/FileStore/psday8_22.csv',header=True,schema=schema2)
windows_spec=Window.partitionBy('dept_id')
Final_result= Dataframe11_2_1.withColumn(
    "Salary",
    coalesce(col("Salary"), round(avg("Salary").over(windows_spec),2))
)
display(Final_result)


id,name,Salary,dept_id
1,Alice,70000.0,10
3,Charlie,80000.0,10
6,Frank,90000.0,10
2,Bob,69333.33,20
5,Eve,75000.0,20
8,Hannah,62000.0,20
10,Jack,71000.0,20
4,David,52000.0,30
7,Grace,52000.0,30
9,Isaac,52000.0,30


In [0]:
#Write pyspark code to display manager of each employees
#How do you retrieve the manager’s name for each employee using PySpark?
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
from pyspark.sql.functions import col
schema=StructType([
    StructField('id',IntegerType(),True),
    StructField('name',StringType(),True),
    StructField('deptid',IntegerType(),True),
    StructField('mgr_id',IntegerType(),True)
])

Dataframe12_1=spark.read.csv('dbfs:/FileStore/psday9_1.csv',header=True,schema=schema)
result = Dataframe12_1.alias('employee') .join(Dataframe12_1.alias('manager'), col('employee.mgr_id') == col('manager.id'), 'left').select(col('employee.id'), col('employee.name'), col('manager.name').alias('manager_name'))

display(result)

id,name,manager_name
1,John,
2,jane,John
3,Sam,John
4,Lucy,jane
5,Mike,Sam
6,Tyson,jane


In [0]:
#Identify accounts that should be banned based on logins from different IP addresses at the same time.
from pyspark.sql.functions import to_timestamp,col,desc,lead
from pyspark.sql.window import Window
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
schema = StructType([
    StructField('acnt_id', StringType(), True),
    StructField('ip_address', StringType(), True),
    StructField('login', StringType(), True),
    StructField('logout', StringType(), True)
])
Dataframe12_2= spark.read.csv('dbfs:/FileStore/psday9_2.csv', header=True, schema=schema)
Dataframe12_2= Dataframe12_2.withColumn('login', to_timestamp('login', 'yy-MM-dd HH:mm:ss')) \
       .withColumn('logout', to_timestamp('logout', 'yy-MM-dd HH:mm:ss'))
window_spec=Window.partitionBy('acnt_id').orderBy('login')
Dup_Dataframe=Dataframe12_2.withColumn('next_login',lead('login').over(window_spec)).filter(col('next_login').isNotNull())
final_df=Dup_Dataframe.filter(col('logout')>col('next_login')).select('acnt_id').distinct()
display(final_df)

acnt_id
A001
A004


In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
from pyspark.sql.functions import explode,col,split
schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('Descr', StringType(), True)
])
Dataframe13_1=spark.read.csv('dbfs:/FileStore/psday10_1-2.csv',header=True,schema=schema)
result=Dataframe13_1.withColumn('Descr',split(col('Descr'),',')).withColumn('Descr',explode('Descr'))
final_result=result.groupBy('Descr').count()
display(final_result)

Descr,count
TV,5
Chair,3
Sofa,3


In [0]:
#write a pyspark code to identify the consecutive numbers from the Dataframe
from pyspark.sql.types import StructType,StructField,IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import col,lead,lag
schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('num', IntegerType(), True)
])
Dataframe13_2=spark.read.csv('dbfs:/FileStore/psday10_2.csv',header=True,schema=schema)
window_spec=Window.orderBy('id')
Dataframe13_2_1=Dataframe13_2.withColumn('prev',lag('num').over(window_spec)).withColumn('next',lead('num').over(window_spec))
final_result=Dataframe13_2_1.filter((col('num')==col('prev')) & (col('num')==col('next'))).select(col('num').alias('consecutiveNums'))
display(final_result)


consecutiveNums
1
