<a href="https://colab.research.google.com/github/Arun-Alpy/PySpark/blob/main/pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Linux Basic

In [None]:
!git version

git version 2.34.1


In [None]:
print ("PySpark")

PySpark


In [None]:
!whoami

root


In [None]:
!pwd

/content


# PySpark Basics

In [None]:
!pip install pyspark



In [None]:
!pip show pyspark

Name: pyspark
Version: 3.5.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.11/dist-packages
Requires: py4j
Required-by: dataproc-spark-connect


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark=SparkSession.builder.appName('Basics').getOrCreate()

In [None]:
#create dataframe
data=[("Hello","World")]
columns=["Word1","Word2"]
df=spark.createDataFrame(data,columns)

In [None]:
df.show()

+-----+-----+
|Word1|Word2|
+-----+-----+
|Hello|World|
+-----+-----+



## Basic Transformations and Actions

In [None]:
columns=["Name","Department","Salary"]
data = [
    ("John", "Sales", 3000),
    ("Jane", "Finance", 4000),
    ("Mike", "Sales", 3500),
    ("Alice", "Finance", 3800),
    ("Bob", "IT", 4500)
]
df = spark.createDataFrame(data, columns)
df.show()

+-----+----------+------+
| Name|Department|Salary|
+-----+----------+------+
| John|     Sales|  3000|
| Jane|   Finance|  4000|
| Mike|     Sales|  3500|
|Alice|   Finance|  3800|
|  Bob|        IT|  4500|
+-----+----------+------+



In [None]:
#filter:employee with salary >3500
df_filtered=df.filter(df.Salary>3500)
df_filtered.show()

+-----+----------+------+
| Name|Department|Salary|
+-----+----------+------+
| Jane|   Finance|  4000|
|Alice|   Finance|  3800|
|  Bob|        IT|  4500|
+-----+----------+------+



In [None]:
#group by and aggregate: Avg salary by Department
df_grouped=df.groupBy("Department").agg({"Salary":"avg"})
df_grouped.show()

+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|     Sales|     3250.0|
|   Finance|     3900.0|
|        IT|     4500.0|
+----------+-----------+



In [None]:
#group by and aggregate: Avg salary by Department
df_grouped=df.groupBy("Department").avg("Salary")
df_grouped.show()

+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|     Sales|     3250.0|
|   Finance|     3900.0|
|        IT|     4500.0|
+----------+-----------+



In [None]:
#Add a new column: Salary with bonus (10%)
from pyspark.sql.functions import col
exp=col("Salary")*1.1
df_with_bonus=df.withColumn("Salary_10%_bonus",exp)
df_with_bonus.show()

+-----+----------+------+------------------+
| Name|Department|Salary|  Salary_10%_bonus|
+-----+----------+------+------------------+
| John|     Sales|  3000|3300.0000000000005|
| Jane|   Finance|  4000|            4400.0|
| Mike|     Sales|  3500|3850.0000000000005|
|Alice|   Finance|  3800|            4180.0|
|  Bob|        IT|  4500|            4950.0|
+-----+----------+------+------------------+



In [None]:

from pyspark.sql.functions import col,upper,lower,concat_ws,length,when

df.show()

+-----+----------+------+
| Name|Department|Salary|
+-----+----------+------+
| John|     Sales|  3000|
| Jane|   Finance|  4000|
| Mike|     Sales|  3500|
|Alice|   Finance|  3800|
|  Bob|        IT|  4500|
+-----+----------+------+



In [None]:
#Change case transformation
df_upper=df.withColumn("Name_upper",upper(col("Name")))
df_lower=df.withColumn("Name_lower",lower(col("Name")))
df_upper.show()
df_lower.show()

+-----+----------+------+----------+
| Name|Department|Salary|Name_upper|
+-----+----------+------+----------+
| John|     Sales|  3000|      JOHN|
| Jane|   Finance|  4000|      JANE|
| Mike|     Sales|  3500|      MIKE|
|Alice|   Finance|  3800|     ALICE|
|  Bob|        IT|  4500|       BOB|
+-----+----------+------+----------+

+-----+----------+------+----------+
| Name|Department|Salary|Name_lower|
+-----+----------+------+----------+
| John|     Sales|  3000|      john|
| Jane|   Finance|  4000|      jane|
| Mike|     Sales|  3500|      mike|
|Alice|   Finance|  3800|     alice|
|  Bob|        IT|  4500|       bob|
+-----+----------+------+----------+



In [None]:
#concatenate columns
df_concat=df.withColumn("Full_Info",concat_ws(",","Name","Department"))
df_concat.show()

+-----+----------+------+-------------+
| Name|Department|Salary|    Full_Info|
+-----+----------+------+-------------+
| John|     Sales|  3000|   John,Sales|
| Jane|   Finance|  4000| Jane,Finance|
| Mike|     Sales|  3500|   Mike,Sales|
|Alice|   Finance|  3800|Alice,Finance|
|  Bob|        IT|  4500|       Bob,IT|
+-----+----------+------+-------------+



In [None]:
# string length of names in new DF
df_length=df.withColumn("Name_length",length(col("Name")))
df_length.show()

+-----+----------+------+-----------+
| Name|Department|Salary|Name_length|
+-----+----------+------+-----------+
| John|     Sales|  3000|          4|
| Jane|   Finance|  4000|          4|
| Mike|     Sales|  3500|          4|
|Alice|   Finance|  3800|          5|
|  Bob|        IT|  4500|          3|
+-----+----------+------+-----------+



In [None]:
#conditional Columns(Salary category)
df_conditional=df.withColumn("Salary_Category",
                              when(col("Salary")>3500,"High")
                              .when(col("Salary")>3000,"Medium")
                              .otherwise("Low"))
df_conditional.show()

+-----+----------+------+---------------+
| Name|Department|Salary|Salary_Category|
+-----+----------+------+---------------+
| John|     Sales|  3000|            Low|
| Jane|   Finance|  4000|           High|
| Mike|     Sales|  3500|         Medium|
|Alice|   Finance|  3800|           High|
|  Bob|        IT|  4500|           High|
+-----+----------+------+---------------+



In [None]:
# rename column (Salary to Base_Salary)
df_renamed=df.withColumnRenamed("Salary","Base_Salary")
df_renamed.show()

+-----+----------+-----------+
| Name|Department|Base_Salary|
+-----+----------+-----------+
| John|     Sales|       3000|
| Jane|   Finance|       4000|
| Mike|     Sales|       3500|
|Alice|   Finance|       3800|
|  Bob|        IT|       4500|
+-----+----------+-----------+



In [None]:
#group by department and count employees

df.groupBy("Department").count().show()

+----------+-----+
|Department|count|
+----------+-----+
|     Sales|    2|
|   Finance|    2|
|        IT|    1|
+----------+-----+



In [None]:
df.groupBy("Department").agg({"Salary":"sum"}).show()

+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
|     Sales|       6500|
|   Finance|       7800|
|        IT|       4500|
+----------+-----------+



In [None]:
#group by department and calculate multiple Aggregations
from pyspark.sql.functions import avg, max, min
df.groupBy("Department").agg(
    avg("Salary").alias("avg_salary"),
    max("Salary").alias("max_salary"),
    min("Salary").alias("min_salary")
).show()

+----------+----------+----------+----------+
|Department|avg_salary|max_salary|min_salary|
+----------+----------+----------+----------+
|     Sales|    3250.0|      3500|      3000|
|   Finance|    3900.0|      4000|      3800|
|        IT|    4500.0|      4500|      4500|
+----------+----------+----------+----------+



In [None]:
# Create another DataFrame for department info
dept_data = [
    ("Sales", "Building A"),
    ("Finance", "Building B"),
    ("IT", "Building C")
]
dept_columns = ["Department", "Location"]

In [None]:
df.show()

+-----+----------+------+
| Name|Department|Salary|
+-----+----------+------+
| John|     Sales|  3000|
| Jane|   Finance|  4000|
| Mike|     Sales|  3500|
|Alice|   Finance|  3800|
|  Bob|        IT|  4500|
+-----+----------+------+



In [None]:
dept_df=spark.createDataFrame(dept_data,dept_columns)
dept_df.show()


+----------+----------+
|Department|  Location|
+----------+----------+
|     Sales|Building A|
|   Finance|Building B|
|        IT|Building C|
+----------+----------+



In [None]:
#join employees with department info

joined_df=df.join(dept_df,on="Department",how="inner")
joined_df.show()

+----------+-----+------+----------+
|Department| Name|Salary|  Location|
+----------+-----+------+----------+
|   Finance| Jane|  4000|Building B|
|   Finance|Alice|  3800|Building B|
|        IT|  Bob|  4500|Building C|
|     Sales| John|  3000|Building A|
|     Sales| Mike|  3500|Building A|
+----------+-----+------+----------+



In [None]:
# Employee DataFrame
emp_data = [
    (1, "John", "Sales", 3000),
    (2, "Jane", "Finance", 4000),
    (3, "Mike", "Sales", 3500),
    (4, "Alice", "HR", 3800),
    (5, "Bob", "IT", 4500),
    (6, "Sam", "Support", 3200)
]
emp_cols = ["EmpID", "Name", "Department", "Salary"]
emp_df = spark.createDataFrame(emp_data, emp_cols)

# Department DataFrame
dept_data = [
    ("Sales", "Building A"),
    ("Finance", "Building B"),
    ("IT", "Building C"),
    ("Admin", "Building D")
]
dept_cols = ["Department", "Location"]
dept_df = spark.createDataFrame(dept_data, dept_cols)

# Display both
emp_df.show()
dept_df.show()

+-----+-----+----------+------+
|EmpID| Name|Department|Salary|
+-----+-----+----------+------+
|    1| John|     Sales|  3000|
|    2| Jane|   Finance|  4000|
|    3| Mike|     Sales|  3500|
|    4|Alice|        HR|  3800|
|    5|  Bob|        IT|  4500|
|    6|  Sam|   Support|  3200|
+-----+-----+----------+------+

+----------+----------+
|Department|  Location|
+----------+----------+
|     Sales|Building A|
|   Finance|Building B|
|        IT|Building C|
|     Admin|Building D|
+----------+----------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, month


In [None]:
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()



In [None]:
sales_data = [
    (1001, 101, 501, "2025-07-10 08:23:00", 3, 25.5),
    (1002, 102, 502, "2025-07-11 09:45:00", 2, 15.0),
    (1003, 103, 503, "2025-07-12 10:15:00", 1, 30.0),
    (1004, 101, 504, "2025-07-13 12:20:00", 5, 25.5),
    (1005, 105, 505, "2025-07-14 14:35:00", 10, 45.0),
    (1006, 102, 506, "2025-07-15 16:00:00", 4, 15.0)
]
sales_columns = ["sale_id", "product_id", "customer_id", "sale_date", "quantity", "price"]
sales_df = spark.createDataFrame(sales_data, sales_columns)

In [None]:
# Product Dataset
product_data = [
    (101, "Widget A", "Gadgets"),
    (102, "Widget B", "Gadgets"),
    (103, "Widget C", "Electronics"),
    (104, "Widget D", "Electronics"),
    (105, "Widget E", "Home & Living")
]
product_columns = ["product_id", "product_name", "category"]
product_df = spark.createDataFrame(product_data, product_columns)


In [None]:
customer_data = [
    (501, "Alice", "alice@example.com", "2025-05-20 10:10:00"),
    (502, "Bob", "bob@example.com", "2025-06-15 14:00:00"),
    (503, "Charlie", "charlie@example.com", "2025-04-05 09:50:00"),
    (504, "David", "david@example.com", "2025-07-01 12:25:00"),
    (505, "Emma", "emma@example.com", "2025-07-10 15:30:00"),
    (506, "Frank", "frank@example.com", "2025-03-23 17:00:00")
]
customer_columns = ["customer_id", "customer_name", "email", "join_date"]
customer_df = spark.createDataFrame(customer_data, customer_columns)


In [None]:
total_revenue_per_product = sales_df.withColumn("revenue", col("quantity") * col("price")) \
    .groupBy("product_id") \
    .agg(sum("revenue").alias("total_revenue"))

total_revenue_per_product.show()

+----------+-------------+
|product_id|total_revenue|
+----------+-------------+
|       103|         30.0|
|       101|        204.0|
|       102|         90.0|
|       105|        450.0|
+----------+-------------+



In [None]:
total_quantity_per_customer = sales_df.groupBy("customer_id") \
    .agg(sum("quantity").alias("total_quantity"))

total_quantity_per_customer.show()



+-----------+--------------+
|customer_id|total_quantity|
+-----------+--------------+
|        502|             2|
|        501|             3|
|        503|             1|
|        504|             5|
|        506|             4|
|        505|            10|
+-----------+--------------+



In [None]:
average_revenue_per_customer = sales_df.withColumn("revenue", col("quantity") * col("price")) \
    .groupBy("customer_id") \
    .agg(avg("revenue").alias("average_revenue"))

average_revenue_per_customer.show()

+-----------+---------------+
|customer_id|average_revenue|
+-----------+---------------+
|        502|           30.0|
|        501|           76.5|
|        503|           30.0|
|        504|          127.5|
|        506|           60.0|
|        505|          450.0|
+-----------+---------------+



In [None]:
monthly_sales_total = sales_df.withColumn("month", month(col("sale_date"))) \
    .withColumn("revenue", col("quantity") * col("price")) \
    .groupBy("month") \
    .agg(sum("revenue").alias("total_monthly_revenue"))

monthly_sales_total.show()

+-----+---------------------+
|month|total_monthly_revenue|
+-----+---------------------+
|    7|                774.0|
+-----+---------------------+



In [None]:
sales_with_category = sales_df.join(product_df, "product_id")
sales_per_category = sales_with_category.groupBy("category") \
    .count() \
    .withColumnRenamed("count", "total_sales")

sales_per_category.show()

+-------------+-----------+
|     category|total_sales|
+-------------+-----------+
|  Electronics|          1|
|      Gadgets|          4|
|Home & Living|          1|
+-------------+-----------+



In [None]:
from pyspark.sql.functions import desc

top_3_products = sales_df.withColumn("revenue", col("quantity") * col("price")) \
    .groupBy("product_id") \
    .agg(sum("revenue").alias("total_revenue")) \
    .orderBy(desc("total_revenue")) \
    .limit(3)

top_3_products.show()

+----------+-------------+
|product_id|total_revenue|
+----------+-------------+
|       105|        450.0|
|       101|        204.0|
|       102|         90.0|
+----------+-------------+



In [None]:
sales_with_product_info = sales_df.join(product_df, "product_id")
sales_with_product_info.show()

+----------+-------+-----------+-------------------+--------+-----+------------+-------------+
|product_id|sale_id|customer_id|          sale_date|quantity|price|product_name|     category|
+----------+-------+-----------+-------------------+--------+-----+------------+-------------+
|       101|   1001|        501|2025-07-10 08:23:00|       3| 25.5|    Widget A|      Gadgets|
|       101|   1004|        504|2025-07-13 12:20:00|       5| 25.5|    Widget A|      Gadgets|
|       102|   1002|        502|2025-07-11 09:45:00|       2| 15.0|    Widget B|      Gadgets|
|       102|   1006|        506|2025-07-15 16:00:00|       4| 15.0|    Widget B|      Gadgets|
|       103|   1003|        503|2025-07-12 10:15:00|       1| 30.0|    Widget C|  Electronics|
|       105|   1005|        505|2025-07-14 14:35:00|      10| 45.0|    Widget E|Home & Living|
+----------+-------+-----------+-------------------+--------+-----+------------+-------------+



In [None]:
sales_with_customer_info = sales_df.join(customer_df, "customer_id")
sales_with_customer_info.show()

+-----------+-------+----------+-------------------+--------+-----+-------------+-------------------+-------------------+
|customer_id|sale_id|product_id|          sale_date|quantity|price|customer_name|              email|          join_date|
+-----------+-------+----------+-------------------+--------+-----+-------------+-------------------+-------------------+
|        501|   1001|       101|2025-07-10 08:23:00|       3| 25.5|        Alice|  alice@example.com|2025-05-20 10:10:00|
|        502|   1002|       102|2025-07-11 09:45:00|       2| 15.0|          Bob|    bob@example.com|2025-06-15 14:00:00|
|        503|   1003|       103|2025-07-12 10:15:00|       1| 30.0|      Charlie|charlie@example.com|2025-04-05 09:50:00|
|        504|   1004|       101|2025-07-13 12:20:00|       5| 25.5|        David|  david@example.com|2025-07-01 12:25:00|
|        505|   1005|       105|2025-07-14 14:35:00|      10| 45.0|         Emma|   emma@example.com|2025-07-10 15:30:00|
|        506|   1006|   

In [None]:
inner_join_gadgets = sales_df.join(product_df, "product_id", "inner") \
    .filter(product_df.category == "Gadgets")

inner_join_gadgets.show()

+----------+-------+-----------+-------------------+--------+-----+------------+--------+
|product_id|sale_id|customer_id|          sale_date|quantity|price|product_name|category|
+----------+-------+-----------+-------------------+--------+-----+------------+--------+
|       101|   1001|        501|2025-07-10 08:23:00|       3| 25.5|    Widget A| Gadgets|
|       101|   1004|        504|2025-07-13 12:20:00|       5| 25.5|    Widget A| Gadgets|
|       102|   1002|        502|2025-07-11 09:45:00|       2| 15.0|    Widget B| Gadgets|
|       102|   1006|        506|2025-07-15 16:00:00|       4| 15.0|    Widget B| Gadgets|
+----------+-------+-----------+-------------------+--------+-----+------------+--------+



In [None]:
left_join_sales_product = sales_df.join(product_df, "product_id", "left")
left_join_sales_product.show()

+----------+-------+-----------+-------------------+--------+-----+------------+-------------+
|product_id|sale_id|customer_id|          sale_date|quantity|price|product_name|     category|
+----------+-------+-----------+-------------------+--------+-----+------------+-------------+
|       103|   1003|        503|2025-07-12 10:15:00|       1| 30.0|    Widget C|  Electronics|
|       101|   1001|        501|2025-07-10 08:23:00|       3| 25.5|    Widget A|      Gadgets|
|       102|   1002|        502|2025-07-11 09:45:00|       2| 15.0|    Widget B|      Gadgets|
|       105|   1005|        505|2025-07-14 14:35:00|      10| 45.0|    Widget E|Home & Living|
|       101|   1004|        504|2025-07-13 12:20:00|       5| 25.5|    Widget A|      Gadgets|
|       102|   1006|        506|2025-07-15 16:00:00|       4| 15.0|    Widget B|      Gadgets|
+----------+-------+-----------+-------------------+--------+-----+------------+-------------+



In [None]:
from pyspark.sql.functions import col

sales_alias1 = sales_df.alias("s1")
sales_alias2 = sales_df.alias("s2")

self_join_df = sales_alias1.join(sales_alias2, col("s1.product_id") == col("s2.product_id")) \
    .filter(col("s1.sale_id") < col("s2.sale_id"))

self_join_df.select("s1.product_id", "s1.sale_date", "s2.sale_date").show()

+----------+-------------------+-------------------+
|product_id|          sale_date|          sale_date|
+----------+-------------------+-------------------+
|       101|2025-07-10 08:23:00|2025-07-13 12:20:00|
|       102|2025-07-11 09:45:00|2025-07-15 16:00:00|
+----------+-------------------+-------------------+



In [None]:
full_outer_join_sales_product = sales_df.join(product_df, "product_id", "full")
full_outer_join_sales_product.show()


+----------+-------+-----------+-------------------+--------+-----+------------+-------------+
|product_id|sale_id|customer_id|          sale_date|quantity|price|product_name|     category|
+----------+-------+-----------+-------------------+--------+-----+------------+-------------+
|       101|   1001|        501|2025-07-10 08:23:00|       3| 25.5|    Widget A|      Gadgets|
|       101|   1004|        504|2025-07-13 12:20:00|       5| 25.5|    Widget A|      Gadgets|
|       102|   1002|        502|2025-07-11 09:45:00|       2| 15.0|    Widget B|      Gadgets|
|       102|   1006|        506|2025-07-15 16:00:00|       4| 15.0|    Widget B|      Gadgets|
|       103|   1003|        503|2025-07-12 10:15:00|       1| 30.0|    Widget C|  Electronics|
|       104|   NULL|       NULL|               NULL|    NULL| NULL|    Widget D|  Electronics|
|       105|   1005|        505|2025-07-14 14:35:00|      10| 45.0|    Widget E|Home & Living|
+----------+-------+-----------+------------------

In [None]:
complete_df = sales_df.join(product_df, "product_id") \
    .join(customer_df, "customer_id")

complete_df.show()

+-----------+----------+-------+-------------------+--------+-----+------------+-------------+-------------+-------------------+-------------------+
|customer_id|product_id|sale_id|          sale_date|quantity|price|product_name|     category|customer_name|              email|          join_date|
+-----------+----------+-------+-------------------+--------+-----+------------+-------------+-------------+-------------------+-------------------+
|        502|       102|   1002|2025-07-11 09:45:00|       2| 15.0|    Widget B|      Gadgets|          Bob|    bob@example.com|2025-06-15 14:00:00|
|        501|       101|   1001|2025-07-10 08:23:00|       3| 25.5|    Widget A|      Gadgets|        Alice|  alice@example.com|2025-05-20 10:10:00|
|        503|       103|   1003|2025-07-12 10:15:00|       1| 30.0|    Widget C|  Electronics|      Charlie|charlie@example.com|2025-04-05 09:50:00|
|        504|       101|   1004|2025-07-13 12:20:00|       5| 25.5|    Widget A|      Gadgets|        Davi

In [None]:
from pyspark.sql.functions import col

filtered_sales_df = sales_df.filter(col("sale_date").between("2025-07-10", "2025-07-15"))
filtered_sales_df.show()

+-------+----------+-----------+-------------------+--------+-----+
|sale_id|product_id|customer_id|          sale_date|quantity|price|
+-------+----------+-----------+-------------------+--------+-----+
|   1001|       101|        501|2025-07-10 08:23:00|       3| 25.5|
|   1002|       102|        502|2025-07-11 09:45:00|       2| 15.0|
|   1003|       103|        503|2025-07-12 10:15:00|       1| 30.0|
|   1004|       101|        504|2025-07-13 12:20:00|       5| 25.5|
|   1005|       105|        505|2025-07-14 14:35:00|      10| 45.0|
+-------+----------+-----------+-------------------+--------+-----+



In [None]:
from pyspark.sql.functions import desc

top_5_customers = sales_df.withColumn("total_spend", col("quantity") * col("price")) \
    .join(customer_df, "customer_id") \
    .groupBy("customer_id", "customer_name") \
    .agg(sum("total_spend").alias("total_spending")) \
    .orderBy(desc("total_spending")) \
    .limit(5)

top_5_customers.show()

+-----------+-------------+--------------+
|customer_id|customer_name|total_spending|
+-----------+-------------+--------------+
|        505|         Emma|         450.0|
|        504|        David|         127.5|
|        501|        Alice|          76.5|
|        506|        Frank|          60.0|
|        502|          Bob|          30.0|
+-----------+-------------+--------------+



In [None]:
from pyspark.sql.functions import when

customer_spending = sales_df.withColumn("total_spend", col("quantity") * col("price")) \
    .join(customer_df, "customer_id") \
    .groupBy("customer_id", "customer_name") \
    .agg(sum("total_spend").alias("total_spending"))

categorized_customers = customer_spending.withColumn("spending_category",
    when(col("total_spending") < 100, "Low")
    .when((col("total_spending") >= 100) & (col("total_spending") <= 300), "Medium")
    .otherwise("High")
)

categorized_customers.show()

+-----------+-------------+--------------+-----------------+
|customer_id|customer_name|total_spending|spending_category|
+-----------+-------------+--------------+-----------------+
|        501|        Alice|          76.5|              Low|
|        502|          Bob|          30.0|              Low|
|        503|      Charlie|          30.0|              Low|
|        504|        David|         127.5|           Medium|
|        505|         Emma|         450.0|             High|
|        506|        Frank|          60.0|              Low|
+-----------+-------------+--------------+-----------------+



In [None]:
from pyspark.sql.functions import min, max

customer_purchase_dates = sales_df.groupBy("customer_id") \
    .agg(
        min("sale_date").alias("first_purchase_date"),
        max("sale_date").alias("last_purchase_date")
    )

customer_purchase_dates_with_names = customer_purchase_dates.join(customer_df, "customer_id") \
    .select("customer_id", "customer_name", "first_purchase_date", "last_purchase_date")

customer_purchase_dates_with_names.show()

+-----------+-------------+-------------------+-------------------+
|customer_id|customer_name|first_purchase_date| last_purchase_date|
+-----------+-------------+-------------------+-------------------+
|        502|          Bob|2025-07-11 09:45:00|2025-07-11 09:45:00|
|        501|        Alice|2025-07-10 08:23:00|2025-07-10 08:23:00|
|        503|      Charlie|2025-07-12 10:15:00|2025-07-12 10:15:00|
|        504|        David|2025-07-13 12:20:00|2025-07-13 12:20:00|
|        506|        Frank|2025-07-15 16:00:00|2025-07-15 16:00:00|
|        505|         Emma|2025-07-14 14:35:00|2025-07-14 14:35:00|
+-----------+-------------+-------------------+-------------------+



In [None]:
from pyspark.sql.functions import max as max_

# Find the most recent sale date in the entire dataset
most_recent_date = sales_df.select(max_("sale_date")).first()[0]
print(f"Most recent sale date: {most_recent_date}")

from pyspark.sql.functions import to_date, lit, datediff

# Calculate the date 30 days before the most recent date
thirty_days_before = sales_df.withColumn("date", to_date(lit(most_recent_date))) \
    .selectExpr("date_sub(date, 30) as cutoff_date").first().cutoff_date

print(f"30-day cutoff date: {thirty_days_before}")

# Find the last purchase date for each customer
customer_last_purchase = sales_df.groupBy("customer_id") \
    .agg(max_("sale_date").alias("last_purchase_date"))

# Find customers whose last purchase was before the 30-day cutoff
churned_customers = customer_last_purchase.filter(col("last_purchase_date") < lit(thirty_days_before))

churned_customers_with_names = churned_customers.join(customer_df, "customer_id") \
    .select("customer_id", "customer_name", "last_purchase_date")

churned_customers_with_names.show()

Most recent sale date: 2025-07-15 16:00:00
30-day cutoff date: 2025-06-15
+-----------+-------------+------------------+
|customer_id|customer_name|last_purchase_date|
+-----------+-------------+------------------+
+-----------+-------------+------------------+



# Adv PySpark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("AdvancedOps").getOrCreate()

data = [
    (1, "Alice", 2000, ["math", "science"], {"city": "NYC", "zip": "10001"}),
    (2, "Bob", 1500, ["english"], {"city": "SF", "zip": "94105"}),
    (3, "Charlie", 2200, ["math", "history", "science"], {"city": "NYC", "zip": "10001"}),
    (4, "David", 1200, ["art"], {"city": "LA", "zip": "90001"}),
]

df = spark.createDataFrame(data, schema=["id", "name", "salary", "subjects", "address"])
df.show(truncate=False)

+---+-------+------+------------------------+---------------------------+
|id |name   |salary|subjects                |address                    |
+---+-------+------+------------------------+---------------------------+
|1  |Alice  |2000  |[math, science]         |{zip -> 10001, city -> NYC}|
|2  |Bob    |1500  |[english]               |{zip -> 94105, city -> SF} |
|3  |Charlie|2200  |[math, history, science]|{zip -> 10001, city -> NYC}|
|4  |David  |1200  |[art]                   |{zip -> 90001, city -> LA} |
+---+-------+------+------------------------+---------------------------+



In [5]:
from pyspark.sql.window import Window
window_spec=Window.partitionBy("address.city").orderBy("salary")
df.withColumn("rank",rank().over(window_spec)).show()

+---+-------+------+--------------------+--------------------+----+
| id|   name|salary|            subjects|             address|rank|
+---+-------+------+--------------------+--------------------+----+
|  4|  David|  1200|               [art]|{zip -> 90001, ci...|   1|
|  1|  Alice|  2000|     [math, science]|{zip -> 10001, ci...|   1|
|  3|Charlie|  2200|[math, history, s...|{zip -> 10001, ci...|   2|
|  2|    Bob|  1500|           [english]|{zip -> 94105, ci...|   1|
+---+-------+------+--------------------+--------------------+----+



In [6]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank, dense_rank, max, sum, avg

# Employee Data
data = [
    (1, "John", "Sales", 3000),
    (2, "Jane", "Finance", 4000),
    (3, "Mike", "Sales", 3500),
    (4, "Alice", "Finance", 3800),
    (5, "Bob", "IT", 4500),
    (6, "Tom", "Sales", 3700),
    (7, "Jerry", "Finance", 4200),
    (8, "Sam", "IT", 4700),
    (9, "Steve", "Sales", 3100),
    (10, "Rachel", "IT", 4600)
]
columns = ["EmpID", "Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)

df.show()

+-----+------+----------+------+
|EmpID|  Name|Department|Salary|
+-----+------+----------+------+
|    1|  John|     Sales|  3000|
|    2|  Jane|   Finance|  4000|
|    3|  Mike|     Sales|  3500|
|    4| Alice|   Finance|  3800|
|    5|   Bob|        IT|  4500|
|    6|   Tom|     Sales|  3700|
|    7| Jerry|   Finance|  4200|
|    8|   Sam|        IT|  4700|
|    9| Steve|     Sales|  3100|
|   10|Rachel|        IT|  4600|
+-----+------+----------+------+



In [9]:
windowSpec=Window.partitionBy(("Department").orderBy(col("Salary")).desc())
df.withColumn("Rank",rank().over(windowSpec)).show()

AttributeError: 'str' object has no attribute 'orderBy'

In [13]:
windowSpec=Window.partitionBy("Department")
df.withColumn("Max_Salary",max("Salary").over(windowSpec)).show()

+-----+------+----------+------+----------+
|EmpID|  Name|Department|Salary|Max_Salary|
+-----+------+----------+------+----------+
|    2|  Jane|   Finance|  4000|      4200|
|    4| Alice|   Finance|  3800|      4200|
|    7| Jerry|   Finance|  4200|      4200|
|    5|   Bob|        IT|  4500|      4700|
|    8|   Sam|        IT|  4700|      4700|
|   10|Rachel|        IT|  4600|      4700|
|    1|  John|     Sales|  3000|      3700|
|    3|  Mike|     Sales|  3500|      3700|
|    6|   Tom|     Sales|  3700|      3700|
|    9| Steve|     Sales|  3100|      3700|
+-----+------+----------+------+----------+



In [16]:
windowSpec=Window.partitionBy("Department")
df.withColumn("Max_Salary",max("Salary").over(windowSpec)).select(col("Department"),col("Max_Salary")).distinct().show()

+----------+----------+
|Department|Max_Salary|
+----------+----------+
|   Finance|      4200|
|        IT|      4700|
|     Sales|      3700|
+----------+----------+



In [17]:
windowSpec=Window.partitionBy("Department").orderBy("Salary").rowsBetween(Window.unboundedPreceding,0)
df.withColumn("Cumulative_Salary",sum("Salary").over(windowSpec)).show()

+-----+------+----------+------+-----------------+
|EmpID|  Name|Department|Salary|Cumulative_Salary|
+-----+------+----------+------+-----------------+
|    4| Alice|   Finance|  3800|             3800|
|    2|  Jane|   Finance|  4000|             7800|
|    7| Jerry|   Finance|  4200|            12000|
|    5|   Bob|        IT|  4500|             4500|
|   10|Rachel|        IT|  4600|             9100|
|    8|   Sam|        IT|  4700|            13800|
|    1|  John|     Sales|  3000|             3000|
|    9| Steve|     Sales|  3100|             6100|
|    3|  Mike|     Sales|  3500|             9600|
|    6|   Tom|     Sales|  3700|            13300|
+-----+------+----------+------+-----------------+



In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("AdvancedOps").getOrCreate()

data = [
    (1, "Alice", 2000, ["math", "science"], {"city": "NYC", "zip": "10001"}),
    (2, "Bob", 1500, ["english"], {"city": "SF", "zip": "94105"}),
    (3, "Charlie", 2200, ["math", "history", "science"], {"city": "NYC", "zip": "10001"}),
    (4, "David", 1200, ["art"], {"city": "LA", "zip": "90001"}),
]

df = spark.createDataFrame(data, schema=["id", "name", "salary", "subjects", "address"])
df.show(truncate=False)

+---+-------+------+------------------------+---------------------------+
|id |name   |salary|subjects                |address                    |
+---+-------+------+------------------------+---------------------------+
|1  |Alice  |2000  |[math, science]         |{zip -> 10001, city -> NYC}|
|2  |Bob    |1500  |[english]               |{zip -> 94105, city -> SF} |
|3  |Charlie|2200  |[math, history, science]|{zip -> 10001, city -> NYC}|
|4  |David  |1200  |[art]                   |{zip -> 90001, city -> LA} |
+---+-------+------+------------------------+---------------------------+



In [19]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark Use Case Activities") \
    .getOrCreate()

from pyspark.sql.functions import col, rank, avg, udf, pandas_udf
from pyspark.sql.window import Window
import pandas as pd

employees_data = [
    (1, "Alice", "HR", 3000),
    (2, "Bob", "IT", 4000),
    (3, "Cathy", "HR", 3500),
    (4, "David", "IT", 4500),
    (5, "Eve", "Finance", 5000),
    (6, "Frank", "Finance", 4800),
]

employees_df = spark.createDataFrame(employees_data, ["id", "name", "department", "salary"])
employees_df.show()

departments_data = [
    ("HR", "New York"),
    ("IT", "San Francisco"),
    ("Finance", "Chicago"),
]

departments_df = spark.createDataFrame(departments_data, ["department", "location"])
departments_df.show()

+---+-----+----------+------+
| id| name|department|salary|
+---+-----+----------+------+
|  1|Alice|        HR|  3000|
|  2|  Bob|        IT|  4000|
|  3|Cathy|        HR|  3500|
|  4|David|        IT|  4500|
|  5|  Eve|   Finance|  5000|
|  6|Frank|   Finance|  4800|
+---+-----+----------+------+

+----------+-------------+
|department|     location|
+----------+-------------+
|        HR|     New York|
|        IT|San Francisco|
|   Finance|      Chicago|
+----------+-------------+



In [20]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Define the window spec: partition by department and order by salary descending
windowSpec = Window.partitionBy("department").orderBy(col("salary").desc())

# Apply the rank function using the window spec
ranked_employees_df = employees_df.withColumn("salary_rank", rank().over(windowSpec))

# Show the result
ranked_employees_df.show()


+---+-----+----------+------+-----------+
| id| name|department|salary|salary_rank|
+---+-----+----------+------+-----------+
|  5|  Eve|   Finance|  5000|          1|
|  6|Frank|   Finance|  4800|          2|
|  3|Cathy|        HR|  3500|          1|
|  1|Alice|        HR|  3000|          2|
|  4|David|        IT|  4500|          1|
|  2|  Bob|        IT|  4000|          2|
+---+-----+----------+------+-----------+



In [22]:
# Average Salary Per Department (Using Window)
windowSpec=Window.partitionBy("department")
avg_salary_per_dept_df=employees_df.withColumn("avg_salary",avg("salary").over(windowSpec))
avg_salary_per_dept_df.show()


+---+-----+----------+------+----------+
| id| name|department|salary|avg_salary|
+---+-----+----------+------+----------+
|  5|  Eve|   Finance|  5000|    4900.0|
|  6|Frank|   Finance|  4800|    4900.0|
|  1|Alice|        HR|  3000|    3250.0|
|  3|Cathy|        HR|  3500|    3250.0|
|  2|  Bob|        IT|  4000|    4250.0|
|  4|David|        IT|  4500|    4250.0|
+---+-----+----------+------+----------+



In [23]:
from pyspark.sql.functions import avg
from pyspark.sql.window import Window

# Define window spec to partition by department
windowSpec = Window.partitionBy("department")

# Calculate average salary using window function
employees_with_avg_salary = employees_df.withColumn(
    "avg_salary_per_dept", avg("salary").over(windowSpec)
)

# Show the result
employees_with_avg_salary.show()


+---+-----+----------+------+-------------------+
| id| name|department|salary|avg_salary_per_dept|
+---+-----+----------+------+-------------------+
|  5|  Eve|   Finance|  5000|             4900.0|
|  6|Frank|   Finance|  4800|             4900.0|
|  1|Alice|        HR|  3000|             3250.0|
|  3|Cathy|        HR|  3500|             3250.0|
|  2|  Bob|        IT|  4000|             4250.0|
|  4|David|        IT|  4500|             4250.0|
+---+-----+----------+------+-------------------+



In [24]:
# Average Salary Per Department (Using Window)
windowSpec=Window.partitionBy("department")
avg_salary_per_dept_df=employees_df.withColumn("avg_salary",avg("salary").over(windowSpec))
avg_salary_per_dept_df.show()

+---+-----+----------+------+----------+
| id| name|department|salary|avg_salary|
+---+-----+----------+------+----------+
|  5|  Eve|   Finance|  5000|    4900.0|
|  6|Frank|   Finance|  4800|    4900.0|
|  1|Alice|        HR|  3000|    3250.0|
|  3|Cathy|        HR|  3500|    3250.0|
|  2|  Bob|        IT|  4000|    4250.0|
|  4|David|        IT|  4500|    4250.0|
+---+-----+----------+------+----------+



In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode

spark = SparkSession.builder.appName("AdvancedOps").getOrCreate()

data = [
    ("John", ["Python", "Java"]),
    ("Jane", ["SQL", "R", "Scala"]),
    ("Mike", [])
]
columns = ["Name", "Skills"]

df = spark.createDataFrame(data, columns)
df.show(truncate=False)

+----+---------------+
|Name|Skills         |
+----+---------------+
|John|[Python, Java] |
|Jane|[SQL, R, Scala]|
|Mike|[]             |
+----+---------------+



In [26]:
df_exploded=df.withColumn("Skill",explode("Skills"))
df_exploded.show()

+----+---------------+------+
|Name|         Skills| Skill|
+----+---------------+------+
|John| [Python, Java]|Python|
|John| [Python, Java]|  Java|
|Jane|[SQL, R, Scala]|   SQL|
|Jane|[SQL, R, Scala]|     R|
|Jane|[SQL, R, Scala]| Scala|
+----+---------------+------+



In [27]:
df.createOrReplaceTempView("people")
df_lateral=spark.sql("""
select Name,Skill from people lateral view explode(Skills) as Skill
""")
df_lateral.show()

+----+------+
|Name| Skill|
+----+------+
|John|Python|
|John|  Java|
|Jane|   SQL|
|Jane|     R|
|Jane| Scala|
+----+------+



In [28]:
data = [
    ("ProductA", "Jan", 100),
    ("ProductA", "Feb", 150),
    ("ProductA", "Mar", 120),
    ("ProductB", "Jan", 200),
    ("ProductB", "Feb", 230),
    ("ProductB", "Mar", 210),
]
columns = ["Product", "Month", "Sales"]

df = spark.createDataFrame(data, columns)
df.show()

+--------+-----+-----+
| Product|Month|Sales|
+--------+-----+-----+
|ProductA|  Jan|  100|
|ProductA|  Feb|  150|
|ProductA|  Mar|  120|
|ProductB|  Jan|  200|
|ProductB|  Feb|  230|
|ProductB|  Mar|  210|
+--------+-----+-----+



In [31]:
pivot_df=df.groupBy("Product").pivot("Month").sum("Sales")
pivot_df.show()
unpivot_df=pivot_df.selectExpr("Product","stack(3,'Jan',Jan,'Feb',Feb,'Mar',Mar) as (Month,Sales)")
unpivot_df.show()

+--------+---+---+---+
| Product|Feb|Jan|Mar|
+--------+---+---+---+
|ProductB|230|200|210|
|ProductA|150|100|120|
+--------+---+---+---+

+--------+-----+-----+
| Product|Month|Sales|
+--------+-----+-----+
|ProductB|  Jan|  200|
|ProductB|  Feb|  230|
|ProductB|  Mar|  210|
|ProductA|  Jan|  100|
|ProductA|  Feb|  150|
|ProductA|  Mar|  120|
+--------+-----+-----+



#Advanced PySpark For Data Engineer

In [4]:
from pyspark import SparkContext,SparkConf

#intitlise SparkContext and SparkConf
conf=SparkConf().setAppName("AdvancedOps").setMaster("local")
sc=SparkContext(conf=conf)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=AdvancedOps, master=local) created by __init__ at /tmp/ipython-input-2-923876238.py:5 

In [12]:


rdd=sc.parallelize([1,2,3,4,5])
#transformation:Map
rdd_mapped=rdd.map(lambda x:x*2)
print(rdd_mapped.collect())

#transformation:filter
rdd_filtered=rdd.filter(lambda x:x%2==0)
print(rdd_filtered.collect())

#transformation:reduce
rdd_reduced=rdd.reduce(lambda x,y:x+y)
print(rdd_reduced)

#transformation:Flat map
rdd_flatmap=rdd.flatMap(lambda x:range(x,x+3))
print(rdd_flatmap.collect())


rd=sc.parallelize("hello","world")
rd_flatmap=rd.flatMap(lambda x:list(x))
print(rd_flatmap.collect())

[2, 4, 6, 8, 10]
[2, 4]
15
[1, 2, 3, 2, 3, 4, 3, 4, 5, 4, 5, 6, 5, 6, 7]


ValueError: invalid literal for int() with base 10: 'world'

In [14]:
#input:1,2,3,4,5
rdd=sc.parallelize([1,2,3,4,5])
rdd_group=rdd.groupBy(lambda x: "even" if x%2==0 else "odd")
print([(key,list(value))for key,value in rdd_group.collect()])

[('odd', [1, 3, 5]), ('even', [2, 4])]


In [15]:
data = [
    (1, "John", "HR", 5000),
    (2, "Jane", "IT", 8000),
    (3, "Mike", "IT", 6000),
    (4, "Sara", "Finance", 7000),
    (5, "David", "HR", 5500)
]

from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("DataFrameOperations").getOrCreate()

# Define column names
columns = ["ID", "Name", "Department", "Salary"]

# Create a DataFrame from the sample data
df = spark.createDataFrame(data, columns)

# Show the DataFrame
df.show()

+---+-----+----------+------+
| ID| Name|Department|Salary|
+---+-----+----------+------+
|  1| John|        HR|  5000|
|  2| Jane|        IT|  8000|
|  3| Mike|        IT|  6000|
|  4| Sara|   Finance|  7000|
|  5|David|        HR|  5500|
+---+-----+----------+------+



In [16]:
from pyspark.sql.functions import col

df=df.withColumn("Bonus",df["Salary"]*0.1)
df.show()

+---+-----+----------+------+-----+
| ID| Name|Department|Salary|Bonus|
+---+-----+----------+------+-----+
|  1| John|        HR|  5000|500.0|
|  2| Jane|        IT|  8000|800.0|
|  3| Mike|        IT|  6000|600.0|
|  4| Sara|   Finance|  7000|700.0|
|  5|David|        HR|  5500|550.0|
+---+-----+----------+------+-----+



In [17]:
from pyspark.sql.functions import avg

df.groupBy("Department").agg(avg("Salary").alias("Avg_Salary")).show()

+----------+----------+
|Department|Avg_Salary|
+----------+----------+
|        HR|    5250.0|
|   Finance|    7000.0|
|        IT|    7000.0|
+----------+----------+



In [20]:
df.groupBy("Department").count().show()

+----------+-----+
|Department|count|
+----------+-----+
|        HR|    2|
|   Finance|    1|
|        IT|    2|
+----------+-----+



In [21]:
df.sort(df["Salary"].desc()).show()

+---+-----+----------+------+-----+
| ID| Name|Department|Salary|Bonus|
+---+-----+----------+------+-----+
|  2| Jane|        IT|  8000|800.0|
|  4| Sara|   Finance|  7000|700.0|
|  3| Mike|        IT|  6000|600.0|
|  5|David|        HR|  5500|550.0|
|  1| John|        HR|  5000|500.0|
+---+-----+----------+------+-----+



In [22]:
data_list=df.collect()
for row in data_list:
  print(row)

Row(ID=1, Name='John', Department='HR', Salary=5000, Bonus=500.0)
Row(ID=2, Name='Jane', Department='IT', Salary=8000, Bonus=800.0)
Row(ID=3, Name='Mike', Department='IT', Salary=6000, Bonus=600.0)
Row(ID=4, Name='Sara', Department='Finance', Salary=7000, Bonus=700.0)
Row(ID=5, Name='David', Department='HR', Salary=5500, Bonus=550.0)


In [24]:
from spark.sql import row


rdd=sc.parallelize([Row(name="Alice",age=25),Row(name="Bob",age=30)])
dataset=spark.createDataFrame(rdd)
dataset.filter(lambda x:x.age>25)
dataset.select("name")




ModuleNotFoundError: No module named 'spark'

In [25]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQLBasics").getOrCreate()

data = [
    (1, "Alice", "Sales", 3000),
    (2, "Bob", "IT", 4000),
    (3, "Cathy", "HR", 3500),
    (4, "David", "Sales", 4500),
    (5, "Eva", "IT", 4200)
]
columns = ["EmpID", "Name", "Department", "Salary"]

df = spark.createDataFrame(data, columns)
df.show()

+-----+-----+----------+------+
|EmpID| Name|Department|Salary|
+-----+-----+----------+------+
|    1|Alice|     Sales|  3000|
|    2|  Bob|        IT|  4000|
|    3|Cathy|        HR|  3500|
|    4|David|     Sales|  4500|
|    5|  Eva|        IT|  4200|
+-----+-----+----------+------+



In [26]:
# convert to rdd
rdd=df.rdd
print("RDD example:",rdd.map(lambda x:(x.Name,x.Salary)).collect())

RDD example: [('Alice', 3000), ('Bob', 4000), ('Cathy', 3500), ('David', 4500), ('Eva', 4200)]


In [27]:
df.createOrReplaceGlobalTempView("employees")
strSQL="select * from employees"
sql_result=spark.sql(strSQL)
sql_result.show()

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `employees` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [employees], [], false


#data formats

In [28]:
df.write.mode("overwrite").json("/content/json_data")

In [29]:
!ls /content/json_data

part-00000-0e93f718-f88e-468c-8c48-1c66c2224983-c000.json  _SUCCESS


In [30]:
!cat /content/json_data/part-00000-0e93f718-f88e-468c-8c48-1c66c2224983-c000.json

{"EmpID":1,"Name":"Alice","Department":"Sales","Salary":3000}
{"EmpID":2,"Name":"Bob","Department":"IT","Salary":4000}
{"EmpID":3,"Name":"Cathy","Department":"HR","Salary":3500}
{"EmpID":4,"Name":"David","Department":"Sales","Salary":4500}
{"EmpID":5,"Name":"Eva","Department":"IT","Salary":4200}


In [33]:
json_df=spark.read.json("/content/json_data")
json_df.show()


+----------+-----+-----+------+
|Department|EmpID| Name|Salary|
+----------+-----+-----+------+
|     Sales|    1|Alice|  3000|
|        IT|    2|  Bob|  4000|
|        HR|    3|Cathy|  3500|
|     Sales|    4|David|  4500|
|        IT|    5|  Eva|  4200|
+----------+-----+-----+------+



In [34]:
df.show()

+-----+-----+----------+------+
|EmpID| Name|Department|Salary|
+-----+-----+----------+------+
|    1|Alice|     Sales|  3000|
|    2|  Bob|        IT|  4000|
|    3|Cathy|        HR|  3500|
|    4|David|     Sales|  4500|
|    5|  Eva|        IT|  4200|
+-----+-----+----------+------+



In [35]:
strPath="csv_data"
df.write.mode("overwrite").option("header","true").csv(strPath)

In [36]:
csv_df=spark.read.option("header","true").csv(strPath)
csv_df.show()

+-----+-----+----------+------+
|EmpID| Name|Department|Salary|
+-----+-----+----------+------+
|    1|Alice|     Sales|  3000|
|    2|  Bob|        IT|  4000|
|    3|Cathy|        HR|  3500|
|    4|David|     Sales|  4500|
|    5|  Eva|        IT|  4200|
+-----+-----+----------+------+



#Spark Streaming

In [37]:
import random
import csv

# Generate 30 records with random data
names = ["John", "Jane", "Mike", "Sara", "David", "Emily", "George", "Nina", "Tom", "Anna"]
departments = ["Sales", "IT", "HR", "Finance", "Marketing"]
salaries = [3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000]

# Create and open a CSV file for writing
with open('employee_data.csv', mode='w', newline='') as file:
    writer = csv.writer(file)

    # Write the header
    writer.writerow(["ID", "Name", "Department", "Salary"])

    # Write the 30 records
    for i in range(1, 31):
        name = random.choice(names)
        department = random.choice(departments)
        salary = random.choice(salaries)
        writer.writerow([i, name, department, salary])

print("CSV file 'employee_data.csv' has been generated successfully.")

CSV file 'employee_data.csv' has been generated successfully.


In [38]:
!cat /content/employee_data.csv

ID,Name,Department,Salary
1,Mike,Marketing,10000
2,George,HR,8000
3,George,Sales,4000
4,Emily,Finance,3000
5,Nina,Marketing,4000
6,George,Finance,5000
7,David,IT,5000
8,Jane,Marketing,6000
9,George,HR,7000
10,Emily,IT,10000
11,Mike,Sales,9000
12,Mike,HR,7000
13,Anna,Marketing,3000
14,John,Marketing,4000
15,Jane,HR,4000
16,George,Marketing,5000
17,Nina,IT,7000
18,Sara,Sales,10000
19,Jane,Marketing,6000
20,David,Finance,5000
21,Nina,Marketing,10000
22,Mike,HR,9000
23,Jane,IT,7000
24,John,Finance,6000
25,David,HR,9000
26,David,Sales,6000
27,Tom,Marketing,8000
28,David,Marketing,10000
29,George,Marketing,4000
30,Mike,HR,10000


In [39]:
!ls /content/ -a

.   .config   employee_data.csv  sample_data
..  csv_data  json_data		 spark-warehouse


In [40]:
!mkdir /content/empdata/

In [41]:
!mv /content/employee_data.csv /content/empdata/

In [43]:
from pyspark.sql import types as T
from pyspark.sql import functions as F

from pyspark.sql import SparkSession

In [44]:
spark = SparkSession.builder.appName("sparkStraming").getOrCreate()

In [45]:
# schema = T.StructType() \
#     .add("EmpID", T.IntegerType()) \
#     .add("Name", T.StringType()) \
#     .add("Department", T.StringType()) \
#     .add("Salary", T.FloatType())


# Schema Generation
schema = T.StructType([
    T.StructField("EmpID", T.IntegerType()),
    T.StructField("Name", T.StringType()),
    T.StructField("Department", T.StringType()),
    T.StructField("Salary", T.FloatType())
])

In [46]:
stream_df = spark.readStream \
    .option("sep", ',') \
    .schema(schema) \
    .csv("/content/empdata")

In [47]:
transformed_df = stream_df.withColumn("Name_upper", F.upper(F.col("Name")))

In [48]:
query = transformed_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

In [49]:
query.stop()

In [50]:
!ls /content/empdata/

employee_data.csv


In [51]:
%%bash
cat<<EOF > /content/empdata/employee_data2.csv
1,John,Sales,3000
2,Jane,IT,4000
3,Mike,Sales,5000
4,Sara,Finance,6000
5,David,HR,7000
6,Emily,Marketing,6000
7,George,HR,4000
8,Nina,Sales,5000
9,Tom,IT,8000
10,Anna,Marketing,3000
EOF


In [52]:
!cat /content/empdata/employee_data2.csv

1,John,Sales,3000
2,Jane,IT,4000
3,Mike,Sales,5000
4,Sara,Finance,6000
5,David,HR,7000
6,Emily,Marketing,6000
7,George,HR,4000
8,Nina,Sales,5000
9,Tom,IT,8000
10,Anna,Marketing,3000


In [53]:
%%bash
cat<<EOF > /content/empdata/employee_data3.csv
11,John,IT,7000
12,Jane,HR,4000
13,Mike,Finance,5000
14,Sara,Sales,6000
15,David,Marketing,7000
16,Emily,Sales,8000
17,George,Finance,3000
18,Nina,IT,6000
19,Tom,Sales,4000
20,Anna,HR,5000
EOF


In [54]:
%%bash
cat<<EOF > /content/empdata/employee_data4.csv
21,John,Finance,7000
22,Jane,Marketing,6000
23,Mike,HR,8000
24,Sara,Sales,3000
25,David,IT,6000
26,Emily,Finance,5000
27,George,Marketing,4000
28,Nina,HR,7000
29,Tom,Finance,5000
30,Anna,IT,8000
EOF

In [55]:
!cat /content/empdata/employee_data3.csv


11,John,IT,7000
12,Jane,HR,4000
13,Mike,Finance,5000
14,Sara,Sales,6000
15,David,Marketing,7000
16,Emily,Sales,8000
17,George,Finance,3000
18,Nina,IT,6000
19,Tom,Sales,4000
20,Anna,HR,5000


In [56]:
!cat /content/empdata/employee_data4.csv

21,John,Finance,7000
22,Jane,Marketing,6000
23,Mike,HR,8000
24,Sara,Sales,3000
25,David,IT,6000
26,Emily,Finance,5000
27,George,Marketing,4000
28,Nina,HR,7000
29,Tom,Finance,5000
30,Anna,IT,8000


In [57]:
stream_df = spark.readStream \
    .option("sep", ',') \
    .schema(schema) \
    .csv("/content/empdata")

In [58]:
transformed_df = stream_df.withColumn("Name_upper", F.upper(F.col("Name")))

In [59]:
query = transformed_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

In [60]:
query.stop()

In [61]:
!ls

csv_data  empdata  json_data  sample_data  spark-warehouse


In [62]:
!ls empdata/

employee_data2.csv  employee_data3.csv	employee_data4.csv  employee_data.csv


In [69]:
import random
import csv
import time
import os

names = ["John", "Jane", "Mike", "Sara", "David", "Emily", "George", "Nina", "Tom", "Anna"]
departments = ["Sales", "IT", "HR", "Finance", "Marketing"]
salaries = [3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000]

record_batch = []
file_count = 1
record_id = 1

while True:
    # Generate a new record
    name = random.choice(names)
    department = random.choice(departments)
    salary = random.choice(salaries)
    record = [record_id, name, department, salary]

    # Add to batch
    record_batch.append(record)
    print(record)
    time.sleep(1)
    record_id += 1

    # Write to file if batch has 10 records
    if len(record_batch) == 10:
        filename = f"output_{file_count}.csv"
        with open(filename, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(["ID", "Name", "Department", "Salary"])
            writer.writerows(record_batch)
        print(f"Written 10 records to {filename}")

        # Reset batch and increment file counter
        record_batch = []
        file_count += 1


[1, 'George', 'Marketing', 7000]
[2, 'John', 'HR', 7000]
[3, 'Sara', 'Sales', 9000]
[4, 'Tom', 'Sales', 8000]
[5, 'George', 'Marketing', 8000]
[6, 'Sara', 'Marketing', 7000]
[7, 'John', 'IT', 4000]
[8, 'Jane', 'HR', 10000]
[9, 'George', 'HR', 10000]
[10, 'Emily', 'Finance', 10000]
Written 10 records to output_1.csv
[11, 'Mike', 'Finance', 8000]
[12, 'Tom', 'IT', 7000]
[13, 'Anna', 'IT', 3000]
[14, 'George', 'Marketing', 7000]


KeyboardInterrupt: 

In [67]:
!ls


csv_data  json_data	output_2.csv  spark-warehouse
empdata   output_1.csv	sample_data


In [70]:
cat output_1.csv

ID,Name,Department,Salary
1,George,Marketing,7000
2,John,HR,7000
3,Sara,Sales,9000
4,Tom,Sales,8000
5,George,Marketing,8000
6,Sara,Marketing,7000
7,John,IT,4000
8,Jane,HR,10000
9,George,HR,10000
10,Emily,Finance,10000


In [71]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [75]:
import random
import csv
import time
import os

# Path to your folder inside Google Drive
folder_path = "/content/drive/MyDrive/employee_batches"
os.makedirs(folder_path, exist_ok=True)  # create folder if not exist

names = ["John", "Jane", "Mike", "Sara", "David", "Emily", "George", "Nina", "Tom", "Anna"]
departments = ["Sales", "IT", "HR", "Finance", "Marketing"]
salaries = [3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000]

record_batch = []
file_count = 1
record_id = 1

while True:
    # Generate random employee record
    name = random.choice(names)
    department = random.choice(departments)
    salary = random.choice(salaries)
    record = [record_id, name, department, salary]

    record_batch.append(record)
    print(record)
    time.sleep(1)
    record_id += 1

    if len(record_batch) == 10:
        filename = f"output_{file_count}.csv"
        file_path = os.path.join(folder_path, filename)

        # Write CSV to Google Drive folder
        with open(file_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(["ID", "Name", "Department", "Salary"])
            writer.writerows(record_batch)

        print(f"Written 10 records to {file_path}")

        # Reset for next batch
        record_batch = []
        file_count += 1


[1, 'Jane', 'Marketing', 7000]
[2, 'David', 'Marketing', 9000]
[3, 'Tom', 'Marketing', 3000]
[4, 'John', 'Marketing', 4000]
[5, 'Jane', 'HR', 4000]
[6, 'Anna', 'Sales', 7000]
[7, 'John', 'HR', 5000]
[8, 'Emily', 'Finance', 6000]
[9, 'Emily', 'Finance', 4000]
[10, 'Anna', 'HR', 9000]
Written 10 records to /content/drive/MyDrive/employee_batches/output_1.csv
[11, 'Mike', 'Marketing', 5000]
[12, 'Mike', 'Finance', 3000]
[13, 'Mike', 'Marketing', 9000]
[14, 'David', 'HR', 8000]
[15, 'David', 'Marketing', 8000]
[16, 'Nina', 'Sales', 10000]
[17, 'Nina', 'HR', 9000]
[18, 'David', 'HR', 9000]
[19, 'George', 'Sales', 8000]
[20, 'George', 'Sales', 3000]
Written 10 records to /content/drive/MyDrive/employee_batches/output_2.csv
[21, 'Emily', 'Marketing', 5000]
[22, 'John', 'Marketing', 4000]
[23, 'Mike', 'Marketing', 8000]


KeyboardInterrupt: 

In [73]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [74]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive

gauth = GoogleAuth()
gauth.LocalWebserverAuth()
drive = GoogleDrive(gauth)




InvalidConfigError: Invalid client secrets file ('Error opening file', 'client_secrets.json', 'No such file or directory', 2)