## Spark session

In [1]:
## Initiate spark session
from pyspark.sql import SparkSession
import numpy

In [2]:
spark = SparkSession.builder.appName('practise').getOrCreate()

In [3]:
spark

In [4]:
#df_pyspark = spark.read.csv('D:/pyspark/heroes.csv')
df_pyspark = spark.read.option('header','true').csv('D:/pyspark/heroes.csv',inferSchema=True)

In [5]:
df_pyspark.show()

+---------+---+----------+
|     name|age|experience|
+---------+---+----------+
| deadpool| 65|         2|
|spiderman| 20|         7|
|  captain|120|         3|
|     hulk| 50|         2|
+---------+---+----------+



In [6]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [7]:
## Glimpse head
df_pyspark.head(2)

[Row(name='deadpool', age=65, experience=2),
 Row(name='spiderman', age=20, experience=7)]

In [8]:
## Dataset Information
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



In [9]:
## Simpler reading style
df_pyspark = spark.read.csv('D:/pyspark/heroes.csv',header=True,inferSchema=True)
df_pyspark.show()

+---------+---+----------+
|     name|age|experience|
+---------+---+----------+
| deadpool| 65|         2|
|spiderman| 20|         7|
|  captain|120|         3|
|     hulk| 50|         2|
+---------+---+----------+



In [10]:
## Get columns
df_pyspark.columns

['name', 'age', 'experience']

### Selecting Columns

In [11]:
## Selecting columns
## [] does not work on spark
df_pyspark.select(['name','age']).show()

+---------+---+
|     name|age|
+---------+---+
| deadpool| 65|
|spiderman| 20|
|  captain|120|
|     hulk| 50|
+---------+---+



In [12]:
## check data types
df_pyspark.dtypes

[('name', 'string'), ('age', 'int'), ('experience', 'int')]

### Describe

In [13]:
df_pyspark.describe().show()

+-------+---------+------------------+------------------+
|summary|     name|               age|        experience|
+-------+---------+------------------+------------------+
|  count|        4|                 4|                 4|
|   mean|     null|             63.75|               3.5|
| stddev|     null|41.907636535600524|2.3804761428476167|
|    min|  captain|                20|                 2|
|    max|spiderman|               120|                 7|
+-------+---------+------------------+------------------+



### Adding Columns

In [14]:
df_pyspark = df_pyspark.withColumn('experience after 2 years',df_pyspark['experience']+2)

In [15]:
df_pyspark.show()

+---------+---+----------+------------------------+
|     name|age|experience|experience after 2 years|
+---------+---+----------+------------------------+
| deadpool| 65|         2|                       4|
|spiderman| 20|         7|                       9|
|  captain|120|         3|                       5|
|     hulk| 50|         2|                       4|
+---------+---+----------+------------------------+



### Drop Columns

In [16]:
df_pyspark = df_pyspark.drop('experience after 2 years')
df_pyspark.show()

+---------+---+----------+
|     name|age|experience|
+---------+---+----------+
| deadpool| 65|         2|
|spiderman| 20|         7|
|  captain|120|         3|
|     hulk| 50|         2|
+---------+---+----------+



### Renaming Column

In [17]:
df_pyspark = df_pyspark.withColumnRenamed('name','new_name')
df_pyspark.show()

+---------+---+----------+
| new_name|age|experience|
+---------+---+----------+
| deadpool| 65|         2|
|spiderman| 20|         7|
|  captain|120|         3|
|     hulk| 50|         2|
+---------+---+----------+



### Missing Values

In [18]:
df_pyspark2 = spark.read.csv('D:/pyspark/heroes2.csv',header=True,inferSchema=True)
df_pyspark2.show()

+------------+----+----------+--------+-------+
|        name| age|experience|  salary|company|
+------------+----+----------+--------+-------+
|    deadpool|  65|         2|10000000| marvel|
|   spiderman|  20|         7| 5000000| marvel|
|     captain| 120|         3|20000000| marvel|
|      batman|  50|         2|10000000|     dc|
| black widow|  40|         6|    null| marvel|
|    superman|null|      null| 5000000|     dc|
|womder woman|  35|      null|    null|     dc|
|       wanda|null|         3|10000000| marvel|
+------------+----+----------+--------+-------+



In [19]:
## any row with missing values
df_pyspark2.na.drop().show()

+---------+---+----------+--------+-------+
|     name|age|experience|  salary|company|
+---------+---+----------+--------+-------+
| deadpool| 65|         2|10000000| marvel|
|spiderman| 20|         7| 5000000| marvel|
|  captain|120|         3|20000000| marvel|
|   batman| 50|         2|10000000|     dc|
+---------+---+----------+--------+-------+



In [20]:
## any (if there is even one occurance)
df_pyspark2.na.drop(how='any').show()

+---------+---+----------+--------+-------+
|     name|age|experience|  salary|company|
+---------+---+----------+--------+-------+
| deadpool| 65|         2|10000000| marvel|
|spiderman| 20|         7| 5000000| marvel|
|  captain|120|         3|20000000| marvel|
|   batman| 50|         2|10000000|     dc|
+---------+---+----------+--------+-------+



In [21]:
## all(if all columns of a row have missing values)
df_pyspark2.na.drop(how='all').show()

+------------+----+----------+--------+-------+
|        name| age|experience|  salary|company|
+------------+----+----------+--------+-------+
|    deadpool|  65|         2|10000000| marvel|
|   spiderman|  20|         7| 5000000| marvel|
|     captain| 120|         3|20000000| marvel|
|      batman|  50|         2|10000000|     dc|
| black widow|  40|         6|    null| marvel|
|    superman|null|      null| 5000000|     dc|
|womder woman|  35|      null|    null|     dc|
|       wanda|null|         3|10000000| marvel|
+------------+----+----------+--------+-------+



In [22]:
## threshold (at least n non missing values)
df_pyspark2.na.drop(how='any',thresh=3).show()

+------------+----+----------+--------+-------+
|        name| age|experience|  salary|company|
+------------+----+----------+--------+-------+
|    deadpool|  65|         2|10000000| marvel|
|   spiderman|  20|         7| 5000000| marvel|
|     captain| 120|         3|20000000| marvel|
|      batman|  50|         2|10000000|     dc|
| black widow|  40|         6|    null| marvel|
|    superman|null|      null| 5000000|     dc|
|womder woman|  35|      null|    null|     dc|
|       wanda|null|         3|10000000| marvel|
+------------+----+----------+--------+-------+



In [23]:
## subset
df_pyspark2.na.drop(how='any',subset=['age']).show()

+------------+---+----------+--------+-------+
|        name|age|experience|  salary|company|
+------------+---+----------+--------+-------+
|    deadpool| 65|         2|10000000| marvel|
|   spiderman| 20|         7| 5000000| marvel|
|     captain|120|         3|20000000| marvel|
|      batman| 50|         2|10000000|     dc|
| black widow| 40|         6|    null| marvel|
|womder woman| 35|      null|    null|     dc|
+------------+---+----------+--------+-------+



In [24]:
## fill missing
df_pyspark2.na.fill(0).show()

+------------+---+----------+--------+-------+
|        name|age|experience|  salary|company|
+------------+---+----------+--------+-------+
|    deadpool| 65|         2|10000000| marvel|
|   spiderman| 20|         7| 5000000| marvel|
|     captain|120|         3|20000000| marvel|
|      batman| 50|         2|10000000|     dc|
| black widow| 40|         6|       0| marvel|
|    superman|  0|         0| 5000000|     dc|
|womder woman| 35|         0|       0|     dc|
|       wanda|  0|         3|10000000| marvel|
+------------+---+----------+--------+-------+



In [25]:
## Replace with mean

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age','experience','salary'],
    outputCols=["{}_imputed".format(c) for c in ['age','experience','salary']]
).setStrategy('mean')


In [26]:
imputer.fit(df_pyspark2).transform(df_pyspark2).show()

+------------+----+----------+--------+-------+-----------+------------------+--------------+
|        name| age|experience|  salary|company|age_imputed|experience_imputed|salary_imputed|
+------------+----+----------+--------+-------+-----------+------------------+--------------+
|    deadpool|  65|         2|10000000| marvel|         65|                 2|      10000000|
|   spiderman|  20|         7| 5000000| marvel|         20|                 7|       5000000|
|     captain| 120|         3|20000000| marvel|        120|                 3|      20000000|
|      batman|  50|         2|10000000|     dc|         50|                 2|      10000000|
| black widow|  40|         6|    null| marvel|         40|                 6|      10000000|
|    superman|null|      null| 5000000|     dc|         55|                 3|       5000000|
|womder woman|  35|      null|    null|     dc|         35|                 3|      10000000|
|       wanda|null|         3|10000000| marvel|         55| 

### Filter

In [27]:
df_pyspark2.filter('salary < 10000000').select(['name','age']).show()

+---------+----+
|     name| age|
+---------+----+
|spiderman|  20|
| superman|null|
+---------+----+



In [28]:
df_pyspark2.filter((df_pyspark2['salary'] < 10000000) & (df_pyspark2['age'] < 50)).show()

+---------+---+----------+-------+-------+
|     name|age|experience| salary|company|
+---------+---+----------+-------+-------+
|spiderman| 20|         7|5000000| marvel|
+---------+---+----------+-------+-------+



### Group By

In [29]:
df_pyspark3 = spark.read.csv('D:/pyspark/heroes3.csv',header=True,inferSchema=True)
df_pyspark3.show()

+------------+---+----------+--------+-------+
|        name|age|experience|  salary|company|
+------------+---+----------+--------+-------+
|    deadpool| 65|         2|10000000| marvel|
|   spiderman| 20|         7| 5000000| marvel|
|     captain|120|         3|20000000| marvel|
|      batman| 50|         2|10000000|     dc|
| black widow| 40|         6|15000000| marvel|
|    superman|150|        10| 5000000|     dc|
|womder woman| 35|         4|15000000|     dc|
|       wanda| 30|         3|10000000| marvel|
+------------+---+----------+--------+-------+



In [30]:
df_pyspark3.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- company: string (nullable = true)



In [35]:
df_pyspark3.groupBy('company').sum().show()

+-------+--------+---------------+-----------+
|company|sum(age)|sum(experience)|sum(salary)|
+-------+--------+---------------+-----------+
|     dc|     235|             16|   30000000|
| marvel|     275|             21|   60000000|
+-------+--------+---------------+-----------+



In [34]:
df_pyspark3.groupBy('company').count().show()

+-------+-----+
|company|count|
+-------+-----+
|     dc|    3|
| marvel|    5|
+-------+-----+



In [40]:
df_pyspark3.agg({'salary':'sum'}).show()

+-----------+
|sum(salary)|
+-----------+
|   90000000|
+-----------+



In [41]:
df_pyspark3.groupBy('company').max().show()

+-------+--------+---------------+-----------+
|company|max(age)|max(experience)|max(salary)|
+-------+--------+---------------+-----------+
|     dc|     150|             10|   15000000|
| marvel|     120|              7|   20000000|
+-------+--------+---------------+-----------+

