In [1]:
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

# Initialize Spark
spark = SparkSession.builder.appName("PySparkAssessment").getOrCreate()



In [3]:
# Define schemas
customers_schema = StructType([
    StructField("CustomerID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("City", StringType(), True),
    StructField("SignupDate", DateType(), True)
])

orders_schema = StructType([
    StructField("OrderID", IntegerType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Product", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Price", DoubleType(), True),
    StructField("OrderDate", DateType(), True)
])

# Load data
customers_df = spark.read.csv(
    "customers.csv",
    header=True,
    schema=customers_schema,
    dateFormat="yyyy-MM-dd"
)

orders_df = spark.read.csv(
    "orders.csv",
    header=True,
    schema=orders_schema,
    dateFormat="yyyy-MM-dd"
)

print("‚úÖ Data loaded successfully!")

‚úÖ Data loaded successfully!


In [4]:
# 1.1 Print schemas
print("üìã Customers Schema:")
customers_df.printSchema()
print("\nüìã Orders Schema:")
orders_df.printSchema()

# 1.2 Count records
print(f"\nüë• Total customers: {customers_df.count()}")
print(f"üõí Total orders: {orders_df.count()}")

# 1.3 Distinct cities
print("\nüåÜ Distinct cities:")
customers_df.select("City").distinct().show()

üìã Customers Schema:
root
 |-- CustomerID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- City: string (nullable = true)
 |-- SignupDate: date (nullable = true)


üìã Orders Schema:
root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- OrderDate: date (nullable = true)


üë• Total customers: 5
üõí Total orders: 7

üåÜ Distinct cities:
+---------+
|     City|
+---------+
|Bangalore|
|  Chennai|
|   Mumbai|
|    Delhi|
|Hyderabad|
+---------+



In [5]:
# 2.1 Add TotalAmount
orders_df = orders_df.withColumn("TotalAmount", col("Price") * col("Quantity"))

# 2.2 Add OrderYear
orders_df = orders_df.withColumn("OrderYear", year(col("OrderDate")))

# 2.3 Filter high-value orders
print("\nüí∞ High-value orders (>10,000):")
orders_df.filter(col("TotalAmount") > 10000).show()

# 2.4 Drop Email column
customers_df = customers_df.drop("Email")
print("\nüë§ Customers without email column:")
customers_df.show()


üí∞ High-value orders (>10,000):
+-------+----------+-------+-----------+--------+-------+----------+-----------+---------+
|OrderID|CustomerID|Product|   Category|Quantity|  Price| OrderDate|TotalAmount|OrderYear|
+-------+----------+-------+-----------+--------+-------+----------+-----------+---------+
|      1|       101| Laptop|Electronics|       2|50000.0|2024-01-10|   100000.0|     2024|
|      3|       102| Tablet|Electronics|       1|20000.0|2024-02-01|    20000.0|     2024|
|      7|       102|  Phone|Electronics|       1|30000.0|2024-03-02|    30000.0|     2024|
+-------+----------+-------+-----------+--------+-------+----------+-----------+---------+


üë§ Customers without email column:
+----------+-----+---------+----------+
|CustomerID| Name|     City|SignupDate|
+----------+-----+---------+----------+
|       101|  Ali|   Mumbai|2022-05-10|
|       102| Neha|    Delhi|2023-01-15|
|       103| Ravi|Bangalore|2021-11-01|
|       104|Sneha|Hyderabad|2020-07-22|
|       1

In [6]:
# 3.1 Simulate null and fill
from pyspark.sql import Row
from datetime import date

new_row = Row(CustomerID=106, Name="Test", City=None, SignupDate=date(2023, 1, 1))
customers_with_null = customers_df.union(
    spark.createDataFrame([new_row], schema=customers_df.schema)
)
customers_filled = customers_with_null.fillna("Unknown", subset=["City"])
print("\nüîç Customers with null handling:")
customers_filled.show()

# 3.2 Loyalty label
customers_df = customers_df.withColumn(
    "CustomerType",
    when(col("SignupDate") < "2022-01-01", "Loyal").otherwise("New")
)
print("\nüèÜ Customer loyalty labels:")
customers_df.show()

# 3.3 OrderType classification
orders_df = orders_df.withColumn(
    "OrderType",
    when(col("TotalAmount") < 5000, "Low").otherwise("High")
)
print("\nüìä Orders with type classification:")
orders_df.show()


üîç Customers with null handling:
+----------+-----+---------+----------+
|CustomerID| Name|     City|SignupDate|
+----------+-----+---------+----------+
|       101|  Ali|   Mumbai|2022-05-10|
|       102| Neha|    Delhi|2023-01-15|
|       103| Ravi|Bangalore|2021-11-01|
|       104|Sneha|Hyderabad|2020-07-22|
|       105| Amit|  Chennai|2023-03-10|
|       106| Test|  Unknown|2023-01-01|
+----------+-----+---------+----------+


üèÜ Customer loyalty labels:
+----------+-----+---------+----------+------------+
|CustomerID| Name|     City|SignupDate|CustomerType|
+----------+-----+---------+----------+------------+
|       101|  Ali|   Mumbai|2022-05-10|         New|
|       102| Neha|    Delhi|2023-01-15|         New|
|       103| Ravi|Bangalore|2021-11-01|       Loyal|
|       104|Sneha|Hyderabad|2020-07-22|       Loyal|
|       105| Amit|  Chennai|2023-03-10|         New|
+----------+-----+---------+----------+------------+


üìä Orders with type classification:
+-------+------

In [7]:
# 4.1 Join DataFrames
joined_df = customers_df.join(orders_df, "CustomerID", "left")
print("\nü§ù Joined customers and orders:")
joined_df.show()

# 4.2 Stats per city
city_stats = joined_df.groupBy("City").agg(
    count("OrderID").alias("TotalOrders"),
    sum("TotalAmount").alias("TotalRevenue")
)
print("\nüèôÔ∏è Statistics by city:")
city_stats.show()

# 4.3 Top customers by spend
top_customers = joined_df.groupBy("CustomerID", "Name").agg(
    sum("TotalAmount").alias("TotalSpend")
).orderBy(col("TotalSpend").desc()).limit(3)
print("\nüèÜ Top 3 customers by spend:")
top_customers.show()

# 4.4 Products sold by category
category_stats = orders_df.groupBy("Category").agg(
    sum("Quantity").alias("TotalProductsSold")
)
print("\nüì¶ Products sold by category:")
category_stats.show()


ü§ù Joined customers and orders:
+----------+-----+---------+----------+------------+-------+---------+-----------+--------+-------+----------+-----------+---------+---------+
|CustomerID| Name|     City|SignupDate|CustomerType|OrderID|  Product|   Category|Quantity|  Price| OrderDate|TotalAmount|OrderYear|OrderType|
+----------+-----+---------+----------+------------+-------+---------+-----------+--------+-------+----------+-----------+---------+---------+
|       101|  Ali|   Mumbai|2022-05-10|         New|      2|    Mouse|Electronics|       1| 1200.0|2024-01-15|     1200.0|     2024|      Low|
|       101|  Ali|   Mumbai|2022-05-10|         New|      1|   Laptop|Electronics|       2|50000.0|2024-01-10|   100000.0|     2024|     High|
|       102| Neha|    Delhi|2023-01-15|         New|      7|    Phone|Electronics|       1|30000.0|2024-03-02|    30000.0|     2024|     High|
|       102| Neha|    Delhi|2023-01-15|         New|      3|   Tablet|Electronics|       1|20000.0|2024-02-

In [9]:
# 5.1 Create database and tables
spark.sql("CREATE DATABASE IF NOT EXISTS sales")
spark.sql("USE sales")

# Drop table if it exists to avoid LOCATION_ALREADY_EXISTS error
spark.sql("DROP TABLE IF EXISTS sales.customers")
customers_df.write.mode("overwrite").saveAsTable("sales.customers")

# Drop table if it exists to avoid LOCATION_ALREADY_EXISTS error for orders
spark.sql("DROP TABLE IF EXISTS sales.orders")
orders_df.write.mode("overwrite").saveAsTable("sales.orders")


# 5.2 SQL queries
print("\nüìç Orders from Delhi:")
spark.sql("""
    SELECT o.*
    FROM sales.orders o
    JOIN sales.customers c ON o.CustomerID = c.CustomerID
    WHERE c.City = 'Delhi'
""").show()

print("\nüìä Average order value by category:")
spark.sql("""
    SELECT Category, AVG(TotalAmount) as AvgOrderValue
    FROM sales.orders
    GROUP BY Category
""").show()

# 5.3 Create monthly orders view
spark.sql("""
    CREATE OR REPLACE VIEW sales.monthly_orders AS
    SELECT
        date_format(OrderDate, 'yyyy-MM') as Month,
        SUM(TotalAmount) as TotalAmount
    FROM sales.orders
    GROUP BY date_format(OrderDate, 'yyyy-MM')
""")

print("\nüìÖ Monthly orders summary:")
spark.sql("SELECT * FROM sales.monthly_orders").show()


üìç Orders from Delhi:
+-------+----------+-------+-----------+--------+-------+----------+-----------+---------+---------+
|OrderID|CustomerID|Product|   Category|Quantity|  Price| OrderDate|TotalAmount|OrderYear|OrderType|
+-------+----------+-------+-----------+--------+-------+----------+-----------+---------+---------+
|      3|       102| Tablet|Electronics|       1|20000.0|2024-02-01|    20000.0|     2024|     High|
|      7|       102|  Phone|Electronics|       1|30000.0|2024-03-02|    30000.0|     2024|     High|
+-------+----------+-------+-----------+--------+-------+----------+-----------+---------+---------+


üìä Average order value by category:
+-----------+-------------+
|   Category|AvgOrderValue|
+-----------+-------------+
| Stationery|       2500.0|
|Electronics|      37800.0|
|  Furniture|       3500.0|
| Appliances|       5000.0|
+-----------+-------------+


üìÖ Monthly orders summary:
+-------+-----------+
|  Month|TotalAmount|
+-------+-----------+
|2024-02

In [10]:
# 6.1 Mask emails (reload original data)
customers_with_email = spark.read.csv(
    "customers.csv",
    header=True,
    schema=customers_schema,
    dateFormat="yyyy-MM-dd"
)
customers_masked = customers_with_email.withColumn(
    "MaskedEmail",
    regexp_replace(col("Email"), "(.)(.*)(@.*)", "$1**$3")
)
print("\nüîí Customers with masked emails:")
customers_masked.show()

# 6.2 Concatenate Name and City
customers_df = customers_df.withColumn(
    "NameFromCity",
    concat(col("Name"), lit(" from "), col("City"))
)
print("\nüè∑Ô∏è Name from City format:")
customers_df.show()

# 6.3 Customer age in days
customers_df = customers_df.withColumn(
    "AgeInDays",
    datediff(current_date(), col("SignupDate"))
)
print("\nüìÖ Customer age in days:")
customers_df.show()

# 6.4 Extract month name
orders_df = orders_df.withColumn(
    "MonthName",
    date_format(col("OrderDate"), "MMMM")
)
print("\nüóìÔ∏è Orders with month names:")
orders_df.show()


üîí Customers with masked emails:
+----------+-----+-----------------+---------+----------+---------------+
|CustomerID| Name|            Email|     City|SignupDate|    MaskedEmail|
+----------+-----+-----------------+---------+----------+---------------+
|       101|  Ali|    ali@gmail.com|   Mumbai|2022-05-10|  a**@gmail.com|
|       102| Neha|   neha@yahoo.com|    Delhi|2023-01-15|  n**@yahoo.com|
|       103| Ravi| ravi@hotmail.com|Bangalore|2021-11-01|r**@hotmail.com|
|       104|Sneha|sneha@outlook.com|Hyderabad|2020-07-22|s**@outlook.com|
|       105| Amit|   amit@gmail.com|  Chennai|2023-03-10|  a**@gmail.com|
+----------+-----+-----------------+---------+----------+---------------+


üè∑Ô∏è Name from City format:
+----------+-----+---------+----------+------------+--------------------+
|CustomerID| Name|     City|SignupDate|CustomerType|        NameFromCity|
+----------+-----+---------+----------+------------+--------------------+
|       101|  Ali|   Mumbai|2022-05-10|    

In [11]:
# 7.1 Customer tagging UDF
def customer_tag(total_spend):
    if total_spend > 50000: return "Gold"
    elif total_spend >= 10000: return "Silver"
    else: return "Bronze"

tag_udf = udf(customer_tag, StringType())

customer_spend = joined_df.groupBy("CustomerID", "Name").agg(
    sum("TotalAmount").alias("TotalSpend")
).withColumn("CustomerTag", tag_udf(col("TotalSpend")))

print("\nüèÖ Customer spending tiers:")
customer_spend.show()

# 7.2 Shorten product names
orders_df = orders_df.withColumn(
    "ShortProduct",
    concat(substring(col("Product"), 1, 3), lit("..."))
)
print("\n‚úÇÔ∏è Shortened product names:")
orders_df.show()


üèÖ Customer spending tiers:
+----------+-----+----------+-----------+
|CustomerID| Name|TotalSpend|CustomerTag|
+----------+-----+----------+-----------+
|       105| Amit|    2500.0|     Bronze|
|       104|Sneha|    5000.0|     Bronze|
|       101|  Ali|  101200.0|       Gold|
|       102| Neha|   50000.0|     Silver|
|       103| Ravi|    3500.0|     Bronze|
+----------+-----+----------+-----------+


‚úÇÔ∏è Shortened product names:
+-------+----------+---------+-----------+--------+-------+----------+-----------+---------+---------+---------+------------+
|OrderID|CustomerID|  Product|   Category|Quantity|  Price| OrderDate|TotalAmount|OrderYear|OrderType|MonthName|ShortProduct|
+-------+----------+---------+-----------+--------+-------+----------+-----------+---------+---------+---------+------------+
|      1|       101|   Laptop|Electronics|       2|50000.0|2024-01-10|   100000.0|     2024|     High|  January|      Lap...|
|      2|       101|    Mouse|Electronics|       1| 1

In [12]:
# 8.1 Save joined data as Parquet
joined_df.write.mode("overwrite").parquet("joined_data.parquet")

# 8.2 Read back Parquet
parquet_df = spark.read.parquet("joined_data.parquet")
print("\nüìÑ Parquet file schema:")
parquet_df.printSchema()

# 8.3 Create global temp view
joined_df.createGlobalTempView("global_joined_data")
print("\nüåê Global temp view sample:")
spark.sql("SELECT * FROM global_temp.global_joined_data LIMIT 5").show()

# 8.4 Performance comparison
print("\n‚è±Ô∏è Performance comparison CSV vs Parquet:")

# Write orders to parquet for fair comparison
orders_df.write.mode("overwrite").parquet("orders.parquet")

# Time CSV read
start = time.time()
spark.read.csv("orders.csv", header=True, schema=orders_schema, dateFormat="yyyy-MM-dd").count()
csv_time = time.time() - start

# Time Parquet read
start = time.time()
spark.read.parquet("orders.parquet").count()
parquet_time = time.time() - start

print(f"CSV read time: {csv_time:.4f} seconds")
print(f"Parquet read time: {parquet_time:.4f} seconds")

# Stop Spark
spark.stop()
print("\n‚úÖ All tasks completed successfully!")


üìÑ Parquet file schema:
root
 |-- CustomerID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- SignupDate: date (nullable = true)
 |-- CustomerType: string (nullable = true)
 |-- OrderID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- OrderYear: integer (nullable = true)
 |-- OrderType: string (nullable = true)


üåê Global temp view sample:
+----------+----+---------+----------+------------+-------+---------+-----------+--------+-------+----------+-----------+---------+---------+
|CustomerID|Name|     City|SignupDate|CustomerType|OrderID|  Product|   Category|Quantity|  Price| OrderDate|TotalAmount|OrderYear|OrderType|
+----------+----+---------+----------+------------+-------+---------+-----------+--------+-