# Free Code Camp Course - Pyspark Full Course 2h

## Part 1 - Setup

In [1]:
!pip install pyspark



In [3]:
!pip install pandas



In [5]:
import pyspark

In [7]:
import pandas as pd

In [9]:
pd.read_csv('datas.csv')

Unnamed: 0,name,age,experience
0,akbar,11,2
1,abbas,20,1
2,nasrin,22,2
3,fariborz,40,3


## Part 2 - Pyspark Dataframe - Basic APIs

In [11]:
from pyspark.sql import SparkSession

In [13]:
spark = SparkSession.builder.appName('Practice').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/01 10:33:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [15]:
df_pyspark = spark.read.csv('datas.csv')

In [17]:
df_pyspark.show()

+--------+---+----------+
|     _c0|_c1|       _c2|
+--------+---+----------+
|    name|age|experience|
|   akbar| 11|         2|
|   abbas| 20|         1|
|  nasrin| 22|         2|
|fariborz| 40|         3|
+--------+---+----------+



In [19]:
df = spark.read.option('header', 'true').option('inferSchema', 'true').csv('datas.csv', inferSchema=True)

In [21]:
df.show()

+--------+---+----------+
|    name|age|experience|
+--------+---+----------+
|   akbar| 11|         2|
|   abbas| 20|         1|
|  nasrin| 22|         2|
|fariborz| 40|         3|
+--------+---+----------+



In [23]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



In [25]:
df.head(3)

[Row(name='akbar', age=11, experience=2),
 Row(name='abbas', age=20, experience=1),
 Row(name='nasrin', age=22, experience=2)]

In [27]:
type(df)

pyspark.sql.dataframe.DataFrame

In [29]:
df.columns

['name', 'age', 'experience']

In [31]:
df.select('Name')

DataFrame[Name: string]

In [33]:
df.select('Name').show()

+--------+
|    Name|
+--------+
|   akbar|
|   abbas|
|  nasrin|
|fariborz|
+--------+



In [35]:
df.select(['Name', 'Experience'])

DataFrame[Name: string, Experience: int]

In [37]:
df.select(['Name', 'Experience']).show()

+--------+----------+
|    Name|Experience|
+--------+----------+
|   akbar|         2|
|   abbas|         1|
|  nasrin|         2|
|fariborz|         3|
+--------+----------+



In [41]:
df['Name']

Column<'Name'>

In [46]:
# df['Name'].show()

In [48]:
df.dtypes

[('name', 'string'), ('age', 'int'), ('experience', 'int')]

In [50]:
df.describe()

DataFrame[summary: string, name: string, age: string, experience: string]

In [52]:
df.describe().show()

+-------+------+------------------+-----------------+
|summary|  name|               age|       experience|
+-------+------+------------------+-----------------+
|  count|     4|                 4|                4|
|   mean|  NULL|             23.25|              2.0|
| stddev|  NULL|12.148388096094616|0.816496580927726|
|    min| abbas|                11|                1|
|    max|nasrin|                40|                3|
+-------+------+------------------+-----------------+



In [62]:
# add new column to pyspark dataframe
df = df.withColumn('experience after 2 years', df['experience'] + 2)
df

DataFrame[name: string, age: int, experience: int, experience after 2 years: int]

In [60]:
df.show()

+--------+---+----------+------------------------+
|    name|age|experience|experience after 2 years|
+--------+---+----------+------------------------+
|   akbar| 11|         2|                       4|
|   abbas| 20|         1|                       3|
|  nasrin| 22|         2|                       4|
|fariborz| 40|         3|                       5|
+--------+---+----------+------------------------+



In [66]:
# drop the column
df = df.drop('experience after 2 years')
df

DataFrame[name: string, age: int, experience: int]

In [68]:
df.show()

+--------+---+----------+
|    name|age|experience|
+--------+---+----------+
|   akbar| 11|         2|
|   abbas| 20|         1|
|  nasrin| 22|         2|
|fariborz| 40|         3|
+--------+---+----------+



In [74]:
# rename a column
df = df.withColumnRenamed('experience', 'Experience')
df

DataFrame[name: string, age: int, Experience: int]

In [76]:
df.show()

+--------+---+----------+
|    name|age|Experience|
+--------+---+----------+
|   akbar| 11|         2|
|   abbas| 20|         1|
|  nasrin| 22|         2|
|fariborz| 40|         3|
+--------+---+----------+



# Part 3 - Pyspark Dataframe - Handling Missing Values

In [98]:
from pyspark.sql.functions import rand, round

In [100]:
df = df.withColumn('salary', round(rand(seed=3) * 1000, 0))
df

DataFrame[name: string, age: int, Experience: int, salary: double]

In [102]:
df.show()

+--------+---+----------+------+
|    name|age|Experience|salary|
+--------+---+----------+------+
|   akbar| 11|         2| 257.0|
|   abbas| 20|         1| 670.0|
|  nasrin| 22|         2| 942.0|
|fariborz| 40|         3| 730.0|
+--------+---+----------+------+



In [118]:
df = spark.read.option('header', 'true').option('inferSchema', 'true').csv('datas-2.csv', inferSchema=True)

In [157]:
# drop null recrods
df.dropna(how='all').show()

+--------+----+----------+
|    name| age|experience|
+--------+----+----------+
|   akbar|  11|         2|
|   abbas|  20|      NULL|
|  nasrin|NULL|         2|
|fariborz|  40|         3|
|    NULL|  90|        10|
| goodarz|NULL|         5|
|shahrooz|  40|         2|
| changiz|  35|      NULL|
|  sohrab|NULL|         6|
|belgheis|  31|      NULL|
+--------+----+----------+



In [159]:
# drop null records on specific columns
df.dropna(how="any", subset=['age']).show()

+--------+---+----------+
|    name|age|experience|
+--------+---+----------+
|   akbar| 11|         2|
|   abbas| 20|      NULL|
|fariborz| 40|         3|
|    NULL| 90|        10|
|shahrooz| 40|         2|
| changiz| 35|      NULL|
|belgheis| 31|      NULL|
+--------+---+----------+



In [161]:
# filling the missing values string values
df.fillna('NONAME').show()

+--------+----+----------+
|    name| age|experience|
+--------+----+----------+
|   akbar|  11|         2|
|   abbas|  20|      NULL|
|  nasrin|NULL|         2|
|fariborz|  40|         3|
|  NONAME|  90|        10|
| goodarz|NULL|         5|
|shahrooz|  40|         2|
| changiz|  35|      NULL|
|  sohrab|NULL|         6|
|belgheis|  31|      NULL|
+--------+----+----------+



In [165]:
# filling the missing values string values specific columns
df.fillna('NONAME', ['name', 'age']).show()

+--------+----+----------+
|    name| age|experience|
+--------+----+----------+
|   akbar|  11|         2|
|   abbas|  20|      NULL|
|  nasrin|NULL|         2|
|fariborz|  40|         3|
|  NONAME|  90|        10|
| goodarz|NULL|         5|
|shahrooz|  40|         2|
| changiz|  35|      NULL|
|  sohrab|NULL|         6|
|belgheis|  31|      NULL|
+--------+----+----------+



In [171]:
# filling the missing values specific columns for each column
df.na.fill({'age': 0, 'name': 'UNKNOWN'}).show()

+--------+---+----------+
|    name|age|experience|
+--------+---+----------+
|   akbar| 11|         2|
|   abbas| 20|      NULL|
|  nasrin|  0|         2|
|fariborz| 40|         3|
| UNKNOWN| 90|        10|
| goodarz|  0|         5|
|shahrooz| 40|         2|
| changiz| 35|      NULL|
|  sohrab|  0|         6|
|belgheis| 31|      NULL|
+--------+---+----------+



In [173]:
from pyspark.ml.feature import Imputer

In [177]:
imputer = Imputer(
    inputCols=['age', 'experience'],
    outputCols=["{}_imputed".format(c) for c in ['age', 'experience']]
                ).setStrategy('mean')

In [179]:
imputer.fit(df).transform(df).show()

+--------+----+----------+-----------+------------------+
|    name| age|experience|age_imputed|experience_imputed|
+--------+----+----------+-----------+------------------+
|   akbar|  11|         2|         11|                 2|
|   abbas|  20|      NULL|         20|                 4|
|  nasrin|NULL|         2|         38|                 2|
|fariborz|  40|         3|         40|                 3|
|    NULL|  90|        10|         90|                10|
| goodarz|NULL|         5|         38|                 5|
|shahrooz|  40|         2|         40|                 2|
| changiz|  35|      NULL|         35|                 4|
|  sohrab|NULL|         6|         38|                 6|
|belgheis|  31|      NULL|         31|                 4|
+--------+----+----------+-----------+------------------+



## Part 4 - Pyspark Dataframe - Filter Operation

In [187]:
df.filter(df.age > 30).show()

+--------+---+----------+
|    name|age|experience|
+--------+---+----------+
|fariborz| 40|         3|
|    NULL| 90|        10|
|shahrooz| 40|         2|
| changiz| 35|      NULL|
|belgheis| 31|      NULL|
+--------+---+----------+



In [205]:
# find
df.filter("age > 30").show()

+--------+---+----------+
|    name|age|experience|
+--------+---+----------+
|fariborz| 40|         3|
|    NULL| 90|        10|
|shahrooz| 40|         2|
| changiz| 35|      NULL|
|belgheis| 31|      NULL|
+--------+---+----------+



In [195]:
df.filter(df.name == "akbar").show()

+-----+---+----------+
| name|age|experience|
+-----+---+----------+
|akbar| 11|         2|
+-----+---+----------+



In [199]:
# projection
df.select(["name", "age"]).show()

+--------+----+
|    name| age|
+--------+----+
|   akbar|  11|
|   abbas|  20|
|  nasrin|NULL|
|fariborz|  40|
|    NULL|  90|
| goodarz|NULL|
|shahrooz|  40|
| changiz|  35|
|  sohrab|NULL|
|belgheis|  31|
+--------+----+



In [225]:
# and operation
df.filter((df['age'] < 45) & (df['experience'] > 2)).show()

+--------+---+----------+
|    name|age|experience|
+--------+---+----------+
|fariborz| 40|         3|
+--------+---+----------+



In [231]:
# or operation
df.filter((df['age'] < 45) | (df['experience'] > 2)).show()

+--------+----+----------+
|    name| age|experience|
+--------+----+----------+
|   akbar|  11|         2|
|   abbas|  20|      NULL|
|fariborz|  40|         3|
|    NULL|  90|        10|
| goodarz|NULL|         5|
|shahrooz|  40|         2|
| changiz|  35|      NULL|
|  sohrab|NULL|         6|
|belgheis|  31|      NULL|
+--------+----+----------+



In [233]:
# not
df.filter(~(df.name == "akbar")).show()

+--------+----+----------+
|    name| age|experience|
+--------+----+----------+
|   abbas|  20|      NULL|
|  nasrin|NULL|         2|
|fariborz|  40|         3|
| goodarz|NULL|         5|
|shahrooz|  40|         2|
| changiz|  35|      NULL|
|  sohrab|NULL|         6|
|belgheis|  31|      NULL|
+--------+----+----------+



## Part 4 - Pyspark Dataframe - GroupBy and Aggregate functions

In [286]:
df.groupBy('age').count().show()
df.groupBy('age').mean().show()
df.groupBy('age').sum().show()
df.groupBy('age').max().show()
df.groupBy('age').min().show()

+----+-----+
| age|count|
+----+-----+
|  31|    1|
|NULL|    3|
|  20|    1|
|  40|    2|
|  35|    1|
|  90|    1|
|  11|    1|
+----+-----+

+----+--------+-----------------+
| age|avg(age)|  avg(experience)|
+----+--------+-----------------+
|  31|    31.0|             NULL|
|NULL|    NULL|4.333333333333333|
|  20|    20.0|             NULL|
|  40|    40.0|              2.5|
|  35|    35.0|             NULL|
|  90|    90.0|             10.0|
|  11|    11.0|              2.0|
+----+--------+-----------------+

+----+--------+---------------+
| age|sum(age)|sum(experience)|
+----+--------+---------------+
|  31|      31|           NULL|
|NULL|    NULL|             13|
|  20|      20|           NULL|
|  40|      80|              5|
|  35|      35|           NULL|
|  90|      90|             10|
|  11|      11|              2|
+----+--------+---------------+

+----+--------+---------------+
| age|max(age)|max(experience)|
+----+--------+---------------+
|  31|      31|           NULL|


In [288]:
df.groupBy("name").agg({"age": "sum", "name": "count"}).show()

+--------+-----------+--------+
|    name|count(name)|sum(age)|
+--------+-----------+--------+
|    NULL|          0|      90|
|   abbas|          1|      20|
|shahrooz|          1|      40|
|  sohrab|          1|    NULL|
|   akbar|          1|      11|
|fariborz|          1|      40|
| changiz|          1|      35|
| goodarz|          1|    NULL|
|belgheis|          1|      31|
|  nasrin|          1|    NULL|
+--------+-----------+--------+



In [278]:
df.sort("name").show()

+--------+----+----------+
|    name| age|experience|
+--------+----+----------+
|    NULL|  90|        10|
|   abbas|  20|      NULL|
|   akbar|  11|         2|
|belgheis|  31|      NULL|
| changiz|  35|      NULL|
|fariborz|  40|         3|
| goodarz|NULL|         5|
|  nasrin|NULL|         2|
|shahrooz|  40|         2|
|  sohrab|NULL|         6|
+--------+----+----------+

