In [2]:
# !pip install pyspark

In [1]:
import os, sys

# SparkContext - Entry Point to PySpark Functionality
# SparkSession - Entry Point to PySpark to work with RDD
# - introduced in version 2.0
# - replaced SQLContext, HiveContext

from pyspark.sql import SparkSession

In [2]:
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [3]:
# master() - if you are running it on cluster you need to use master name
# ideally it would be either yarn or mesos

# local[x] - when running in standalone mode
# x - how many partitions it should create with RDD
# - ideally x value should be the number of CPU cores you have
spark = SparkSession.builder.master("local[1]").appName("SkillRisersV1").getOrCreate()

In [4]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x0000018666D44A60>


In [5]:
spark

In [6]:
# get existing SparkSession
# SparkSession.builder.getOrCreate()

In [7]:
# create new session
# SparkSession.newSession

In [8]:
# enable hive support with SparkSession
# SparkSession.builder.master("local[2]").appName("SparkSessionHive").enableHiveSupport().getOrCreate()

In [9]:
data = [
    ("John", "IT", 45000, 10000),
    ("Max", "IT", 25000, 15000),
    ("Shawn", "HR", 50000, 5000),
    ("John", "IT", 45000, 20000),
    ("Nick", "HR", 40000, 10000),
    ("Sam", "IT", 45000, 10000),
    ("Ricky", "HR", 70000, 35000),
    ("Steve", "IT", 20000, 10000),
    ("Mathew", "IT", 35000, 15000),
]

In [10]:
rdd = spark.sparkContext.parallelize(data)

In [11]:
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287

In [12]:
df = rdd.toDF()

In [13]:
df

DataFrame[_1: string, _2: string, _3: bigint, _4: bigint]

In [14]:
df.show()

+------+---+-----+-----+
|    _1| _2|   _3|   _4|
+------+---+-----+-----+
|  John| IT|45000|10000|
|   Max| IT|25000|15000|
| Shawn| HR|50000| 5000|
|  John| IT|45000|20000|
|  Nick| HR|40000|10000|
|   Sam| IT|45000|10000|
| Ricky| HR|70000|35000|
| Steve| IT|20000|10000|
|Mathew| IT|35000|15000|
+------+---+-----+-----+



In [15]:
rdd.getNumPartitions()

1

In [16]:
rdd.collect()

[('John', 'IT', 45000, 10000),
 ('Max', 'IT', 25000, 15000),
 ('Shawn', 'HR', 50000, 5000),
 ('John', 'IT', 45000, 20000),
 ('Nick', 'HR', 40000, 10000),
 ('Sam', 'IT', 45000, 10000),
 ('Ricky', 'HR', 70000, 35000),
 ('Steve', 'IT', 20000, 10000),
 ('Mathew', 'IT', 35000, 15000)]

In [17]:
# Add column names
columns = ["Name", "Dept", "Salary", "Incentive"]
df_2 = rdd.toDF(columns)

In [18]:
df_2.show()

+------+----+------+---------+
|  Name|Dept|Salary|Incentive|
+------+----+------+---------+
|  John|  IT| 45000|    10000|
|   Max|  IT| 25000|    15000|
| Shawn|  HR| 50000|     5000|
|  John|  IT| 45000|    20000|
|  Nick|  HR| 40000|    10000|
|   Sam|  IT| 45000|    10000|
| Ricky|  HR| 70000|    35000|
| Steve|  IT| 20000|    10000|
|Mathew|  IT| 35000|    15000|
+------+----+------+---------+



In [19]:
# create dataframe using SparkSession
# df_3 = spark.createDataFrame(rdd).toDF(*columns)

In [20]:
df_2.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Dept: string (nullable = true)
 |-- Salary: long (nullable = true)
 |-- Incentive: long (nullable = true)



In [21]:
# Create Dataframe with Schema

In [22]:
from pyspark.sql.types import StringType, StructType, IntegerType, StructField

In [23]:
schema = StructType([
    StructField("EmpName", StringType()),
    StructField("EmpDept", StringType()),
    StructField("EmpSal", IntegerType()),
    StructField("EmpIncentive", IntegerType())
])

In [24]:
schema

StructType([StructField('EmpName', StringType(), True), StructField('EmpDept', StringType(), True), StructField('EmpSal', IntegerType(), True), StructField('EmpIncentive', IntegerType(), True)])

In [25]:
df = spark.createDataFrame(data=data, schema=schema)

In [26]:
df.show()

+-------+-------+------+------------+
|EmpName|EmpDept|EmpSal|EmpIncentive|
+-------+-------+------+------------+
|   John|     IT| 45000|       10000|
|    Max|     IT| 25000|       15000|
|  Shawn|     HR| 50000|        5000|
|   John|     IT| 45000|       20000|
|   Nick|     HR| 40000|       10000|
|    Sam|     IT| 45000|       10000|
|  Ricky|     HR| 70000|       35000|
|  Steve|     IT| 20000|       10000|
| Mathew|     IT| 35000|       15000|
+-------+-------+------+------------+



In [28]:
# number of columns
len(df.dtypes)

4

In [29]:
# number of rows
df.count()

9

In [30]:
# first 3 rows
df.limit(3).show()

+-------+-------+------+------------+
|EmpName|EmpDept|EmpSal|EmpIncentive|
+-------+-------+------+------------+
|   John|     IT| 45000|       10000|
|    Max|     IT| 25000|       15000|
|  Shawn|     HR| 50000|        5000|
+-------+-------+------+------------+



In [31]:
df.limit(3).toPandas()

Unnamed: 0,EmpName,EmpDept,EmpSal,EmpIncentive
0,John,IT,45000,10000
1,Max,IT,25000,15000
2,Shawn,HR,50000,5000


In [32]:
# print some specific columns only
df.select("EmpName", "EmpSal").show(3)

+-------+------+
|EmpName|EmpSal|
+-------+------+
|   John| 45000|
|    Max| 25000|
|  Shawn| 50000|
+-------+------+
only showing top 3 rows



In [33]:
df.select(df.EmpName, df.EmpSal).show(3)

+-------+------+
|EmpName|EmpSal|
+-------+------+
|   John| 45000|
|    Max| 25000|
|  Shawn| 50000|
+-------+------+
only showing top 3 rows



In [34]:
# Indexing
df.select(df.columns[:3]).show(5)

+-------+-------+------+
|EmpName|EmpDept|EmpSal|
+-------+-------+------+
|   John|     IT| 45000|
|    Max|     IT| 25000|
|  Shawn|     HR| 50000|
|   John|     IT| 45000|
|   Nick|     HR| 40000|
+-------+-------+------+
only showing top 5 rows



In [35]:
df.select(df.columns[1:3]).show(5)

+-------+------+
|EmpDept|EmpSal|
+-------+------+
|     IT| 45000|
|     IT| 25000|
|     HR| 50000|
|     IT| 45000|
|     HR| 40000|
+-------+------+
only showing top 5 rows



In [36]:
from pyspark.sql.functions import col, lit

In [38]:
df.withColumn("Total Salary", col("EmpSal") + 5000).show()

+-------+-------+------+------------+------------+
|EmpName|EmpDept|EmpSal|EmpIncentive|Total Salary|
+-------+-------+------+------------+------------+
|   John|     IT| 45000|       10000|       50000|
|    Max|     IT| 25000|       15000|       30000|
|  Shawn|     HR| 50000|        5000|       55000|
|   John|     IT| 45000|       20000|       50000|
|   Nick|     HR| 40000|       10000|       45000|
|    Sam|     IT| 45000|       10000|       50000|
|  Ricky|     HR| 70000|       35000|       75000|
|  Steve|     IT| 20000|       10000|       25000|
| Mathew|     IT| 35000|       15000|       40000|
+-------+-------+------+------------+------------+



In [39]:
# perform addition of 2 columns
df.withColumn("Total Salary", col("EmpSal") + col("EmpIncentive")).show()

+-------+-------+------+------------+------------+
|EmpName|EmpDept|EmpSal|EmpIncentive|Total Salary|
+-------+-------+------+------------+------------+
|   John|     IT| 45000|       10000|       55000|
|    Max|     IT| 25000|       15000|       40000|
|  Shawn|     HR| 50000|        5000|       55000|
|   John|     IT| 45000|       20000|       65000|
|   Nick|     HR| 40000|       10000|       50000|
|    Sam|     IT| 45000|       10000|       55000|
|  Ricky|     HR| 70000|       35000|      105000|
|  Steve|     IT| 20000|       10000|       30000|
| Mathew|     IT| 35000|       15000|       50000|
+-------+-------+------+------------+------------+



In [41]:
# change data type of a column
df.withColumn("EmpSal", col("EmpSal").cast("Integer")).show()

+-------+-------+------+------------+
|EmpName|EmpDept|EmpSal|EmpIncentive|
+-------+-------+------+------------+
|   John|     IT| 45000|       10000|
|    Max|     IT| 25000|       15000|
|  Shawn|     HR| 50000|        5000|
|   John|     IT| 45000|       20000|
|   Nick|     HR| 40000|       10000|
|    Sam|     IT| 45000|       10000|
|  Ricky|     HR| 70000|       35000|
|  Steve|     IT| 20000|       10000|
| Mathew|     IT| 35000|       15000|
+-------+-------+------+------------+



In [43]:
# Drop a column
df.drop("EmpIncentive").show(4)

+-------+-------+------+
|EmpName|EmpDept|EmpSal|
+-------+-------+------+
|   John|     IT| 45000|
|    Max|     IT| 25000|
|  Shawn|     HR| 50000|
|   John|     IT| 45000|
+-------+-------+------+
only showing top 4 rows



In [45]:
df = df.withColumn("EmpGender", lit("Male"))

In [46]:
df.show()

+-------+-------+------+------------+---------+
|EmpName|EmpDept|EmpSal|EmpIncentive|EmpGender|
+-------+-------+------+------------+---------+
|   John|     IT| 45000|       10000|     Male|
|    Max|     IT| 25000|       15000|     Male|
|  Shawn|     HR| 50000|        5000|     Male|
|   John|     IT| 45000|       20000|     Male|
|   Nick|     HR| 40000|       10000|     Male|
|    Sam|     IT| 45000|       10000|     Male|
|  Ricky|     HR| 70000|       35000|     Male|
|  Steve|     IT| 20000|       10000|     Male|
| Mathew|     IT| 35000|       15000|     Male|
+-------+-------+------+------------+---------+



In [47]:
# Filter data

In [48]:
df.filter(col("EmpDept") == "IT").show()

+-------+-------+------+------------+---------+
|EmpName|EmpDept|EmpSal|EmpIncentive|EmpGender|
+-------+-------+------+------------+---------+
|   John|     IT| 45000|       10000|     Male|
|    Max|     IT| 25000|       15000|     Male|
|   John|     IT| 45000|       20000|     Male|
|    Sam|     IT| 45000|       10000|     Male|
|  Steve|     IT| 20000|       10000|     Male|
| Mathew|     IT| 35000|       15000|     Male|
+-------+-------+------+------------+---------+



In [49]:
df.filter(df.EmpDept == "IT").show()

+-------+-------+------+------------+---------+
|EmpName|EmpDept|EmpSal|EmpIncentive|EmpGender|
+-------+-------+------+------------+---------+
|   John|     IT| 45000|       10000|     Male|
|    Max|     IT| 25000|       15000|     Male|
|   John|     IT| 45000|       20000|     Male|
|    Sam|     IT| 45000|       10000|     Male|
|  Steve|     IT| 20000|       10000|     Male|
| Mathew|     IT| 35000|       15000|     Male|
+-------+-------+------+------------+---------+



In [50]:
df.filter(df.EmpDept != "IT").show()

+-------+-------+------+------------+---------+
|EmpName|EmpDept|EmpSal|EmpIncentive|EmpGender|
+-------+-------+------+------------+---------+
|  Shawn|     HR| 50000|        5000|     Male|
|   Nick|     HR| 40000|       10000|     Male|
|  Ricky|     HR| 70000|       35000|     Male|
+-------+-------+------+------------+---------+



In [51]:
df.filter((df.EmpDept == "IT") & (df.EmpSal >= 40000)).show()

+-------+-------+------+------------+---------+
|EmpName|EmpDept|EmpSal|EmpIncentive|EmpGender|
+-------+-------+------+------------+---------+
|   John|     IT| 45000|       10000|     Male|
|   John|     IT| 45000|       20000|     Male|
|    Sam|     IT| 45000|       10000|     Male|
+-------+-------+------+------------+---------+



In [53]:
# Sort data - by default ascending order
df.sort(col("EmpSal")).show(5)

+-------+-------+------+------------+---------+
|EmpName|EmpDept|EmpSal|EmpIncentive|EmpGender|
+-------+-------+------+------------+---------+
|  Steve|     IT| 20000|       10000|     Male|
|    Max|     IT| 25000|       15000|     Male|
| Mathew|     IT| 35000|       15000|     Male|
|   Nick|     HR| 40000|       10000|     Male|
|   John|     IT| 45000|       10000|     Male|
+-------+-------+------+------------+---------+
only showing top 5 rows



In [55]:
# sort data in desc order
df.sort(col("EmpSal").desc()).show(5)

+-------+-------+------+------------+---------+
|EmpName|EmpDept|EmpSal|EmpIncentive|EmpGender|
+-------+-------+------+------------+---------+
|  Ricky|     HR| 70000|       35000|     Male|
|  Shawn|     HR| 50000|        5000|     Male|
|   John|     IT| 45000|       10000|     Male|
|   John|     IT| 45000|       20000|     Male|
|    Sam|     IT| 45000|       10000|     Male|
+-------+-------+------+------------+---------+
only showing top 5 rows



In [56]:
# Group By
df.groupBy("EmpDept").sum("EmpSal").show()

+-------+-----------+
|EmpDept|sum(EmpSal)|
+-------+-----------+
|     HR|     160000|
|     IT|     215000|
+-------+-----------+



In [57]:
from pyspark.sql.functions import sum, avg, min, max, count

In [58]:
df.groupBy("EmpDept").agg(sum("EmpSal").alias("Total Salary"),
                         max("EmpSal").alias("Max Salary"),
                         avg("EmpSal").alias("Average Salary")).show()

+-------+------------+----------+------------------+
|EmpDept|Total Salary|Max Salary|    Average Salary|
+-------+------------+----------+------------------+
|     HR|      160000|     70000|53333.333333333336|
|     IT|      215000|     45000|35833.333333333336|
+-------+------------+----------+------------------+

