### Pyspark Part 1 

In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('PractiseSpark').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/31 10:11:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/31 10:11:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
spark

## Trying different ways of reading data from csv

In [7]:
df_pyspark = spark.read.csv('../test1.csv')

                                                                                

In [8]:
df_pyspark.show()

+---------+---+----------+------+
|      _c0|_c1|       _c2|   _c3|
+---------+---+----------+------+
|     Name|age|Experience|Salary|
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [9]:
df_pyspark = spark.read.csv('../test1.csv', header = 'true')

In [10]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [13]:
df_pyspark = spark.read.option('header','true').csv('../test1.csv')

In [14]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [15]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [16]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [17]:
df_pyspark = spark.read.option('header','true').option('inferschema','true').csv('../test1.csv')

In [18]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [19]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [20]:
df_pyspark = spark.read.csv('../test1.csv', header='true',inferSchema='true')

In [21]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [22]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [23]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [25]:
df_pyspark.head(3)

[Row(Name='Krish', age=31, Experience=10, Salary=30000),
 Row(Name='Sudhanshu', age=30, Experience=8, Salary=25000),
 Row(Name='Sunny', age=29, Experience=4, Salary=20000)]

## Pyspark Part 2

### Selecting columns from spark dataframe

In [26]:
df_pyspark.select(['Name','age']).show()

+---------+---+
|     Name|age|
+---------+---+
|    Krish| 31|
|Sudhanshu| 30|
|    Sunny| 29|
|     Paul| 24|
|   Harsha| 21|
|  Shubham| 23|
+---------+---+



In [28]:
df_pyspark['Experience']

Column<'Experience'>

In [29]:
df_pyspark.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [30]:
df_pyspark.describe().show()

23/07/31 10:29:40 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 15:>                                                         (0 + 1) / 1]

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



                                                                                

### Adding columns in data frame

In [31]:
df_pyspark.withColumn('Experience after 2 years', df_pyspark['Experience'] + 2).show()

+---------+---+----------+------+------------------------+
|     Name|age|Experience|Salary|Experience after 2 years|
+---------+---+----------+------+------------------------+
|    Krish| 31|        10| 30000|                      12|
|Sudhanshu| 30|         8| 25000|                      10|
|    Sunny| 29|         4| 20000|                       6|
|     Paul| 24|         3| 20000|                       5|
|   Harsha| 21|         1| 15000|                       3|
|  Shubham| 23|         2| 18000|                       4|
+---------+---+----------+------+------------------------+



In [36]:
df_pyspark.withColumns({'Experience after 2 years': df_pyspark['Experience']+2, 'Salary after 2 years': df_pyspark['Salary']*1.2}).show()

+---------+---+----------+------+------------------------+--------------------+
|     Name|age|Experience|Salary|Experience after 2 years|Salary after 2 years|
+---------+---+----------+------+------------------------+--------------------+
|    Krish| 31|        10| 30000|                      12|             36000.0|
|Sudhanshu| 30|         8| 25000|                      10|             30000.0|
|    Sunny| 29|         4| 20000|                       6|             24000.0|
|     Paul| 24|         3| 20000|                       5|             24000.0|
|   Harsha| 21|         1| 15000|                       3|             18000.0|
|  Shubham| 23|         2| 18000|                       4|             21600.0|
+---------+---+----------+------+------------------------+--------------------+



In [38]:
df_pyspark.withColumnRenamed('Name', 'Naam').show()

+---------+---+----------+------+
|     Naam|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



## Pyspark Part 3

### Handling missing values

In [44]:
df_pyspark = spark.read.options(header ='true',inferschema = 'true').csv('../test2.csv')
df_pyspark.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [45]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [57]:
df_pyspark = spark.read.options(inferschema = 'true', header = 'true').csv('../test2.csv')
df_pyspark.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [59]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [53]:
df_pyspark = spark.read.csv('../test2.csv', inferSchema = 'true', header = 'true')
df_pyspark.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [58]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [60]:
df_pyspark.drop('age').show()

+---------+----------+------+
|     Name|Experience|Salary|
+---------+----------+------+
|    Krish|        10| 30000|
|Sudhanshu|         8| 25000|
|    Sunny|         4| 20000|
|     Paul|         3| 20000|
|   Harsha|         1| 15000|
|  Shubham|         2| 18000|
|   Mahesh|      null| 40000|
|     null|        10| 38000|
|     null|      null|  null|
+---------+----------+------+



In [62]:
df_pyspark.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [68]:
df_pyspark.na.drop(how = 'any').show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [64]:
df_pyspark.na.drop(how = 'all').show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [65]:
df_pyspark.na.drop(how = 'any', thresh = 3).show() ## For cases where atleast 3 non null values are there will be reatained

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+



In [66]:
df_pyspark.na.drop(how = 'any', thresh = 2).show() ## For cases where atleast 2 non null values are there will be reatained

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
+---------+----+----------+------+



In [67]:
df_pyspark.na.drop(how = 'any', thresh = 1).show() ## For cases where atleast 1 non null values are there will be reatained

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [73]:
df_pyspark.na.drop(how = 'any', subset = 'age').show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
|     null| 36|      null|  null|
+---------+---+----------+------+



In [71]:
df_pyspark.na.drop(how = 'any', subset = ['age', 'Name']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [72]:
df_pyspark.na.drop(how = 'any', subset = ['age', 'Experience']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+



In [77]:
df_pyspark.na.fill('Missing values',subset='Name').show()

+--------------+----+----------+------+
|          Name| age|Experience|Salary|
+--------------+----+----------+------+
|         Krish|  31|        10| 30000|
|     Sudhanshu|  30|         8| 25000|
|         Sunny|  29|         4| 20000|
|          Paul|  24|         3| 20000|
|        Harsha|  21|         1| 15000|
|       Shubham|  23|         2| 18000|
|        Mahesh|null|      null| 40000|
|Missing values|  34|        10| 38000|
|Missing values|  36|      null|  null|
+--------------+----+----------+------+



In [82]:
df_pyspark.na.fill(0, subset = ['salary','Experience']).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|         0| 40000|
|     null|  34|        10| 38000|
|     null|  36|         0|     0|
+---------+----+----------+------+



In [84]:
df_pyspark.na.fill('Missing').na.fill(0, subset = ['salary','Experience']).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|         0| 40000|
|  Missing|  34|        10| 38000|
|  Missing|  36|         0|     0|
+---------+----+----------+------+



In [85]:
df_pyspark.na.fill('Missing').na.fill(0).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh|  0|         0| 40000|
|  Missing| 34|        10| 38000|
|  Missing| 36|         0|     0|
+---------+---+----------+------+



In [86]:
df_pyspark.na.fill('Missing').na.fill(0,['salary','Experience']).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|         0| 40000|
|  Missing|  34|        10| 38000|
|  Missing|  36|         0|     0|
+---------+----+----------+------+



In [90]:
df_pyspark.na.fill(0,['Experience','Salary']).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|         0| 40000|
|     null|  34|        10| 38000|
|     null|  36|         0|     0|
+---------+----+----------+------+



In [92]:
df_pyspark = spark.read.csv('../test2.csv', header = True)

In [93]:
df_pyspark.na.fill('Missing values',['Experience','Salary']).show()

+---------+----+--------------+--------------+
|     Name| age|    Experience|        Salary|
+---------+----+--------------+--------------+
|    Krish|  31|            10|         30000|
|Sudhanshu|  30|             8|         25000|
|    Sunny|  29|             4|         20000|
|     Paul|  24|             3|         20000|
|   Harsha|  21|             1|         15000|
|  Shubham|  23|             2|         18000|
|   Mahesh|null|Missing values|         40000|
|     null|  34|            10|         38000|
|     null|  36|Missing values|Missing values|
+---------+----+--------------+--------------+



### Using imputer to impute missing values

In [102]:
df_pyspark = spark.read.option('inferschema', 'true').csv('../test2.csv', header = True)

In [103]:
from pyspark.ml.feature import Imputer

In [106]:
imputer = Imputer(
    inputCols= ['age','Experience','Salary'],
    outputCols= ['{}_col'.format(c) for c in ['age','Experience','Salary']],
    strategy= 'mean'
)

In [107]:
# Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+----+----------+------+-------+--------------+----------+
|     Name| age|Experience|Salary|age_col|Experience_col|Salary_col|
+---------+----+----------+------+-------+--------------+----------+
|    Krish|  31|        10| 30000|     31|            10|     30000|
|Sudhanshu|  30|         8| 25000|     30|             8|     25000|
|    Sunny|  29|         4| 20000|     29|             4|     20000|
|     Paul|  24|         3| 20000|     24|             3|     20000|
|   Harsha|  21|         1| 15000|     21|             1|     15000|
|  Shubham|  23|         2| 18000|     23|             2|     18000|
|   Mahesh|null|      null| 40000|     28|             5|     40000|
|     null|  34|        10| 38000|     34|            10|     38000|
|     null|  36|      null|  null|     36|             5|     25750|
+---------+----+----------+------+-------+--------------+----------+



In [109]:
df_pyspark.describe().show()

+-------+------+------------------+------------------+-----------------+
|summary|  Name|               age|        Experience|           Salary|
+-------+------+------------------+------------------+-----------------+
|  count|     7|                 8|                 7|                8|
|   mean|  null|              28.5| 5.428571428571429|          25750.0|
| stddev|  null|5.3718844791323335|3.8234863173611093|9361.776388210581|
|    min|Harsha|                21|                 1|            15000|
|    max| Sunny|                36|                10|            40000|
+-------+------+------------------+------------------+-----------------+



In [116]:
df_pyspark.describe('age').show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 8|
|   mean|              28.5|
| stddev|5.3718844791323335|
|    min|                21|
|    max|                36|
+-------+------------------+



In [117]:
imputer = Imputer(
    inputCols= ['age','Experience','Salary'],
    outputCols= ['{}_col'.format(c) for c in ['age','Experience','Salary']]
).setStrategy('median')

In [112]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+----+----------+------+-------+--------------+----------+
|     Name| age|Experience|Salary|age_col|Experience_col|Salary_col|
+---------+----+----------+------+-------+--------------+----------+
|    Krish|  31|        10| 30000|     31|            10|     30000|
|Sudhanshu|  30|         8| 25000|     30|             8|     25000|
|    Sunny|  29|         4| 20000|     29|             4|     20000|
|     Paul|  24|         3| 20000|     24|             3|     20000|
|   Harsha|  21|         1| 15000|     21|             1|     15000|
|  Shubham|  23|         2| 18000|     23|             2|     18000|
|   Mahesh|null|      null| 40000|     29|             4|     40000|
|     null|  34|        10| 38000|     34|            10|     38000|
|     null|  36|      null|  null|     36|             4|     20000|
+---------+----+----------+------+-------+--------------+----------+



## Pyspark Part 4

### Filter operations

In [120]:
df_pyspark = spark.read.csv('../test1.csv', inferSchema = True, header = True)

In [121]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [124]:
df_pyspark.filter(df_pyspark['Experience']< 10 ).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [125]:
df_pyspark.filter('Experience < 10' ).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [126]:
df_pyspark.filter('Experience > 3').select(['Name','age']).show()

+---------+---+
|     Name|age|
+---------+---+
|    Krish| 31|
|Sudhanshu| 30|
|    Sunny| 29|
+---------+---+



In [131]:
df_pyspark.filter((df_pyspark['Experience'] > 3) & (df_pyspark['age'] < 30)).show()

+-----+---+----------+------+
| Name|age|Experience|Salary|
+-----+---+----------+------+
|Sunny| 29|         4| 20000|
+-----+---+----------+------+



In [132]:
df_pyspark.filter((df_pyspark['Experience'] > 3) | (df_pyspark['age'] < 30)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [133]:
df_pyspark.filter(~(df_pyspark['Experience'] > 3)).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



## Pyspark Part 5

### Group by and aggregate functions

In [134]:
df_pyspark = spark.read.csv('../test3.csv', inferSchema=True, header = True)
df_pyspark.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [135]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



In [137]:
df_pyspark.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [139]:
df_pyspark.groupBy('Name').mean().show()

+---------+------------------+
|     Name|       avg(salary)|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|    Sunny|            6000.0|
|    Krish| 6333.333333333333|
|   Mahesh|            3500.0|
+---------+------------------+



In [140]:
df_pyspark.groupBy('Name').avg().show()

+---------+------------------+
|     Name|       avg(salary)|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|    Sunny|            6000.0|
|    Krish| 6333.333333333333|
|   Mahesh|            3500.0|
+---------+------------------+



In [141]:
df_pyspark.groupBy('Name').count().show()

+---------+-----+
|     Name|count|
+---------+-----+
|Sudhanshu|    3|
|    Sunny|    2|
|    Krish|    3|
|   Mahesh|    2|
+---------+-----+



In [143]:
df_pyspark.groupBy('Departments').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [144]:
df_pyspark.groupBy('Departments').avg().show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [145]:
df_pyspark.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [146]:
df_pyspark.groupBy('Departments').max().show()

+------------+-----------+
| Departments|max(salary)|
+------------+-----------+
|         IOT|      10000|
|    Big Data|       5000|
|Data Science|      20000|
+------------+-----------+



In [147]:
df_pyspark.groupBy('Name').max().show()

+---------+-----------+
|     Name|max(salary)|
+---------+-----------+
|Sudhanshu|      20000|
|    Sunny|      10000|
|    Krish|      10000|
|   Mahesh|       4000|
+---------+-----------+



In [149]:
df_pyspark.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



In [151]:
df_pyspark.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [150]:
df_pyspark.agg({'Salary':'median'}).show()

+--------------+
|median(Salary)|
+--------------+
|        5000.0|
+--------------+



In [153]:
df_pyspark.sort('Salary', ascending = False).show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|Sudhanshu|Data Science| 20000|
|    Krish|Data Science| 10000|
|Sudhanshu|         IOT| 10000|
|    Sunny|Data Science| 10000|
|    Krish|         IOT|  5000|
|Sudhanshu|    Big Data|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [154]:
df_pyspark.agg({'Salary':'stddev'}).show()

+-----------------+
|   stddev(Salary)|
+-----------------+
|5396.500923952689|
+-----------------+



In [157]:
df_pyspark.agg({'Salary':'mean'}).show()

+-----------+
|avg(Salary)|
+-----------+
|     7300.0|
+-----------+



In [156]:
df_pyspark.describe().show()

+-------+-----+-----------+-----------------+
|summary| Name|Departments|           salary|
+-------+-----+-----------+-----------------+
|  count|   10|         10|               10|
|   mean| null|       null|           7300.0|
| stddev| null|       null|5396.500923952689|
|    min|Krish|   Big Data|             2000|
|    max|Sunny|        IOT|            20000|
+-------+-----+-----------+-----------------+



## Part 6 Pyspark ML

### Examples of Pyspark ML

In [159]:
## Read The dataset
training = spark.read.csv('../test1.csv',header=True,inferSchema=True)

In [160]:
training.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



### Using Vector Assembler

In [168]:
training.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [161]:
from pyspark.ml.feature import VectorAssembler

In [164]:
featureassembler = VectorAssembler(inputCols = ['age', 'Experience'], outputCol = 'Independent variables')

In [166]:
output = featureassembler.transform(training)

In [167]:
output.show()

+---------+---+----------+------+---------------------+
|     Name|age|Experience|Salary|Independent variables|
+---------+---+----------+------+---------------------+
|    Krish| 31|        10| 30000|          [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|           [30.0,8.0]|
|    Sunny| 29|         4| 20000|           [29.0,4.0]|
|     Paul| 24|         3| 20000|           [24.0,3.0]|
|   Harsha| 21|         1| 15000|           [21.0,1.0]|
|  Shubham| 23|         2| 18000|           [23.0,2.0]|
+---------+---+----------+------+---------------------+



In [169]:
finalized_data = output.select(['Independent variables', 'Salary'])
finalized_data.show()

+---------------------+------+
|Independent variables|Salary|
+---------------------+------+
|          [31.0,10.0]| 30000|
|           [30.0,8.0]| 25000|
|           [29.0,4.0]| 20000|
|           [24.0,3.0]| 20000|
|           [21.0,1.0]| 15000|
|           [23.0,2.0]| 18000|
+---------------------+------+



In [170]:
from pyspark.ml.regression import LinearRegression
## Train test split
train_data , test_data = finalized_data.randomSplit([0.75,0.25])
linear_regression = LinearRegression(featuresCol= 'Independent variables', labelCol='Salary')
linear_regression = linear_regression.fit(train_data)

23/07/31 12:14:45 WARN Instrumentation: [854b892f] regParam is zero, which might cause numerical instability and overfitting.
23/07/31 12:14:45 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/07/31 12:14:45 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [171]:
linear_regression.coefficients

DenseVector([-90.5483, 1608.7819])

In [172]:
linear_regression.intercept

16079.136690647425

In [173]:
### Prediction
pred_results=linear_regression.evaluate(test_data)

In [175]:
pred_results.predictions.show()

+---------------------+------+-----------------+
|Independent variables|Salary|       prediction|
+---------------------+------+-----------------+
|           [23.0,2.0]| 18000|17214.09079632846|
+---------------------+------+-----------------+



In [176]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(785.909203671541, 617653.2764156357)

## Pyspark Part 7

In [193]:
df = spark.read.csv('../tips.csv', inferSchema=True, header=True)
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [178]:
df.columns

['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [179]:
### Handling Categorical Features
from pyspark.ml.feature import StringIndexer

In [194]:
indexer = StringIndexer(inputCol = 'sex', outputCol= 'sex_indexed')
# df_r = 
indexer.fit(df).transform(df).show() 

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|        0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|        0.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|        1.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        0.0|
|     18.43| 3.0|  Male|    No|Sun|Dinne

In [195]:
df_r = indexer.fit(df).transform(df)

In [196]:
df_r.show()

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|        0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|        0.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|        1.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        0.0|
|     18.43| 3.0|  Male|    No|Sun|Dinne

In [197]:
indexer = StringIndexer(inputCols= ['smoker','day','time'], outputCols=['{}_indexed'.format(c) for c in ['smoker','day','time']])
df_r = indexer.fit(df_r).transform(df_r)

In [198]:
df_r.show(5)

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
only showing top 5 rows



In [199]:
df_r.columns

['total_bill',
 'tip',
 'sex',
 'smoker',
 'day',
 'time',
 'size',
 'sex_indexed',
 'smoker_indexed',
 'day_indexed',
 'time_indexed']

In [200]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['total_bill', 'size', 'sex_indexed', 'smoker_indexed', 'day_indexed', 'time_indexed'],
                            outputCol = 'Independent Features')

In [201]:
output = assembler.transform(df_r)
output.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|Independent Features|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|[16.99,2.0,1.0,0....|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|[10.34,3.0,0.0,0....|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|[21.01,3.0,0.0,0....|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|[23.68,2.0,0.0,0....|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|[24.59,4.0,1.0,0....|
|     25.29|4.71|  Male|    No|S

In [202]:
output.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|[16.99,2.0,1.0,0....|
|[10.34,3.0,0.0,0....|
|[21.01,3.0,0.0,0....|
|[23.68,2.0,0.0,0....|
|[24.59,4.0,1.0,0....|
|[25.29,4.0,0.0,0....|
|[8.77,2.0,0.0,0.0...|
|[26.88,4.0,0.0,0....|
|[15.04,2.0,0.0,0....|
|[14.78,2.0,0.0,0....|
|[10.27,2.0,0.0,0....|
|[35.26,4.0,1.0,0....|
|[15.42,2.0,0.0,0....|
|[18.43,4.0,0.0,0....|
|[14.83,2.0,1.0,0....|
|[21.58,2.0,0.0,0....|
|[10.33,3.0,1.0,0....|
|[16.29,3.0,0.0,0....|
|[16.97,3.0,1.0,0....|
|(6,[0,1],[20.65,3...|
+--------------------+
only showing top 20 rows



In [203]:
finalized_data = output.select('Independent Features', 'tip')
finalized_data.show()

+--------------------+----+
|Independent Features| tip|
+--------------------+----+
|[16.99,2.0,1.0,0....|1.01|
|[10.34,3.0,0.0,0....|1.66|
|[21.01,3.0,0.0,0....| 3.5|
|[23.68,2.0,0.0,0....|3.31|
|[24.59,4.0,1.0,0....|3.61|
|[25.29,4.0,0.0,0....|4.71|
|[8.77,2.0,0.0,0.0...| 2.0|
|[26.88,4.0,0.0,0....|3.12|
|[15.04,2.0,0.0,0....|1.96|
|[14.78,2.0,0.0,0....|3.23|
|[10.27,2.0,0.0,0....|1.71|
|[35.26,4.0,1.0,0....| 5.0|
|[15.42,2.0,0.0,0....|1.57|
|[18.43,4.0,0.0,0....| 3.0|
|[14.83,2.0,1.0,0....|3.02|
|[21.58,2.0,0.0,0....|3.92|
|[10.33,3.0,1.0,0....|1.67|
|[16.29,3.0,0.0,0....|3.71|
|[16.97,3.0,1.0,0....| 3.5|
|(6,[0,1],[20.65,3...|3.35|
+--------------------+----+
only showing top 20 rows



In [204]:
# Train test split
from pyspark.ml.regression import LinearRegression
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
train_data.show(5)

+--------------------+----+
|Independent Features| tip|
+--------------------+----+
|(6,[0,1],[9.55,2.0])|1.45|
|(6,[0,1],[10.51,2...|1.25|
|(6,[0,1],[12.02,2...|1.97|
|(6,[0,1],[12.69,2...| 2.0|
|(6,[0,1],[13.28,2...|2.72|
+--------------------+----+
only showing top 5 rows



In [206]:
regressor = LinearRegression(featuresCol='Independent Features' ,labelCol= 'tip')
regressor = regressor.fit(train_data)

23/07/31 12:46:12 WARN Instrumentation: [6b337c7a] regParam is zero, which might cause numerical instability and overfitting.


In [207]:
regressor.coefficients

DenseVector([0.0846, 0.259, 0.1048, -0.2639, 0.1106, -0.0793])

In [208]:
regressor.intercept

0.5259924328512966

In [209]:
#Predictions
pred_results = regressor.evaluate(test_data)

In [211]:
pred_results.predictions.show()

+--------------------+----+------------------+
|Independent Features| tip|        prediction|
+--------------------+----+------------------+
|(6,[0,1],[10.07,2...|1.25|1.8959600776065515|
|(6,[0,1],[10.77,2...|1.47|1.9551812899749916|
|(6,[0,1],[11.61,2...|3.39|2.0262467448171195|
|(6,[0,1],[13.37,2...| 2.0|2.1751457930577685|
|(6,[0,1],[14.0,2.0])| 3.0| 2.228444884189365|
|(6,[0,1],[17.81,4...|2.34| 3.068805686906856|
|(6,[0,1],[17.92,2...|4.08| 2.560083673452629|
|(6,[0,1],[20.08,3...|3.15|3.0018375164598776|
|(6,[0,1],[21.7,2.0])| 4.3|2.8798782202422046|
|(6,[0,1],[29.03,3...|5.92|3.7590230174563604|
|(6,[0,1],[48.27,4...|6.73| 5.645774442253545|
|[7.25,2.0,0.0,1.0...|5.15|1.5040948684431161|
|[7.51,2.0,0.0,0.0...| 2.0|1.8213165024695182|
|[8.58,1.0,0.0,1.0...|1.92|1.4995379286260653|
|[10.33,3.0,1.0,0....|1.67|2.3923469108181363|
|[11.17,2.0,1.0,0....| 1.5|2.2357153796966105|
|[11.24,2.0,0.0,1....|1.76|1.7310360362966575|
|[11.35,2.0,1.0,1....| 2.5| 2.176957993023504|
|[11.59,2.0,0

In [212]:
# Performance metrices
pred_results.r2, pred_results.meanAbsoluteError, pred_results.meanSquaredError

(0.3789598349999539, 0.8982608566010509, 1.5796298027410625)