# Pyspark fundamentals

In [1]:
# A Spark Session is required to execute any code on a Spark Cluster. 
# It's also necessary for working with higher-level APIs like DataFrames and Spark SQL
# It's not always mandatory though
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [2]:
# Read the json file people that is placed in same path as this notebook. Else specify full path
df = spark.read.json('people.json')
df.show()

+---+---------+------+--------+----------+
|age|firstName|gender|lastName|    number|
+---+---------+------+--------+----------+
| 28|      Joe|  male| Jackson|7349282382|
| 32|    James|  male|   Smith|5678568567|
| 24|    Emily|female|   Jones| 456754675|
+---+---------+------+--------+----------+



In [3]:
# printSchema is used to print the schema of the dataframe. It displays the datatype and if the field is nullable. 
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- number: string (nullable = true)



In [4]:
# Viewing the columns
df.columns

['age', 'firstName', 'gender', 'lastName', 'number']

In [5]:
#  describing the dataframe structure. describe() displays the datatype and field name
# as seen age is string here
df.describe()

DataFrame[summary: string, age: string, firstName: string, gender: string, lastName: string, number: string]

In [6]:
# describe() along with show() displays count, mean, stddev, min and max details. 
df.describe().show()

+-------+----+---------+------+--------+--------------------+
|summary| age|firstName|gender|lastName|              number|
+-------+----+---------+------+--------+--------------------+
|  count|   3|        3|     3|       3|                   3|
|   mean|28.0|     null|  null|    null| 4.494868541333333E9|
| stddev| 4.0|     null|  null|    null|3.5954963302739053E9|
|    min|  24|    Emily|female| Jackson|           456754675|
|    max|  32|      Joe|  male|   Smith|          7349282382|
+-------+----+---------+------+--------+--------------------+



### Creating user defined schema with datatypes. 

In [7]:
# Import the required methods from the package pyspark.sql.types
from pyspark.sql.types import (StructType, StructField, 
                               IntegerType, StringType)

In [8]:
# create a schema with the columns age, firstName, gender, lastName and number with required datatypes. 
# as seen age is defined as Integer now. 

# third argument True() says field can be nullable
data_schema = [StructField (('age'), IntegerType(), True ),
               StructField (('firstName'), StringType(), True ),
               StructField (('gender'), StringType(), True ),
               StructField (('lastName'), StringType(), True ),
               StructField (('number'), StringType(), True )]

In [9]:
# Once the schema is required, create the Structure Type and assign the data_schema that is created. 
final_struc = StructType(fields=data_schema)

In [10]:
# Reading the json file with the user defined file structure. 
df = spark.read.json('people.json', schema=final_struc)

In [11]:
# print the schema with userdefined structure. 
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- number: string (nullable = true)



In [12]:
# print the data
df.show()

+---+---------+------+--------+----------+
|age|firstName|gender|lastName|    number|
+---+---------+------+--------+----------+
| 28|      Joe|  male| Jackson|7349282382|
| 32|    James|  male|   Smith|5678568567|
| 24|    Emily|female|   Jones| 456754675|
+---+---------+------+--------+----------+



In [13]:
# df['age'] displays the Column but not exact data. 
print(df['age'])

# When the type() is displayed, we can see its column.Column
print(type(df['age']))

Column<'age'>
<class 'pyspark.sql.column.Column'>


In [14]:
# To display the data of any column, we need to use .select() function like SQL. 
df.select('age').show()

# type() would display its dataframe.Dataframe
print(type(df.select('age')))

+---+
|age|
+---+
| 28|
| 32|
| 24|
+---+

<class 'pyspark.sql.dataframe.DataFrame'>


In [15]:
# to display multiple columns, specify the column names within select()
df.select('age','gender').show()

+---+------+
|age|gender|
+---+------+
| 28|  male|
| 32|  male|
| 24|female|
+---+------+



In [16]:
# head() is used to Row data as RowData. 
# head(2) displays 2 records as list elements. 
# to select the 1st element from the list here, need to use the [0] and to select 2nd element, use [1], just like accessing list elements

print(df.head(2))
print('------------------------------------')
print(df.head(2)[0])
print('------------------------------------')
print(type((df.head(2)[0])))

[Row(age=28, firstName='Joe', gender='male', lastName='Jackson', number='7349282382'), Row(age=32, firstName='James', gender='male', lastName='Smith', number='5678568567')]
------------------------------------
Row(age=28, firstName='Joe', gender='male', lastName='Jackson', number='7349282382')
------------------------------------
<class 'pyspark.sql.types.Row'>


In [17]:
# to display first two entries in the dataframe use show(2)
df.show(2)

+---+---------+------+--------+----------+
|age|firstName|gender|lastName|    number|
+---+---------+------+--------+----------+
| 28|      Joe|  male| Jackson|7349282382|
| 32|    James|  male|   Smith|5678568567|
+---+---------+------+--------+----------+
only showing top 2 rows



In [18]:
# withColumn adds new column to dataframe. We are adding 'newAge' here with value as twice of 'age'
# !!!!!! Remember its temporary data unless it is assigned to some other dataFrame. !!!!!
df.withColumn('newAge',df['age']*2).show()

+---+---------+------+--------+----------+------+
|age|firstName|gender|lastName|    number|newAge|
+---+---------+------+--------+----------+------+
| 28|      Joe|  male| Jackson|7349282382|    56|
| 32|    James|  male|   Smith|5678568567|    64|
| 24|    Emily|female|   Jones| 456754675|    48|
+---+---------+------+--------+----------+------+



In [19]:
# As seen below, the new column newAge is not really added to dataframe df. 
df.show()

+---+---------+------+--------+----------+
|age|firstName|gender|lastName|    number|
+---+---------+------+--------+----------+
| 28|      Joe|  male| Jackson|7349282382|
| 32|    James|  male|   Smith|5678568567|
| 24|    Emily|female|   Jones| 456754675|
+---+---------+------+--------+----------+



In [20]:
# assigning the df with new column to df_new. 
df_new = df.withColumn('newAge',df['age']*2).show()
df_new

+---+---------+------+--------+----------+------+
|age|firstName|gender|lastName|    number|newAge|
+---+---------+------+--------+----------+------+
| 28|      Joe|  male| Jackson|7349282382|    56|
| 32|    James|  male|   Smith|5678568567|    64|
| 24|    Emily|female|   Jones| 456754675|    48|
+---+---------+------+--------+----------+------+



In [21]:
# withColumnRenamed is used to rename the existing column. 
# !!!!!! Remember its temporary data unless it is assigned to some other dataFrame. !!!!!
df.withColumnRenamed('age','new_age').show()

+-------+---------+------+--------+----------+
|new_age|firstName|gender|lastName|    number|
+-------+---------+------+--------+----------+
|     28|      Joe|  male| Jackson|7349282382|
|     32|    James|  male|   Smith|5678568567|
|     24|    Emily|female|   Jones| 456754675|
+-------+---------+------+--------+----------+



In [22]:
# As seen below, the new column newAge is not really added to dataframe df.
df.show()

+---+---------+------+--------+----------+
|age|firstName|gender|lastName|    number|
+---+---------+------+--------+----------+
| 28|      Joe|  male| Jackson|7349282382|
| 32|    James|  male|   Smith|5678568567|
| 24|    Emily|female|   Jones| 456754675|
+---+---------+------+--------+----------+



In [23]:
# assigning the df with new column to df_new. 
df_new1 = df.withColumnRenamed('age','new_age').show()
df_new1

+-------+---------+------+--------+----------+
|new_age|firstName|gender|lastName|    number|
+-------+---------+------+--------+----------+
|     28|      Joe|  male| Jackson|7349282382|
|     32|    James|  male|   Smith|5678568567|
|     24|    Emily|female|   Jones| 456754675|
+-------+---------+------+--------+----------+



### Creating the Temporary View with the dataframe to access like the DB2/SQL Table

In [24]:
# copying the dataframe data to Temporary View names 'people'
df.createOrReplaceTempView('people')

In [25]:
# the data from 'people' can be queried now like SQL Queries using SELECT as seen below
results = spark.sql("select * from people")
results.show()

+---+---------+------+--------+----------+
|age|firstName|gender|lastName|    number|
+---+---------+------+--------+----------+
| 28|      Joe|  male| Jackson|7349282382|
| 32|    James|  male|   Smith|5678568567|
| 24|    Emily|female|   Jones| 456754675|
+---+---------+------+--------+----------+



In [26]:
# SQL query can contain the conditions as well just like regular SQL queries using WHERE clause
results = spark.sql("select * from people where age > 25")
results.show()

+---+---------+------+--------+----------+
|age|firstName|gender|lastName|    number|
+---+---------+------+--------+----------+
| 28|      Joe|  male| Jackson|7349282382|
| 32|    James|  male|   Smith|5678568567|
+---+---------+------+--------+----------+



# Spark dataFrame Basic Operations

In [27]:
# create a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()

In [28]:
# read th csv file appl_stock into spark session
df= spark.read.csv('appl_stock.csv',inferSchema=True, header=True)

In [29]:
# display the top 10 rows of dataFrame
df.show(10)

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [30]:
# print the schema of the dataFrame
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [31]:
# count() and columns() are used to get the count of the rows and also the column names of the dataframe. 
print("The number of records in the dataframe is :", df.count())
print("The columns of the dataframe are          :",df.columns)

The number of records in the dataframe is : 1762
The columns of the dataframe are          : ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


In [32]:
# write() also can be used similar to filter()
df.where((df['Open']<100) & (df['Close']<92)).select(['Date','Open','Close']).show(5)

# filter() also can be used similar to write()
df.filter((df['Open']<100) & (df['Close']<92)).select(['Date','Open','Close']).show(5)

+----------+---------+-----------------+
|      Date|     Open|            Close|
+----------+---------+-----------------+
|2014-06-13|92.199997|        91.279999|
|2014-06-19|92.290001|        91.860001|
|2014-06-20|91.849998|        90.910004|
|2014-06-23|    91.32|90.83000200000001|
|2014-06-24|    90.75|        90.279999|
+----------+---------+-----------------+
only showing top 5 rows

+----------+---------+-----------------+
|      Date|     Open|            Close|
+----------+---------+-----------------+
|2014-06-13|92.199997|        91.279999|
|2014-06-19|92.290001|        91.860001|
|2014-06-20|91.849998|        90.910004|
|2014-06-23|    91.32|90.83000200000001|
|2014-06-24|    90.75|        90.279999|
+----------+---------+-----------------+
only showing top 5 rows



In [33]:
# Just like SQL Where clause, we can use filter to select the records from dataframe based on condition. 
df.filter("Close < 500").show(5)

+----------+----------+----------+------------------+------------------+---------+------------------+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+----------+----------+----------+------------------+------------------+---------+------------------+
only showing top 5 rows



In [34]:
# filter() to select specific rows that has "Close" < 500. and select() to select specific columns. 
df.filter("Close < 500").select(["Open", "Close"]).show(5)

# df.filter(df['close']>500).select(['Open','Close']).show()     --> Even this can be used

+----------+------------------+
|      Open|             Close|
+----------+------------------+
|213.429998|        214.009998|
|214.599998|        214.379993|
|214.379993|        210.969995|
|    211.75|            210.58|
|210.299994|211.98000499999998|
+----------+------------------+
only showing top 5 rows



In [35]:
# Specifying Multiple conditions using filter(). 
df.filter((df['close'] < 200) & (df['Open'] > 200)).select(['Open','Close']).show()

+------------------+----------+
|              Open|     Close|
+------------------+----------+
|206.78000600000001|    197.75|
|        204.930004|199.289995|
|        201.079996|192.060003|
+------------------+----------+



In [36]:
df.filter(df['Low'] == 197.16).show()    # --> show() would display the resultant like SQL output in tabluar format. 

df.filter(df['Low'] == 197.16).collect() # --> collect() would display the resultant in List/row format. Each data can then accessed like list elements. 

+----------+------------------+----------+------+------+---------+---------+
|      Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+----------+------------------+----------+------+------+---------+---------+
|2010-01-22|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+----------+------------------+----------+------+------+---------+---------+



[Row(Date=datetime.date(2010, 1, 22), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [37]:
# Capturing the data that has 'Low' creater than 690. 
result = df.filter((df['Low'] > 690.0)).collect()
result

[Row(Date=datetime.date(2012, 9, 17), Open=699.349998, High=699.799995, Low=694.6100230000001, Close=699.7800219999999, Volume=99507800, Adj Close=91.052448),
 Row(Date=datetime.date(2012, 9, 18), Open=699.879997, High=702.329987, Low=696.4199980000001, Close=701.910004, Volume=93375800, Adj Close=91.329593),
 Row(Date=datetime.date(2012, 9, 19), Open=700.259979, High=703.989998, Low=699.569977, Close=702.100021, Volume=81718700, Adj Close=91.35431700000001),
 Row(Date=datetime.date(2012, 9, 20), Open=699.1599809999999, High=700.059975, Low=693.619987, Close=698.6999969999999, Volume=84142100, Adj Close=90.91192),
 Row(Date=datetime.date(2012, 9, 21), Open=702.409988, High=705.070023, Low=699.3599849999999, Close=700.089989, Volume=142897300, Adj Close=91.09278)]

In [38]:
# from the above result, capturing the 1st occurence i.e., 0th element and then accessing "Volume" field
row=result[0]
row.asDict()['Volume']

99507800

## Transforming and writing into CSV

In [39]:
df.show(5)

+----------+----------+----------+------------------+------------------+---------+------------------+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+----------+----------+----------+------------------+------------------+---------+------------------+
only showing top 5 rows



In [40]:
from pyspark.sql.functions import round, col

df=df.withColumn("Open",round(col('Open'),2))
df=df.withColumn("High",round(col('High'),2))
df=df.withColumn("Low",round(col('Low'),2))
df=df.withColumn("Close",round(col('Close'),2))
df=df.withColumn("Adj Close",round(col('Adj Close'),2))
df.show(5)

+----------+------+------+------+------+---------+---------+
|      Date|  Open|  High|   Low| Close|   Volume|Adj Close|
+----------+------+------+------+------+---------+---------+
|2010-01-04|213.43| 214.5|212.38|214.01|123432400|    27.73|
|2010-01-05| 214.6|215.59|213.25|214.38|150476200|    27.77|
|2010-01-06|214.38|215.23|210.75|210.97|138040000|    27.33|
|2010-01-07|211.75| 212.0|209.05|210.58|119282800|    27.28|
|2010-01-08| 210.3| 212.0|209.06|211.98|111902700|    27.46|
+----------+------+------+------+------+---------+---------+
only showing top 5 rows



In [41]:
# Finding the average of the day
df=df.withColumn("Day_Avg", round(((col("High") + col("Low"))/2),2))

df.show(5)

+----------+------+------+------+------+---------+---------+-------+
|      Date|  Open|  High|   Low| Close|   Volume|Adj Close|Day_Avg|
+----------+------+------+------+------+---------+---------+-------+
|2010-01-04|213.43| 214.5|212.38|214.01|123432400|    27.73| 213.44|
|2010-01-05| 214.6|215.59|213.25|214.38|150476200|    27.77| 214.42|
|2010-01-06|214.38|215.23|210.75|210.97|138040000|    27.33| 212.99|
|2010-01-07|211.75| 212.0|209.05|210.58|119282800|    27.28| 210.53|
|2010-01-08| 210.3| 212.0|209.06|211.98|111902700|    27.46| 210.53|
+----------+------+------+------+------+---------+---------+-------+
only showing top 5 rows



In [42]:
from pyspark.sql.functions import year

df=df.withColumn("Year",year(df['Date']))

df.show(5)

+----------+------+------+------+------+---------+---------+-------+----+
|      Date|  Open|  High|   Low| Close|   Volume|Adj Close|Day_Avg|Year|
+----------+------+------+------+------+---------+---------+-------+----+
|2010-01-04|213.43| 214.5|212.38|214.01|123432400|    27.73| 213.44|2010|
|2010-01-05| 214.6|215.59|213.25|214.38|150476200|    27.77| 214.42|2010|
|2010-01-06|214.38|215.23|210.75|210.97|138040000|    27.33| 212.99|2010|
|2010-01-07|211.75| 212.0|209.05|210.58|119282800|    27.28| 210.53|2010|
|2010-01-08| 210.3| 212.0|209.06|211.98|111902700|    27.46| 210.53|2010|
+----------+------+------+------+------+---------+---------+-------+----+
only showing top 5 rows



In [43]:
# Exporting the transformed dataframe --- WAS NOT SUCCESSFULL
'''    
df.coalesce(1).write.csv(
    "C:/Users/dilip/Documents/Dilip Studies/pyspark/updated",
    header=True,
    mode="overwrite"
)
'''

'    \ndf.coalesce(1).write.csv(\n    "C:/Users/dilip/Documents/Dilip Studies/pyspark/updated",\n    header=True,\n    mode="overwrite"\n)\n'

## GroupBy and Aggregation

In [44]:
# Creating new spark session
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('aggs').getOrCreate()

In [45]:
# Reading the csv file sales_info
df=spark.read.csv("sales_info.csv",inferSchema = True, header=True)
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [46]:
# display the schema of the dataframe
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [47]:
# Using groupby() on Company to find the aggregations. Here, finding the mean after grouping on Company column. 
df.groupBy("Company").mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [48]:
# Finding the sum of Sales per Company using groupBy() and sum()
df.groupBy("Company").sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [49]:
# groupby() can also be used on other aggregations like count(), min(), max() etc
df.groupBy("Company").count().show()
df.groupBy("Company").min().show()
df.groupBy("Company").max().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [50]:
# groupBy() and agg() can also be combined like below to get the desired result. here alias is used to give new column name
from pyspark.sql.functions import sum 

df_new = df.groupBy('Company').agg(sum('Sales').alias('Company_sales'))
df_new.show(5)

+-------+-------------+
|Company|Company_sales|
+-------+-------------+
|   APPL|       1480.0|
|   GOOG|        660.0|
|     FB|       1220.0|
|   MSFT|        967.0|
+-------+-------------+



In [51]:
# agg() can be used by specifying the parameters like key value pairs
df.agg({'Sales':'sum'}).show()
df.agg({'Sales':'max'}).show()
df.agg({'Sales':'min'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+

+----------+
|min(Sales)|
+----------+
|     120.0|
+----------+



In [52]:
# agg() can also be used on groupBy dataframe itself to find the aggregations on grouped data
group_data = df.groupBy("Company")
group_data.agg({'Sales':'count'}).show()
group_data.agg({'Sales':'max'}).show()
group_data.agg({'Sales':'min'}).show()

+-------+------------+
|Company|count(Sales)|
+-------+------------+
|   APPL|           4|
|   GOOG|           3|
|     FB|           2|
|   MSFT|           3|
+-------+------------+

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+



In [53]:
# using other functions like Distinct count, Average and stad deviations.
from pyspark.sql.functions import countDistinct, avg, stddev
df.select(countDistinct('Sales')).show()
df.select(avg('Sales')).show()
df.select(stddev('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [54]:
# format_number is used to rounding off the numberic data. In the below code, the Sales std dev is rounded off to 2 digits
from pyspark.sql.functions import format_number
sales_std = df.select(stddev('Sales').alias("Sales_Std_Dev"))

sales_std.select(format_number('Sales_Std_Dev',2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [55]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [56]:
# Dataframe can be sorted using orderBy(), By default the data gets sorted in ascending order of the column specified
df.orderBy("sales").show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [57]:
# To sort in descending, either of the below two code works. desc() or ascending=False would work 
df.orderBy("sales",ascending=False).show()

df.orderBy(df["sales"].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



## Missing Data

In [58]:
# creating the spark session
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("Miss").getOrCreate()

In [59]:
# reading the ContainsNull csv file. 
df = spark.read.csv("ContainsNull.csv", header=True, inferSchema = True)
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [60]:
# dropping the rows that has Null value. na.drop() would drop all the rows that has even single null value. 
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [61]:
# thresh inside na.drop() is used to specify threshold. thresh=2 mean, drop the records with atleast 2 nulls in it. 
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [62]:
# na.drop(how='any') drops all the rows that has any null value in it. This is default one similar to na.drop()
df.na.drop(how='any').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [63]:
# na.drop(how='all') will only drop the rows if all the values are null. 
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [64]:
# subset inside na.drop() used to check null values only in specific column
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [65]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [66]:
#na.fill() is used to fill the null values. By default, spark assigns the string fields if the value specified is string. 
df.na.fill('FILL VALUE').show()

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



In [67]:
#na.fill() is used to fill the null values. By default, spark assigns the numeric fields if the value specified is numeric.
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [68]:
# Way to fill the missing values specific to columns
df.na.fill({'Name':'Missing','Sales':0}).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John|  0.0|
|emp2|Missing|  0.0|
|emp3|Missing|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [69]:
# Best way is to specify the column in subset that needs to be filled and specify the data in na.fill()
df.na.fill('No Name',subset=['Name']).show()
df.na.fill('No Name',['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [70]:
# Finding the mean value using mean()
from pyspark.sql.functions import mean
mean_value = df.select(mean(df['Sales'])).collect()           # collect() takes the value and creates the result like List
display(mean_value)

mean_value1 = df.select(mean(df['Sales']))                    # without collect(), its a table data like query result
mean_value1.show()

[Row(avg(Sales)=400.5)]

+----------+
|avg(Sales)|
+----------+
|     400.5|
+----------+



In [71]:
# Extracting the data from row format
mean_sales = mean_value[0][0]
mean_sales

400.5

In [72]:
# fill null values in Sales column by mean value
df.na.fill(mean_sales,['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [73]:
# the above operation can be done like below in single line
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0],['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



## Dates and Timestamp

In [74]:
# Creating spark session
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("dates").getOrCreate()

In [75]:
# reading the appl_stock csv file into spark
df=spark.read.csv("appl_stock.csv",header=True, inferSchema = True)
df.show(10)

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [76]:
df.head(5)

[Row(Date=datetime.date(2010, 1, 4), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.date(2010, 1, 5), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002),
 Row(Date=datetime.date(2010, 1, 6), Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004),
 Row(Date=datetime.date(2010, 1, 7), Open=211.75, High=212.000006, Low=209.050005, Close=210.58, Volume=119282800, Adj Close=27.28265),
 Row(Date=datetime.date(2010, 1, 8), Open=210.299994, High=212.000006, Low=209.06000500000002, Close=211.98000499999998, Volume=111902700, Adj Close=27.464034)]

In [77]:
# Selecting 'data' and 'open' columns
df.select('date','Open').show(10)

+----------+------------------+
|      date|              Open|
+----------+------------------+
|2010-01-04|        213.429998|
|2010-01-05|        214.599998|
|2010-01-06|        214.379993|
|2010-01-07|            211.75|
|2010-01-08|        210.299994|
|2010-01-11|212.79999700000002|
|2010-01-12|209.18999499999998|
|2010-01-13|        207.870005|
|2010-01-14|210.11000299999998|
|2010-01-15|210.92999500000002|
+----------+------------------+
only showing top 10 rows



In [78]:
# printing the schema
df.printSchema

<bound method DataFrame.printSchema of DataFrame[Date: date, Open: double, High: double, Low: double, Close: double, Volume: int, Adj Close: double]>

In [79]:
# importing some of the functions of date and timestamp. 
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month, 
                                 year,weekofyear,format_number,date_format)

In [80]:
# To display the day of the months and month of the year
df.select(dayofmonth(df['date']), month(df['date']), year(df['date'])).show()

+----------------+-----------+----------+
|dayofmonth(date)|month(date)|year(date)|
+----------------+-----------+----------+
|               4|          1|      2010|
|               5|          1|      2010|
|               6|          1|      2010|
|               7|          1|      2010|
|               8|          1|      2010|
|              11|          1|      2010|
|              12|          1|      2010|
|              13|          1|      2010|
|              14|          1|      2010|
|              15|          1|      2010|
|              19|          1|      2010|
|              20|          1|      2010|
|              21|          1|      2010|
|              22|          1|      2010|
|              25|          1|      2010|
|              26|          1|      2010|
|              27|          1|      2010|
|              28|          1|      2010|
|              29|          1|      2010|
|               1|          2|      2010|
+----------------+-----------+----

In [81]:
# copying the dataframe to new dataframe new_df and adding new column "Year" with year data from Date column
new_df = df.withColumn("Year",year(df['Date']))
print(type(new_df))
new_df.show(4)

<class 'pyspark.sql.dataframe.DataFrame'>
+----------+----------+----------+------------------+----------+---------+------------------+----+
|      Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|Year|
+----------+----------+----------+------------------+----------+---------+------------------+----+
|2010-01-04|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|2010|
|2010-01-05|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|2010|
|2010-01-06|214.379993|    215.23|        210.750004|210.969995|138040000|27.333178000000004|2010|
|2010-01-07|    211.75|212.000006|        209.050005|    210.58|119282800|          27.28265|2010|
+----------+----------+----------+------------------+----------+---------+------------------+----+
only showing top 4 rows



In [82]:
# grouping on Year to find the Mean of other columns & assigning it to new variable. 
result = new_df.groupBy("Year").mean().select(['Year', 'avg(Close)'])
result.show()
print(type(result))

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+

<class 'pyspark.sql.dataframe.DataFrame'>


In [83]:
# Renaming the column "avg(Close)" to "Average Closing Price".
new = result.withColumnRenamed('avg(Close)','Average Closing Price')
new.show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|2015|   120.03999980555547|
|2013|    472.6348802857143|
|2014|    295.4023416507935|
|2012|    576.0497195640002|
|2016|   104.60400786904763|
|2010|    259.8424600000002|
|2011|   364.00432532142867|
+----+---------------------+



In [84]:
# formatting the Avg Closing Price and rounding it off to 2 digits. 
new.select(['Year',format_number('Average Closing Price',2).alias('Avg Closing Price')]).show()

+----+-----------------+
|Year|Avg Closing Price|
+----+-----------------+
|2015|           120.04|
|2013|           472.63|
|2014|           295.40|
|2012|           576.05|
|2016|           104.60|
|2010|           259.84|
|2011|           364.00|
+----+-----------------+

