In [1]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Spark Introduction")
    .config("spark.executor.instances", "2")
    .config("spark.executor.cores", "1")
    .config("spark.executor.memory", "1g")
    .master("spark://spark-master:7077")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/15 00:59:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [4]:
# Emp Data & Schema

emp_data = [
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female","51000","2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male","75000","2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","Female","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

In [5]:
# Create emp DataFrame

emp = spark.createDataFrame(data=emp_data, schema=emp_schema)

In [5]:
emp.take(20)

                                                                                

[Row(employee_id='001', department_id='101', name='John Doe', age='30', gender='Male', salary='50000', hire_date='2015-01-01'),
 Row(employee_id='002', department_id='101', name='Jane Smith', age='25', gender='Female', salary='45000', hire_date='2016-02-15'),
 Row(employee_id='003', department_id='102', name='Bob Brown', age='35', gender='Male', salary='55000', hire_date='2014-05-01'),
 Row(employee_id='004', department_id='102', name='Alice Lee', age='28', gender='Female', salary='48000', hire_date='2017-09-30'),
 Row(employee_id='005', department_id='103', name='Jack Chan', age='40', gender='Male', salary='60000', hire_date='2013-04-01'),
 Row(employee_id='006', department_id='103', name='Jill Wong', age='32', gender='Female', salary='52000', hire_date='2018-07-01'),
 Row(employee_id='007', department_id='101', name='James Johnson', age='42', gender='Male', salary='70000', hire_date='2012-03-15'),
 Row(employee_id='008', department_id='102', name='Kate Kim', age='29', gender='Female'

In [6]:
# Check number of partitions

emp.rdd.getNumPartitions()

8

In [7]:
from pyspark.sql import functions as F

emp = emp.withColumn("partition_id", F.spark_partition_id())
emp.show()



+-----------+-------------+-------------+---+------+------+----------+------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|partition_id|
+-----------+-------------+-------------+---+------+------+----------+------------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|           0|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|           0|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|           1|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|           1|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|           2|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|           2|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|           3|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|           3|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|      

                                                                                

In [8]:
from pyspark import TaskContext
import os

def get_executor_partition_info(index, iterator):
    context = TaskContext.get()
    partition_id = context.partitionId()
    executor_id = os.getenv("SPARK_EXECUTOR_ID", "unknown")
    
    # Imprimir todas las variables de entorno para ver que SPARK_EXECUTOR_ID existe
    print(os.environ)
    
    return [f"Executor {executor_id} is processing partition {partition_id}"]

info = emp.rdd.mapPartitionsWithIndex(get_executor_partition_info).collect()

for line in info:
    print(line)



Executor 8d4d02b01ea7 is processing partition 0
Executor 8d4d02b01ea7 is processing partition 1
Executor de04a8c09e98 is processing partition 2
Executor de04a8c09e98 is processing partition 3
Executor 8d4d02b01ea7 is processing partition 4
Executor de04a8c09e98 is processing partition 5
Executor de04a8c09e98 is processing partition 6
Executor 8d4d02b01ea7 is processing partition 7


                                                                                

In [10]:
import socket

def get_executor_host(index, iterator):
    # Get the hostname of the machine processing this partition (i.e., the executor)
    executor_host = socket.gethostname()
    
    # Get partition ID from TaskContext
    context = TaskContext.get()
    partition_id = context.partitionId()

    return [f"Executor {executor_host} is processing partition {partition_id}"]

# Apply the function and collect the results
info = emp.rdd.mapPartitionsWithIndex(get_executor_host).collect()

# Print the collected information on the driver
for line in info:
    print(line)



Executor 79fff4e7ee83 is processing partition 0
Executor e5ee7fdde0bb is processing partition 1
Executor e5ee7fdde0bb is processing partition 2
Executor 79fff4e7ee83 is processing partition 3
Executor 79fff4e7ee83 is processing partition 4
Executor e5ee7fdde0bb is processing partition 5
Executor e5ee7fdde0bb is processing partition 6
Executor 79fff4e7ee83 is processing partition 7


                                                                                

In [12]:
emp.rdd.toDebugString()

b'(8) MapPartitionsRDD[17] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[16] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  SQLExecutionRDD[15] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[14] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[3] at map at SerDeUtil.scala:69 []\n |  MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:117 []\n |  PythonRDD[1] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287 []'

In [11]:
# Write our first Transformation (EMP salary > 50000)

emp_final1 = emp.where("salary > 50000")
emp_final2 = emp.filter("salary > 50000")
emp_final3 = emp.filter(F.col("salary") > 50000)

In [12]:
emp_final1.explain()

== Physical Plan ==
*(1) Filter (isnotnull(salary#5) AND (cast(salary#5 as int) > 50000))
+- *(1) Project [employee_id#0, department_id#1, name#2, age#3, gender#4, salary#5, hire_date#6, SPARK_PARTITION_ID() AS partition_id#43]
   +- *(1) Scan ExistingRDD[employee_id#0,department_id#1,name#2,age#3,gender#4,salary#5,hire_date#6]




In [None]:
emp_final2.explain()

In [None]:
# Validate number of Partitions

emp_final.rdd.getNumPartitions()

In [None]:
# Write data as CSV output (ACTION)

emp_final.write.format("csv").save("data/output/1/emp.csv")