In [4]:
# !pip install pyspark

# Part 1

In [5]:
import pyspark

In [6]:
import pandas as pd 
pd.read_csv('test1.csv')

Unnamed: 0,name,age
0,dave,31
1,phil,30
2,bob,29


In [7]:
# before we start loading data with spark we need to start a SPARK SESSION

In [8]:
from pyspark.sql import SparkSession

In [9]:
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [10]:
spark

In [11]:
df = spark.read.csv('test1.csv')

In [12]:
df

DataFrame[_c0: string, _c1: string]

In [13]:
df.show()

+-----+---+
|  _c0|_c1|
+-----+---+
| name|age|
|dave | 31|
|phil | 30|
|  bob| 29|
+-----+---+



In [14]:
spark.read.option('header', 'true').csv('test1.csv').show()

+-----+---+
| name|age|
+-----+---+
|dave | 31|
|phil | 30|
|  bob| 29|
+-----+---+



In [15]:
df = spark.read.option('header', 'true').csv('test1.csv')

In [16]:
type(df)

pyspark.sql.dataframe.DataFrame

In [17]:
df.head(3)

[Row(name='dave ', age='31'),
 Row(name='phil ', age='30'),
 Row(name='bob', age='29')]

In [18]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)



# Part 2

In [19]:
from pyspark.sql import SparkSession

In [20]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [21]:
spark

In [22]:
# when we are running from my memory on my PC - this is shown as 'local' - ie one node

In [23]:
## read the data set

spark.read.option('header', 'true').csv('test2.csv')

DataFrame[name: string, age: string, experience: string]

In [24]:
spark.read.option('header', 'true').csv('test2.csv').show()

+-----+---+----------+
| name|age|experience|
+-----+---+----------+
|dave | 31|        10|
|phil | 30|         8|
|  bob| 29|         4|
+-----+---+----------+



In [25]:
df = spark.read.option('header', 'true').csv('test2.csv')

In [26]:
### check the schema

df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- experience: string (nullable = true)



In [27]:
# by default they have been as strings

# to change this we need to input the variable within ''.csv(inferSchema = True)'

In [28]:
df = spark.read.option('header', 'true').csv('test1.csv',inferSchema = True)

In [29]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [30]:
# another way of reading data 

In [31]:
df = spark.read.csv('test2.csv', header = True, inferSchema = True)
df.show()

+-----+---+----------+
| name|age|experience|
+-----+---+----------+
|dave | 31|        10|
|phil | 30|         8|
|  bob| 29|         4|
+-----+---+----------+



In [34]:
type(df)

pyspark.sql.dataframe.DataFrame

In [35]:
# ie what is a dataframe? - is it one kind of data structure, ie columns and rows 

In [36]:
# how to select columns

In [37]:
df.columns

['name', 'age', 'experience']

In [38]:
df.head(3)

[Row(name='dave ', age=31, experience=10),
 Row(name='phil ', age=30, experience=8),
 Row(name='bob', age=29, experience=4)]

In [39]:
df.show()

+-----+---+----------+
| name|age|experience|
+-----+---+----------+
|dave | 31|        10|
|phil | 30|         8|
|  bob| 29|         4|
+-----+---+----------+



In [40]:
df.select('name')

DataFrame[name: string]

In [41]:
df.select('name').show()

+-----+
| name|
+-----+
|dave |
|phil |
|  bob|
+-----+



In [42]:
type(df.select('name'))

pyspark.sql.dataframe.DataFrame

In [43]:
df.select('name', 'age')

DataFrame[name: string, age: int]

In [44]:
df.select(['name', 'age']).show()

+-----+---+
| name|age|
+-----+---+
|dave | 31|
|phil | 30|
|  bob| 29|
+-----+---+



In [45]:
# how to check data types 

In [46]:
df.dtypes

[('name', 'string'), ('age', 'int'), ('experience', 'int')]

In [47]:
df.describe().show()

+-------+-----+----+-----------------+
|summary| name| age|       experience|
+-------+-----+----+-----------------+
|  count|    3|   3|                3|
|   mean| NULL|30.0|7.333333333333333|
| stddev| NULL| 1.0|3.055050463303893|
|    min|  bob|  29|                4|
|    max|phil |  31|               10|
+-------+-----+----+-----------------+



In [48]:
# adding columns

In [50]:
df.withColumn('experience after 2 years', df['experience']+2)

DataFrame[name: string, age: int, experience: int, experience after 2 years: int]

In [51]:
df.withColumn('experience after 2 years', df['experience']+2).show()

+-----+---+----------+------------------------+
| name|age|experience|experience after 2 years|
+-----+---+----------+------------------------+
|dave | 31|        10|                      12|
|phil | 30|         8|                      10|
|  bob| 29|         4|                       6|
+-----+---+----------+------------------------+



In [54]:
df = df.withColumn('experience after 2 years', df['experience']+2)

In [55]:
df.show()

+-----+---+----------+------------------------+
| name|age|experience|experience after 2 years|
+-----+---+----------+------------------------+
|dave | 31|        10|                      12|
|phil | 30|         8|                      10|
|  bob| 29|         4|                       6|
+-----+---+----------+------------------------+



In [56]:
# drop columns 

In [57]:
df.drop('experience after 2 years').show()

+-----+---+----------+
| name|age|experience|
+-----+---+----------+
|dave | 31|        10|
|phil | 30|         8|
|  bob| 29|         4|
+-----+---+----------+



In [59]:
df = df.drop('experience after 2 years')

In [60]:
df.show()

+-----+---+----------+
| name|age|experience|
+-----+---+----------+
|dave | 31|        10|
|phil | 30|         8|
|  bob| 29|         4|
+-----+---+----------+



In [61]:
# rename the columns 

In [62]:
df.withColumnRenamed('name', 'new name').show()

+--------+---+----------+
|new name|age|experience|
+--------+---+----------+
|   dave | 31|        10|
|   phil | 30|         8|
|     bob| 29|         4|
+--------+---+----------+



# Part 3

In [63]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [64]:
spark.read.csv('test3.csv', header = True, inferSchema = True)

DataFrame[name: string, age: int, experience : int, salary: int]

In [65]:
df = spark.read.csv('test3.csv', header = True, inferSchema = True)

In [66]:
df.show()

+-----+----+-----------+------+
| name| age|experience |salary|
+-----+----+-----------+------+
| dave|  31|         10| 30000|
|  bob|  30|          8| 25000|
| phil|  29|          4| 20000|
|gemma|  24|       NULL| 20000|
|  rob|  51|       NULL| 15000|
|  gab|  28|       NULL| 18000|
| toby|NULL|       NULL| 40000|
| NULL|  34|         10| 38000|
| NULL|  36|       NULL|  NULL|
+-----+----+-----------+------+



In [67]:
# drop the columns

In [68]:
df.drop('name').show()

+----+-----------+------+
| age|experience |salary|
+----+-----------+------+
|  31|         10| 30000|
|  30|          8| 25000|
|  29|          4| 20000|
|  24|       NULL| 20000|
|  51|       NULL| 15000|
|  28|       NULL| 18000|
|NULL|       NULL| 40000|
|  34|         10| 38000|
|  36|       NULL|  NULL|
+----+-----------+------+



In [69]:
# drop rows depending on nulls

In [70]:
# where there is a null, the whole row is dropped 

df.na.drop().show()

+----+---+-----------+------+
|name|age|experience |salary|
+----+---+-----------+------+
|dave| 31|         10| 30000|
| bob| 30|          8| 25000|
|phil| 29|          4| 20000|
+----+---+-----------+------+



In [71]:
# how variable

In [72]:
# if all rows have nulls

df.na.drop(how = 'all').show()

+-----+----+-----------+------+
| name| age|experience |salary|
+-----+----+-----------+------+
| dave|  31|         10| 30000|
|  bob|  30|          8| 25000|
| phil|  29|          4| 20000|
|gemma|  24|       NULL| 20000|
|  rob|  51|       NULL| 15000|
|  gab|  28|       NULL| 18000|
| toby|NULL|       NULL| 40000|
| NULL|  34|         10| 38000|
| NULL|  36|       NULL|  NULL|
+-----+----+-----------+------+



In [73]:
# if any rows have nulls they will be dropped

df.na.drop(how = 'any').show()

+----+---+-----------+------+
|name|age|experience |salary|
+----+---+-----------+------+
|dave| 31|         10| 30000|
| bob| 30|          8| 25000|
|phil| 29|          4| 20000|
+----+---+-----------+------+



In [74]:
# threshold

In [75]:
df.na.drop(how = 'all', thresh = 2).show()

+-----+----+-----------+------+
| name| age|experience |salary|
+-----+----+-----------+------+
| dave|  31|         10| 30000|
|  bob|  30|          8| 25000|
| phil|  29|          4| 20000|
|gemma|  24|       NULL| 20000|
|  rob|  51|       NULL| 15000|
|  gab|  28|       NULL| 18000|
| toby|NULL|       NULL| 40000|
| NULL|  34|         10| 38000|
+-----+----+-----------+------+

