In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=b63f051842db588699f173d984aab4bf4a9a8d47aeefbc764372c72b351cacf0
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Product Sales Analysis") \
    .getOrCreate()

# Sample data for products
products = [
    (1, "Laptop", "Electronics", 50000),
    (2, "Smartphone", "Electronics", 30000),
    (3, "Table", "Furniture", 15000),
    (4, "Chair", "Furniture", 5000),
    (5, "Headphones", "Electronics", 2000),
]

# Sample data for sales transactions
sales = [
    (1, 1, 2),
    (2, 2, 1),
    (3, 3, 3),
    (4, 1, 1),
    (5, 4, 5),
    (6, 2, 2),
    (7, 5, 10),
    (8, 3, 1),
]

# Define schema for DataFrames
product_columns = ["ProductID", "ProductName", "Category", "Price"]
sales_columns = ["SaleID", "ProductID", "Quantity"]

# Create DataFrames
product_df = spark.createDataFrame(products, schema=product_columns)
sales_df = spark.createDataFrame(sales, schema=sales_columns)

# Show the DataFrames
print("Products DataFrame:")
product_df.show()

print("Sales DataFrame:")
sales_df.show()

Products DataFrame:
+---------+-----------+-----------+-----+
|ProductID|ProductName|   Category|Price|
+---------+-----------+-----------+-----+
|        1|     Laptop|Electronics|50000|
|        2| Smartphone|Electronics|30000|
|        3|      Table|  Furniture|15000|
|        4|      Chair|  Furniture| 5000|
|        5| Headphones|Electronics| 2000|
+---------+-----------+-----------+-----+

Sales DataFrame:
+------+---------+--------+
|SaleID|ProductID|Quantity|
+------+---------+--------+
|     1|        1|       2|
|     2|        2|       1|
|     3|        3|       3|
|     4|        1|       1|
|     5|        4|       5|
|     6|        2|       2|
|     7|        5|      10|
|     8|        3|       1|
+------+---------+--------+



In [None]:
# Join the product and sales DataFrames on ProductID
combined_df = product_df.join(sales_df, on="ProductID", how="inner")
combined_df.show()


+---------+-----------+-----------+-----+------+--------+
|ProductID|ProductName|   Category|Price|SaleID|Quantity|
+---------+-----------+-----------+-----+------+--------+
|        1|     Laptop|Electronics|50000|     1|       2|
|        1|     Laptop|Electronics|50000|     4|       1|
|        2| Smartphone|Electronics|30000|     2|       1|
|        2| Smartphone|Electronics|30000|     6|       2|
|        3|      Table|  Furniture|15000|     3|       3|
|        3|      Table|  Furniture|15000|     8|       1|
|        4|      Chair|  Furniture| 5000|     5|       5|
|        5| Headphones|Electronics| 2000|     7|      10|
+---------+-----------+-----------+-----+------+--------+



In [None]:
from pyspark.sql.functions import col

# Calculate total sales value
combined_df = combined_df.withColumn("TotalSalesValue", col("Price") * col("Quantity"))
combined_df.show()


+---------+-----------+-----------+-----+------+--------+---------------+
|ProductID|ProductName|   Category|Price|SaleID|Quantity|TotalSalesValue|
+---------+-----------+-----------+-----+------+--------+---------------+
|        1|     Laptop|Electronics|50000|     1|       2|         100000|
|        1|     Laptop|Electronics|50000|     4|       1|          50000|
|        2| Smartphone|Electronics|30000|     2|       1|          30000|
|        2| Smartphone|Electronics|30000|     6|       2|          60000|
|        3|      Table|  Furniture|15000|     3|       3|          45000|
|        3|      Table|  Furniture|15000|     8|       1|          15000|
|        4|      Chair|  Furniture| 5000|     5|       5|          25000|
|        5| Headphones|Electronics| 2000|     7|      10|          20000|
+---------+-----------+-----------+-----+------+--------+---------------+



In [None]:
# Group by Category and calculate total sales value for each category
category_sales_df = combined_df.groupBy("Category").sum("TotalSalesValue")
category_sales_df = category_sales_df.withColumnRenamed("sum(TotalSalesValue)", "TotalSalesValue")
category_sales_df.show()


+-----------+---------------+
|   Category|TotalSalesValue|
+-----------+---------------+
|Electronics|         260000|
|  Furniture|          85000|
+-----------+---------------+



In [None]:
# Group by ProductName and calculate total sales value for each product
product_sales_df = combined_df.groupBy("ProductName").sum("TotalSalesValue")
product_sales_df = product_sales_df.withColumnRenamed("sum(TotalSalesValue)", "TotalSalesValue")

# Find the top-selling product
top_selling_product_df = product_sales_df.orderBy(col("TotalSalesValue").desc()).limit(1)
top_selling_product_df.show()


+-----------+---------------+
|ProductName|TotalSalesValue|
+-----------+---------------+
|     Laptop|         150000|
+-----------+---------------+



In [None]:
# Sort products by total sales value in descending order
sorted_products_df = product_sales_df.orderBy(col("TotalSalesValue").desc())
sorted_products_df.show()


+-----------+---------------+
|ProductName|TotalSalesValue|
+-----------+---------------+
|     Laptop|         150000|
| Smartphone|          90000|
|      Table|          60000|
|      Chair|          25000|
| Headphones|          20000|
+-----------+---------------+



In [None]:
# Count the number of sales transactions for each product
product_sales_count_df = combined_df.groupBy("ProductName").count()
product_sales_count_df.show()


+-----------+-----+
|ProductName|count|
+-----------+-----+
|      Chair|    1|
|     Laptop|    2|
|      Table|    2|
| Smartphone|    2|
| Headphones|    1|
+-----------+-----+



In [None]:
# Filter products with total sales value greater than 50,000
high_value_products_df = product_sales_df.filter(col("TotalSalesValue") > 50000)
high_value_products_df.show()


+-----------+---------------+
|ProductName|TotalSalesValue|
+-----------+---------------+
|     Laptop|         150000|
|      Table|          60000|
| Smartphone|          90000|
+-----------+---------------+



In [None]:
import pandas as pd
from datetime import datetime

# Sample sales data
data = {
    "TransactionID": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    "CustomerID": [101, 102, 103, 101, 104, 102, 103, 104, 101, 105],
    "ProductID": [501, 502, 501, 503, 504, 502, 503, 504, 501, 505],
    "Quantity": [2, 1, 4, 3, 1, 2, 5, 1, 2, 1],
    "Price": [150.0, 250.0, 150.0, 300.0, 450.0, 250.0, 300.0, 450.0, 150.0, 550.0],
    "Date": [
        datetime(2024, 9, 1),
        datetime(2024, 9, 1),
        datetime(2024, 9, 2),
        datetime(2024, 9, 2),
        datetime(2024, 9, 3),
        datetime(2024, 9, 3),
        datetime(2024, 9, 4),
        datetime(2024, 9, 4),
        datetime(2024, 9, 5),
        datetime(2024, 9, 5)
    ]
}

# Create a DataFrame
df = pd.DataFrame(data)

# Save the DataFrame to a CSV file
df.to_csv('/content/sales_data.csv', index=False)

print("Sample sales dataset has been created and saved as 'sales_data.csv'.")


Sample sales dataset has been created and saved as 'sales_data.csv'.


In [None]:
!ls /content



sales_data.csv	sample_data


In [None]:
!pip install pyspark

from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Sales Dataset Analysis") \
    .getOrCreate()




In [None]:
# Load the CSV file into a PySpark DataFrame
df_spark = spark.read.csv("/content/sales_data.csv", header=True)

# Show the first few rows of the DataFrame
df_spark.show()


+-------------+----------+---------+--------+-----+----------+
|TransactionID|CustomerID|ProductID|Quantity|Price|      Date|
+-------------+----------+---------+--------+-----+----------+
|            1|       101|      501|       2|150.0|2024-09-01|
|            2|       102|      502|       1|250.0|2024-09-01|
|            3|       103|      501|       4|150.0|2024-09-02|
|            4|       101|      503|       3|300.0|2024-09-02|
|            5|       104|      504|       1|450.0|2024-09-03|
|            6|       102|      502|       2|250.0|2024-09-03|
|            7|       103|      503|       5|300.0|2024-09-04|
|            8|       104|      504|       1|450.0|2024-09-04|
|            9|       101|      501|       2|150.0|2024-09-05|
|           10|       105|      505|       1|550.0|2024-09-05|
+-------------+----------+---------+--------+-----+----------+



In [None]:
df_spark.printSchema()


root
 |-- TransactionID: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Date: string (nullable = true)



In [None]:
df_spark.show(5)


+-------------+----------+---------+--------+-----+----------+
|TransactionID|CustomerID|ProductID|Quantity|Price|      Date|
+-------------+----------+---------+--------+-----+----------+
|            1|       101|      501|       2|150.0|2024-09-01|
|            2|       102|      502|       1|250.0|2024-09-01|
|            3|       103|      501|       4|150.0|2024-09-02|
|            4|       101|      503|       3|300.0|2024-09-02|
|            5|       104|      504|       1|450.0|2024-09-03|
+-------------+----------+---------+--------+-----+----------+
only showing top 5 rows



In [None]:
df_spark.describe("Quantity", "Price").show()


+-------+-----------------+-----------------+
|summary|         Quantity|            Price|
+-------+-----------------+-----------------+
|  count|               10|               10|
|   mean|              2.2|            300.0|
| stddev|1.398411797560202|141.4213562373095|
|    min|                1|            150.0|
|    max|                5|            550.0|
+-------+-----------------+-----------------+



In [None]:
from pyspark.sql.functions import col

df_spark = df_spark.withColumn("TotalSales", col("Quantity") * col("Price"))
df_spark.show()


+-------------+----------+---------+--------+-----+----------+----------+
|TransactionID|CustomerID|ProductID|Quantity|Price|      Date|TotalSales|
+-------------+----------+---------+--------+-----+----------+----------+
|            1|       101|      501|       2|150.0|2024-09-01|     300.0|
|            2|       102|      502|       1|250.0|2024-09-01|     250.0|
|            3|       103|      501|       4|150.0|2024-09-02|     600.0|
|            4|       101|      503|       3|300.0|2024-09-02|     900.0|
|            5|       104|      504|       1|450.0|2024-09-03|     450.0|
|            6|       102|      502|       2|250.0|2024-09-03|     500.0|
|            7|       103|      503|       5|300.0|2024-09-04|    1500.0|
|            8|       104|      504|       1|450.0|2024-09-04|     450.0|
|            9|       101|      501|       2|150.0|2024-09-05|     300.0|
|           10|       105|      505|       1|550.0|2024-09-05|     550.0|
+-------------+----------+---------+--

In [None]:
total_sales_per_product = df_spark.groupBy("ProductID").sum("TotalSales")
total_sales_per_product = total_sales_per_product.withColumnRenamed("sum(TotalSales)", "TotalSales")
total_sales_per_product.show()


+---------+----------+
|ProductID|TotalSales|
+---------+----------+
|      503|    2400.0|
|      502|     750.0|
|      505|     550.0|
|      501|    1200.0|
|      504|     900.0|
+---------+----------+



In [None]:
top_selling_product = total_sales_per_product.orderBy(col("TotalSales").desc()).first()
print(f"Top-Selling Product: ProductID = {top_selling_product['ProductID']}, TotalSales = {top_selling_product['TotalSales']}")


Top-Selling Product: ProductID = 503, TotalSales = 2400.0


In [None]:
total_sales_by_date = df_spark.groupBy("Date").sum("TotalSales")
total_sales_by_date = total_sales_by_date.withColumnRenamed("sum(TotalSales)", "TotalSales")
total_sales_by_date.show()


+----------+----------+
|      Date|TotalSales|
+----------+----------+
|2024-09-05|     850.0|
|2024-09-04|    1950.0|
|2024-09-02|    1500.0|
|2024-09-03|     950.0|
|2024-09-01|     550.0|
+----------+----------+



In [None]:
high_value_transactions = df_spark.filter(col("TotalSales") > 500)
high_value_transactions.show()


+-------------+----------+---------+--------+-----+----------+----------+
|TransactionID|CustomerID|ProductID|Quantity|Price|      Date|TotalSales|
+-------------+----------+---------+--------+-----+----------+----------+
|            3|       103|      501|       4|150.0|2024-09-02|     600.0|
|            4|       101|      503|       3|300.0|2024-09-02|     900.0|
|            7|       103|      503|       5|300.0|2024-09-04|    1500.0|
|           10|       105|      505|       1|550.0|2024-09-05|     550.0|
+-------------+----------+---------+--------+-----+----------+----------+



In [None]:
from pyspark.sql.functions import count

repeat_customers = df_spark.groupBy("CustomerID").agg(count("TransactionID").alias("PurchaseCount"))
repeat_customers = repeat_customers.filter(col("PurchaseCount") > 1)
repeat_customers.show()


+----------+-------------+
|CustomerID|PurchaseCount|
+----------+-------------+
|       101|            3|
|       104|            2|
|       102|            2|
|       103|            2|
+----------+-------------+



In [None]:
from pyspark.sql.functions import col, sum, avg

avg_price_per_product = df_spark.groupBy("ProductID").agg(
    (sum(col("Price") * col("Quantity")) / sum(col("Quantity"))).alias("AveragePricePerUnit")
)
avg_price_per_product.show()


+---------+-------------------+
|ProductID|AveragePricePerUnit|
+---------+-------------------+
|      503|              300.0|
|      502|              250.0|
|      505|              550.0|
|      501|              150.0|
|      504|              450.0|
+---------+-------------------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
        .appName("pyspark notebook example") \
        .getOrCreate()

data = [
    ("John Doe","Engineering",75000),
    ("John Smith","Marketing",60000),
    ("Sam Brown","Engineering",80000),
    ("Emily Davis","HR",50000),
    ("Michael Johnson","Marketing",70000)
]

columns = ["Name","Departmnet","Salary"]

df = spark.createDataFrame(data,schema=columns)

df.show()

+---------------+-----------+------+
|           Name| Departmnet|Salary|
+---------------+-----------+------+
|       John Doe|Engineering| 75000|
|     John Smith|  Marketing| 60000|
|      Sam Brown|Engineering| 80000|
|    Emily Davis|         HR| 50000|
|Michael Johnson|  Marketing| 70000|
+---------------+-----------+------+



In [None]:
high_salary_df = df.filter(col("Salary")>65000)
print("Employees with salary > 65,000")
high_salary_df.show()

Employees with salary > 65,000
+---------------+-----------+------+
|           Name| Departmnet|Salary|
+---------------+-----------+------+
|       John Doe|Engineering| 75000|
|      Sam Brown|Engineering| 80000|
|Michael Johnson|  Marketing| 70000|
+---------------+-----------+------+



In [None]:
avg_salary_df = df.groupBy("Departmnet").avg("Salary")
print("Average salary by Department")
avg_salary_df.show()

Average salary by Department
+-----------+-----------+
| Departmnet|avg(Salary)|
+-----------+-----------+
|Engineering|    77500.0|
|  Marketing|    65000.0|
|         HR|    50000.0|
+-----------+-----------+



In [None]:
# Install PySpark
!pip install pyspark

# Import required libraries
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("SalesDataAnalysis") \
    .getOrCreate()

# Get the Spark context from the session
sc = spark.sparkContext




In [None]:
# Define sales data
sales_data = [
    ("ProductA", 100),
    ("ProductB", 150),
    ("ProductA", 200),
    ("ProductC", 300),
    ("ProductB", 250),
    ("ProductC", 100)
]

# Create an RDD from the sales data
sales_rdd = sc.parallelize(sales_data)

# Print the first few elements of the RDD
print(sales_rdd.take(3))


[('ProductA', 100), ('ProductB', 150), ('ProductA', 200)]


In [None]:
# Group data by product name
grouped_rdd = sales_rdd.groupByKey()

# Print the grouped data
for product, sales in grouped_rdd.collect():
    print(f"{product}: {list(sales)}")


ProductA: [100, 200]
ProductB: [150, 250]
ProductC: [300, 100]


In [None]:
# Calculate total sales by product
total_sales_rdd = sales_rdd.reduceByKey(lambda x, y: x + y)

# Print total sales for each product
for product, total_sales in total_sales_rdd.collect():
    print(f"{product}: {total_sales}")


ProductA: 300
ProductB: 400
ProductC: 400


In [None]:
# Sort products by total sales in descending order
sorted_sales_rdd = total_sales_rdd.sortBy(lambda x: x[1], ascending=False)

# Print sorted list of products with their sales amounts
for product, total_sales in sorted_sales_rdd.collect():
    print(f"{product}: {total_sales}")


ProductB: 400
ProductC: 400
ProductA: 300


In [None]:
# Filter products with total sales greater than 200
high_sales_rdd = total_sales_rdd.filter(lambda x: x[1] > 200)

# Print the products that meet this condition
for product, total_sales in high_sales_rdd.collect():
    print(f"{product}: {total_sales}")


ProductA: 300
ProductB: 400
ProductC: 400


In [None]:
# Define regional sales data
regional_sales_data = [
    ("ProductA", 50),
    ("ProductC", 150)
]

# Create an RDD from the regional sales data
regional_sales_rdd = sc.parallelize(regional_sales_data)

# Union the two RDDs
combined_rdd = sales_rdd.union(regional_sales_rdd)

# Calculate the new total sales for each product
new_total_sales_rdd = combined_rdd.reduceByKey(lambda x, y: x + y)

# Print the combined sales data
for product, total_sales in new_total_sales_rdd.collect():
    print(f"{product}: {total_sales}")


ProductA: 350
ProductC: 550
ProductB: 400


In [None]:
# Count the number of distinct products
distinct_products_count = sales_rdd.map(lambda x: x[0]).distinct().count()

# Print the count of distinct products
print(f"Number of distinct products: {distinct_products_count}")


Number of distinct products: 3


In [None]:
# Find the product with the maximum total sales
max_sales_product = total_sales_rdd.reduce(lambda a, b: a if a[1] > b[1] else b)

# Print the product name and its total sales amount
print(f"Product with maximum sales: {max_sales_product[0]} with sales amount: {max_sales_product[1]}")


Product with maximum sales: ProductC with sales amount: 400


In [None]:
# Calculate the average sales amount per product
count_rdd = sales_rdd.mapValues(lambda x: (x, 1)) \
                     .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

average_sales_rdd = count_rdd.mapValues(lambda x: x[0] / x[1])

# Print the average sales for each product
for product, average_sales in average_sales_rdd.collect():
    print(f"{product}: {average_sales:.2f}")


ProductA: 150.00
ProductB: 200.00
ProductC: 200.00


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
.appName("Employee Data Analysis") \
.getOrCreate()

data = [
    (1,'Arjun','IT',75000),
    (2,'Vijay','Finance',85000),
    (3,'Shalini','IT',90000),
    (4,'Sneha','HR',50000),
    (5,'Rahul','Finance',60000),
    (6,'Amit','IT',55000),
]

columns = ['EmployeeID','EmployeeName','Department','Salary']

employee_df = spark.createDataFrame(data,columns)

employee_df.show()

+----------+------------+----------+------+
|EmployeeID|EmployeeName|Department|Salary|
+----------+------------+----------+------+
|         1|       Arjun|        IT| 75000|
|         2|       Vijay|   Finance| 85000|
|         3|     Shalini|        IT| 90000|
|         4|       Sneha|        HR| 50000|
|         5|       Rahul|   Finance| 60000|
|         6|        Amit|        IT| 55000|
+----------+------------+----------+------+



In [None]:
filtered_df = employee_df.filter(col('Salary') > 60000)
filtered_df.show()


+----------+------------+----------+------+
|EmployeeID|EmployeeName|Department|Salary|
+----------+------------+----------+------+
|         1|       Arjun|        IT| 75000|
|         2|       Vijay|   Finance| 85000|
|         3|     Shalini|        IT| 90000|
+----------+------------+----------+------+



In [None]:
from pyspark.sql.functions import avg
avg_salary_df = employee_df.groupBy('Department').agg(avg('Salary').alias('AverageSalary'))
avg_salary_df.show()


+----------+-----------------+
|Department|    AverageSalary|
+----------+-----------------+
|   Finance|          72500.0|
|        IT|73333.33333333333|
|        HR|          50000.0|
+----------+-----------------+



In [None]:
sorted_df = employee_df.orderBy(col('Salary').desc())
sorted_df.show()


+----------+------------+----------+------+
|EmployeeID|EmployeeName|Department|Salary|
+----------+------------+----------+------+
|         3|     Shalini|        IT| 90000|
|         2|       Vijay|   Finance| 85000|
|         1|       Arjun|        IT| 75000|
|         5|       Rahul|   Finance| 60000|
|         6|        Amit|        IT| 55000|
|         4|       Sneha|        HR| 50000|
+----------+------------+----------+------+



In [None]:
employee_with_bonus_df = employee_df.withColumn('Bonus', col('Salary') * 0.1)
employee_with_bonus_df.show()


+----------+------------+----------+------+------+
|EmployeeID|EmployeeName|Department|Salary| Bonus|
+----------+------------+----------+------+------+
|         1|       Arjun|        IT| 75000|7500.0|
|         2|       Vijay|   Finance| 85000|8500.0|
|         3|     Shalini|        IT| 90000|9000.0|
|         4|       Sneha|        HR| 50000|5000.0|
|         5|       Rahul|   Finance| 60000|6000.0|
|         6|        Amit|        IT| 55000|5500.0|
+----------+------------+----------+------+------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Advanced DataFrame Operations - Different Dataset") \
    .getOrCreate()

# Create two sample DataFrames for Product Sales
data1 = [
    (1, 'Product A', 'Electronics', 1200, '2022-05-10'),
    (2, 'Product B', 'Clothing', 500, '2022-07-15'),
    (3, 'Product C', 'Electronics', 1800, '2021-11-05')
]

data2 = [
    (4, 'Product D', 'Furniture', 3000, '2022-03-25'),
    (5, 'Product E', 'Clothing', 800, '2022-09-12'),
    (6, 'Product F', 'Electronics', 1500, '2021-10-19')
]

# Define schema (columns)
columns = ['ProductID', 'ProductName', 'Category', 'Price', 'SaleDate']

# Create DataFrames
sales_df1 = spark.createDataFrame(data1, columns)
sales_df2 = spark.createDataFrame(data2, columns)

In [None]:
combined_df = sales_df1.union(sales_df2).dropDuplicates()
combined_df.show()

+---------+-----------+-----------+-----+----------+
|ProductID|ProductName|   Category|Price|  SaleDate|
+---------+-----------+-----------+-----+----------+
|        1|  Product A|Electronics| 1200|2022-05-10|
|        2|  Product B|   Clothing|  500|2022-07-15|
|        3|  Product C|Electronics| 1800|2021-11-05|
|        4|  Product D|  Furniture| 3000|2022-03-25|
|        6|  Product F|Electronics| 1500|2021-10-19|
|        5|  Product E|   Clothing|  800|2022-09-12|
+---------+-----------+-----------+-----+----------+



In [None]:
combined_with_duplicates_df = sales_df1.union(sales_df2)
combined_with_duplicates_df.show()

+---------+-----------+-----------+-----+----------+
|ProductID|ProductName|   Category|Price|  SaleDate|
+---------+-----------+-----------+-----+----------+
|        1|  Product A|Electronics| 1200|2022-05-10|
|        2|  Product B|   Clothing|  500|2022-07-15|
|        3|  Product C|Electronics| 1800|2021-11-05|
|        4|  Product D|  Furniture| 3000|2022-03-25|
|        5|  Product E|   Clothing|  800|2022-09-12|
|        6|  Product F|Electronics| 1500|2021-10-19|
+---------+-----------+-----------+-----+----------+



In [None]:
window_spec = Window.partitionBy('Category').orderBy(F.desc('Price'))
ranked_df = sales_df1.withColumn('Rank', F.row_number().over(window_spec))
ranked_df.show()

+---------+-----------+-----------+-----+----------+----+
|ProductID|ProductName|   Category|Price|  SaleDate|Rank|
+---------+-----------+-----------+-----+----------+----+
|        2|  Product B|   Clothing|  500|2022-07-15|   1|
|        3|  Product C|Electronics| 1800|2021-11-05|   1|
|        1|  Product A|Electronics| 1200|2022-05-10|   2|
+---------+-----------+-----------+-----+----------+----+



In [None]:
cumulative_window_spec = Window.partitionBy('Category').orderBy('SaleDate').rowsBetween(Window.unboundedPreceding, Window.currentRow)
cumulative_df = sales_df1.withColumn('CumulativePrice', F.sum('Price').over(cumulative_window_spec))
cumulative_df.show()


+---------+-----------+-----------+-----+----------+---------------+
|ProductID|ProductName|   Category|Price|  SaleDate|CumulativePrice|
+---------+-----------+-----------+-----+----------+---------------+
|        2|  Product B|   Clothing|  500|2022-07-15|            500|
|        3|  Product C|Electronics| 1800|2021-11-05|           1800|
|        1|  Product A|Electronics| 1200|2022-05-10|           3000|
+---------+-----------+-----------+-----+----------+---------------+



In [None]:
sales_df1 = sales_df1.withColumn('SaleDate', F.to_date('SaleDate', 'yyyy-MM-dd'))
sales_df1.show()

+---------+-----------+-----------+-----+----------+
|ProductID|ProductName|   Category|Price|  SaleDate|
+---------+-----------+-----------+-----+----------+
|        1|  Product A|Electronics| 1200|2022-05-10|
|        2|  Product B|   Clothing|  500|2022-07-15|
|        3|  Product C|Electronics| 1800|2021-11-05|
+---------+-----------+-----------+-----+----------+



In [None]:
sales_df1 = sales_df1.withColumn('DaysSinceSale', F.datediff(F.current_date(), 'SaleDate'))
sales_df1.show()

+---------+-----------+-----------+-----+----------+-------------+
|ProductID|ProductName|   Category|Price|  SaleDate|DaysSinceSale|
+---------+-----------+-----------+-----+----------+-------------+
|        1|  Product A|Electronics| 1200|2022-05-10|          848|
|        2|  Product B|   Clothing|  500|2022-07-15|          782|
|        3|  Product C|Electronics| 1800|2021-11-05|         1034|
+---------+-----------+-----------+-----+----------+-------------+



In [None]:
sales_df1 = sales_df1.withColumn('NextSaleDeadline', F.date_add('SaleDate', 30))
sales_df1.show()

+---------+-----------+-----------+-----+----------+-------------+----------------+
|ProductID|ProductName|   Category|Price|  SaleDate|DaysSinceSale|NextSaleDeadline|
+---------+-----------+-----------+-----+----------+-------------+----------------+
|        1|  Product A|Electronics| 1200|2022-05-10|          848|      2022-06-09|
|        2|  Product B|   Clothing|  500|2022-07-15|          782|      2022-08-14|
|        3|  Product C|Electronics| 1800|2021-11-05|         1034|      2021-12-05|
+---------+-----------+-----------+-----+----------+-------------+----------------+



In [None]:
revenue_df = sales_df1.groupBy('Category').agg(
    F.sum('Price').alias('TotalRevenue'),
    F.avg('Price').alias('AveragePrice')
)

In [None]:
sales_df1 = sales_df1.withColumn('ProductNameLower', F.lower('ProductName'))
sales_df1.show()

+---------+-----------+-----------+-----+----------+-------------+----------------+----------------+
|ProductID|ProductName|   Category|Price|  SaleDate|DaysSinceSale|NextSaleDeadline|ProductNameLower|
+---------+-----------+-----------+-----+----------+-------------+----------------+----------------+
|        1|  Product A|Electronics| 1200|2022-05-10|          848|      2022-06-09|       product a|
|        2|  Product B|   Clothing|  500|2022-07-15|          782|      2022-08-14|       product b|
|        3|  Product C|Electronics| 1800|2021-11-05|         1034|      2021-12-05|       product c|
+---------+-----------+-----------+-----+----------+-------------+----------------+----------------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
.appName("Employee Data Analysis") \
.getOrCreate()

In [None]:
csv_file_path = "/content/sample_data/people.csv"

df_csv = spark.read.format("csv").option("header","true").load(csv_file_path)
df_csv.show()

+----+---+------+
|Name|Age|Gender|
+----+---+------+
|John| 28|  Male|
|Jane| 32|Female|
+----+---+------+

