In [0]:
"""
Task - Write a Spark code snippet to calculate the sum of a column in a DataFrame
"""
from pyspark.sql.functions import col,sum
# Sample employee data
data = [("John Doe", "john@example.com", 50000.0),
    ("Jane Smith", "jane@example.com", 60000.0),
    ("Bob Johnson", "bob@example.com", 55000.0)]


schema="Name string,email string,salary double"
df=spark.createDataFrame(data,schema)
ds2 = df.agg(sum("salary").alias("Total_salary")).first()[0]
display(ds2)

165000.0

In [0]:

"""
Task - Identify rows containing non-numeric values in the "Quantity" column, if any.
"""
from pyspark.sql.types import *
schema = StructType([
  StructField("ProductCode", StringType(), True),
  StructField("Quantity", StringType(), True),
  StructField("UnitPrice", StringType(), True),
  StructField("CustomerID", StringType(), True),
])
 
data = [
  ("P001", 5, 20.0, "C001"),
  ("P002", 3, 15.5, "C002"),
  ("P003", 10, 5.99, "C003"),
  ("P004", 2, 50.0, "C001"),
  ("P005", "eight", 12.75, "C002"),
]
 
df = spark.createDataFrame(data, schema=schema)
df = df.filter(~col("Quantity").rlike('^[a-zA-Z]*$'))
df.show()

+-----------+--------+---------+----------+
|ProductCode|Quantity|UnitPrice|CustomerID|
+-----------+--------+---------+----------+
|       P001|       5|     20.0|      C001|
|       P002|       3|     15.5|      C002|
|       P003|      10|     5.99|      C003|
|       P004|       2|     50.0|      C001|
+-----------+--------+---------+----------+



In [0]:
# Databricks notebook source
#Find out hashtag count for each quote

from pyspark.sql.functions import col,split,size
data = [
    ("Work hard in #silence, let #success make the noise.",),
    ("Be #yourself; everyone else is already taken.",),
    ("The only way to do #greatwork is to #love what you do.",),
    ("#Believe you can and you're #halfway there.",),
    ("The #future belongs to those who #believe in the #beauty of their #dreams.",)
]
df = spark.createDataFrame(data, ["quote"])
df = df.withColumn("count", size(split(col("quote"),"#"))-1)
df.show(truncate=False)

+--------------------------------------------------------------------------+-----+
|quote                                                                     |count|
+--------------------------------------------------------------------------+-----+
|Work hard in #silence, let #success make the noise.                       |2    |
|Be #yourself; everyone else is already taken.                             |1    |
|The only way to do #greatwork is to #love what you do.                    |2    |
|#Believe you can and you're #halfway there.                               |2    |
|The #future belongs to those who #believe in the #beauty of their #dreams.|4    |
+--------------------------------------------------------------------------+-----+



In [0]:
""" Write a PySpark program to select every 3rd (nth) row in the dataset """
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
# Define schema for the DataFrame
schema = StructType([
 StructField("emp_id", IntegerType(), True),
 StructField("name", StringType(), True),
 StructField("salary", IntegerType(), True)
])

# Sample employee data
data = [
 (1001, "John Doe", 50000),
 (2001, "Jane Smith", 60000),
 (1003, "Michael Johnson", 75000),
 (4000, "Emily Davis", 55000),
 (1005, "Robert Brown", 70000),
 (6000, "Emma Wilson", 80000),
 (1700, "James Taylor", 65000),
 (8000, "Olivia Martinez", 72000),
 (2900, "William Anderson", 68000),
 (3310, "Sophia Garcia", 67000)
]

# Create DataFrame
df = spark.createDataFrame(data, schema)
windowspec = Window.orderBy(lit("1"))
df = df.withColumn("rn",row_number().over(windowspec)).filter(col("rn")%3==0).drop("rn")

# Show DataFrame
df.show()


+------+----------------+------+
|emp_id|            name|salary|
+------+----------------+------+
|  1003| Michael Johnson| 75000|
|  6000|     Emma Wilson| 80000|
|  2900|William Anderson| 68000|
+------+----------------+------+



In [0]:
"""Write a pyspark code to rank the products based on their total sales amount for each month, and return the top product for each month.
"""
from pyspark.sql.functions import *
from pyspark.sql.window import Window
sales_data = [
 {"product_id": 1, "sale_date": "2023-01-05", "amount": 100},
 {"product_id": 2, "sale_date": "2023-01-08", "amount": 150},
 {"product_id": 1, "sale_date": "2023-01-15", "amount": 100},
 {"product_id": 3, "sale_date": "2023-01-20", "amount": 100},
 {"product_id": 2, "sale_date": "2023-02-03", "amount": 180},
 {"product_id": 3, "sale_date": "2023-02-10", "amount": 250},
 {"product_id": 1, "sale_date": "2023-02-15", "amount": 300},
]
schema="product_id int ,sale_date string,amount long"

df=spark.createDataFrame(sales_data,schema)
df = df.withColumn("sale_date",date_format(to_date(col("sale_date")),'yyyy-MM'))
df = df.groupBy("product_id","sale_date").agg(sum("amount").alias("total_sales"))
window_spec = Window.partitionBy("sale_date").orderBy(desc("total_sales"))
df = df.withColumn("rn",row_number().over(window_spec)).filter(col("rn")==1).drop("rn")
df.show()

+----------+---------+-----------+
|product_id|sale_date|total_sales|
+----------+---------+-----------+
|         1|  2023-01|        200|
|         1|  2023-02|        300|
+----------+---------+-----------+



In [0]:

"""SELF JOIN"""
from pyspark.sql.types import *
from pyspark.sql.functions import col
employees_data = [
(1, 'Joe', 70000, 3),
(2, 'Henry', 80000, 4),
(3, 'Sam', 60000, None),
(4, 'Max', 90000, None)
]


employees_schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("salary", IntegerType(), True),
StructField("managerId", IntegerType(), True)])

df_employee=spark.createDataFrame(employees_data,employees_schema)
df_final=df_employee.alias("e1").join(df_employee.alias("e2"),col("e1.managerId")==col("e2.id"),"inner").filter(col("e1.salary")>col("e2.salary"))
df_final.show()

+---+----+------+---------+---+----+------+---------+
| id|name|salary|managerId| id|name|salary|managerId|
+---+----+------+---------+---+----+------+---------+
|  1| Joe| 70000|        3|  3| Sam| 60000|     null|
+---+----+------+---------+---+----+------+---------+



In [0]:
"""FIND DUPS """
# Databricks notebook source
from pyspark.sql.functions import col,count
data = [
(1, 'abc@g.com'),
(2, 'xyz@g.com'),
(3, 'abc@g.com' ),
(4, 'pqr@g.com')
]
schema = "id int,Email string"
df=spark.createDataFrame(data,schema)
df_2 = df.groupBy(col("Email")).agg(count("*").alias("count")).filter(col("count")>1).drop("count")
df_2.show()

+---------+
|    Email|
+---------+
|abc@g.com|
+---------+



In [0]:

"""FIND MISSING NUMBERS in DATA FREAME"""
from pyspark.sql.functions import col,min,max
data = [
 (1, ),
 (2,),
 (3,),
 (6,),
 (7,),
 (8,)]
df = spark.createDataFrame(data).toDF("id")
df_min_max = df.agg(min(col("id")),max(col("id")))
df_final=spark.range(df_min_max.collect()[0][0],df_min_max.collect()[0][1]+1)
df_final = df_final.subtract(df)
df_final.show()

+---+
| id|
+---+
|  4|
|  5|
+---+



In [0]:

"""NUMBER FORMATTING"""
df = spark.createDataFrame([(101, 0.000000987), (102, 0.0000554467), (103, 0.00050345678)], ["observation_id", "result"])
df.show()

from pyspark.sql.functions import col,format_number
df = df.withColumn("NUM",format_number(col("result"),10))
df.show()

+--------------+------------+
|observation_id|      result|
+--------------+------------+
|           101|     9.87E-7|
|           102|  5.54467E-5|
|           103|5.0345678E-4|
+--------------+------------+

+--------------+------------+------------+
|observation_id|      result|         NUM|
+--------------+------------+------------+
|           101|     9.87E-7|0.0000009870|
|           102|  5.54467E-5|0.0000554467|
|           103|5.0345678E-4|0.0005034568|
+--------------+------------+------------+



In [0]:
"""
1. total number of partitions
2. count of rows in each partitions
"""
from pyspark.sql.functions import col,spark_partition_id,count
df=spark.range(1,1000000)
df_final=df.withColumn("spark_partition_id",spark_partition_id())
df_final=df_final.groupBy(col("spark_partition_id")).agg(count("*").alias("total_rows"))
df_final.show(truncate=False)

+------------------+----------+
|spark_partition_id|total_rows|
+------------------+----------+
|0                 |124999    |
|1                 |125000    |
|2                 |125000    |
|3                 |125000    |
|4                 |125000    |
|5                 |125000    |
|6                 |125000    |
|7                 |125000    |
+------------------+----------+

