# library

In [3]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("SparkBasics").getOrCreate()
spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/05 12:57:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 64029)
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/Users/indhra/Machine_learning/pyspark_handson/.venv/lib/python3.10/site-packages/pyspark/accumulators.py", line 299, in handle
    poll(accum_updates)
  File "/Users/indhra/Machine

In [5]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x13811ceb0>


# creating data

In [6]:
# Method 1: Simple list to DataFrame
data = [("Alice", 25, "Engineer"),
        ("Bob", 30, "Manager"), 
        ("Charlie", 35, "Analyst")]

columns = ["name", "age", "role"]
data

[('Alice', 25, 'Engineer'), ('Bob', 30, 'Manager'), ('Charlie', 35, 'Analyst')]

In [7]:
df = spark.createDataFrame(data
                           )
df.show()

                                                                                

+-------+---+--------+
|     _1| _2|      _3|
+-------+---+--------+
|  Alice| 25|Engineer|
|    Bob| 30| Manager|
|Charlie| 35| Analyst|
+-------+---+--------+



In [8]:
df.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)
 |-- _3: string (nullable = true)



In [9]:
df = spark.createDataFrame(data, 
                           columns)
df.show()

+-------+---+--------+
|   name|age|    role|
+-------+---+--------+
|  Alice| 25|Engineer|
|    Bob| 30| Manager|
|Charlie| 35| Analyst|
+-------+---+--------+



In [10]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- role: string (nullable = true)



In [11]:
df.count()

3

In [12]:
df.columns

['name', 'age', 'role']

# explicit data types

In [13]:
from pyspark.sql.types import StringType, IntegerType, StructField,StructType

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age",IntegerType(), True),
    StructField("role",StringType(), True)
])
schema

StructType([StructField('name', StringType(), True), StructField('age', IntegerType(), True), StructField('role', StringType(), True)])

In [14]:
df_type = spark.createDataFrame(data, schema)
df_type.show()

+-------+---+--------+
|   name|age|    role|
+-------+---+--------+
|  Alice| 25|Engineer|
|    Bob| 30| Manager|
|Charlie| 35| Analyst|
+-------+---+--------+



In [15]:
df_type.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- role: string (nullable = true)



# selecting, filtering, sorting

In [16]:
df.select('name','age').show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+



In [17]:
df.filter(df.age > 30).show()

+-------+---+-------+
|   name|age|   role|
+-------+---+-------+
|Charlie| 35|Analyst|
+-------+---+-------+



In [18]:
df.filter(df.role =='Manager').show()

+----+---+-------+
|name|age|   role|
+----+---+-------+
| Bob| 30|Manager|
+----+---+-------+



In [19]:
df.orderBy('age').show()

+-------+---+--------+
|   name|age|    role|
+-------+---+--------+
|  Alice| 25|Engineer|
|    Bob| 30| Manager|
|Charlie| 35| Analyst|
+-------+---+--------+



In [20]:
df.orderBy(df.age.desc()).show()

+-------+---+--------+
|   name|age|    role|
+-------+---+--------+
|Charlie| 35| Analyst|
|    Bob| 30| Manager|
|  Alice| 25|Engineer|
+-------+---+--------+



In [21]:
df.orderBy(df.age.asc()).show()

+-------+---+--------+
|   name|age|    role|
+-------+---+--------+
|  Alice| 25|Engineer|
|    Bob| 30| Manager|
|Charlie| 35| Analyst|
+-------+---+--------+



In [22]:
df.orderBy('role').show()

+-------+---+--------+
|   name|age|    role|
+-------+---+--------+
|Charlie| 35| Analyst|
|  Alice| 25|Engineer|
|    Bob| 30| Manager|
+-------+---+--------+



# column operations

In [23]:
from pyspark.sql.functions import col 


In [24]:
df.select('name', (col("age") + 5).alias('age_plus_five')).show()

+-------+-------------+
|   name|age_plus_five|
+-------+-------------+
|  Alice|           30|
|    Bob|           35|
|Charlie|           40|
+-------+-------------+



In [25]:
df1= df.select('name', (col('age')+5.2).alias('age_plus_fiveish'))
df1.show()

+-------+----------------+
|   name|age_plus_fiveish|
+-------+----------------+
|  Alice|            30.2|
|    Bob|            35.2|
|Charlie|            40.2|
+-------+----------------+



In [26]:
df1.printSchema()

root
 |-- name: string (nullable = true)
 |-- age_plus_fiveish: double (nullable = true)



# grouping and aggregations

In [27]:
data = [
    ("James", "Sales", "NY", 90000, 34, 10000),
    ("Michael", "Sales", "NY", 86000, 56, 20000),
    ("Robert", "Sales", "CA", 81000, 30, 23000),
    ("Maria", "Finance", "CA", 90000, 24, 23000),
    ("Raman", "Finance", "CA", 99000, 40, 24000),
    ("Scott", "Finance", "NY", 83000, 36, 19000),
    ("Jen", "Finance", "NY", 79000, 53, 15000),
    ("Jeff", "Marketing", "CA", 80000, 25, 18000),
    ("Kumar", "Marketing", "NY", 91000, 50, 21000)
]
columns = ["employee_name", "department", "state", "salary", "age", "bonus"]
data

[('James', 'Sales', 'NY', 90000, 34, 10000),
 ('Michael', 'Sales', 'NY', 86000, 56, 20000),
 ('Robert', 'Sales', 'CA', 81000, 30, 23000),
 ('Maria', 'Finance', 'CA', 90000, 24, 23000),
 ('Raman', 'Finance', 'CA', 99000, 40, 24000),
 ('Scott', 'Finance', 'NY', 83000, 36, 19000),
 ('Jen', 'Finance', 'NY', 79000, 53, 15000),
 ('Jeff', 'Marketing', 'CA', 80000, 25, 18000),
 ('Kumar', 'Marketing', 'NY', 91000, 50, 21000)]

In [28]:
df = spark.createDataFrame(data, columns)
df.printSchema()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)



In [29]:
from pyspark.sql.functions import sum, avg, min, max

df.groupBy('state').agg(sum('salary'),max('age')).show()

+-----+-----------+--------+
|state|sum(salary)|max(age)|
+-----+-----------+--------+
|   NY|     429000|      56|
|   CA|     350000|      40|
+-----+-----------+--------+



In [30]:
df.groupBy('department').agg(max('bonus').alias('highest_bonus'), min('bonus').alias('lowest_bonus')).show()

+----------+-------------+------------+
|department|highest_bonus|lowest_bonus|
+----------+-------------+------------+
|     Sales|        23000|       10000|
|   Finance|        24000|       15000|
| Marketing|        21000|       18000|
+----------+-------------+------------+



In [31]:
df.groupBy(
    "state"
).count().show()

+-----+-----+
|state|count|
+-----+-----+
|   NY|    5|
|   CA|    4|
+-----+-----+



# window 

In [32]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank


In [33]:
window_spec = Window.partitionBy('department').orderBy('Salary')
print(window_spec)

<pyspark.sql.classic.window.WindowSpec object at 0x1381bb610>


In [34]:
df.withColumn('salary_rank', rank().over(window_spec)).show()

+-------------+----------+-----+------+---+-----+-----------+
|employee_name|department|state|salary|age|bonus|salary_rank|
+-------------+----------+-----+------+---+-----+-----------+
|          Jen|   Finance|   NY| 79000| 53|15000|          1|
|        Scott|   Finance|   NY| 83000| 36|19000|          2|
|        Maria|   Finance|   CA| 90000| 24|23000|          3|
|        Raman|   Finance|   CA| 99000| 40|24000|          4|
|         Jeff| Marketing|   CA| 80000| 25|18000|          1|
|        Kumar| Marketing|   NY| 91000| 50|21000|          2|
|       Robert|     Sales|   CA| 81000| 30|23000|          1|
|      Michael|     Sales|   NY| 86000| 56|20000|          2|
|        James|     Sales|   NY| 90000| 34|10000|          3|
+-------------+----------+-----+------+---+-----+-----------+



In [35]:
window_spec = Window.partitionBy('department').orderBy(df.age.desc())
df.withColumn('aged_person',rank().over(window_spec)).show()

+-------------+----------+-----+------+---+-----+-----------+
|employee_name|department|state|salary|age|bonus|aged_person|
+-------------+----------+-----+------+---+-----+-----------+
|          Jen|   Finance|   NY| 79000| 53|15000|          1|
|        Raman|   Finance|   CA| 99000| 40|24000|          2|
|        Scott|   Finance|   NY| 83000| 36|19000|          3|
|        Maria|   Finance|   CA| 90000| 24|23000|          4|
|        Kumar| Marketing|   NY| 91000| 50|21000|          1|
|         Jeff| Marketing|   CA| 80000| 25|18000|          2|
|      Michael|     Sales|   NY| 86000| 56|20000|          1|
|        James|     Sales|   NY| 90000| 34|10000|          2|
|       Robert|     Sales|   CA| 81000| 30|23000|          3|
+-------------+----------+-----+------+---+-----+-----------+



# joins

In [36]:
dept_data = [("Finance", 10), ("Marketing", 20), ("Sales", 30)]
dept_columns = ["dept_name", "dept_id"]
dept_df = spark.createDataFrame(dept_data, dept_columns)

emp_data = [(1,"John",10), (2,"Maria",20), (3,"David",10)]
emp_columns = ["emp_id", "name", "emp_dept_id"]
emp_df = spark.createDataFrame(emp_data, emp_columns)
dept_df.show()

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
+---------+-------+



In [37]:
emp_df.show()

+------+-----+-----------+
|emp_id| name|emp_dept_id|
+------+-----+-----------+
|     1| John|         10|
|     2|Maria|         20|
|     3|David|         10|
+------+-----+-----------+



In [38]:
joined = emp_df.join(dept_df, dept_df.dept_id == emp_df.emp_dept_id, how='inner')
joined.show()

+------+-----+-----------+---------+-------+
|emp_id| name|emp_dept_id|dept_name|dept_id|
+------+-----+-----------+---------+-------+
|     1| John|         10|  Finance|     10|
|     3|David|         10|  Finance|     10|
|     2|Maria|         20|Marketing|     20|
+------+-----+-----------+---------+-------+



In [39]:
joined = emp_df.join(dept_df, dept_df.dept_id == emp_df.emp_dept_id, how='left')
joined.show()

+------+-----+-----------+---------+-------+
|emp_id| name|emp_dept_id|dept_name|dept_id|
+------+-----+-----------+---------+-------+
|     1| John|         10|  Finance|     10|
|     2|Maria|         20|Marketing|     20|
|     3|David|         10|  Finance|     10|
+------+-----+-----------+---------+-------+



In [40]:
joined = emp_df.join(dept_df, dept_df.dept_id == emp_df.emp_dept_id, how='right')
joined.show()

+------+-----+-----------+---------+-------+
|emp_id| name|emp_dept_id|dept_name|dept_id|
+------+-----+-----------+---------+-------+
|     3|David|         10|  Finance|     10|
|     1| John|         10|  Finance|     10|
|     2|Maria|         20|Marketing|     20|
|  NULL| NULL|       NULL|    Sales|     30|
+------+-----+-----------+---------+-------+



In [41]:
joined = emp_df.join(dept_df, dept_df.dept_id == emp_df.emp_dept_id, how='outer')
joined.show()

+------+-----+-----------+---------+-------+
|emp_id| name|emp_dept_id|dept_name|dept_id|
+------+-----+-----------+---------+-------+
|     1| John|         10|  Finance|     10|
|     3|David|         10|  Finance|     10|
|     2|Maria|         20|Marketing|     20|
|  NULL| NULL|       NULL|    Sales|     30|
+------+-----+-----------+---------+-------+



# caching

In [42]:
df.printSchema()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)



In [43]:
df.cache()

DataFrame[employee_name: string, department: string, state: string, salary: bigint, age: bigint, bonus: bigint]

In [44]:
df.count()

9

In [45]:
from pyspark import StorageLevel

In [46]:
joined.persist(StorageLevel.MEMORY_AND_DISK)
joined.count()

4

In [47]:
joined.unpersist()

DataFrame[emp_id: bigint, name: string, emp_dept_id: bigint, dept_name: string, dept_id: bigint]

In [48]:
# transformation - lazy
older = df.filter(df.age>30)

# action - triggers job
older.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



# distributed computing

In [49]:
df.rdd.getNumPartitions()

8

In [50]:
# increase partitions  - shuffle

df2 = df.repartition(10)
df2.rdd.getNumPartitions()

10

In [51]:
df3 = df.coalesce(2)
df3.rdd.getNumPartitions()

2

# read, write data, csv, parquet, json

In [52]:
df_csv = spark.read.csv('./employee_data.csv', header=True, inferSchema=True)
df_csv.show()

+-----------+-----------+---------+-----------+-----------+------+---+----------------+-----------------+----------------+------------+-----+----------+---------+---------------+-------------+---------------------+
|employee_id| first_name|last_name| department|  job_title|salary|age|years_experience|performance_score|bonus_percentage|        city|state| hire_date|is_remote|education_level|project_count|customer_satisfaction|
+-----------+-----------+---------+-----------+-----------+------+---+----------------+-----------------+----------------+------------+-----+----------+---------+---------------+-------------+---------------------+
|          1|      Sarah| Martinez|      Sales|     Senior| 78511| 49|               0|             3.94|            1.32|Philadelphia|   AZ|2021-03-14|    false|       Bachelor|            7|                  7.8|
|          2|       John|    Davis|      Legal|Coordinator| 57346| 43|              25|             4.01|            2.26|      Dallas|   TX

In [53]:
# # Fix for Parquet timestamp type compatibility
# spark.conf.set("spark.sql.parquet.int96AsTimestamp", "true")
# spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY")
# spark.conf.set("spark.sql.parquet.int96RebaseModeInRead", "LEGACY")
# spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")

# df_parq = spark.read.option("mergeSchema", "true").option("timeZone", "UTC").parquet('./employee_data.parquet')
# # Display the contents of the Parquet DataFrame
# df_parq.show()
# # Print the schema for clarity and learning
# df_parq.printSchema()


In [54]:
# df_json = spark.read.json('./employee_data.json')
# df_json.show()

In [55]:
df_csv.write.option('header',True).csv('save1.csv')

In [56]:
from pyspark.sql.functions import sum, avg, min, max, count

df.groupBy("department", "state").agg(
    sum("salary").alias("total_salary"),
    avg("age").alias("avg_age"),
    min("bonus").alias("min_bonus"),
    max("bonus").alias("max_bonus"),
    count("*").alias("emp_count")
).show()


+----------+-----+------------+-------+---------+---------+---------+
|department|state|total_salary|avg_age|min_bonus|max_bonus|emp_count|
+----------+-----+------------+-------+---------+---------+---------+
|     Sales|   NY|      176000|   45.0|    10000|    20000|        2|
|     Sales|   CA|       81000|   30.0|    23000|    23000|        1|
|   Finance|   CA|      189000|   32.0|    23000|    24000|        2|
|   Finance|   NY|      162000|   44.5|    15000|    19000|        2|
| Marketing|   NY|       91000|   50.0|    21000|    21000|        1|
| Marketing|   CA|       80000|   25.0|    18000|    18000|        1|
+----------+-----+------------+-------+---------+---------+---------+



In [57]:
from pyspark.sql.functions import when 

# Add a new column 'age_group' based on age ranges 

In [58]:
df.withColumn(
    "age_group",
    when(df.age<=30, 'young')
    .when(df.age > 60, 'senior')
    .otherwise('middle')
).show()


+-------------+----------+-----+------+---+-----+---------+
|employee_name|department|state|salary|age|bonus|age_group|
+-------------+----------+-----+------+---+-----+---------+
|        James|     Sales|   NY| 90000| 34|10000|   middle|
|      Michael|     Sales|   NY| 86000| 56|20000|   middle|
|       Robert|     Sales|   CA| 81000| 30|23000|    young|
|        Maria|   Finance|   CA| 90000| 24|23000|    young|
|        Raman|   Finance|   CA| 99000| 40|24000|   middle|
|        Scott|   Finance|   NY| 83000| 36|19000|   middle|
|          Jen|   Finance|   NY| 79000| 53|15000|   middle|
|         Jeff| Marketing|   CA| 80000| 25|18000|    young|
|        Kumar| Marketing|   NY| 91000| 50|21000|   middle|
+-------------+----------+-----+------+---+-----+---------+



In [59]:
from pyspark.sql.functions import explode

data = [
    ("James", ["Java", "Scala"]),
    ("Michael", ["Spark", "Java"]),
    ("Robert", ["CSharp"]),
    ("Washington", None),
    ("Jefferson", ["1", "2"])
]
data

[('James', ['Java', 'Scala']),
 ('Michael', ['Spark', 'Java']),
 ('Robert', ['CSharp']),
 ('Washington', None),
 ('Jefferson', ['1', '2'])]

In [60]:
df_arr = spark.createDataFrame(data, ['name','skills'])
df_arr.show()

+----------+-------------+
|      name|       skills|
+----------+-------------+
|     James|[Java, Scala]|
|   Michael|[Spark, Java]|
|    Robert|     [CSharp]|
|Washington|         NULL|
| Jefferson|       [1, 2]|
+----------+-------------+



In [61]:
dfarr1 = df_arr.select('name', explode("skills"))
dfarr1.show()

+---------+------+
|     name|   col|
+---------+------+
|    James|  Java|
|    James| Scala|
|  Michael| Spark|
|  Michael|  Java|
|   Robert|CSharp|
|Jefferson|     1|
|Jefferson|     2|
+---------+------+



In [62]:
df.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [63]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

window_spec = Window.partitionBy("department").orderBy("salary").rowsBetween(-2, 0)
df.withColumn("moving_avg_salary", avg("salary").over(window_spec)).show()


+-------------+----------+-----+------+---+-----+-----------------+
|employee_name|department|state|salary|age|bonus|moving_avg_salary|
+-------------+----------+-----+------+---+-----+-----------------+
|          Jen|   Finance|   NY| 79000| 53|15000|          79000.0|
|        Scott|   Finance|   NY| 83000| 36|19000|          81000.0|
|        Maria|   Finance|   CA| 90000| 24|23000|          84000.0|
|        Raman|   Finance|   CA| 99000| 40|24000|90666.66666666667|
|         Jeff| Marketing|   CA| 80000| 25|18000|          80000.0|
|        Kumar| Marketing|   NY| 91000| 50|21000|          85500.0|
|       Robert|     Sales|   CA| 81000| 30|23000|          81000.0|
|      Michael|     Sales|   NY| 86000| 56|20000|          83500.0|
|        James|     Sales|   NY| 90000| 34|10000|85666.66666666667|
+-------------+----------+-----+------+---+-----+-----------------+



# UDF and pandas udf

In [65]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def label_bonus(bonus):
    return "High" if bonus >= 20000 else "Low"

bonus_udf = udf(label_bonus, StringType())
df.withColumn("bonus_label", bonus_udf(df.bonus)).show()



+-------------+----------+-----+------+---+-----+-----------+
|employee_name|department|state|salary|age|bonus|bonus_label|
+-------------+----------+-----+------+---+-----+-----------+
|        James|     Sales|   NY| 90000| 34|10000|        Low|
|      Michael|     Sales|   NY| 86000| 56|20000|       High|
|       Robert|     Sales|   CA| 81000| 30|23000|       High|
|        Maria|   Finance|   CA| 90000| 24|23000|       High|
|        Raman|   Finance|   CA| 99000| 40|24000|       High|
|        Scott|   Finance|   NY| 83000| 36|19000|        Low|
|          Jen|   Finance|   NY| 79000| 53|15000|        Low|
|         Jeff| Marketing|   CA| 80000| 25|18000|        Low|
|        Kumar| Marketing|   NY| 91000| 50|21000|       High|
+-------------+----------+-----+------+---+-----+-----------+



In [68]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Sample data
data = [
    ("IN1001", 950, "US", "regular"),
    ("IN1002", 3500, "IN", "vip"),
    ("IN1003", 12000, "US", "blocked"),
    ("IN1004", 9000, "UK", "regular"),
    ("IN1005", 250, "FR", "vip")
]
columns = ["order_id", "amount", "country", "customer_status"]

spark = SparkSession.builder.appName("UDFMultiColExample").getOrCreate()
df = spark.createDataFrame(data, columns)

# Advanced UDF using multiple columns
def risk_label(amount, country, status):
    if status == "blocked":
        return "high"
    if amount > 10000:
        return "high"
    if amount > 2000 and country == "IN":
        return "medium"
    if (country in ["US", "UK"]) and status == "vip":
        return "low"
    return "normal"

risk_udf = udf(risk_label, StringType())

df = df.withColumn("risk", risk_udf(df.amount, df.country, df.customer_status))
df.show()


+--------+------+-------+---------------+------+
|order_id|amount|country|customer_status|  risk|
+--------+------+-------+---------------+------+
|  IN1001|   950|     US|        regular|normal|
|  IN1002|  3500|     IN|            vip|medium|
|  IN1003| 12000|     US|        blocked|  high|
|  IN1004|  9000|     UK|        regular|normal|
|  IN1005|   250|     FR|            vip|normal|
+--------+------+-------+---------------+------+



In [69]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("string")
def fast_risk_label(amount: pd.Series, country: pd.Series, status: pd.Series) -> pd.Series:
    # pandas magic for batch processing
    cond_high = (status == "blocked") | (amount > 10000)
    cond_medium = (amount > 2000) & (country == "IN")
    cond_low = (country.isin(["US","UK"])) & (status == "vip")
    return pd.Series(
        ["high" if h else "medium" if m else "low" if l else "normal"
         for h, m, l in zip(cond_high, cond_medium, cond_low)]
    )

df = df.withColumn("risk_pandas", fast_risk_label(df.amount, df.country, df.customer_status))
df.show()


+--------+------+-------+---------------+------+-----------+
|order_id|amount|country|customer_status|  risk|risk_pandas|
+--------+------+-------+---------------+------+-----------+
|  IN1001|   950|     US|        regular|normal|     normal|
|  IN1002|  3500|     IN|            vip|medium|     medium|
|  IN1003| 12000|     US|        blocked|  high|       high|
|  IN1004|  9000|     UK|        regular|normal|     normal|
|  IN1005|   250|     FR|            vip|normal|     normal|
+--------+------+-------+---------------+------+-----------+



# pivot and unpivot

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, first, last

spark = SparkSession.builder.getOrCreate()

data = [
    ("North", "Jan", 200),
    ("North", "Feb", 150),
    ("South", "Jan", 50),
    ("South", "Feb", 120)
]
columns = ["region", "month", "sales"]
df = spark.createDataFrame(data, columns)
df.show()

+------+-----+-----+
|region|month|sales|
+------+-----+-----+
| North|  Jan|  200|
| North|  Feb|  150|
| South|  Jan|   50|
| South|  Feb|  120|
+------+-----+-----+



In [72]:

pivoted = df.groupBy("region").pivot("month").agg(sum("sales"))
pivoted.show()


+------+---+---+
|region|Feb|Jan|
+------+---+---+
| South|120| 50|
| North|150|200|
+------+---+---+



In [75]:
df.printSchema()

root
 |-- region: string (nullable = true)
 |-- month: string (nullable = true)
 |-- sales: long (nullable = true)



In [74]:
from pyspark.sql.functions import first,last

# Example: pivot with string values, extracting the first string per group
pivoted_str = df.groupBy("region").pivot("month").agg(last("sales"))
pivoted_str.show()

+------+---+---+
|region|Feb|Jan|
+------+---+---+
| South|120| 50|
| North|150|200|
+------+---+---+



In [76]:
# PySpark (3.4+)
melted = pivoted.melt(ids=["region"], values=["Jan", "Feb"], 
                      variableColumnName="month", valueColumnName="sales")
melted.show()


+------+-----+-----+
|region|month|sales|
+------+-----+-----+
| South|  Jan|   50|
| South|  Feb|  120|
| North|  Jan|  200|
| North|  Feb|  150|
+------+-----+-----+



In [78]:
from pyspark.sql.functions import explode

data = [("Anna", ["AI", "ML"]), ("Bala", ["DB", "Web"]), ("Cara", None)]
df2 = spark.createDataFrame(data, ["name", "skills"])
df_exploded = df.select("name", explode("skills").alias("skill"))
df_exploded.show()
    

+----+-----+
|name|skill|
+----+-----+
|Anna|   AI|
|Anna|   ML|
|Bala|   DB|
|Bala|  Web|
+----+-----+



In [80]:
from pyspark.sql.functions import avg, count, max

data = [
    ("Red", 2019, 100),
    ("Red", 2020, 120),
    ("Blue", 2019, 90),
    ("Blue", 2020, 110)
]
cols = ["color", "year", "score"]
df = spark.createDataFrame(data, cols)

agg_df = df.groupBy("color").agg(
    avg("score").alias("avg_score"),
    count("*").alias("n_samples"),
    max("score").alias("max_score")
)
agg_df.show()

+-----+---------+---------+---------+
|color|avg_score|n_samples|max_score|
+-----+---------+---------+---------+
|  Red|    110.0|        2|      120|
| Blue|    100.0|        2|      110|
+-----+---------+---------+---------+



In [81]:

agg_df.filter("avg_score > 100").show()



+-----+---------+---------+---------+
|color|avg_score|n_samples|max_score|
+-----+---------+---------+---------+
|  Red|    110.0|        2|      120|
+-----+---------+---------+---------+



# pyspark.pandas

In [83]:
# Disable ANSI mode for pandas-on-Spark compatibility
spark.conf.set("spark.sql.ansi.enabled", "false")

import pyspark.pandas as ps

pdf = ps.DataFrame({
    'animal': ['cat', 'dog', 'cat', 'dog'],
    'score': [5, 7, 9, 1]
})

pdf_grouped = pdf.groupby('animal').mean()
pdf_grouped = pdf_grouped.sort_values('score', ascending=False)
print(pdf_grouped)

        score
animal       
cat       7.0
dog       4.0


In [84]:
pdf.explode('score')


Unnamed: 0,animal,score
0,cat,5
1,dog,7
2,cat,9
3,dog,1


In [85]:
pdf.melt(id_vars='animal', value_vars=['score'])

Unnamed: 0,animal,variable,value
0,cat,score,5
1,dog,score,7
2,cat,score,9
3,dog,score,1


In [88]:
left = spark.createDataFrame(
    [("A", 10), ("B", 20)], ["key", "val1"]
)
right = spark.createDataFrame(
    [("A", 99), ("C", 88)], ["key", "val2"]
)
left.show()
right.show()

+---+----+
|key|val1|
+---+----+
|  A|  10|
|  B|  20|
+---+----+

+---+----+
|key|val2|
+---+----+
|  A|  99|
|  C|  88|
+---+----+



In [87]:

inner = left.join(right, on="key", how="inner")
left_outer = left.join(right, on="key", how="left_outer")
full_outer = left.join(right, on="key", how="outer")
inner.show()
left_outer.show()
full_outer.show()


+---+----+----+
|key|val1|val2|
+---+----+----+
|  A|  10|  99|
+---+----+----+

+---+----+----+
|key|val1|val2|
+---+----+----+
|  A|  10|  99|
|  B|  20|NULL|
+---+----+----+

+---+----+----+
|key|val1|val2|
+---+----+----+
|  A|  10|  99|
|  B|  20|NULL|
|  C|NULL|  88|
+---+----+----+



In [89]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnull, when, count

spark = SparkSession.builder.getOrCreate()
data = [("Alice", 54, None), ("Bob", None, "A"), ("Cara", 33, "B"), ("Dan", None, None)]
cols = ["name", "age", "grade"]
df = spark.createDataFrame(data, cols)
df.show()

# Count nulls per column
df.select([count(when(isnull(c), c)).alias(c + "_nulls") for c in df.columns]).show()

# Drop rows with any nulls
df_drop = df.dropna()
df_drop.show()

# Fill nulls with default values
df_fill = df.fillna({"age": 0, "grade": "Unknown"})
df_fill.show()


+-----+----+-----+
| name| age|grade|
+-----+----+-----+
|Alice|  54| NULL|
|  Bob|NULL|    A|
| Cara|  33|    B|
|  Dan|NULL| NULL|
+-----+----+-----+

+----------+---------+-----------+
|name_nulls|age_nulls|grade_nulls|
+----------+---------+-----------+
|         0|        2|          2|
+----------+---------+-----------+

+----+---+-----+
|name|age|grade|
+----+---+-----+
|Cara| 33|    B|
+----+---+-----+

+-----+---+-------+
| name|age|  grade|
+-----+---+-------+
|Alice| 54|Unknown|
|  Bob|  0|      A|
| Cara| 33|      B|
|  Dan|  0|Unknown|
+-----+---+-------+



In [92]:
from pyspark.sql.functions import lower, trim

result = (
    df.fillna({"age": 100})
      .filter(df.age > 30)
      .withColumn("grade_clean", lower(trim(df.grade)))
)
result.show()


+-----+---+-----+-----------+
| name|age|grade|grade_clean|
+-----+---+-----+-----------+
|Alice| 54| NULL|       NULL|
| Cara| 33|    B|          b|
+-----+---+-----+-----------+



In [93]:
from pyspark.sql.functions import sum, avg, countDistinct

data = [("A", 100), ("A", 200), ("B", 50), ("B", None), ("C", 30)]
df = spark.createDataFrame(data, ["cat", "val"])

aggd = df.groupBy("cat").agg(
    sum("val").alias("total"),
    avg("val").alias("mean"),
    countDistinct("val").alias("n_unique")
)
aggd.show()


+---+-----+-----+--------+
|cat|total| mean|n_unique|
+---+-----+-----+--------+
|  B|   50| 50.0|       1|
|  C|   30| 30.0|       1|
|  A|  300|150.0|       2|
+---+-----+-----+--------+

