## Creating Spark Session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PysparkPractice").getOrCreate()

### creating dataframe and basic df operation

In [2]:
df = spark.read.csv("test.csv", header=True, inferSchema=True)
df.show()

+-------+----+----+------+
|   name| age| exp|salary|
+-------+----+----+------+
|karthik|  20|   1| 10000|
|  gokul|  35|  10|  NULL|
|   subu|  25|   4| 20000|
|   ram |NULL|   1| 10000|
| muthu |  45|  20| 80000|
|kishore|  32|  10| 25000|
|   john|  27|NULL| 15000|
|aravind|  34|  10| 25000|
|   arun|  26|   5| 10000|
+-------+----+----+------+



In [3]:
type(df)      #instead of pandas.core dataframe it is a sql dataframe

pyspark.sql.dataframe.DataFrame

In [4]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- exp: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [5]:
df.columns

['name', 'age', 'exp', 'salary']

In [6]:
df.select('name').show()        #to select a perticular column

+-------+
|   name|
+-------+
|karthik|
|  gokul|
|   subu|
|   ram |
| muthu |
|kishore|
|   john|
|aravind|
|   arun|
+-------+



In [8]:
df.select('name', 'age').show()    #for multiple colunms

+-------+----+
|   name| age|
+-------+----+
|karthik|  20|
|  gokul|  35|
|   subu|  25|
|   ram |NULL|
| muthu |  45|
|kishore|  32|
|   john|  27|
|aravind|  34|
|   arun|  26|
+-------+----+



In [8]:
#slicing is not supported in pyspark
df.dtypes   #show data types

[('name', 'string'), ('age', 'int'), ('exp', 'int'), ('salary', 'int')]

In [9]:
df.describe().show()  #describe the info about dataframe

+-------+-------+-----------------+-----------------+-----------------+
|summary|   name|              age|              exp|           salary|
+-------+-------+-----------------+-----------------+-----------------+
|  count|      9|                8|                8|                8|
|   mean|   NULL|             30.5|            7.625|          24375.0|
| stddev|   NULL|7.727501906456298|6.300510183423925|23366.26078038895|
|    min|aravind|               20|                1|            10000|
|    max|   subu|               45|               20|            80000|
+-------+-------+-----------------+-----------------+-----------------+



###  Ways to Rename column on DataFrame

pyspark uses __withColumnRenamed__ method to rename column, hence dataframe is immutable it returns a new DataFrame with columns renamed

In [11]:
df.withColumnRenamed('exp','experience').show()

+-------+----+----------+------+
|   name| age|experience|salary|
+-------+----+----------+------+
|karthik|  20|         1| 10000|
|  gokul|  35|        10|  NULL|
|   subu|  25|         4| 20000|
|   ram |NULL|         1| 10000|
| muthu |  45|        20| 80000|
|kishore|  32|        10| 25000|
|   john|  27|      NULL| 15000|
|aravind|  34|        10| 25000|
|   arun|  26|         5| 10000|
+-------+----+----------+------+



### withColumn() 

this is a transformation method

In [12]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- exp: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [16]:
#changing DataType using withColunm()
df.withColumn('exp',df['exp'].cast("String")).printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- exp: string (nullable = true)
 |-- salary: integer (nullable = true)



In [20]:
# updating value of the column
df.withColumn('salary',df['salary']*12).head(5)

[Row(name='karthik', age=20, exp=1, salary=120000),
 Row(name='gokul', age=35, exp=10, salary=None),
 Row(name='subu', age=25, exp=4, salary=240000),
 Row(name='ram ', age=None, exp=1, salary=120000),
 Row(name='muthu ', age=45, exp=20, salary=960000)]

In [21]:
#new column
df.withColumn('exp after 5 yrs', df['exp']+5).head(5)

[Row(name='karthik', age=20, exp=1, salary=10000, exp after 5 yrs=6),
 Row(name='gokul', age=35, exp=10, salary=None, exp after 5 yrs=15),
 Row(name='subu', age=25, exp=4, salary=20000, exp after 5 yrs=9),
 Row(name='ram ', age=None, exp=1, salary=10000, exp after 5 yrs=6),
 Row(name='muthu ', age=45, exp=20, salary=80000, exp after 5 yrs=25)]

In [24]:
#dropping columns
df.drop('exp').head(5)

[Row(name='karthik', age=20, salary=10000),
 Row(name='gokul', age=35, salary=None),
 Row(name='subu', age=25, salary=20000),
 Row(name='ram ', age=None, salary=10000),
 Row(name='muthu ', age=45, salary=80000)]

### filter()

its alternative is where(), both the method supports SQL experssions to filter the rows

In [28]:
df.filter(df['exp'] > 5).show()

+-------+---+---+------+
|   name|age|exp|salary|
+-------+---+---+------+
|  gokul| 35| 10|  NULL|
| muthu | 45| 20| 80000|
|kishore| 32| 10| 25000|
|aravind| 34| 10| 25000|
+-------+---+---+------+



In [30]:
df.filter((df.exp > 5) & (df.exp < 15)).show() #multiple conditions

+-------+---+---+------+
|   name|age|exp|salary|
+-------+---+---+------+
|  gokul| 35| 10|  NULL|
|kishore| 32| 10| 25000|
|aravind| 34| 10| 25000|
+-------+---+---+------+



In [37]:
names = ["gokul", "arun", "aravind"]
df.filter(df['name'].isin(names)).show()  #isin() method allows to filter from a list

+-------+---+---+------+
|   name|age|exp|salary|
+-------+---+---+------+
|  gokul| 35| 10|  NULL|
|aravind| 34| 10| 25000|
|   arun| 26|  5| 10000|
+-------+---+---+------+



In [38]:
df.filter(df['name'].startswith('a')).show()

+-------+---+---+------+
|   name|age|exp|salary|
+-------+---+---+------+
|aravind| 34| 10| 25000|
|   arun| 26|  5| 10000|
+-------+---+---+------+



In [39]:
df.filter(df['name'].contains('a')).show()

+-------+----+---+------+
|   name| age|exp|salary|
+-------+----+---+------+
|karthik|  20|  1| 10000|
|   ram |NULL|  1| 10000|
|aravind|  34| 10| 25000|
|   arun|  26|  5| 10000|
+-------+----+---+------+



### orderby() & sort()

orderby() is the alternative/alias of sort()

In [45]:
df.sort('name').show()

+-------+----+----+------+
|   name| age| exp|salary|
+-------+----+----+------+
|aravind|  34|  10| 25000|
|   arun|  26|   5| 10000|
|  gokul|  35|  10|  NULL|
|   john|  27|NULL| 15000|
|karthik|  20|   1| 10000|
|kishore|  32|  10| 25000|
| muthu |  45|  20| 80000|
|   ram |NULL|   1| 10000|
|   subu|  25|   4| 20000|
+-------+----+----+------+



In [48]:
df.sort(df['name'].desc()).show()

+-------+----+----+------+
|   name| age| exp|salary|
+-------+----+----+------+
|   subu|  25|   4| 20000|
|   ram |NULL|   1| 10000|
| muthu |  45|  20| 80000|
|kishore|  32|  10| 25000|
|karthik|  20|   1| 10000|
|   john|  27|NULL| 15000|
|  gokul|  35|  10|  NULL|
|   arun|  26|   5| 10000|
|aravind|  34|  10| 25000|
+-------+----+----+------+



### Groupby()

it is similar to SQL GROUP BY, it returns GroupedData object

* count()
* mean()
* max
* min()
* sum()
* avg()

In [50]:
df.groupBy('exp').sum('salary').show()

+----+-----------+
| exp|sum(salary)|
+----+-----------+
|NULL|      15000|
|   1|      20000|
|  20|      80000|
|   5|      10000|
|   4|      20000|
|  10|      50000|
+----+-----------+



In [52]:
df.groupBy('exp').count().show()

+----+-----+
| exp|count|
+----+-----+
|NULL|    1|
|   1|    2|
|  20|    1|
|   5|    1|
|   4|    1|
|  10|    3|
+----+-----+



In [70]:
df.groupBy('exp').count().filter(df['exp'] > 5).show()

+---+-----+
|exp|count|
+---+-----+
| 20|    1|
| 10|    3|
+---+-----+



### SQL IN PYSPARK

In [71]:
df.createOrReplaceTempView("PERSON_DATA")
df2 = spark.sql("SELECT * from PERSON_DATA")
df2.printSchema()
df2.show()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- exp: integer (nullable = true)
 |-- salary: integer (nullable = true)

+-------+----+----+------+
|   name| age| exp|salary|
+-------+----+----+------+
|karthik|  20|   1| 10000|
|  gokul|  35|  10|  NULL|
|   subu|  25|   4| 20000|
|   ram |NULL|   1| 10000|
| muthu |  45|  20| 80000|
|kishore|  32|  10| 25000|
|   john|  27|NULL| 15000|
|aravind|  34|  10| 25000|
|   arun|  26|   5| 10000|
+-------+----+----+------+



### when().otherwise()

this is similar to switch case in other programming language

In [74]:
from pyspark.sql.functions import when

df.withColumn("classification", 
               when(df['age'] < 30, "less than 30")
              .when(df['age'] < 40, "less than 40")
              .otherwise(df['age'])).show()

+-------+----+----+------+--------------+
|   name| age| exp|salary|classification|
+-------+----+----+------+--------------+
|karthik|  20|   1| 10000|  less than 30|
|  gokul|  35|  10|  NULL|  less than 40|
|   subu|  25|   4| 20000|  less than 30|
|   ram |NULL|   1| 10000|          NULL|
| muthu |  45|  20| 80000|            45|
|kishore|  32|  10| 25000|  less than 40|
|   john|  27|NULL| 15000|  less than 30|
|aravind|  34|  10| 25000|  less than 40|
|   arun|  26|   5| 10000|  less than 30|
+-------+----+----+------+--------------+



#### Window function

these functions allow us to rank, allocate row number to the rows of dataframe based on given coloumn

In [79]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank
windowSpec = Window.orderBy("salary")

df.withColumn("row_number",row_number().over(windowSpec)).show()
df.withColumn("rank",rank().over(windowSpec)).show()

+-------+----+----+------+----------+
|   name| age| exp|salary|row_number|
+-------+----+----+------+----------+
|  gokul|  35|  10|  NULL|         1|
|karthik|  20|   1| 10000|         2|
|   ram |NULL|   1| 10000|         3|
|   arun|  26|   5| 10000|         4|
|   john|  27|NULL| 15000|         5|
|   subu|  25|   4| 20000|         6|
|kishore|  32|  10| 25000|         7|
|aravind|  34|  10| 25000|         8|
| muthu |  45|  20| 80000|         9|
+-------+----+----+------+----------+

+-------+----+----+------+----+
|   name| age| exp|salary|rank|
+-------+----+----+------+----+
|  gokul|  35|  10|  NULL|   1|
|karthik|  20|   1| 10000|   2|
|   ram |NULL|   1| 10000|   2|
|   arun|  26|   5| 10000|   2|
|   john|  27|NULL| 15000|   5|
|   subu|  25|   4| 20000|   6|
|kishore|  32|  10| 25000|   7|
|aravind|  34|  10| 25000|   7|
| muthu |  45|  20| 80000|   9|
+-------+----+----+------+----+



#### Handling Missing Values 

* we can drop rows with null value
* we can fill the null values with 0, empty string or mean/mode/median

In [80]:
df.na.drop().show()

+-------+---+---+------+
|   name|age|exp|salary|
+-------+---+---+------+
|karthik| 20|  1| 10000|
|   subu| 25|  4| 20000|
| muthu | 45| 20| 80000|
|kishore| 32| 10| 25000|
|aravind| 34| 10| 25000|
|   arun| 26|  5| 10000|
+-------+---+---+------+



In [82]:
df.na.fill(0).show()

+-------+---+---+------+
|   name|age|exp|salary|
+-------+---+---+------+
|karthik| 20|  1| 10000|
|  gokul| 35| 10|     0|
|   subu| 25|  4| 20000|
|   ram |  0|  1| 10000|
| muthu | 45| 20| 80000|
|kishore| 32| 10| 25000|
|   john| 27|  0| 15000|
|aravind| 34| 10| 25000|
|   arun| 26|  5| 10000|
+-------+---+---+------+



#### PySpark UDF(User Defined Function)

PySpark UDF’s are similar to UDF on traditional databases. . UDF’s are once created they can be re-used on several DataFrames


In [85]:
def capitalizeString(string):
    return string.capitalize()

In [102]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
convertUDF = udf(lambda z: capitalizeString(z),StringType())
# df.withColumn("newName",convertUDF(col('name'))).show()

#### isNull()

In [106]:
df.filter(df['age'].isNull()).show()

+----+----+---+------+
|name| age|exp|salary|
+----+----+---+------+
|ram |NULL|  1| 10000|
+----+----+---+------+



#### Concat()

In [110]:
from pyspark.sql.functions import concat_ws
df.select(concat_ws('_',df['name'],df['age'])).show()

+-----------------------+
|concat_ws(_, name, age)|
+-----------------------+
|             karthik_20|
|               gokul_35|
|                subu_25|
|                   ram |
|              muthu _45|
|             kishore_32|
|                john_27|
|             aravind_34|
|                arun_26|
+-----------------------+



#### Pyspark Select Distinct Rows

In [114]:
df.distinct().show()

+-------+----+----+------+
|   name| age| exp|salary|
+-------+----+----+------+
| muthu |  45|  20| 80000|
|karthik|  20|   1| 10000|
|   arun|  26|   5| 10000|
|   subu|  25|   4| 20000|
|kishore|  32|  10| 25000|
|aravind|  34|  10| 25000|
|   john|  27|NULL| 15000|
|  gokul|  35|  10|  NULL|
|   ram |NULL|   1| 10000|
+-------+----+----+------+



#### Colunm and row Count

In [115]:
print("row = ", df.count(), "col = ", len(df.columns))

row =  9 col =  4


#### union()

In [8]:
simpleData = [("James",9,34,10000),("Michael",18,56,20000)]

columns= ["name","exp","age","salary"]
df2 = spark.createDataFrame(data = simpleData, schema = columns)
df2.printSchema()
df2.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- exp: long (nullable = true)
 |-- age: long (nullable = true)
 |-- salary: long (nullable = true)

+-------+---+---+------+
|name   |exp|age|salary|
+-------+---+---+------+
|James  |9  |34 |10000 |
|Michael|18 |56 |20000 |
+-------+---+---+------+



In [9]:
df.union(df2).show()

+-------+----+----+------+
|   name| age| exp|salary|
+-------+----+----+------+
|karthik|  20|   1| 10000|
|  gokul|  35|  10|  NULL|
|   subu|  25|   4| 20000|
|   ram |NULL|   1| 10000|
| muthu |  45|  20| 80000|
|kishore|  32|  10| 25000|
|   john|  27|NULL| 15000|
|aravind|  34|  10| 25000|
|   arun|  26|   5| 10000|
|  James|   9|  34| 10000|
|Michael|  18|  56| 20000|
+-------+----+----+------+

