# Spark Dataframe

## Basic Operations
Filtering, comparing and collecting

In [0]:
# import and create session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()

In [0]:
# read data
df = spark.read.csv('/FileStore/tables/appl_stock-1.csv',inferSchema= True, header= True)
df.printSchema()
df.head(3)[0]

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

Out[12]: Row(Date=datetime.date(2010, 1, 4), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [0]:
# filter with sql rule
df.filter("Close < 500").select(['Open']).show()    # filter where close < 500 and show the open column

+------------------+
|              Open|
+------------------+
|        213.429998|
|        214.599998|
|        214.379993|
|            211.75|
|        210.299994|
|212.79999700000002|
|209.18999499999998|
|        207.870005|
|210.11000299999998|
|210.92999500000002|
|        208.330002|
|        214.910006|
|        212.079994|
|206.78000600000001|
|202.51000200000001|
|205.95000100000001|
|        206.849995|
|        204.930004|
|        201.079996|
|192.36999699999998|
+------------------+
only showing top 20 rows



In [0]:
# filter according two conditions
df.filter((df['Close']<200) & ~(df['Open']>200)).show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-02-01|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.375532999999997|
|2010-02-03|        195.169994|        200.200003|        194.420004|        199.229994|153832000|25.812148999999998|
|2010-02-04|        196.730003|        198.370001|        191.570005|        192.050003|189413000|         24.881912|
|2010-02-05|192.63000300000002|             196.0|        190.850002|        195.460001|212576700|25.323710000000002|
|2010-02-08|        195.690006|197.88000300000002|      

In [0]:
# in order to set a var, use collect
result = df.filter(df['Low'] == 197.16).collect()
row = result[0]
print(row.asDict())
print('Volume:',row.asDict()['Volume'])

{'Date': datetime.date(2010, 1, 22), 'Open': 206.78000600000001, 'High': 207.499996, 'Low': 197.16, 'Close': 197.75, 'Volume': 220441900, 'Adj Close': 25.620401}
Volume: 220441900


## GroupBy and Aggregate Functions
groupBy and agg functions, import functions, formatting number and orderBy

In [0]:
# import and create session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('aggs').getOrCreate()

In [0]:
# read data
df = spark.read.csv('/FileStore/tables/sales_info.csv',inferSchema= True, header= True)
df.printSchema()
df.head(3)[0]

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)

Out[35]: Row(Company='GOOG', Person='Sam', Sales=200.0)

In [0]:
# grouping
df.groupBy("Company").mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [0]:
# aggregate
df.agg({'Sales':'mean'}).show()

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+



In [0]:
# importing functions
from pyspark.sql.functions import countDistinct, avg, stddev, format_number

In [0]:
# using functions instead of agg
df.select(countDistinct('Sales')).show()    # count distinct sales
df.select(avg('Sales').alias('Average Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+

+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [0]:
# formatting
sales_std = df.select(stddev('Sales').alias('std'))
sales_std.show()
sales_std.select(format_number('std', 2).alias('std')).show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+

+------+
|   std|
+------+
|250.09|
+------+



In [0]:
# orderBy
df.orderBy('Sales').show()    # default ascending
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



## Missing data
using na.drop and na.fill

In [0]:
# import and create session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('miss').getOrCreate()

In [0]:
# read data
df = spark.read.csv('/FileStore/tables/ContainsNull.csv',inferSchema= True, header= True)
df.printSchema()
df.show()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
# using na.drop
df.na.drop(thresh=2).show()    # drops if 2 null or lower, default is 1
df.na.drop(how='all').show()   # default is 'any'
df.na.drop(subset=['Sales']).show()   # if any null in Sales col

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
# using na.fill
# is smart about type in schema
df.na.fill('FILL INFO').show()
df.na.fill(100).show()

+----+---------+-----+
|  Id|     Name|Sales|
+----+---------+-----+
|emp1|     John| null|
|emp2|FILL INFO| null|
|emp3|FILL INFO|345.0|
|emp4|    Cindy|456.0|
+----+---------+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|100.0|
|emp2| null|100.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
# to specify, use subset
df.na.fill('No Name', subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [0]:
# import function
from pyspark.sql.functions import mean

In [0]:
# fill with mean
mean_val = df.select(mean(df['Sales'])).collect()
mean_sales = mean_val[0][0]
df.na.fill(mean_sales,['Sales']).show()
# one liner
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0],['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



## Dates and Timestamps

In [0]:
# import and create session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('dates').getOrCreate()

In [0]:
# read data
df = spark.read.csv('/FileStore/tables/appl_stock-1.csv',inferSchema= True, header= True)
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [0]:
# import functions
from pyspark.sql.functions import (dayofmonth, hour, dayofyear,
                                   month, year, weekofyear,
                                   format_number, date_format)

In [0]:
# create new df with year col
newdf = df.withColumn('Year', year(df['Date']))
# group by year and get mean of close
result = newdf.groupBy('Year').mean().select(['Year','avg(Close)'])
# rename column
new = result.withColumnRenamed('avg(Close)','Average Close Pricing')
# format numbers
new.select(['Year',format_number('Average Close Pricing',2).alias('Average Close')]).show()

+----+-------------+
|Year|Average Close|
+----+-------------+
|2015|       120.04|
|2013|       472.63|
|2014|       295.40|
|2012|       576.05|
|2016|       104.60|
|2010|       259.84|
|2011|       364.00|
+----+-------------+

