In [78]:
# Basic imports

import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import *


spark = SparkSession.builder.appName('Basics_pyspark_df').getOrCreate()
df = spark.read.csv('data/student_data.csv', header=True, inferSchema=True)

In [38]:
df.show(5)

+-------+-------+-----+-----+-------+-------+
|   Name|Roll_No|Marks|Class|Subject|Section|
+-------+-------+-----+-----+-------+-------+
|  Avind|      1|   92|    9|  Maths|      A|
| Aditya|      2|   87|    9|  Maths|      A|
|   John|      3|   23|    9|  Maths|      B|
|   Mary|      4|   45|    9|  Maths|      B|
|Nicolas|      5|   67|    9|English|      A|
+-------+-------+-----+-----+-------+-------+
only showing top 5 rows



### Select vs Collect

select() is a transformation that returns a new DataFrame and holds the columns that are selected whereas collect() is an action that returns the entire data set in an Array to the driver.

df.collect() retrieves all elements in a DataFrame as an Array of Row type to the driver node. Note that collect() is an action hence it does not return a DataFrame instead, it returns data in an Array to the driver. Once the data is in an array, you can use python for loop to process it further.

In [39]:
df.collect()

[Row(Name='Avind', Roll_No=1, Marks=92, Class=9, Subject='Maths', Section='A'),
 Row(Name='Aditya', Roll_No=2, Marks=87, Class=9, Subject='Maths', Section='A'),
 Row(Name='John', Roll_No=3, Marks=23, Class=9, Subject='Maths', Section='B'),
 Row(Name='Mary', Roll_No=4, Marks=45, Class=9, Subject='Maths', Section='B'),
 Row(Name='Nicolas', Roll_No=5, Marks=67, Class=9, Subject='English', Section='A'),
 Row(Name='Jonny', Roll_No=6, Marks=100, Class=9, Subject='English', Section='A'),
 Row(Name='Tom', Roll_No=7, Marks=55, Class=9, Subject='English', Section='B'),
 Row(Name='Yash', Roll_No=4, Marks=32, Class=9, Subject='English', Section='B'),
 Row(Name='Pushkar', Roll_No=8, Marks=30, Class=9, Subject='Science', Section='B'),
 Row(Name='Parth', Roll_No=9, Marks=76, Class=9, Subject='Science', Section='A'),
 Row(Name='Piyush', Roll_No=10, Marks=86, Class=9, Subject='Science', Section='A'),
 Row(Name='Zodiac', Roll_No=11, Marks=65, Class=9, Subject='Computer', Section='B')]

In [40]:
type(df.collect())

list

In [41]:
dataCollect = df.collect()
for data in dataCollect:
    print(data['Name']+"_"+data['Section'])

Avind_A
Aditya_A
John_B
Mary_B
Nicolas_A
Jonny_A
Tom_B
Yash_B
Pushkar_B
Parth_A
Piyush_A
Zodiac_B


In [42]:
df.select('*')

DataFrame[Name: string, Roll_No: int, Marks: int, Class: int, Subject: string, Section: string]

In [43]:
type(df.select('*'))

pyspark.sql.dataframe.DataFrame

In [44]:
df.select('*').show(5)

+-------+-------+-----+-----+-------+-------+
|   Name|Roll_No|Marks|Class|Subject|Section|
+-------+-------+-----+-----+-------+-------+
|  Avind|      1|   92|    9|  Maths|      A|
| Aditya|      2|   87|    9|  Maths|      A|
|   John|      3|   23|    9|  Maths|      B|
|   Mary|      4|   45|    9|  Maths|      B|
|Nicolas|      5|   67|    9|English|      A|
+-------+-------+-----+-----+-------+-------+
only showing top 5 rows



In [74]:
#select for nested column

structureData = [(("John","","Don"),36636.334,"M",3000),
    (("Mahesh","Raj",""),40288.101,"M",4000),
    (("Robert","Dorney","Junior"),42114.99,"M",4000),
    (("May","","Jones"),3919.00123,"F",4000),
    (("Nick","Mary","Brown"),np.nan,"F",-1000)]

structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()

df2.select("name").show(truncate=False)
df2.select("name.firstname","name.lastname").show(truncate=False)
df2.select("name.*").show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+------------------------+
|name                    |
+------------------------+
|{John, , Don}           |
|{Mahesh, Raj, }         |
|{Robert, Dorney, Junior}|
|{May, , Jones}          |
|{Nick, Mary, Brown}     |
+------------------------+

+---------+--------+
|firstname|lastname|
+---------+--------+
|John     |Don     |
|Mahesh   |        |
|Robert   |Junior  |
|May      |Jones   |
|Nick     |Brown   |
+---------+--------+

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
|John     |          |Don     |
|Mahesh   |Raj       |        |
|Robert   |Dorney    |Junior  |
|May      |          |Jones   |
|Nick     |Mary      |Brown   |
+---------+--------

### Columns

In [45]:
df['Name']

Column<'Name'>

In [46]:
type(df['Name'])

pyspark.sql.column.Column

In [47]:
type(df.select(['Name','Roll_No']))

pyspark.sql.dataframe.DataFrame

In [48]:
df.select(['Name','Roll_No']).show(5)

+-------+-------+
|   Name|Roll_No|
+-------+-------+
|  Avind|      1|
| Aditya|      2|
|   John|      3|
|   Mary|      4|
|Nicolas|      5|
+-------+-------+
only showing top 5 rows



### Row

In [49]:
df.head(2)

[Row(Name='Avind', Roll_No=1, Marks=92, Class=9, Subject='Maths', Section='A'),
 Row(Name='Aditya', Roll_No=2, Marks=87, Class=9, Subject='Maths', Section='A')]

In [50]:
type(df.head())

pyspark.sql.types.Row

In [51]:
df.head(2)[0]    #fetch row level data.

Row(Name='Avind', Roll_No=1, Marks=92, Class=9, Subject='Maths', Section='A')

### withColumn

PySpark withColumn() is a transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more.

In [52]:
#change dtype

df = df.withColumn("Class_changed_dtype",col("Class").cast("String"))

In [54]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Roll_No: integer (nullable = true)
 |-- Marks: integer (nullable = true)
 |-- Class: integer (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Section: string (nullable = true)
 |-- Class_changed_dtype: string (nullable = true)



In [58]:
# change value of a column

df.withColumn("Marks_Percent",col("Marks")/150*100).show(5)

+-------+-------+-----+-----+-------+-------+-------------------+------------------+
|   Name|Roll_No|Marks|Class|Subject|Section|Class_changed_dtype|     Marks_Percent|
+-------+-------+-----+-----+-------+-------+-------------------+------------------+
|  Avind|      1|   92|    9|  Maths|      A|                  9| 61.33333333333333|
| Aditya|      2|   87|    9|  Maths|      A|                  9| 57.99999999999999|
|   John|      3|   23|    9|  Maths|      B|                  9|15.333333333333332|
|   Mary|      4|   45|    9|  Maths|      B|                  9|              30.0|
|Nicolas|      5|   67|    9|English|      A|                  9|44.666666666666664|
+-------+-------+-----+-----+-------+-------+-------------------+------------------+
only showing top 5 rows



In [64]:
#PySpark lit() function is used to add a constant value to a DataFrame column.
#We can also chain in order to add multiple columns.

df.withColumn('Country',lit('India')).withColumn('New_col2', col('Class')+1).show(5)

+-------+-------+-----+-----+-------+-------+-------+--------+
|   Name|Roll_No|Marks|Class|Subject|Section|Country|New_col2|
+-------+-------+-----+-----+-------+-------+-------+--------+
|  Avind|      1|   92|    9|  Maths|      A|  India|      10|
| Aditya|      2|   87|    9|  Maths|      A|  India|      10|
|   John|      3|   23|    9|  Maths|      B|  India|      10|
|   Mary|      4|   45|    9|  Maths|      B|  India|      10|
|Nicolas|      5|   67|    9|English|      A|  India|      10|
+-------+-------+-----+-----+-------+-------+-------+--------+
only showing top 5 rows



In [81]:
# combination of column into new_col

df.withColumn('Marks_mul_class', df.Marks*df.Class).show(5)

df.withColumn('Name_section', concat(df.Name,df.Section)).show(5)

#concat_ws() function of Pyspark concatenates multiple string columns into a single column with a given separator or delimiter.
df.withColumn('Name_section', concat_ws('_',df.Name,df.Section)).show(5)

+-------+-------+-----+-----+-------+-------+---------------+
|   Name|Roll_No|Marks|Class|Subject|Section|Marks_mul_class|
+-------+-------+-----+-----+-------+-------+---------------+
|  Avind|      1|   92|    9|  Maths|      A|            828|
| Aditya|      2|   87|    9|  Maths|      A|            783|
|   John|      3|   23|    9|  Maths|      B|            207|
|   Mary|      4|   45|    9|  Maths|      B|            405|
|Nicolas|      5|   67|    9|English|      A|            603|
+-------+-------+-----+-----+-------+-------+---------------+
only showing top 5 rows

+-------+-------+-----+-----+-------+-------+------------+
|   Name|Roll_No|Marks|Class|Subject|Section|Name_section|
+-------+-------+-----+-----+-------+-------+------------+
|  Avind|      1|   92|    9|  Maths|      A|      AvindA|
| Aditya|      2|   87|    9|  Maths|      A|     AdityaA|
|   John|      3|   23|    9|  Maths|      B|       JohnB|
|   Mary|      4|   45|    9|  Maths|      B|       MaryB|
|Nic

In [65]:
#Rename Column

df.withColumnRenamed("Marks","Grade").show(5)

+-------+-------+-----+-----+-------+-------+
|   Name|Roll_No|Grade|Class|Subject|Section|
+-------+-------+-----+-----+-------+-------+
|  Avind|      1|   92|    9|  Maths|      A|
| Aditya|      2|   87|    9|  Maths|      A|
|   John|      3|   23|    9|  Maths|      B|
|   Mary|      4|   45|    9|  Maths|      B|
|Nicolas|      5|   67|    9|English|      A|
+-------+-------+-----+-----+-------+-------+
only showing top 5 rows



In [68]:
# Drop column

df.drop('Roll_No','Class').show()

#Note: there is not concept of axis here , so for row wise drop we use filter and where in next notebook.

+-------+-----+--------+-------+
|   Name|Marks| Subject|Section|
+-------+-----+--------+-------+
|  Avind|   92|   Maths|      A|
| Aditya|   87|   Maths|      A|
|   John|   23|   Maths|      B|
|   Mary|   45|   Maths|      B|
|Nicolas|   67| English|      A|
|  Jonny|  100| English|      A|
|    Tom|   55| English|      B|
|   Yash|   32| English|      B|
|Pushkar|   30| Science|      B|
|  Parth|   76| Science|      A|
| Piyush|   86| Science|      A|
| Zodiac|   65|Computer|      B|
+-------+-----+--------+-------+



In [None]:
#Distinct and Duplicates

In [90]:
data = [('Ram','Dev',30000),
        ('Shyam','Dev',31000),
        ('Babu','Dev',32000),
        ('Ram','Sup',40000),
        ('Shyam','Sup',31000),
        ('Babu','Sup',32000),
        ('Anuradha','Dev',30000),
        ('Mina','Dev',33000),
        ('Anuradha','Sup',40000),
        ('Mina','Sup',33000),
        ('Ram','Dev',30000)
       ]
column = ['Name','Dept','Salary']
new_df = spark.createDataFrame(data = data, schema= column)
new_df.printSchema()
new_df.show()

root
 |-- Name: string (nullable = true)
 |-- Dept: string (nullable = true)
 |-- Salary: long (nullable = true)

+--------+----+------+
|    Name|Dept|Salary|
+--------+----+------+
|     Ram| Dev| 30000|
|   Shyam| Dev| 31000|
|    Babu| Dev| 32000|
|     Ram| Sup| 40000|
|   Shyam| Sup| 31000|
|    Babu| Sup| 32000|
|Anuradha| Dev| 30000|
|    Mina| Dev| 33000|
|Anuradha| Sup| 40000|
|    Mina| Sup| 33000|
|     Ram| Dev| 30000|
+--------+----+------+



In [92]:
print("Initial Record Count: ", new_df.count())  # Last row of record is similiar to the first one (1 duplicate row)

# way 1
df1 = new_df.distinct()
print("Distinct count using distinct: "+str(df1.count()))

#way 2
df2 = new_df.dropDuplicates()
print("Distinct count using dropDuplicates: "+str(df2.count()))

df2.show() #show any df1/df2

Initial Record Count:  11
Distinct count: 10
Distinct count: 10
+--------+----+------+
|    Name|Dept|Salary|
+--------+----+------+
|   Shyam| Dev| 31000|
|     Ram| Dev| 30000|
|    Babu| Dev| 32000|
|   Shyam| Sup| 31000|
|     Ram| Sup| 40000|
|    Babu| Sup| 32000|
|Anuradha| Dev| 30000|
|    Mina| Dev| 33000|
|    Mina| Sup| 33000|
|Anuradha| Sup| 40000|
+--------+----+------+



Note : PySpark doesn’t have a distinct method that takes columns that should run distinct on (drop duplicate rows on selected multiple columns) however, it provides another signature of dropDuplicates() function which takes multiple columns to eliminate duplicates.

In [94]:
# Find duplicated based on multi-columns

df3 = new_df.dropDuplicates(["Dept","Salary"])
print("Distinct count of Dept & Salary : "+str(df3.count()))
df3.show()

Distinct count of Dept & Salary : 8
+-----+----+------+
| Name|Dept|Salary|
+-----+----+------+
|  Ram| Dev| 30000|
|Shyam| Dev| 31000|
| Babu| Dev| 32000|
| Mina| Dev| 33000|
|Shyam| Sup| 31000|
| Babu| Sup| 32000|
| Mina| Sup| 33000|
|  Ram| Sup| 40000|
+-----+----+------+



In [69]:
#Note : To save these results we can assign it same df or new df