In [1]:
# Spark Session
from pyspark.sql import SparkSession


spark = (
    SparkSession
    .builder
    .appName("Spark Introduction")
    .master("local[*]")
    .getOrCreate()
)
spark

In [3]:
# Emp Data & Schema
# Dataset has taken from internet
emp_data = [
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female","51000","2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male","75000","2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","Female","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

In [4]:
emp_df = spark.createDataFrame(schema=emp_schema, data=emp_data)

In [7]:
emp_df.schema

StructType([StructField('employee_id', StringType(), True), StructField('department_id', StringType(), True), StructField('name', StringType(), True), StructField('age', StringType(), True), StructField('gender', StringType(), True), StructField('salary', StringType(), True), StructField('hire_date', StringType(), True)])

In [21]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
schema_string = "name string,age int"
schema_spark = StructType([
    StructField("name",StringType(),False),
    StructField("age",IntegerType(),True)]
)
data = [["John",1],["Alice",2]]
data_df=spark.createDataFrame(data,schema_spark)
data_df.printSchema()
data_df.show()


root
 |-- name: string (nullable = false)
 |-- age: integer (nullable = true)

+-----+---+
| name|age|
+-----+---+
| John|  1|
|Alice|  2|
+-----+---+



In [35]:
# Refering a column
# Ways to select column col,expr,df.col,df['col'],'col'
# expr also used like col but also it will evalutes expression
from pyspark.sql.functions import col,expr
emp_filetered = emp_df.select(col("employee_id"),expr("department_id"),emp_df.salary,expr("salary+10000").alias("incremented_salary"),"hire_date",emp_df["age"])
emp_filetered.show()

+-----------+-------------+------+------------------+----------+---+
|employee_id|department_id|salary|incremented_salary| hire_date|age|
+-----------+-------------+------+------------------+----------+---+
|        001|          101| 50000|           60000.0|2015-01-01| 30|
|        002|          101| 45000|           55000.0|2016-02-15| 25|
|        003|          102| 55000|           65000.0|2014-05-01| 35|
|        004|          102| 48000|           58000.0|2017-09-30| 28|
|        005|          103| 60000|           70000.0|2013-04-01| 40|
|        006|          103| 52000|           62000.0|2018-07-01| 32|
|        007|          101| 70000|           80000.0|2012-03-15| 42|
|        008|          102| 51000|           61000.0|2019-10-01| 29|
|        009|          103| 58000|           68000.0|2016-06-01| 33|
|        010|          104| 47000|           57000.0|2018-08-01| 27|
|        011|          104| 65000|           75000.0|2015-11-01| 38|
|        012|          105| 54000|

In [41]:
    emp_cast = emp_df.select(expr("employee_id as emp_id "),expr("department_id as dept_id "),emp_df.salary,expr("cast(salary+10000 as int)").alias("incremented_salary"))
emp_cast.printSchema()
emp_cast.show()


root
 |-- emp_id: string (nullable = true)
 |-- dept_id: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- incremented_salary: integer (nullable = true)

+------+-------+------+------------------+
|emp_id|dept_id|salary|incremented_salary|
+------+-------+------+------------------+
|   001|    101| 50000|             60000|
|   002|    101| 45000|             55000|
|   003|    102| 55000|             65000|
|   004|    102| 48000|             58000|
|   005|    103| 60000|             70000|
|   006|    103| 52000|             62000|
|   007|    101| 70000|             80000|
|   008|    102| 51000|             61000|
|   009|    103| 58000|             68000|
|   010|    104| 47000|             57000|
|   011|    104| 65000|             75000|
|   012|    105| 54000|             64000|
|   013|    106| 75000|             85000|
|   014|    107| 46000|             56000|
|   015|    106| 63000|             73000|
|   016|    107| 49000|             59000|
|   017|   

In [48]:
# select + expr - selectExpr

emp_cast_expr = emp_df.selectExpr("employee_id as emp_id ","department_id as dept_id ","salary","cast(salary+10000 as int) as incremented_salary","cast(age as int)")
emp_cast_expr.printSchema()
emp_cast_expr.show()

root
 |-- emp_id: string (nullable = true)
 |-- dept_id: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- incremented_salary: integer (nullable = true)
 |-- age: integer (nullable = true)

+------+-------+------+------------------+---+
|emp_id|dept_id|salary|incremented_salary|age|
+------+-------+------+------------------+---+
|   001|    101| 50000|             60000| 30|
|   002|    101| 45000|             55000| 25|
|   003|    102| 55000|             65000| 35|
|   004|    102| 48000|             58000| 28|
|   005|    103| 60000|             70000| 40|
|   006|    103| 52000|             62000| 32|
|   007|    101| 70000|             80000| 42|
|   008|    102| 51000|             61000| 29|
|   009|    103| 58000|             68000| 33|
|   010|    104| 47000|             57000| 27|
|   011|    104| 65000|             75000| 38|
|   012|    105| 54000|             64000| 31|
|   013|    106| 75000|             85000| 45|
|   014|    107| 46000|             5600

In [54]:
emp_cast_final = emp_cast_expr.select("emp_id","dept_id","salary","age").where("age<30")
emp_cast_final.show()

+------+-------+------+---+
|emp_id|dept_id|salary|age|
+------+-------+------+---+
|   002|    101| 45000| 25|
|   004|    102| 48000| 28|
|   008|    102| 51000| 29|
|   010|    104| 47000| 27|
|   014|    107| 46000| 26|
|   018|    104| 50000| 29|
+------+-------+------+---+



In [60]:
emp_cast_final.repartition(1).write.format("csv").save("data/output/2_1.csv")

In [64]:
# How Spark convert schema_string into spark_schema

from pyspark.sql.types import _parse_datatype_string
schema_string = "name string,age int"

spark_schema = _parse_datatype_string(schema_string)

spark_schema

StructType([StructField('name', StringType(), True), StructField('age', IntegerType(), True)])