In [1]:
#!pip install pyspark

In [2]:
import pyspark
import pandas as pd

In [3]:
data = pd.read_csv('E:\PySpark\data.csv', header=0)
data.head()

Unnamed: 0,EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID
0,198,Donald,OConnell,DOCONNEL,650.507.9833,21-JUN-07,SH_CLERK,2600,-,124,50
1,199,Douglas,Grant,DGRANT,650.507.9844,13-JAN-08,SH_CLERK,2600,-,124,50
2,200,Jennifer,Whalen,JWHALEN,515.123.4444,17-SEP-03,AD_ASST,4400,-,101,10
3,201,Michael,Hartstein,MHARTSTE,515.123.5555,17-FEB-04,MK_MAN,13000,-,100,20
4,202,Pat,Fay,PFAY,603.123.6666,17-AUG-05,MK_REP,6000,-,201,20


## Spark Session

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('Practise PySpark').getOrCreate()

In [6]:
spark

### Read data with spark (1)

In [7]:
df_pyspark = spark.read.csv('.\data.csv')
df_pyspark.show()

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        _c0|       _c1|      _c2|     _c3|         _c4|      _c5|       _c6|   _c7|           _c8|       _c9|         _c10|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20|


### Read CSV with Spark (2)

In [8]:
spark.read.option('header', 'true').csv('data.csv', inferSchema=True).show()

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|  6000|            - |       201|           20|


In [9]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

### Spark Schema

In [10]:
df_pyspark.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)



In [11]:
df_pyspark.head(3)

[Row(_c0='EMPLOYEE_ID', _c1='FIRST_NAME', _c2='LAST_NAME', _c3='EMAIL', _c4='PHONE_NUMBER', _c5='HIRE_DATE', _c6='JOB_ID', _c7='SALARY', _c8='COMMISSION_PCT', _c9='MANAGER_ID', _c10='DEPARTMENT_ID'),
 Row(_c0='198', _c1='Donald', _c2='OConnell', _c3='DOCONNEL', _c4='650.507.9833', _c5='21-JUN-07', _c6='SH_CLERK', _c7='2600', _c8=' - ', _c9='124', _c10='50'),
 Row(_c0='199', _c1='Douglas', _c2='Grant', _c3='DGRANT', _c4='650.507.9844', _c5='13-JAN-08', _c6='SH_CLERK', _c7='2600', _c8=' - ', _c9='124', _c10='50')]

## Selecting Columns and Indexing

In [12]:
df_pyspark.columns

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10']

In [13]:
df_pyspark.select('_c1')

DataFrame[_c1: string]

In [14]:
type(df_pyspark.select('_c1'))

pyspark.sql.dataframe.DataFrame

In [15]:
df_pyspark.select(['_c1','_c2']).show(3)

+----------+---------+
|       _c1|      _c2|
+----------+---------+
|FIRST_NAME|LAST_NAME|
|    Donald| OConnell|
|   Douglas|    Grant|
+----------+---------+
only showing top 3 rows



## Checking Data Types

In [16]:
df_pyspark.dtypes

[('_c0', 'string'),
 ('_c1', 'string'),
 ('_c2', 'string'),
 ('_c3', 'string'),
 ('_c4', 'string'),
 ('_c5', 'string'),
 ('_c6', 'string'),
 ('_c7', 'string'),
 ('_c8', 'string'),
 ('_c9', 'string'),
 ('_c10', 'string')]

In [17]:
df_pyspark.describe().show(5)

+-------+------------------+-------+--------+------+------------+---------+----------+-----------------+--------------+------------------+-----------------+
|summary|               _c0|    _c1|     _c2|   _c3|         _c4|      _c5|       _c6|              _c7|           _c8|               _c9|             _c10|
+-------+------------------+-------+--------+------+------------+---------+----------+-----------------+--------------+------------------+-----------------+
|  count|                51|     51|      51|    51|          51|       51|        51|               51|            51|                51|               51|
|   mean|            134.76|   null|    null|  null|        null|     null|      null|          6182.32|          null|114.83673469387755|             57.6|
| stddev|33.631593504213456|   null|    null|  null|        null|     null|      null|4586.181771631927|          null|20.591611296406914|25.11686968666962|
|    min|               100|   Adam|Atkinson|AFRIPP|515.12

## Adding columns and Dropping columns

In [18]:
## SALARY = '_c7'

df_pyspark.withColumn('_c11', df_pyspark['_c7']+2000).show(3)

+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+------+
|        _c0|       _c1|      _c2|     _c3|         _c4|      _c5|     _c6|   _c7|           _c8|       _c9|         _c10|  _c11|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|  null|
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|  2600|            - |       124|           50|4600.0|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|SH_CLERK|  2600|            - |       124|           50|4600.0|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+------+
only showing top 3 rows



### Dropping the columns


In [19]:
df_pyspark = df_pyspark.drop('_c11')

In [20]:
df_pyspark.show(4)

+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|        _c0|       _c1|      _c2|     _c3|         _c4|      _c5|     _c6|   _c7|           _c8|       _c9|         _c10|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03| AD_ASST|  4400|            - |       101|           10|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
only showing top

### Renaming the columns

In [21]:
df_pyspark.withColumnRenamed('_c1','First_Name').show(2)

+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|        _c0|First_Name|      _c2|     _c3|         _c4|      _c5|     _c6|   _c7|           _c8|       _c9|         _c10|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|  2600|            - |       124|           50|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
only showing top 2 rows



## Handling Missing Values

In [22]:
## Drop the columns
df_pyspark.drop('_c0').show()

+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|       _c1|      _c2|     _c3|         _c4|      _c5|       _c6|   _c7|           _c8|       _c9|         _c10|
+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|
|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|
|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20|
|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|  6000|            - |       20

In [25]:
## Dropping null 0r na values

df_pyspark.na.drop().show()

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        _c0|       _c1|      _c2|     _c3|         _c4|      _c5|       _c6|   _c7|           _c8|       _c9|         _c10|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20|


In [27]:
## any == how

df_pyspark.na.drop(how='any').show()

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        _c0|       _c1|      _c2|     _c3|         _c4|      _c5|       _c6|   _c7|           _c8|       _c9|         _c10|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20|


In [28]:
## Threshold

df_pyspark.na.drop(how='any', thresh=2).show()

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        _c0|       _c1|      _c2|     _c3|         _c4|      _c5|       _c6|   _c7|           _c8|       _c9|         _c10|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20|


In [31]:
## Subset: Drop nan values from a specific column

df_pyspark.na.drop(how='any', subset=['_c8']).show()

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        _c0|       _c1|      _c2|     _c3|         _c4|      _c5|       _c6|   _c7|           _c8|       _c9|         _c10|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20|


In [32]:
## Filling the missing values

df_pyspark.na.fill('Missing Values').show()

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        _c0|       _c1|      _c2|     _c3|         _c4|      _c5|       _c6|   _c7|           _c8|       _c9|         _c10|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20|


In [39]:
## Handling missing values with Mean, Mode, Median

## Using Imputer functions

#from pyspark.ml.feature import Imputer

#imputer = Imputer(
#    inputCols=['_c7','_c8'],
#    outputCols = ["{}_imputed".format(c) for c in ['_c7','_c8'] ]
#).setStrategy("mean")

In [40]:
## Add imputation cols to df
#imputer.fit(df_pyspark).transform(df_pyspark).show()

## Filter Operations

* Important for data preprocessing

In [45]:
## Salary of the people less than or equal to 20000

df_pyspark.filter('_c7 <= 8000').show()

+---+-----------+----------+--------+------------+---------+----------+----+---+---+----+
|_c0|        _c1|       _c2|     _c3|         _c4|      _c5|       _c6| _c7|_c8|_c9|_c10|
+---+-----------+----------+--------+------------+---------+----------+----+---+---+----+
|198|     Donald|  OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|2600| - |124|  50|
|199|    Douglas|     Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|2600| - |124|  50|
|200|   Jennifer|    Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|4400| - |101|  10|
|202|        Pat|       Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|6000| - |201|  20|
|203|      Susan|    Mavris| SMAVRIS|515.123.7777|07-JUN-02|    HR_REP|6500| - |101|  40|
|104|      Bruce|     Ernst|  BERNST|590.423.4568|21-MAY-07|   IT_PROG|6000| - |103|  60|
|105|      David|    Austin| DAUSTIN|590.423.4569|25-JUN-05|   IT_PROG|4800| - |103|  60|
|106|      Valli| Pataballa|VPATABAL|590.423.4560|05-FEB-06|   IT_PROG|4800| - |103|  60|
|107|     

In [46]:
df_pyspark.filter('_c7 <= 8000').select(['_c1']).show()

+-----------+
|        _c1|
+-----------+
|     Donald|
|    Douglas|
|   Jennifer|
|        Pat|
|      Susan|
|      Bruce|
|      David|
|      Valli|
|      Diana|
|     Ismael|
|Jose Manuel|
|       Luis|
|  Alexander|
|     Shelli|
|      Sigal|
|        Guy|
|      Karen|
|    Matthew|
|      Payam|
|     Shanta|
+-----------+
only showing top 20 rows



In [48]:
df_pyspark.filter(df_pyspark['_c7'] <= 5000).show()

+---+---------+-----------+--------+------------+---------+--------+----+---+---+----+
|_c0|      _c1|        _c2|     _c3|         _c4|      _c5|     _c6| _c7|_c8|_c9|_c10|
+---+---------+-----------+--------+------------+---------+--------+----+---+---+----+
|198|   Donald|   OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|2600| - |124|  50|
|199|  Douglas|      Grant|  DGRANT|650.507.9844|13-JAN-08|SH_CLERK|2600| - |124|  50|
|200| Jennifer|     Whalen| JWHALEN|515.123.4444|17-SEP-03| AD_ASST|4400| - |101|  10|
|105|    David|     Austin| DAUSTIN|590.423.4569|25-JUN-05| IT_PROG|4800| - |103|  60|
|106|    Valli|  Pataballa|VPATABAL|590.423.4560|05-FEB-06| IT_PROG|4800| - |103|  60|
|107|    Diana|    Lorentz|DLORENTZ|590.423.5567|07-FEB-07| IT_PROG|4200| - |103|  60|
|115|Alexander|       Khoo|   AKHOO|515.127.4562|18-MAY-03|PU_CLERK|3100| - |114|  30|
|116|   Shelli|      Baida|  SBAIDA|515.127.4563|24-DEC-05|PU_CLERK|2900| - |114|  30|
|117|    Sigal|     Tobias| STOBIAS|515.127

In [54]:
df_pyspark.filter((df_pyspark['_c7'] <= 15000) & 
                  (df_pyspark['_c7'] >= 5000)).show()

+---+-----------+---------+--------+------------+---------+----------+-----+---+---+----+
|_c0|        _c1|      _c2|     _c3|         _c4|      _c5|       _c6|  _c7|_c8|_c9|_c10|
+---+-----------+---------+--------+------------+---------+----------+-----+---+---+----+
|201|    Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN|13000| - |100|  20|
|202|        Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP| 6000| - |201|  20|
|203|      Susan|   Mavris| SMAVRIS|515.123.7777|07-JUN-02|    HR_REP| 6500| - |101|  40|
|204|    Hermann|     Baer|   HBAER|515.123.8888|07-JUN-02|    PR_REP|10000| - |101|  70|
|205|    Shelley|  Higgins|SHIGGINS|515.123.8080|07-JUN-02|    AC_MGR|12008| - |101| 110|
|206|    William|    Gietz|  WGIETZ|515.123.8181|07-JUN-02|AC_ACCOUNT| 8300| - |205| 110|
|103|  Alexander|   Hunold| AHUNOLD|590.423.4567|03-JAN-06|   IT_PROG| 9000| - |102|  60|
|104|      Bruce|    Ernst|  BERNST|590.423.4568|21-MAY-07|   IT_PROG| 6000| - |103|  60|
|108|     

In [55]:
## OR

df_pyspark.filter((df_pyspark['_c7'] <= 15000) |
                  (df_pyspark['_c7'] >= 5000)).show()

+---+---------+---------+--------+------------+---------+----------+-----+---+---+----+
|_c0|      _c1|      _c2|     _c3|         _c4|      _c5|       _c6|  _c7|_c8|_c9|_c10|
+---+---------+---------+--------+------------+---------+----------+-----+---+---+----+
|198|   Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK| 2600| - |124|  50|
|199|  Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK| 2600| - |124|  50|
|200| Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST| 4400| - |101|  10|
|201|  Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN|13000| - |100|  20|
|202|      Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP| 6000| - |201|  20|
|203|    Susan|   Mavris| SMAVRIS|515.123.7777|07-JUN-02|    HR_REP| 6500| - |101|  40|
|204|  Hermann|     Baer|   HBAER|515.123.8888|07-JUN-02|    PR_REP|10000| - |101|  70|
|205|  Shelley|  Higgins|SHIGGINS|515.123.8080|07-JUN-02|    AC_MGR|12008| - |101| 110|
|206|  William|    Gietz|  WGIET

In [56]:
## Inverse Filter Operation

df_pyspark.filter(~(df_pyspark['_c7'] <= 15000) |
                  (df_pyspark['_c7'] >= 5000)).show()

+---+-----------+---------+--------+------------+---------+----------+-----+---+---+----+
|_c0|        _c1|      _c2|     _c3|         _c4|      _c5|       _c6|  _c7|_c8|_c9|_c10|
+---+-----------+---------+--------+------------+---------+----------+-----+---+---+----+
|201|    Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN|13000| - |100|  20|
|202|        Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP| 6000| - |201|  20|
|203|      Susan|   Mavris| SMAVRIS|515.123.7777|07-JUN-02|    HR_REP| 6500| - |101|  40|
|204|    Hermann|     Baer|   HBAER|515.123.8888|07-JUN-02|    PR_REP|10000| - |101|  70|
|205|    Shelley|  Higgins|SHIGGINS|515.123.8080|07-JUN-02|    AC_MGR|12008| - |101| 110|
|206|    William|    Gietz|  WGIETZ|515.123.8181|07-JUN-02|AC_ACCOUNT| 8300| - |205| 110|
|100|     Steven|     King|   SKING|515.123.4567|17-JUN-03|   AD_PRES|24000| - | - |  90|
|101|      Neena|  Kochhar|NKOCHHAR|515.123.4568|21-SEP-05|     AD_VP|17000| - |100|  90|
|102|     

## GroupBy And Aggregate Functions

(For Data Preprocessing)

In [60]:
## GroupBy
# Q. What will be the mean avg salary?

# df_pyspark.groupby('_c1').sum().show()

In [61]:
## Groupby department
# Q. Which department gets max salary

df_pyspark.groupby('_c6').sum().show()

Py4JJavaError: An error occurred while calling o308.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.0 failed 1 times, most recent failure: Lost task 0.0 in stage 32.0 (TID 31) (PREDATOR executor driver): java.nio.file.NoSuchFileException: C:\Users\Swapnil\AppData\Local\Temp\blockmgr-478d9285-e07a-47ba-9e69-399b5fb8c222\16
	at sun.nio.fs.WindowsException.translateToIOException(Unknown Source)
	at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
	at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
	at sun.nio.fs.WindowsFileSystemProvider.createDirectory(Unknown Source)
	at java.nio.file.Files.createDirectory(Unknown Source)
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:108)
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:126)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.$anonfun$getDataFile$2(IndexShuffleBlockResolver.scala:103)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.getDataFile(IndexShuffleBlockResolver.scala:103)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.getDataFile(IndexShuffleBlockResolver.scala:65)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.<init>(LocalDiskShuffleMapOutputWriter.java:78)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents.createMapOutputWriter(LocalDiskShuffleExecutorComponents.java:71)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:138)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.nio.file.NoSuchFileException: C:\Users\Swapnil\AppData\Local\Temp\blockmgr-478d9285-e07a-47ba-9e69-399b5fb8c222\16
	at sun.nio.fs.WindowsException.translateToIOException(Unknown Source)
	at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
	at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
	at sun.nio.fs.WindowsFileSystemProvider.createDirectory(Unknown Source)
	at java.nio.file.Files.createDirectory(Unknown Source)
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:108)
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:126)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.$anonfun$getDataFile$2(IndexShuffleBlockResolver.scala:103)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.getDataFile(IndexShuffleBlockResolver.scala:103)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.getDataFile(IndexShuffleBlockResolver.scala:65)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.<init>(LocalDiskShuffleMapOutputWriter.java:78)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents.createMapOutputWriter(LocalDiskShuffleExecutorComponents.java:71)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:138)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
