In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 40 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 47.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=9c7971e2a0f0f3bc9b13742d06650cb6e400e15636d852807d7e87f01452c0df
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [None]:
from pyspark.sql.types import StringType, StructField, IntegerType, StructType
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum,min,max,avg,col,lit,when
from pyspark.sql.functions import *

**Creating Spark Session**

In [None]:
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()

**DataFrame creation using createDataFrame()**

In [None]:
data = [('A', 'B', '', '123456', 'M', 10000),
        ('P', '', 'R', '745126', 'M', 10500),
        ('Q', 'B', 'S', '9632541', 'F', 20000),
        ('W', 'D', 'G', '78541203', 'F', 19000),
        ('M', '', 'N', '456789', 'M', 1200)]

column = ['FirstName', 'MiddleName', 'lastName', 'EMP_Id', 'Gender', 'Salary']

df = spark.createDataFrame(data=data, schema=column)

df.printSchema()
df.show()

root
 |-- FirstName: string (nullable = true)
 |-- MiddleName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- EMP_Id: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Salary: long (nullable = true)

+---------+----------+--------+--------+------+------+
|FirstName|MiddleName|lastName|  EMP_Id|Gender|Salary|
+---------+----------+--------+--------+------+------+
|        A|         B|        |  123456|     M| 10000|
|        P|          |       R|  745126|     M| 10500|
|        Q|         B|       S| 9632541|     F| 20000|
|        W|         D|       G|78541203|     F| 19000|
|        M|          |       N|  456789|     M|  1200|
+---------+----------+--------+--------+------+------+



**Creating empty data frame**

In [None]:
empty_schema = StructType([
    StructField('first_name', StringType(), True),
    StructField('middle_name', StringType(), True),
    StructField('surname', StringType(), True)
])

df2 = spark.createDataFrame([], empty_schema)
df2.printSchema()

root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- surname: string (nullable = true)



**Creating DataFrame from CSV**

In [None]:
df3 = spark.read.csv('emp_data.csv',header= True)
df3.printSchema()
df3.show(10)

root
 |-- emp_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- salary: string (nullable = true)

+------+----------+---------+-----------+----------------+------+
|emp_id|first_name|last_name|     gender|     designation|salary|
+------+----------+---------+-----------+----------------+------+
|     1|Washington| Clifford|       Male|  Human Resource| 34544|
|     2|     Letti|Rosekilly|     Female|Testing engineer| 26411|
|     3|      Gray| Lavielle|       Male|            null| 23445|
|     4|      Ulla|    Bound|     Female|   Data Engineer|  null|
|     5|    Korney|   Lomath|     Female|   Data Engineer| 41360|
|     6|     Wanda|     null|    Agender|           Sales| 29984|
|     7|     Denis| Cleveley|       Male|Testing engineer| 47485|
|     8|   Artemas| Bolsover|       Male|            null| 41185|
|     9|   Kerstin| De Meyer

**Select Single & Multiple Columns From PySpark**

In [None]:
df3.select(df3['designation']).show(5)
df3.select('first_name','last_name').show(5)

+----------------+
|     designation|
+----------------+
|  Human Resource|
|Testing engineer|
|            null|
|   Data Engineer|
|   Data Engineer|
+----------------+
only showing top 5 rows

+----------+---------+
|first_name|last_name|
+----------+---------+
|Washington| Clifford|
|     Letti|Rosekilly|
|      Gray| Lavielle|
|      Ulla|    Bound|
|    Korney|   Lomath|
+----------+---------+
only showing top 5 rows



**Select All Columns**

In [None]:
df3.select('*').show(10)

+------+----------+---------+-----------+----------------+------+
|emp_id|first_name|last_name|     gender|     designation|salary|
+------+----------+---------+-----------+----------------+------+
|     1|Washington| Clifford|       Male|  Human Resource| 34544|
|     2|     Letti|Rosekilly|     Female|Testing engineer| 26411|
|     3|      Gray| Lavielle|       Male|            null| 23445|
|     4|      Ulla|    Bound|     Female|   Data Engineer|  null|
|     5|    Korney|   Lomath|     Female|   Data Engineer| 41360|
|     6|     Wanda|     null|    Agender|           Sales| 29984|
|     7|     Denis| Cleveley|       Male|Testing engineer| 47485|
|     8|   Artemas| Bolsover|       Male|            null| 41185|
|     9|   Kerstin| De Meyer|Genderqueer|   Data Engineer| 34899|
|    10|    Dulsea|  Esilmon|     Female|   Data Engineer| 29043|
+------+----------+---------+-----------+----------------+------+
only showing top 10 rows



**Select Columns by Index**

In [None]:
df3.select(df3.columns[2:6]).show(5)

+---------+------+----------------+------+
|last_name|gender|     designation|salary|
+---------+------+----------------+------+
| Clifford|  Male|  Human Resource| 34544|
|Rosekilly|Female|Testing engineer| 26411|
| Lavielle|  Male|            null| 23445|
|    Bound|Female|   Data Engineer|  null|
|   Lomath|Female|   Data Engineer| 41360|
+---------+------+----------------+------+
only showing top 5 rows



**Collect() – Retrieve data from DataFrame**

In [None]:
df3.collect()[6][4]

'Testing engineer'

**Change DataType using PySpark withColumn()**

In [None]:
df3 = df3.withColumn('emp_id', df3['emp_id'].cast('integer'))
df3 = df3.withColumn('salary', df3['salary'].cast('integer'))
df3.printSchema()

root
 |-- emp_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- salary: integer (nullable = true)



**Update The Value of an Existing Column**

In [None]:
df3 = df3.withColumn("emp_id",df3["emp_id"]+100)
df3.show(5)

+------+----------+---------+------+----------------+------+
|emp_id|first_name|last_name|gender|     designation|salary|
+------+----------+---------+------+----------------+------+
|   101|Washington| Clifford|  Male|  Human Resource| 34544|
|   102|     Letti|Rosekilly|Female|Testing engineer| 26411|
|   103|      Gray| Lavielle|  Male|            null| 23445|
|   104|      Ulla|    Bound|Female|   Data Engineer|  null|
|   105|    Korney|   Lomath|Female|   Data Engineer| 41360|
+------+----------+---------+------+----------------+------+
only showing top 5 rows



**DataFrame filter() with Column Condition**

In [None]:
df3.filter((df3.designation == 'Human Resource') & (df3.gender == 'Male')).show()

+------+----------+---------+------+--------------+------+
|emp_id|first_name|last_name|gender|   designation|salary|
+------+----------+---------+------+--------------+------+
|   101|Washington| Clifford|  Male|Human Resource| 34544|
|   111|     Torey|     null|  Male|Human Resource| 44046|
|   113|      Redd|    Rosel|  Male|Human Resource| 45594|
|   126|     Byrom|    Omand|  Male|Human Resource| 46385|
|   142|   Hershel|   Monsey|  Male|Human Resource| 16789|
|   145|    Luther|     null|  Male|Human Resource| 40264|
|   147|   Umberto|  Fearney|  Male|Human Resource| 35256|
|   150|    Cesare|  Dionisi|  Male|Human Resource| 37793|
|   151|   Cheston|   Glasby|  Male|Human Resource| 35908|
|   169|     Penny|   Creebo|  Male|Human Resource| 15699|
|   171|  Vincenty|   Bordes|  Male|Human Resource| 49213|
|   197|    Hastie|  Sprowle|  Male|Human Resource| 48848|
+------+----------+---------+------+--------------+------+



**Filter based on given list**

In [None]:
department_list = ['Sales', 'Marketing']
df3.filter(df3.designation.isin(department_list)).show()

+------+----------+----------+----------+-----------+------+
|emp_id|first_name| last_name|    gender|designation|salary|
+------+----------+----------+----------+-----------+------+
|   106|     Wanda|      null|   Agender|      Sales| 29984|
|   112|   Clarita| Grollmann|    Female|      Sales| 12377|
|   117|      Dolf| Dumpleton|      Male|      Sales| 20597|
|   129|    Easter|   Pautard|    Female|      Sales| 39305|
|   132|  Cristina|     Skypp|    Female|      Sales|  null|
|   134|      Mada|    Hoggan|    Female|      Sales|  null|
|   136|   Willdon|      null|      Male|      Sales| 49289|
|   137|     Tania| Habergham|    Female|      Sales| 14313|
|   153|     Vanni|   Grinyer|Non-binary|      Sales| 17509|
|   156|     Angil|   Shimman|    Female|      Sales| 40145|
|   157|   Maurits|  Lerhinan|      Male|      Sales| 33757|
|   181|      Toni|    Ruffey|    Female|      Sales| 11198|
|   184|  Alasdair|   Campion|      Male|      Sales| 31193|
|   187|   Krystle|Giova

**Filter based on starts with**

In [None]:
df3.filter(df3.first_name.startswith('A')).show()

+------+-----------+---------+-----------+----------------+------+
|emp_id| first_name|last_name|     gender|     designation|salary|
+------+-----------+---------+-----------+----------------+------+
|   108|    Artemas| Bolsover|       Male|            null| 41185|
|   115|     Antoni|Beveridge|       Male|   Data Engineer| 20837|
|   124|     Angela|Kirkbride|     Female|  Human Resource| 43482|
|   127|    Alameda|  MacNeil|     Female|           Admin| 10540|
|   130|       Aime|   MacIan|     Female|Testing engineer|  null|
|   156|      Angil|  Shimman|     Female|           Sales| 40145|
|   160|     Austin|Tincknell|     Female|            null| 45436|
|   170|      Addia|     null|Genderfluid|           Admin| 16910|
|   178|Alexandrina| Tregidgo|     Female|   Data Engineer| 37395|
|   184|   Alasdair|  Campion|       Male|           Sales| 31193|
|   196|      Adams|     Yaus|       Male|            null| 23976|
+------+-----------+---------+-----------+----------------+---

**Filter based on ends with**

In [None]:
df3.filter(df3.last_name.endswith('n')).show()

+------+----------+---------+------+----------------+------+
|emp_id|first_name|last_name|gender|     designation|salary|
+------+----------+---------+------+----------------+------+
|   110|    Dulsea|  Esilmon|Female|   Data Engineer| 29043|
|   112|   Clarita|Grollmann|Female|           Sales| 12377|
|   114| Christean|     Senn|Female|           Admin| 22343|
|   117|      Dolf|Dumpleton|  Male|           Sales| 20597|
|   122|     Dredi|  Jeandon|Female|           Admin| 32729|
|   128|     Grete|   Beaven|Female|   Data Engineer| 23312|
|   130|      Aime|   MacIan|Female|Testing engineer|  null|
|   134|      Mada|   Hoggan|Female|           Sales|  null|
|   144|   Sherrie|   Harron|Female|   Data Engineer| 18994|
|   154|  Madeline|  Wagenen|Female|           Admin| 11224|
|   156|     Angil|  Shimman|Female|           Sales| 40145|
|   157|   Maurits| Lerhinan|  Male|           Sales| 33757|
|   164|   Chelsea|   Dixson|Female|Testing engineer| 12369|
|   183|     Odele| Dumm

**Filter based on contains**

In [None]:
df3.filter(df3.designation.contains('Engineer')).show()

+------+-----------+---------+-----------+-------------+------+
|emp_id| first_name|last_name|     gender|  designation|salary|
+------+-----------+---------+-----------+-------------+------+
|   104|       Ulla|    Bound|     Female|Data Engineer|  null|
|   105|     Korney|   Lomath|     Female|Data Engineer| 41360|
|   109|    Kerstin| De Meyer|Genderqueer|Data Engineer| 34899|
|   110|     Dulsea|  Esilmon|     Female|Data Engineer| 29043|
|   115|     Antoni|Beveridge|       Male|Data Engineer| 20837|
|   128|      Grete|   Beaven|     Female|Data Engineer| 23312|
|   133|   Lorianne|   Dooney|     Female|Data Engineer| 16747|
|   139|     Patton|     null|       Male|Data Engineer| 36191|
|   141|   Waldemar|   Krabbe|       Male|Data Engineer| 49930|
|   143|       Dene|   Pottie|       Male|Data Engineer| 13218|
|   144|    Sherrie|   Harron|     Female|Data Engineer| 18994|
|   148|    Othello|  Darnody|       Male|Data Engineer| 26357|
|   149|     Odella|  Carhart|     Femal

**Filter like()**

In [None]:
df3.filter(df3.first_name.like('%on%')).show()

+------+----------+---------+------+----------------+------+
|emp_id|first_name|last_name|gender|     designation|salary|
+------+----------+---------+------+----------------+------+
|   101|Washington| Clifford|  Male|  Human Resource| 34544|
|   115|    Antoni|Beveridge|  Male|   Data Engineer| 20837|
|   136|   Willdon|     null|  Male|           Sales| 49289|
|   139|    Patton|     null|  Male|   Data Engineer| 36191|
|   151|   Cheston|   Glasby|  Male|  Human Resource| 35908|
|   174|    Donnie|     null|Female|Testing engineer| 46181|
|   181|      Toni|   Ruffey|Female|           Sales| 11198|
+------+----------+---------+------+----------------+------+



**Filter rlike()**

In [None]:
df3.filter(df3.first_name.rlike('(?i)^*on$')).show()

+------+----------+---------+------+--------------+------+
|emp_id|first_name|last_name|gender|   designation|salary|
+------+----------+---------+------+--------------+------+
|   101|Washington| Clifford|  Male|Human Resource| 34544|
|   136|   Willdon|     null|  Male|         Sales| 49289|
|   139|    Patton|     null|  Male| Data Engineer| 36191|
|   151|   Cheston|   Glasby|  Male|Human Resource| 35908|
+------+----------+---------+------+--------------+------+



**Data frame sorting using sort()**

In [None]:
df3.sort('first_name', 'designation').show()

+------+-----------+---------+-----------+----------------+------+
|emp_id| first_name|last_name|     gender|     designation|salary|
+------+-----------+---------+-----------+----------------+------+
|   196|      Adams|     Yaus|       Male|            null| 23976|
|   170|      Addia|     null|Genderfluid|           Admin| 16910|
|   130|       Aime|   MacIan|     Female|Testing engineer|  null|
|   127|    Alameda|  MacNeil|     Female|           Admin| 10540|
|   184|   Alasdair|  Campion|       Male|           Sales| 31193|
|   178|Alexandrina| Tregidgo|     Female|   Data Engineer| 37395|
|   124|     Angela|Kirkbride|     Female|  Human Resource| 43482|
|   156|      Angil|  Shimman|     Female|           Sales| 40145|
|   115|     Antoni|Beveridge|       Male|   Data Engineer| 20837|
|   108|    Artemas| Bolsover|       Male|            null| 41185|
|   160|     Austin|Tincknell|     Female|            null| 45436|
|   119|      Baxie|  Smewing|       Male|           Admin| 45

**Data frame sorting using orderBy() in ascending**

In [None]:
df3.orderBy(df3.gender.asc(),df3.designation.asc()).show(5)

+------+----------+---------+--------+----------------+------+
|emp_id|first_name|last_name|  gender|     designation|salary|
+------+----------+---------+--------+----------------+------+
|   106|     Wanda|     null| Agender|           Sales| 29984|
|   182|     Grata|    Drife| Agender|Testing engineer| 46602|
|   162|   Lindsay|    Semor|Bigender|   Data Engineer| 17544|
|   160|    Austin|Tincknell|  Female|            null| 45436|
|   118|      Reba|   Cleall|  Female|            null| 19583|
+------+----------+---------+--------+----------------+------+
only showing top 5 rows



**Data frame sorting using orderBy in descending**

In [None]:
df3.orderBy(df3.gender.asc(),df3.designation.desc()).show(5)

+------+----------+---------+--------+----------------+------+
|emp_id|first_name|last_name|  gender|     designation|salary|
+------+----------+---------+--------+----------------+------+
|   182|     Grata|    Drife| Agender|Testing engineer| 46602|
|   106|     Wanda|     null| Agender|           Sales| 29984|
|   162|   Lindsay|    Semor|Bigender|   Data Engineer| 17544|
|   152|    Rebeka|    Goard|  Female|Testing engineer| 12481|
|   102|     Letti|Rosekilly|  Female|Testing engineer| 26411|
+------+----------+---------+--------+----------------+------+
only showing top 5 rows



**groupBy() on Data frame**

In [None]:
df3.groupBy('designation').agg(sum('salary').alias('Total Salary')).show()

+----------------+------------+
|     designation|Total Salary|
+----------------+------------+
|           Sales|      365781|
|            null|      451944|
|           Admin|      348253|
|  Human Resource|      573320|
|Testing engineer|      499253|
|   Data Engineer|      471702|
+----------------+------------+



In [None]:
df3.groupBy('designation').count().show()

+----------------+-----+
|     designation|count|
+----------------+-----+
|           Sales|   16|
|            null|   15|
|           Admin|   15|
|  Human Resource|   16|
|Testing engineer|   19|
|   Data Engineer|   19|
+----------------+-----+



In [None]:
df3.groupBy('designation').agg(min('salary').alias('Minimum Salary')).show()

+----------------+--------------+
|     designation|Minimum Salary|
+----------------+--------------+
|           Sales|         11198|
|            null|         19583|
|           Admin|         10540|
|  Human Resource|         15699|
|Testing engineer|         10018|
|   Data Engineer|         13218|
+----------------+--------------+



In [None]:
df3.groupBy('designation').agg(max('salary').alias('Maximum Salary')).show()

+----------------+--------------+
|     designation|Maximum Salary|
+----------------+--------------+
|           Sales|         49289|
|            null|         49985|
|           Admin|         48677|
|  Human Resource|         49213|
|Testing engineer|         47485|
|   Data Engineer|         49930|
+----------------+--------------+



**filter() on data frame**

In [None]:
df3.filter((df3.designation == 'Human Resource')).show(5)

+------+----------+---------+------+--------------+------+
|emp_id|first_name|last_name|gender|   designation|salary|
+------+----------+---------+------+--------------+------+
|   101|Washington| Clifford|  Male|Human Resource| 34544|
|   111|     Torey|     null|  Male|Human Resource| 44046|
|   113|      Redd|    Rosel|  Male|Human Resource| 45594|
|   124|    Angela|Kirkbride|Female|Human Resource| 43482|
|   126|     Byrom|    Omand|  Male|Human Resource| 46385|
+------+----------+---------+------+--------------+------+
only showing top 5 rows



**filter() to show null values**

In [None]:
df3.filter('salary is null').show(50)

+------+----------+----------+------+----------------+------+
|emp_id|first_name| last_name|gender|     designation|salary|
+------+----------+----------+------+----------------+------+
|   104|      Ulla|     Bound|Female|   Data Engineer|  null|
|   130|      Aime|    MacIan|Female|Testing engineer|  null|
|   132|  Cristina|     Skypp|Female|           Sales|  null|
|   134|      Mada|    Hoggan|Female|           Sales|  null|
|   135|  Gamaliel|Catcheside|  Male|            null|  null|
|   155|     Onfre|  Matteoli|  Male|           Admin|  null|
|   172|     Smith|   Tilsley|  Male|   Data Engineer|  null|
|   183|     Odele|  Dummigan|Female|            null|  null|
|   186| Francoise|      Deme|Female|           Admin|  null|
|   187|   Krystle|Giovanardi|Female|           Sales|  null|
+------+----------+----------+------+----------------+------+



In [None]:
emp = [(1,"A",-1,"2018","10","M",50000), \
    (2,"B",1,"2010","10","M",20000), \
    (3,"C",1,"2010","20","M",4000), \
    (4,"D",2,"2005","20","F",2000), \
    (5,"E",2,"2010","20","",2000), \
      (6,"F",2,"2010","20","",2000), \
       (7,"g",2,"2010","20","",1000),\
       (6,"h",2,"2010","30","",4500),\
       (6,"i",2,"2010","30","",3000),\
       (6,"j",2,"2010","30","",2500),\
       (6,"k",2,"2010","30","",2500),\
       (6,"l",2,"2010","30","",2000),\
       (6,"k",2,"2010","40","",5000),\
       (6,"m",2,"2010","50","",8000)
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]
empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.show()

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.show()

+------+----+---------------+-----------+-----------+------+------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+----+---------------+-----------+-----------+------+------+
|     1|   A|             -1|       2018|         10|     M| 50000|
|     2|   B|              1|       2010|         10|     M| 20000|
|     3|   C|              1|       2010|         20|     M|  4000|
|     4|   D|              2|       2005|         20|     F|  2000|
|     5|   E|              2|       2010|         20|      |  2000|
|     6|   F|              2|       2010|         20|      |  2000|
|     7|   g|              2|       2010|         20|      |  1000|
|     6|   h|              2|       2010|         30|      |  4500|
|     6|   i|              2|       2010|         30|      |  3000|
|     6|   j|              2|       2010|         30|      |  2500|
|     6|   k|              2|       2010|         30|      |  2500|
|     6|   l|              2|       2010|       

**Inner join dataframe**

In [None]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner").show()

+------+----+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+----+---------------+-----------+-----------+------+------+---------+-------+
|     1|   A|             -1|       2018|         10|     M| 50000|  Finance|     10|
|     2|   B|              1|       2010|         10|     M| 20000|  Finance|     10|
|     3|   C|              1|       2010|         20|     M|  4000|Marketing|     20|
|     4|   D|              2|       2005|         20|     F|  2000|Marketing|     20|
|     5|   E|              2|       2010|         20|      |  2000|Marketing|     20|
|     6|   F|              2|       2010|         20|      |  2000|Marketing|     20|
|     7|   g|              2|       2010|         20|      |  1000|Marketing|     20|
|     6|   h|              2|       2010|         30|      |  4500|    Sales|     30|
|     6|   i|              2|       2010|         30| 

**Outer join data frame**

In [None]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer").show()

+------+----+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+----+---------------+-----------+-----------+------+------+---------+-------+
|     1|   A|             -1|       2018|         10|     M| 50000|  Finance|     10|
|     2|   B|              1|       2010|         10|     M| 20000|  Finance|     10|
|     3|   C|              1|       2010|         20|     M|  4000|Marketing|     20|
|     4|   D|              2|       2005|         20|     F|  2000|Marketing|     20|
|     5|   E|              2|       2010|         20|      |  2000|Marketing|     20|
|     6|   F|              2|       2010|         20|      |  2000|Marketing|     20|
|     7|   g|              2|       2010|         20|      |  1000|Marketing|     20|
|     6|   h|              2|       2010|         30|      |  4500|    Sales|     30|
|     6|   i|              2|       2010|         30| 

**Full Join Dataframe**

In [None]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full").show()

+------+----+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+----+---------------+-----------+-----------+------+------+---------+-------+
|     1|   A|             -1|       2018|         10|     M| 50000|  Finance|     10|
|     2|   B|              1|       2010|         10|     M| 20000|  Finance|     10|
|     3|   C|              1|       2010|         20|     M|  4000|Marketing|     20|
|     4|   D|              2|       2005|         20|     F|  2000|Marketing|     20|
|     5|   E|              2|       2010|         20|      |  2000|Marketing|     20|
|     6|   F|              2|       2010|         20|      |  2000|Marketing|     20|
|     7|   g|              2|       2010|         20|      |  1000|Marketing|     20|
|     6|   h|              2|       2010|         30|      |  4500|    Sales|     30|
|     6|   i|              2|       2010|         30| 

**Fullouter Join Datframe**

In [None]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter").show()

+------+----+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+----+---------------+-----------+-----------+------+------+---------+-------+
|     1|   A|             -1|       2018|         10|     M| 50000|  Finance|     10|
|     2|   B|              1|       2010|         10|     M| 20000|  Finance|     10|
|     3|   C|              1|       2010|         20|     M|  4000|Marketing|     20|
|     4|   D|              2|       2005|         20|     F|  2000|Marketing|     20|
|     5|   E|              2|       2010|         20|      |  2000|Marketing|     20|
|     6|   F|              2|       2010|         20|      |  2000|Marketing|     20|
|     7|   g|              2|       2010|         20|      |  1000|Marketing|     20|
|     6|   h|              2|       2010|         30|      |  4500|    Sales|     30|
|     6|   i|              2|       2010|         30| 

**Left Join Dataframe**

In [None]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show()

+------+----+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+----+---------------+-----------+-----------+------+------+---------+-------+
|     6|   m|              2|       2010|         50|      |  8000|     null|   null|
|     1|   A|             -1|       2018|         10|     M| 50000|  Finance|     10|
|     2|   B|              1|       2010|         10|     M| 20000|  Finance|     10|
|     6|   h|              2|       2010|         30|      |  4500|    Sales|     30|
|     6|   i|              2|       2010|         30|      |  3000|    Sales|     30|
|     6|   j|              2|       2010|         30|      |  2500|    Sales|     30|
|     6|   k|              2|       2010|         30|      |  2500|    Sales|     30|
|     6|   l|              2|       2010|         30|      |  2000|    Sales|     30|
|     3|   C|              1|       2010|         20| 

**Leftouter Join Dataframe**

In [None]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftouter").show()

+------+----+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+----+---------------+-----------+-----------+------+------+---------+-------+
|     6|   m|              2|       2010|         50|      |  8000|     null|   null|
|     1|   A|             -1|       2018|         10|     M| 50000|  Finance|     10|
|     2|   B|              1|       2010|         10|     M| 20000|  Finance|     10|
|     6|   h|              2|       2010|         30|      |  4500|    Sales|     30|
|     6|   i|              2|       2010|         30|      |  3000|    Sales|     30|
|     6|   j|              2|       2010|         30|      |  2500|    Sales|     30|
|     6|   k|              2|       2010|         30|      |  2500|    Sales|     30|
|     6|   l|              2|       2010|         30|      |  2000|    Sales|     30|
|     3|   C|              1|       2010|         20| 

**Right Join Dataframe**

In [None]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id,"right").show()

+------+----+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+----+---------------+-----------+-----------+------+------+---------+-------+
|     2|   B|              1|       2010|         10|     M| 20000|  Finance|     10|
|     1|   A|             -1|       2018|         10|     M| 50000|  Finance|     10|
|     6|   l|              2|       2010|         30|      |  2000|    Sales|     30|
|     6|   k|              2|       2010|         30|      |  2500|    Sales|     30|
|     6|   j|              2|       2010|         30|      |  2500|    Sales|     30|
|     6|   i|              2|       2010|         30|      |  3000|    Sales|     30|
|     6|   h|              2|       2010|         30|      |  4500|    Sales|     30|
|     7|   g|              2|       2010|         20|      |  1000|Marketing|     20|
|     6|   F|              2|       2010|         20| 

**Right Outer join Dataframe**

In [None]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'rightouter').show()

+------+----+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+----+---------------+-----------+-----------+------+------+---------+-------+
|     2|   B|              1|       2010|         10|     M| 20000|  Finance|     10|
|     1|   A|             -1|       2018|         10|     M| 50000|  Finance|     10|
|     6|   l|              2|       2010|         30|      |  2000|    Sales|     30|
|     6|   k|              2|       2010|         30|      |  2500|    Sales|     30|
|     6|   j|              2|       2010|         30|      |  2500|    Sales|     30|
|     6|   i|              2|       2010|         30|      |  3000|    Sales|     30|
|     6|   h|              2|       2010|         30|      |  4500|    Sales|     30|
|     7|   g|              2|       2010|         20|      |  1000|Marketing|     20|
|     6|   F|              2|       2010|         20| 

**Left Semi join Dataframe**

In [None]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'leftsemi').show()
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'inner').show()

+------+----+---------------+-----------+-----------+------+------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+----+---------------+-----------+-----------+------+------+
|     1|   A|             -1|       2018|         10|     M| 50000|
|     2|   B|              1|       2010|         10|     M| 20000|
|     3|   C|              1|       2010|         20|     M|  4000|
|     4|   D|              2|       2005|         20|     F|  2000|
|     5|   E|              2|       2010|         20|      |  2000|
|     6|   F|              2|       2010|         20|      |  2000|
|     7|   g|              2|       2010|         20|      |  1000|
|     6|   h|              2|       2010|         30|      |  4500|
|     6|   i|              2|       2010|         30|      |  3000|
|     6|   j|              2|       2010|         30|      |  2500|
|     6|   k|              2|       2010|         30|      |  2500|
|     6|   l|              2|       2010|       

**Left Anti Join Dataframe**

In [None]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'leftsemi').show()
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'leftanti').show()

+------+----+---------------+-----------+-----------+------+------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+----+---------------+-----------+-----------+------+------+
|     1|   A|             -1|       2018|         10|     M| 50000|
|     2|   B|              1|       2010|         10|     M| 20000|
|     3|   C|              1|       2010|         20|     M|  4000|
|     4|   D|              2|       2005|         20|     F|  2000|
|     5|   E|              2|       2010|         20|      |  2000|
|     6|   F|              2|       2010|         20|      |  2000|
|     7|   g|              2|       2010|         20|      |  1000|
|     6|   h|              2|       2010|         30|      |  4500|
|     6|   i|              2|       2010|         30|      |  3000|
|     6|   j|              2|       2010|         30|      |  2500|
|     6|   k|              2|       2010|         30|      |  2500|
|     6|   l|              2|       2010|       

**Self Join Dataframe**

In [None]:
empDF.alias('tbl1').join(empDF.alias('tbl2'), col('tbl1.superior_emp_id') == col('tbl2.emp_id'),"inner") \
      .select(col('tbl1.emp_id'),col('tbl1.name'), \
      col('tbl2.emp_id').alias('Manager_id'),col('tbl2.name').alias('Manager_name')) \
   .show()

+------+----+----------+------------+
|emp_id|name|Manager_id|Manager_name|
+------+----+----------+------------+
|     2|   B|         1|           A|
|     3|   C|         1|           A|
|     4|   D|         2|           B|
|     5|   E|         2|           B|
|     6|   F|         2|           B|
|     7|   g|         2|           B|
|     6|   h|         2|           B|
|     6|   i|         2|           B|
|     6|   j|         2|           B|
|     6|   k|         2|           B|
|     6|   l|         2|           B|
|     6|   k|         2|           B|
|     6|   m|         2|           B|
+------+----+----------+------------+



**Merge two data frame having same schema using Union()**

In [None]:
empDF_1 = spark.read.csv('empDF_1.csv',header= True)
empDF_2 = spark.read.csv('empDF_2.csv',header= True)

In [None]:
unionDF = empDF_1.union(empDF_2)
unionDF.show()

+---+----------+---------+--------------------+--------+
| id|first_name|last_name|               email|  gender|
+---+----------+---------+--------------------+--------+
|  1|     Lyman|    Juett|     ljuett0@tiny.cc|Bigender|
|  2|  Caterina|    Ruoss|cruoss1@theglobea...|  Female|
|  3| Guglielma|     Robb|grobb2@deviantart...|  Female|
|  4|       Evy|    Duthy|   eduthy3@wired.com|  Female|
|  5|       Jen|   Synnot|    jsynnot4@wix.com|  Female|
|  6|       Ted| Bouchard|tbouchard5@paypal...|    Male|
|  7|     Lenka|Rosenfeld|lrosenfeld6@123-r...|  Female|
|  8|       Vic|  Redshaw|vredshaw7@freeweb...|    Male|
|  9|    Martha| Forseith|mforseith8@auda.o...|  Female|
| 10|    Austin| Tweddell|atweddell9@friend...|  Female|
|  1|    Leonid|    Alwin|  lalwin0@utexas.edu|    Male|
|  2|     Drusy|Duckhouse|dduckhouse1@wikis...|  Female|
|  3|    Gareth|   Porter|gporter2@oaic.gov.au|    Male|
|  4|  Teresita| Yeabsley|tyeabsley3@canalb...|  Female|
|  5|    Burlie|   McBean|bmcbe

**sample() on data frame**

In [None]:
print(empDF.sample(0.5).collect())

[Row(emp_id=1, name='A', superior_emp_id=-1, year_joined='2018', emp_dept_id='10', gender='M', salary=50000), Row(emp_id=2, name='B', superior_emp_id=1, year_joined='2010', emp_dept_id='10', gender='M', salary=20000), Row(emp_id=3, name='C', superior_emp_id=1, year_joined='2010', emp_dept_id='20', gender='M', salary=4000), Row(emp_id=6, name='j', superior_emp_id=2, year_joined='2010', emp_dept_id='30', gender='', salary=2500), Row(emp_id=6, name='k', superior_emp_id=2, year_joined='2010', emp_dept_id='30', gender='', salary=2500), Row(emp_id=6, name='l', superior_emp_id=2, year_joined='2010', emp_dept_id='30', gender='', salary=2000), Row(emp_id=6, name='m', superior_emp_id=2, year_joined='2010', emp_dept_id='50', gender='', salary=8000)]


In [None]:
print(empDF.sample(0.5, 10).collect())

[Row(emp_id=1, name='A', superior_emp_id=-1, year_joined='2018', emp_dept_id='10', gender='M', salary=50000), Row(emp_id=5, name='E', superior_emp_id=2, year_joined='2010', emp_dept_id='20', gender='', salary=2000), Row(emp_id=6, name='F', superior_emp_id=2, year_joined='2010', emp_dept_id='20', gender='', salary=2000), Row(emp_id=6, name='j', superior_emp_id=2, year_joined='2010', emp_dept_id='30', gender='', salary=2500), Row(emp_id=6, name='l', superior_emp_id=2, year_joined='2010', emp_dept_id='30', gender='', salary=2000), Row(emp_id=6, name='k', superior_emp_id=2, year_joined='2010', emp_dept_id='40', gender='', salary=5000)]


**when() function on data frame**

In [None]:
empDF.withColumn('level', when(empDF.salary < 2500, 'Level_1')
                         .when((empDF.salary >= 2500) & (empDF.salary <= 4000), 'Level_2')
                         .when(empDF.salary > 4000, 'Level_3')).show(50)

+------+----+---------------+-----------+-----------+------+------+-------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|  level|
+------+----+---------------+-----------+-----------+------+------+-------+
|     1|   A|             -1|       2018|         10|     M| 50000|Level_3|
|     2|   B|              1|       2010|         10|     M| 20000|Level_3|
|     3|   C|              1|       2010|         20|     M|  4000|Level_2|
|     4|   D|              2|       2005|         20|     F|  2000|Level_1|
|     5|   E|              2|       2010|         20|      |  2000|Level_1|
|     6|   F|              2|       2010|         20|      |  2000|Level_1|
|     7|   g|              2|       2010|         20|      |  1000|Level_1|
|     6|   h|              2|       2010|         30|      |  4500|Level_3|
|     6|   i|              2|       2010|         30|      |  3000|Level_2|
|     6|   j|              2|       2010|         30|      |  2500|Level_2|
|     6|   k

**SQL CASE WHEN with expr()**

In [None]:
from pyspark.sql.functions import expr

empDF.withColumn('dept_name', expr("CASE WHEN emp_dept_id = 10 THEN 'Admin'"
                                   "WHEN emp_dept_id = 20 THEN 'Sales'"
                                   "WHEN emp_dept_id = 30 THEN 'Tech'"
                                   "ELSE emp_dept_id END")).show()

+------+----+---------------+-----------+-----------+------+------+---------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|
+------+----+---------------+-----------+-----------+------+------+---------+
|     1|   A|             -1|       2018|         10|     M| 50000|    Admin|
|     2|   B|              1|       2010|         10|     M| 20000|    Admin|
|     3|   C|              1|       2010|         20|     M|  4000|    Sales|
|     4|   D|              2|       2005|         20|     F|  2000|    Sales|
|     5|   E|              2|       2010|         20|      |  2000|    Sales|
|     6|   F|              2|       2010|         20|      |  2000|    Sales|
|     7|   g|              2|       2010|         20|      |  1000|    Sales|
|     6|   h|              2|       2010|         30|      |  4500|     Tech|
|     6|   i|              2|       2010|         30|      |  3000|     Tech|
|     6|   j|              2|       2010|         30|      |  25

**regexp_replace() function on data frame**

In [None]:
from pyspark.sql.functions import regexp_replace

unionDF.withColumn('email',when(unionDF.email.endswith('cc'),regexp_replace(unionDF.email,'cc','com')) \
                          .when(unionDF.email.endswith('edu'),regexp_replace(unionDF.email,'edu','com')) \
                          .otherwise(unionDF.email)).show()

+---+----------+---------+--------------------+--------+
| id|first_name|last_name|               email|  gender|
+---+----------+---------+--------------------+--------+
|  1|     Lyman|    Juett|    ljuett0@tiny.com|Bigender|
|  2|  Caterina|    Ruoss|cruoss1@theglobea...|  Female|
|  3| Guglielma|     Robb|grobb2@deviantart...|  Female|
|  4|       Evy|    Duthy|   eduthy3@wired.com|  Female|
|  5|       Jen|   Synnot|    jsynnot4@wix.com|  Female|
|  6|       Ted| Bouchard|tbouchard5@paypal...|    Male|
|  7|     Lenka|Rosenfeld|lrosenfeld6@123-r...|  Female|
|  8|       Vic|  Redshaw|vredshaw7@freeweb...|    Male|
|  9|    Martha| Forseith|mforseith8@auda.o...|  Female|
| 10|    Austin| Tweddell|atweddell9@friend...|  Female|
|  1|    Leonid|    Alwin|  lalwin0@utexas.com|    Male|
|  2|     Drusy|Duckhouse|dduckhouse1@wikis...|  Female|
|  3|    Gareth|   Porter|gporter2@oaic.gov.au|    Male|
|  4|  Teresita| Yeabsley|tyeabsley3@canalb...|  Female|
|  5|    Burlie|   McBean|bmcbe

**array function to create data frame**

In [None]:
details_array = [('A',['12','qwe_RD','asd'],['python','c++']),
                 ('B',['10','wer_RD','wer'],['java','c++']),
                 ('C',['14','tyu_RD','qwe'],['c++','spark'])]

details_array_df = spark.createDataFrame(data=details_array, schema = ['name', 'address', 'subjects'])

details_array_df.show()

+----+-----------------+-------------+
|name|          address|     subjects|
+----+-----------------+-------------+
|   A|[12, qwe_RD, asd]|[python, c++]|
|   B|[10, wer_RD, wer]|  [java, c++]|
|   C|[14, tyu_RD, qwe]| [c++, spark]|
+----+-----------------+-------------+



**concat_ws() on data frame**

In [None]:
from pyspark.sql.functions import col, concat_ws

details_array_df.withColumn('address',concat_ws(',',col('address'))).show()

+----+-------------+-------------+
|name|      address|     subjects|
+----+-------------+-------------+
|   A|12,qwe_RD,asd|[python, c++]|
|   B|10,wer_RD,wer|  [java, c++]|
|   C|14,tyu_RD,qwe| [c++, spark]|
+----+-------------+-------------+



**explode() function data frame**

In [None]:
from pyspark.sql.functions import explode

details_array_df.select(details_array_df.name,explode(details_array_df.subjects)).show()

+----+------+
|name|   col|
+----+------+
|   A|python|
|   A|   c++|
|   B|  java|
|   B|   c++|
|   C|   c++|
|   C| spark|
+----+------+



**array_contains() on data frame**

In [None]:
from pyspark.sql.functions import array_contains

details_array_df.select(details_array_df.name,array_contains(details_array_df.subjects,'python').alias('name_with_python')).show()

+----+----------------+
|name|name_with_python|
+----+----------------+
|   A|            true|
|   B|           false|
|   C|           false|
+----+----------------+



**partitionBy() function**

In [None]:
df3.write.option('header',True).partitionBy('designation').mode('overwrite').csv('designation_partition')

In [None]:
df=spark.createDataFrame(
        data = [ ("1","2019-06-24 12:01:19.000")],
        schema=["id","input_timestamp"])

df.withColumn("date_type",to_date("input_timestamp")) \
  .show(truncate=False)

+---+-----------------------+----------+
|id |input_timestamp        |date_type |
+---+-----------------------+----------+
|1  |2019-06-24 12:01:19.000|2019-06-24|
+---+-----------------------+----------+



In [None]:
df3.createOrReplaceTempView('EMP')
spark.sql("SELECT * FROM EMP").show()

+------+----------+---------+-----------+----------------+------+
|emp_id|first_name|last_name|     gender|     designation|salary|
+------+----------+---------+-----------+----------------+------+
|   101|Washington| Clifford|       Male|  Human Resource| 34544|
|   102|     Letti|Rosekilly|     Female|Testing engineer| 26411|
|   103|      Gray| Lavielle|       Male|            null| 23445|
|   104|      Ulla|    Bound|     Female|   Data Engineer|  null|
|   105|    Korney|   Lomath|     Female|   Data Engineer| 41360|
|   106|     Wanda|     null|    Agender|           Sales| 29984|
|   107|     Denis| Cleveley|       Male|Testing engineer| 47485|
|   108|   Artemas| Bolsover|       Male|            null| 41185|
|   109|   Kerstin| De Meyer|Genderqueer|   Data Engineer| 34899|
|   110|    Dulsea|  Esilmon|     Female|   Data Engineer| 29043|
|   111|     Torey|     null|       Male|  Human Resource| 44046|
|   112|   Clarita|Grollmann|     Female|           Sales| 12377|
|   113|  

In [None]:
df3.filter('salary is null').show(50)

+------+----------+----------+------+----------------+------+
|emp_id|first_name| last_name|gender|     designation|salary|
+------+----------+----------+------+----------------+------+
|   104|      Ulla|     Bound|Female|   Data Engineer|  null|
|   130|      Aime|    MacIan|Female|Testing engineer|  null|
|   132|  Cristina|     Skypp|Female|           Sales|  null|
|   134|      Mada|    Hoggan|Female|           Sales|  null|
|   135|  Gamaliel|Catcheside|  Male|            null|  null|
|   155|     Onfre|  Matteoli|  Male|           Admin|  null|
|   172|     Smith|   Tilsley|  Male|   Data Engineer|  null|
|   183|     Odele|  Dummigan|Female|            null|  null|
|   186| Francoise|      Deme|Female|           Admin|  null|
|   187|   Krystle|Giovanardi|Female|           Sales|  null|
+------+----------+----------+------+----------------+------+



In [None]:
df3.withColumn('salary', expr("CASE WHEN designation = 'Data Engineer' AND salary = 'NULL' THEN '1000'"
                                   "WHEN designation = 'Testing Engineer' THEN '2000'"
                                   "WHEN designation = 'Sales' THEN '3000'"
                                   "WHEN designation = 'Admin' THEN '4000'"
                                   "ELSE designation END")).show()

+------+----------+---------+-----------+----------------+----------------+
|emp_id|first_name|last_name|     gender|     designation|          salary|
+------+----------+---------+-----------+----------------+----------------+
|   101|Washington| Clifford|       Male|  Human Resource|  Human Resource|
|   102|     Letti|Rosekilly|     Female|Testing engineer|Testing engineer|
|   103|      Gray| Lavielle|       Male|            null|            null|
|   104|      Ulla|    Bound|     Female|   Data Engineer|   Data Engineer|
|   105|    Korney|   Lomath|     Female|   Data Engineer|   Data Engineer|
|   106|     Wanda|     null|    Agender|           Sales|            3000|
|   107|     Denis| Cleveley|       Male|Testing engineer|Testing engineer|
|   108|   Artemas| Bolsover|       Male|            null|            null|
|   109|   Kerstin| De Meyer|Genderqueer|   Data Engineer|   Data Engineer|
|   110|    Dulsea|  Esilmon|     Female|   Data Engineer|   Data Engineer|
|   111|    

In [None]:
path = ['emp_data.csv', 'empDF_1.csv','empDF_2.csv']
df10 = spark.read.csv(['emp_data.csv','empDF_1.csv','empDF_2.csv'], header = True)
df10.show(200)

AnalysisException: ignored

In [None]:
df10.printSchema()

root
 |-- emp_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- salary: string (nullable = true)

