In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,concat,lit,floor,rand
spark = SparkSession.builder.appName("ETLPractice").getOrCreate()
source_path = "orders.csv"
target_path = "order_result.csv"
load_data = spark.read.csv("orders.csv",header = True, inferSchema = True)

In [3]:
load_data.columns
load_data.show(5)

+-------+----------+----------+----------+-----------+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|
+-------+----------+----------+----------+-----------+
|      1|      john|       doe|         5|     active|
|      2|      jane|     smith|         8|     active|
|      3|   micheal|   jhonson|         3|   inactive|
|      4|      abhi|   wiliams|         1|     active|
|      5|       ram|     brown|         4|   inactive|
+-------+----------+----------+----------+-----------+
only showing top 5 rows


### Transformation 1: Concatenate First and Last Names

In [4]:
 #Transformation 1: Concatenate First and Last Names
load_data = load_data.withColumn('full_name', concat(col('cust_fname'), lit(' '), col('cust_lname')))
load_data.show(10)

+-------+----------+----------+----------+-----------+---------------+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|      full_name|
+-------+----------+----------+----------+-----------+---------------+
|      1|      john|       doe|         5|     active|       john doe|
|      2|      jane|     smith|         8|     active|     jane smith|
|      3|   micheal|   jhonson|         3|   inactive|micheal jhonson|
|      4|      abhi|   wiliams|         1|     active|   abhi wiliams|
|      5|       ram|     brown|         4|   inactive|      ram brown|
|      6|     emily|  anderson|         2|     active| emily anderson|
|      7|   william|     jones|        10|     active|  william jones|
|      8|     susan|     davis|         7|   inactive|    susan davis|
|      9|     david|    miller|         9|     active|   david miller|
|     10|      sara|     moore|         2|   inactive|     sara moore|
+-------+----------+----------+----------+-----------+---------------+
only s

### Transformation 2: Calculate Net Salary (subtract 10% as taxes)

In [5]:
# Transformation 2: Calculate Net Salary (subtract 10% as taxes)
load_data = load_data.withColumn('net_salary', floor(lit(10000) + rand() * lit(50)))
load_data.show(10)

+-------+----------+----------+----------+-----------+---------------+----------+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|      full_name|net_salary|
+-------+----------+----------+----------+-----------+---------------+----------+
|      1|      john|       doe|         5|     active|       john doe|     10008|
|      2|      jane|     smith|         8|     active|     jane smith|     10039|
|      3|   micheal|   jhonson|         3|   inactive|micheal jhonson|     10038|
|      4|      abhi|   wiliams|         1|     active|   abhi wiliams|     10015|
|      5|       ram|     brown|         4|   inactive|      ram brown|     10016|
|      6|     emily|  anderson|         2|     active| emily anderson|     10039|
|      7|   william|     jones|        10|     active|  william jones|     10026|
|      8|     susan|     davis|         7|   inactive|    susan davis|     10015|
|      9|     david|    miller|         9|     active|   david miller|     10037|
|     10|      s

### Adding Age Column

In [6]:
#adding age column
load_data = load_data.withColumn('age', floor(lit(20) + rand() * lit(31)))
load_data.show(10)

+-------+----------+----------+----------+-----------+---------------+----------+---+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|      full_name|net_salary|age|
+-------+----------+----------+----------+-----------+---------------+----------+---+
|      1|      john|       doe|         5|     active|       john doe|     10008| 31|
|      2|      jane|     smith|         8|     active|     jane smith|     10039| 30|
|      3|   micheal|   jhonson|         3|   inactive|micheal jhonson|     10038| 31|
|      4|      abhi|   wiliams|         1|     active|   abhi wiliams|     10015| 36|
|      5|       ram|     brown|         4|   inactive|      ram brown|     10016| 36|
|      6|     emily|  anderson|         2|     active| emily anderson|     10039| 50|
|      7|   william|     jones|        10|     active|  william jones|     10026| 24|
|      8|     susan|     davis|         7|   inactive|    susan davis|     10015| 25|
|      9|     david|    miller|         9|     active|

### Transformation 3: Filter by Age (age >= 30)

In [7]:
# # Transformation 3: Filter by Age (age >= 30)
load_data = load_data.filter(col('age')>= 30)
load_data.show()

+-------+----------+----------+----------+-----------+---------------+----------+---+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|      full_name|net_salary|age|
+-------+----------+----------+----------+-----------+---------------+----------+---+
|      1|      john|       doe|         5|     active|       john doe|     10008| 31|
|      2|      jane|     smith|         8|     active|     jane smith|     10039| 30|
|      3|   micheal|   jhonson|         3|   inactive|micheal jhonson|     10038| 31|
|      4|      abhi|   wiliams|         1|     active|   abhi wiliams|     10015| 36|
|      5|       ram|     brown|         4|   inactive|      ram brown|     10016| 36|
|      6|     emily|  anderson|         2|     active| emily anderson|     10039| 50|
|     10|      sara|     moore|         2|   inactive|     sara moore|     10015| 49|
|     11|     james|    tailor|         5|   inactive|   james tailor|     10043| 36|
|     12|    olivia|    wilson|         3|   inactive|

### Transformation 4: Group by Age and Calculate Average Salary

In [8]:
# Transformation 4: Group by Age and Calculate Average Salary
avg_salary_by_age = load_data.groupBy('age').agg({'net_salary' :'avg'}).withColumnRenamed('avg(salary)', 'avg_salary')
avg_salary_by_age.show()

+---+------------------+
|age|   avg(net_salary)|
+---+------------------+
| 50|10026.666666666666|
| 43|           10019.0|
| 31|           10023.0|
| 37|           10020.0|
| 49|10016.333333333334|
| 36|           10030.5|
| 38|           10012.0|
| 30|           10039.0|
+---+------------------+



### Sorting the Dataframe by AGE

In [9]:
load_data = load_data.orderBy("age")
load_data.show()

+-------+----------+----------+----------+-----------+---------------+----------+---+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|      full_name|net_salary|age|
+-------+----------+----------+----------+-----------+---------------+----------+---+
|      2|      jane|     smith|         8|     active|     jane smith|     10039| 30|
|      1|      john|       doe|         5|     active|       john doe|     10008| 31|
|      3|   micheal|   jhonson|         3|   inactive|micheal jhonson|     10038| 31|
|      4|      abhi|   wiliams|         1|     active|   abhi wiliams|     10015| 36|
|      5|       ram|     brown|         4|   inactive|      ram brown|     10016| 36|
|     11|     james|    tailor|         5|   inactive|   james tailor|     10043| 36|
|     19|chrisopher|      basa|         8|   inactive|chrisopher basa|     10048| 36|
|     12|    olivia|    wilson|         3|   inactive|  olivia wilson|     10020| 37|
|     18|     grace|       lee|         5|     active|

### Save the transformed data to an external CSV file

In [None]:
# Save the transformed data to an external CSV file
load_data.write.csv(target_path, mode='overwrite', header=True)

25/08/05 13:33:36 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 605911 ms exceeds timeout 120000 ms
25/08/05 13:33:36 WARN SparkContext: Killing executors is not supported by current scheduler.
25/08/05 13:33:42 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:81)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:669)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1296)
	at o