In [1]:
import os
import findspark

# Set environment variables
os.environ['JAVA_HOME'] = "C:\\Program Files\\Java\\jdk-19"
os.environ['SPARK_HOME'] = "C:\\spark\\spark-3.3.0-bin-hadoop3"

# Initialize findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
from pyspark.sql.functions import col, to_date
from pyspark.sql.functions import sum, desc
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import *

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("SimpleTest") \
    .getOrCreate()



In [None]:
# Problem 1
file_location = "C:\\Users\\ershiaa\\Downloads\\thoughtworkd\\customers.csv"
read_cust = spark.read.csv(file_location,header=True)
read_cust.show(3,False)
read_cust.printSchema()

# Calculate the total number of transactions.
ttbec = read_cust.groupBy("customer_id").count()
ttbec.show()

# Calculate the total purchase amount for each customer.
tpafec = read_cust.withColumn("customer_id",read_cust["customer_id"].cast(IntegerType())) \
        .withColumn("transaction_date",to_date(col("transaction_date"),"yyyy-mm-dd").cast(DateType())) \
        .withColumn("purchase_amount", read_cust["purchase_amount"].cast(DoubleType()))
tpafec = tpafec.groupBy("customer_id").agg(sum("purchase_amount").alias("total_purchase_amount"))

# Determine the customer with the highest total purchase amount.

tpafec_highest = tpafec.orderBy(desc("total_purchase_amount"))
tpafec_highest.first()

# Calculate the average purchase amount per transaction.
new_df = read_cust.withColumn("customer_id",read_cust["customer_id"].cast(IntegerType())) \
        .withColumn("transaction_date",to_date(col("transaction_date"),"yyyy-mm-dd").cast(DateType())) \
        .withColumn("purchase_amount", read_cust["purchase_amount"].cast(DoubleType()))
new_df_sum = new_df.select(sum("purchase_amount"))
av1=new_df_sum.collect()[0][0]
tot_sum = av1
tot_cnt = new_df.count()
avg1= tot_sum/tot_cnt
print(avg1)

In [10]:
emp = "C:\\Users\\ershiaa\\Downloads\\thoughtworkd\\employees.csv"
dept = "C:\\Users\\ershiaa\\Downloads\\thoughtworkd\\dept.csv"
read_employees = spark.read.csv(emp,header ="true")
read_dept = spark.read.csv(dept,header ="true")
employees = read_employees.withColumn("emp_id", read_employees["emp_id"].cast(IntegerType())) \
            .withColumn("salary", read_employees["salary"].cast(IntegerType())) \
            .withColumn("dept_id", read_employees["dept_id"].cast(IntegerType()))
department = read_dept.withColumn("dept_id", read_dept["dept_id"].cast(IntegerType()))

emp_dept = employees.join(department, employees.dept_id==department.dept_id, "left") \
            .select("emp_id","emp_name","salary",employees["dept_id"],"dept_name")
emp_dept.printSchema()
emp_dept.createOrReplaceTempView("emp_data1")
emp1 = spark.sql("select emp_id, emp_name,salary, dept_id, dept_name, DENSE_RANK() over(partition by dept_id order by salary desc) as rnk from emp_data1")


root
 |-- emp_id: integer (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- dept_id: integer (nullable = true)
 |-- dept_name: string (nullable = true)



In [25]:
#filter those emp and details where rnk =1 and dept = sales and IT 
emp1_filter = emp1.filter((emp1["rnk"]==1) & (emp1["dept_name"]=="Sales")).select("emp_id")
emp1_filter.show()

+------+
|emp_id|
+------+
|    32|
+------+



In [29]:
#now do it in dataframe POV
WindowSpec = Window.partitionBy("dept_id").orderBy(desc("salary"))
new_emp = emp_dept.withColumn("rank_normal", dense_rank().over(WindowSpec))
new_emp.show()

+------+----------------+------+-------+---------+-----------+
|emp_id|        emp_name|salary|dept_id|dept_name|rank_normal|
+------+----------------+------+-------+---------+-----------+
|    32|Christopher Hill| 74000|      1|    Sales|          1|
|    22|    Nathan Scott| 71000|      1|    Sales|          2|
|    36|   Joseph Wright| 71000|      1|    Sales|          2|
|    47|  Stephanie Gray| 71000|      1|    Sales|          2|
|     3|     Michael Lee| 70000|      1|    Sales|          3|
|    14|    Andrew Clark| 70000|      1|    Sales|          3|
|    43|   Anna Phillips| 69000|      1|    Sales|          4|
|    10|Ashley Rodriguez| 68000|      1|    Sales|          5|
|    29|   Samantha King| 67000|      1|    Sales|          6|
|    20|William Thompson| 66000|      1|    Sales|          7|
|    25|      Ella Young| 63000|      1|    Sales|          8|
|     6|   Sarah Johnson| 62000|      1|    Sales|          9|
|    17|   Sophia Martin| 62000|      1|    Sales|     

+------+----------------+------+-------+---------------+---+
|emp_id|        emp_name|salary|dept_id|      dept_name|rnk|
+------+----------------+------+-------+---------------+---+
|    32|Christopher Hill| 74000|      1|          Sales|  1|
|    24|     Joshua Hall| 82000|      2|      Marketing|  1|
|    33|    Hailey Allen| 77000|      3|    Engineering|  1|
|    41|   Lauren Carter| 78000|      4|Human Resources|  1|
+------+----------------+------+-------+---------------+---+



In [None]:
# # Define schema
# schema = StructType([
#     StructField("emp_id", IntegerType(), True),
#     StructField("emp_name", StringType(), True),
#     StructField("salary", IntegerType(), True),
#     StructField("dept_id", IntegerType(), True),
#     StructField("dept_name", StringType(), True)
# ])

# # Sample data
# data = [
#     (1, "John Doe", 50000, 101, "HR"),
#     (2, "Jane Smith", 60000, 102, "Finance"),
#     (3, "Sam Brown", 55000, 103, "IT")
# ]

# # Create DataFrame
# emp_dept = spark.createDataFrame(data, schema)
# emp_dept.show()

# # Print schema
# emp_dept.printSchema()