In [1]:
import pyspark
from datetime import datetime, date
from pyspark.sql import Row
from pyspark.sql import Column
from pyspark.sql import SparkSession

<h1><center>Reading and writing a csv file</center></h1>

In [2]:
spark = SparkSession.builder.appName('Practice Pyspark').getOrCreate()

23/07/10 20:44:45 WARN Utils: Your hostname, nMACHINE resolves to a loopback address: 127.0.1.1; using 192.168.0.133 instead (on interface wlp2s0)
23/07/10 20:44:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/07/10 20:44:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df1 = spark.read.option("header", True).option("inferSchema", True).option("delimiter", ",").csv('Data/student.csv')

In [4]:
df2 = spark.read.format("csv").options(header='True', inferSchema='True', delimiter=',').load('Data/student.csv')

In [5]:
df1.show(2)

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  1|John Deo| Four|  75|female|
|  2|Max Ruin|Three|  85|  male|
+---+--------+-----+----+------+
only showing top 2 rows



In [6]:
df2.show(2)

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  1|John Deo| Four|  75|female|
|  2|Max Ruin|Three|  85|  male|
+---+--------+-----+----+------+
only showing top 2 rows



In [7]:
df2.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- class: string (nullable = true)
 |-- mark: integer (nullable = true)
 |-- gender: string (nullable = true)



### inferSchema will read data twice, so it is recommended to use custom schema

In [8]:
from pyspark.sql.types import StructType, StringType, IntegerType

In [9]:
schema = StructType() \
      .add("id",IntegerType(),True) \
      .add("name",StringType(),True) \
      .add("class",StringType(),True) \
      .add("mark",IntegerType(),True) \
      .add("gender",StringType(),True)

df3 = spark.read.format("csv").options(header='True', delimiter=',').schema(schema).load('Data/student.csv')

In [10]:
df3.show(2)

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  1|John Deo| Four|  75|female|
|  2|Max Ruin|Three|  85|  male|
+---+--------+-----+----+------+
only showing top 2 rows



In [11]:
df3.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- class: string (nullable = true)
 |-- mark: integer (nullable = true)
 |-- gender: string (nullable = true)



## Other Features while reading csv
- Read Multiple CSV Files ==> 
df = spark.read.csv("path1,path2,path3")
- Read all CSV Files in a Directory ==> 
df = spark.read.csv("Folder path")
- quotes
When you have a column with a delimiter that used to split the columns, 
use quotes option to specify the quote character, by default it is ” and 
delimiters inside quotes are ignored. But using this option you can set any character.
df = spark.read.option("quote", "'").csv(csv_path)
- nullValues
Using nullValues option you can specify the string in a CSV to consider as null. For example, 
if you want to consider a date column with a value "1900-01-01" set null on DataFrame.

## Writing DF to csv

In [12]:
df2.write.options(header='True', delimiter=',').mode('overwrite').csv("Data/written_student_csv")

### Saving modes
overwrite – mode is used to overwrite the existing file.

append – To add the data to the existing file.

ignore – Ignores write operation when the file already exists.

error – This is a default option when the file already exists, it returns an error.

<h1><center>Filter Operations</center></h1>

In [13]:
df3.filter(df3.gender == 'male').show(2)

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  2|Max Ruin|Three|  85|  male|
|  3|  Arnold|Three|  55|  male|
+---+--------+-----+----+------+
only showing top 2 rows



In [14]:
df3.filter(df3.gender != 'male').show(2)

+---+----------+-----+----+------+
| id|      name|class|mark|gender|
+---+----------+-----+----+------+
|  1|  John Deo| Four|  75|female|
|  4|Krish Star| Four|  60|female|
+---+----------+-----+----+------+
only showing top 2 rows



In [15]:
df3.filter(~(df3.gender == 'male')).show(2)

+---+----------+-----+----+------+
| id|      name|class|mark|gender|
+---+----------+-----+----+------+
|  1|  John Deo| Four|  75|female|
|  4|Krish Star| Four|  60|female|
+---+----------+-----+----+------+
only showing top 2 rows



In [16]:
from pyspark.sql.functions import col
df3.filter(col("gender") == "male").show(2)

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  2|Max Ruin|Three|  85|  male|
|  3|  Arnold|Three|  55|  male|
+---+--------+-----+----+------+
only showing top 2 rows



In [17]:
df3.filter("gender == 'male'").show(2)

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  2|Max Ruin|Three|  85|  male|
|  3|  Arnold|Three|  55|  male|
+---+--------+-----+----+------+
only showing top 2 rows



In [18]:
df3.filter((df3.gender=='male')&(df3['class']=='Four')).show(2)

+---+---------+-----+----+------+
| id|     name|class|mark|gender|
+---+---------+-----+----+------+
|  6|Alex John| Four|  55|  male|
| 15| Tade Row| Four|  88|  male|
+---+---------+-----+----+------+
only showing top 2 rows



In [19]:
li=["Three","Four"]
df3.filter(df3['class'].isin(li)).show(2)

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  1|John Deo| Four|  75|female|
|  2|Max Ruin|Three|  85|  male|
+---+--------+-----+----+------+
only showing top 2 rows



In [20]:
df3.filter(df3['class'].startswith("T")).show()
df3.filter(df3['class'].endswith("ee")).show()
df3.filter(df3['class'].contains("Th")).show()

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  2|Max Ruin|Three|  85|  male|
|  3|  Arnold|Three|  55|  male|
| 27|Big Nose|Three|  81|female|
+---+--------+-----+----+------+

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  2|Max Ruin|Three|  85|  male|
|  3|  Arnold|Three|  55|  male|
| 27|Big Nose|Three|  81|female|
+---+--------+-----+----+------+

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  2|Max Ruin|Three|  85|  male|
|  3|  Arnold|Three|  55|  male|
| 27|Big Nose|Three|  81|female|
+---+--------+-----+----+------+



In [21]:
df3.filter(df3.name.like("%Ruin%")).show(2)

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  2|Max Ruin|Three|  85|  male|
+---+--------+-----+----+------+



In [22]:
df3.filter(df3.name.rlike("(?i)^*ruin$")).show(2)

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  2|Max Ruin|Three|  85|  male|
+---+--------+-----+----+------+



In [23]:
df3.filter(df3.name.isNull()).show(2)

+---+----+-----+----+------+
| id|name|class|mark|gender|
+---+----+-----+----+------+
+---+----+-----+----+------+



In [24]:
df3.filter(df3.name.isNotNull()).show(2)

+---+--------+-----+----+------+
| id|    name|class|mark|gender|
+---+--------+-----+----+------+
|  1|John Deo| Four|  75|female|
|  2|Max Ruin|Three|  85|  male|
+---+--------+-----+----+------+
only showing top 2 rows



In [25]:
from pyspark.sql.types import StructField 
from pyspark.sql.types import ArrayType
data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
 ]
        
schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
 ])

df4 = spark.createDataFrame(data = data, schema = schema)
df4.printSchema()
df4.show(2)

from pyspark.sql.functions import array_contains
df4.filter(array_contains(df4.languages,"Java")).show(2)

df4.filter(df4.name.lastname == "Williams").show(2)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)



[Stage 22:>                                                         (0 + 1) / 1]                                                                                

+----------------+------------------+-----+------+
|            name|         languages|state|gender|
+----------------+------------------+-----+------+
|{James, , Smith}|[Java, Scala, C++]|   OH|     M|
|  {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
+----------------+------------------+-----+------+
only showing top 2 rows

+----------------+------------------+-----+------+
|            name|         languages|state|gender|
+----------------+------------------+-----+------+
|{James, , Smith}|[Java, Scala, C++]|   OH|     M|
|  {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
+----------------+------------------+-----+------+

+--------------------+------------+-----+------+
|                name|   languages|state|gender|
+--------------------+------------+-----+------+
| {Julia, , Williams}|[CSharp, VB]|   OH|     F|
|{Mike, Mary, Will...|[Python, VB]|   OH|     M|
+--------------------+------------+-----+------+



<h1><center>Add, Rename and Drop Columns, Change datatype of Column</center></h1>

df.withColumn("CopiedColumn",col("salary")* -1).show()

df.withColumn("Country", lit("USA")).show()

df6 = df.withColumn("Country", lit("USA")).withColumn("anotherColumn",lit("anotherValue"))




df.withColumnRenamed("gender","sex").show()




df.drop("salary").show()




df.withColumn("salary",col("salary").cast("Integer")).show()

<h1><center>Join Types</center></h1>

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner").show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer").show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full").show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter").show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftouter").show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"rightouter").show(truncate=False)

#### Left Semi Join
leftsemi join is similar to inner join difference being leftsemi join returns all columns from the left dataset and ignores all columns from the right dataset. In other words, this join returns columns from the only left dataset for the records match in the right dataset on join expression, records not matched on join expression are ignored from both left and right datasets.


empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi").show(truncate=False)

#### Left Anti Join
leftanti join does the exact opposite of the leftsemi, leftanti join returns only columns from the left dataset for non-matched records.


empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti").show(truncate=False)

#### Self Join

empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)

#### Using SQL Expression

empDF.createOrReplaceTempView("EMP")

deptDF.createOrReplaceTempView("DEPT")

joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id").show(truncate=False)

joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id").show(truncate=False)

#### Join on multiple DataFrames

df1.join(df2,df1.id1 == df2.id2,"inner").join(df3,df1.id1 == df3.id3,"inner")

#### Join String	                     ==                       Equivalent SQL Join

inner	            ==                  INNER JOIN

outer, full, fullouter, full_outer	==  FULL OUTER JOIN

left, leftouter, left_outer	     ==     LEFT JOIN

right, rightouter, right_outer	  ==    RIGHT JOIN

cross	
anti, leftanti, left_anti	

semi, leftsemi, left_semi	

### ALIAS

empDF.alias("emp2")

col("emp2.emp_id").alias("superior_emp_id")

In [26]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----

In [35]:
empDF.createOrReplaceTempView("EMP")
joinDF = spark.sql("select e1.emp_id, e1.name, e2.emp_id as superior_emp_id, e2.name as superior_name \
                   from EMP e1, EMP e2 where e1.superior_emp_id = e2.emp_id").show(truncate=False)

+------+--------+---------------+-------------+
|emp_id|name    |superior_emp_id|superior_name|
+------+--------+---------------+-------------+
|2     |Rose    |1              |Smith        |
|3     |Williams|1              |Smith        |
|4     |Jones   |2              |Rose         |
|5     |Brown   |2              |Rose         |
|6     |Brown   |2              |Rose         |
+------+--------+---------------+-------------+



<h1><center>Explode nested array into rows AND Flatten array of arrays</center></h1>

In [37]:
arrayArrayData = [
  ("James",[["Java","Scala","C++"],["Spark","Java"]]),
  ("Michael",[["Spark","Java","C++"],["Spark","Java"]]),
  ("Robert",[["CSharp","VB"],["Spark","Python"]])
]

df_explode = spark.createDataFrame(data=arrayArrayData, schema = ['name','subjects'])
df_explode.printSchema()
df_explode.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- subjects: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

+-------+-----------------------------------+
|name   |subjects                           |
+-------+-----------------------------------+
|James  |[[Java, Scala, C++], [Spark, Java]]|
|Michael|[[Spark, Java, C++], [Spark, Java]]|
|Robert |[[CSharp, VB], [Spark, Python]]    |
+-------+-----------------------------------+



In [40]:
from pyspark.sql.functions import explode
df_explode.select(df_explode.name,explode(df_explode.subjects)).show(truncate=False)

+-------+------------------+
|name   |col               |
+-------+------------------+
|James  |[Java, Scala, C++]|
|James  |[Spark, Java]     |
|Michael|[Spark, Java, C++]|
|Michael|[Spark, Java]     |
|Robert |[CSharp, VB]      |
|Robert |[Spark, Python]   |
+-------+------------------+



In [41]:
from pyspark.sql.functions import flatten
df_explode.select(df_explode.name,flatten(df_explode.subjects)).show(truncate=False)

+-------+-------------------------------+
|name   |flatten(subjects)              |
+-------+-------------------------------+
|James  |[Java, Scala, C++, Spark, Java]|
|Michael|[Spark, Java, C++, Spark, Java]|
|Robert |[CSharp, VB, Spark, Python]    |
+-------+-------------------------------+

