In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

dataa = [
    (101, 'John', 'A', None),   # null for managerId
    (102, 'Dan', 'A', 101),
    (103, 'James', 'A', 101),
    (104, 'Amy', 'A', 101),
    (105, 'Anne', 'A', 101),
    (106, 'Ron', 'B', 101)
]

schemaa= StructType([StructField('id',IntegerType(),False),StructField('name',StringType(),False),StructField('department',StringType(),False),StructField('managerId',IntegerType(),True)])

df=spark.createDataFrame(dataa,schemaa)
df.show()


+---+-----+----------+---------+
| id| name|department|managerId|
+---+-----+----------+---------+
|101| John|         A|     null|
|102|  Dan|         A|      101|
|103|James|         A|      101|
|104|  Amy|         A|      101|
|105| Anne|         A|      101|
|106|  Ron|         B|      101|
+---+-----+----------+---------+



In [0]:
variablee=df.groupBy('managerId').agg(count('managerId').alias('countt')).filter((col('countt')>=5)&(col('managerId').isNotNull())).select(col('managerId')).collect()
manager_ids=[ss['managerId'] for ss in variablee]
    
df.select(col('name')).filter(col('id').isin(manager_ids)).display()

name
John


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

dataa = [
    (0, 'Y', 'N'),
    (1, 'Y', 'Y'),
    (2, 'N', 'Y'),
    (3, 'Y', 'Y'),
    (4, 'N', 'N')
]

schemaa = StructType([
    StructField('product_id', IntegerType(), False),
    StructField('low_fats', StringType(), False),
    StructField('recyclable', StringType(), False)
])

df = spark.createDataFrame(dataa, schemaa)
df.show()

+----------+--------+----------+
|product_id|low_fats|recyclable|
+----------+--------+----------+
|         0|       Y|         N|
|         1|       Y|         Y|
|         2|       N|         Y|
|         3|       Y|         Y|
|         4|       N|         N|
+----------+--------+----------+



In [0]:
df.filter((col('low_fats')=='Y')&(col('recyclable')=='Y')).select(col('product_id')).display()

product_id
1
3


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

dataa = [
    (1, 'Will', None),
    (2, 'Jane', None),
    (3, 'Alex', 2),
    (4, 'Bill', None),
    (5, 'Zack', 1),
    (6, 'Mark', 2)
]

schemaa = StructType([
    StructField('id', IntegerType(), False),
    StructField('name', StringType(), False),
    StructField('referee_id', IntegerType(), True)
])

df = spark.createDataFrame(dataa, schemaa)
df.show()


+---+----+----------+
| id|name|referee_id|
+---+----+----------+
|  1|Will|      null|
|  2|Jane|      null|
|  3|Alex|         2|
|  4|Bill|      null|
|  5|Zack|         1|
|  6|Mark|         2|
+---+----+----------+



In [0]:
df.filter((col('referee_id').isNull()) | (col('referee_id')!=2)).select(col('name')).display()

name
Will
Jane
Bill
Zack


📌 datetime and datetime.date — explained simply:
In Python, there is a built-in module called datetime that lets you work with dates and times.

🧱 datetime module:
It contains many classes — two important ones are:

datetime.date — only stores a date (like 2024-08-12)

datetime.datetime — stores a date and time (like 2024-08-12 14:30:00)

✅ datetime.date
This class represents a date (year, month, day) without time.

python
Copy
Edit
import datetime

d = datetime.date(2024, 8, 12)
print(d)   # Output: 2024-08-12
So in your PySpark schema, when you declare a column as DateType(), it's best to use values of type datetime.date.

🧠 Simple analogy:
Concept	Example
String date	'2024-08-12'
datetime.date	datetime.date(2024, 8, 12)
datetime.datetime	datetime.datetime(2024, 8, 12, 14, 30)

📌 Summary:
datetime is the module

datetime.date is the class to create date-only objects

It's the safest format to use for DateType() in PySpark

Let me know if you want a side-by-side code example using all three!

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import datetime

dataa = [
    (1, 3, 5, datetime.date(2019, 8, 1)),
    (1, 3, 6, datetime.date(2019, 8, 2)),
    (2, 7, 7, datetime.date(2019, 8, 1)),
    (2, 7, 6, datetime.date(2019, 8, 2)),
    (4, 7, 1, datetime.date(2019, 7, 22)),
    (3, 4, 4, datetime.date(2019, 7, 21)),
    (3, 4, 4, datetime.date(2019, 7, 21))
]

schemaa = StructType([
    StructField('article_id', IntegerType(), False),
    StructField('author_id', IntegerType(), False),
    StructField('viewer_id', IntegerType(), False),
    StructField('view_date', DateType(), False)
])

df = spark.createDataFrame(dataa, schemaa)
df.show()


+----------+---------+---------+----------+
|article_id|author_id|viewer_id| view_date|
+----------+---------+---------+----------+
|         1|        3|        5|2019-08-01|
|         1|        3|        6|2019-08-02|
|         2|        7|        7|2019-08-01|
|         2|        7|        6|2019-08-02|
|         4|        7|        1|2019-07-22|
|         3|        4|        4|2019-07-21|
|         3|        4|        4|2019-07-21|
+----------+---------+---------+----------+



In [0]:
#df.filter(col('author_id')==col('viewer_id')).select(col('author_id')).distinct().sort(col('author_id').asc()).display()
df.filter(col('author_id')==col('viewer_id')).select(col('author_id')).dropDuplicates(subset=['author_id']).sort(col('author_id').asc()).display()


author_id
4
7


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

dataa = [
    (1, "Let us Code"),
    (2, "More than fifteen chars are here!")
]

schemaa = StructType([
    StructField('tweet_id', IntegerType(), False),
    StructField('content', StringType(), False)
])

df = spark.createDataFrame(dataa, schemaa)
df.show(truncate=False)


+--------+---------------------------------+
|tweet_id|content                          |
+--------+---------------------------------+
|1       |Let us Code                      |
|2       |More than fifteen chars are here!|
+--------+---------------------------------+



In [0]:
df.filter(length(col('content'))>15).select('tweet_id').display()

tweet_id
2


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Employees table
dataa_employees = [
    (1, "Alice"),
    (7, "Bob"),
    (11, "Meir"),
    (90, "Winston"),
    (3, "Jonathan")
]

schemaa_employees = StructType([
    StructField('id', IntegerType(), False),
    StructField('name', StringType(), False)
])

df_employees = spark.createDataFrame(dataa_employees, schemaa_employees)
df_employees.show()

# EmployeeUNI table
dataa_employeeuni = [
    (3, 1),
    (11, 2),
    (90, 3)
]

schemaa_employeeuni = StructType([
    StructField('id', IntegerType(), False),
    StructField('unique_id', IntegerType(), False)
])

df_employeeuni = spark.createDataFrame(dataa_employeeuni, schemaa_employeeuni)
df_employeeuni.show()


+---+--------+
| id|    name|
+---+--------+
|  1|   Alice|
|  7|     Bob|
| 11|    Meir|
| 90| Winston|
|  3|Jonathan|
+---+--------+

+---+---------+
| id|unique_id|
+---+---------+
|  3|        1|
| 11|        2|
| 90|        3|
+---+---------+



In [0]:
df_employees.join(df_employeeuni,df_employees.id==df_employeeuni.id,'left').select(col('unique_id'),col('name')).display()

unique_id,name
,Alice
,Bob
2.0,Meir
3.0,Winston
1.0,Jonathan


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Sales table
dataa_sales = [
    (1, 100, 2008, 10, 5000),
    (2, 100, 2009, 12, 5000),
    (7, 200, 2011, 15, 9000)
]

schemaa_sales = StructType([
    StructField('sale_id', IntegerType(), False),
    StructField('product_id', IntegerType(), False),
    StructField('year', IntegerType(), False),
    StructField('quantity', IntegerType(), False),
    StructField('price', IntegerType(), False)
])

df_sales = spark.createDataFrame(dataa_sales, schemaa_sales)
df_sales.show()

# Product table
dataa_product = [
    (100, "Nokia"),
    (200, "Apple"),
    (300, "Samsung")
]

schemaa_product = StructType([
    StructField('product_id', IntegerType(), False),
    StructField('product_name', StringType(), False)
])

df_product = spark.createDataFrame(dataa_product, schemaa_product)
df_product.show()


+-------+----------+----+--------+-----+
|sale_id|product_id|year|quantity|price|
+-------+----------+----+--------+-----+
|      1|       100|2008|      10| 5000|
|      2|       100|2009|      12| 5000|
|      7|       200|2011|      15| 9000|
+-------+----------+----+--------+-----+

+----------+------------+
|product_id|product_name|
+----------+------------+
|       100|       Nokia|
|       200|       Apple|
|       300|     Samsung|
+----------+------------+



In [0]:
df_sales.join(df_product,df_product.product_id==df_sales.product_id,'left').select(col('product_name'),col('year'),col('price')).display()

product_name,year,price
Nokia,2008,5000
Nokia,2009,5000
Apple,2011,9000


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Visits table
dataa_visits = [
    (1, 23),
    (2, 9),
    (4, 30),
    (5, 54),
    (6, 96),
    (7, 54),
    (8, 54)
]

schemaa_visits = StructType([
    StructField('visit_id', IntegerType(), False),
    StructField('customer_id', IntegerType(), False)
])

df_visits = spark.createDataFrame(dataa_visits, schemaa_visits)
df_visits.show()

# Transactions table
dataa_transactions = [
    (2, 5, 310),
    (3, 5, 300),
    (9, 5, 200),
    (12, 1, 910),
    (13, 2, 970)
]

schemaa_transactions = StructType([
    StructField('transaction_id', IntegerType(), False),
    StructField('visit_id', IntegerType(), False),
    StructField('amount', IntegerType(), False)
])

df_transactions = spark.createDataFrame(dataa_transactions, schemaa_transactions)
df_transactions.show()


+--------+-----------+
|visit_id|customer_id|
+--------+-----------+
|       1|         23|
|       2|          9|
|       4|         30|
|       5|         54|
|       6|         96|
|       7|         54|
|       8|         54|
+--------+-----------+

+--------------+--------+------+
|transaction_id|visit_id|amount|
+--------------+--------+------+
|             2|       5|   310|
|             3|       5|   300|
|             9|       5|   200|
|            12|       1|   910|
|            13|       2|   970|
+--------------+--------+------+



In [0]:

df_visits.join(df_transactions,df_visits.visit_id==df_transactions.visit_id,'left').filter(col('amount').isNull()).groupby(col('customer_id')).agg(count('*').alias('count_no_trans')).select(col('customer_id'),col('count_no_trans')).sort(col('customer_id').desc()).display()

customer_id,count_no_trans
96,1
54,2
30,1


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import datetime

# Weather table data
dataa_weather = [
    (1, datetime.date(2015, 1, 1), 10),
    (2, datetime.date(2015, 1, 2), 25),
    (3, datetime.date(2015, 1, 3), 20),
    (4, datetime.date(2015, 1, 4), 30)
]

# Schema for Weather table
schemaa_weather = StructType([
    StructField('id', IntegerType(), False),
    StructField('recordDate', DateType(), False),
    StructField('temperature', IntegerType(), False)
])

# Create DataFrame
df_weather = spark.createDataFrame(dataa_weather, schemaa_weather)
df_weather.show()


+---+----------+-----------+
| id|recordDate|temperature|
+---+----------+-----------+
|  1|2015-01-01|         10|
|  2|2015-01-02|         25|
|  3|2015-01-03|         20|
|  4|2015-01-04|         30|
+---+----------+-----------+



###if we use self join, below steps are mandatory:
1) df_weather1=df_weather.alias('a')
2) df_weather2=df_weather.alias('b')

and wherever we try to refer the columns in the two df's we should write like col('a.recordDate')==col('b.recordDate') insetad of df_weather1.recordDtae==df_weather2.recordDtae like we do in normal sql joins

In [0]:
df_weather1=df_weather.alias('a')
df_weather2=df_weather.alias('b')
df_weather1.join(df_weather2,col('b.recordDate')==date_add(col('a.recordDate'),1),'inner').filter(col('b.temperature')>col('a.temperature')).select(col('b.id')).display()

id
2
4


###Follow these when using window functions

1)these are mandatory:   
from pyspark.sql.window import Window
from pyspark.sql.functions import *  ------------------>> this will have lead lag etc functions


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
df_weather.select(when((lead(col('temperature')).over(Window.orderBy(col('recordDate')) )>col('temperature'))&(lead(col('recordDate')).over(Window.orderBy(col('recordDate')))==col('recordDate')+1),lead(col('id')).over(Window.orderBy(col('recordDate')))).alias('id')).filter(col('id').isNotNull()).display()

id
2
4
