# Spark DataFrame Intro

In [1]:
import findspark

In [2]:
findspark.init('/home/ubuntu/spark-2.4.4-bin-hadoop2.7/')

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [5]:
df = spark.read.json('../spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.json')

In [6]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [8]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [9]:
df.columns

['age', 'name']

In [10]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



Set the schema for the dataset

In [11]:
from pyspark.sql.types import (StructField, StringType, StructType, IntegerType)

In [13]:
data_schema = [StructField('age', IntegerType(), True),
              StructField('name', StringType(), True)]

In [15]:
final_struct = StructType(fields=data_schema)

In [16]:
df = spark.read.json("../spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.json", schema=final_struct)

In [17]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [19]:
#Python Dataframe way will return a column object
df['age'], type(df['age'])

(Column<b'age'>, pyspark.sql.column.Column)

In [22]:
#To get the column values
df.select('age').show(), type(df.select('age'))

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



(None, pyspark.sql.dataframe.DataFrame)

In [25]:
df.head(2)[0], type(df.head(2)[0])

(Row(age=None, name='Michael'), pyspark.sql.types.Row)

In [26]:
df.select(['age', 'name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [27]:
#Create new column
df.withColumn('double_age', df['age']*2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [28]:
#Use SQL with DataFrame
df.createOrReplaceTempView('people')

In [29]:
results = spark.sql("SELECT * from people")

In [30]:
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [31]:
age_results = spark.sql("SELECT * from people where age=30")

In [32]:
age_results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



# Spark DataFrame Basic Operations

In [33]:
from pyspark.sql import SparkSession

In [35]:
spark = SparkSession.builder.appName('ops').getOrCreate()

In [41]:
df = spark.read.csv('data/appl_stock.csv', inferSchema=True, header=True)

In [42]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [46]:
df.head(5)[0]

Row(Date=datetime.datetime(2016, 5, 16, 0, 0), Open=92.389999, High=94.389999, Low=91.650002, Close=93.879997, Volume=61140600, Adj Close=93.879997)

In [48]:
df.filter("Close<500").select(['Open', 'Close']).show()

+----------+----------+
|      Open|     Close|
+----------+----------+
| 92.389999| 93.879997|
|      90.0| 90.519997|
| 92.720001| 90.339996|
| 93.480003| 92.510002|
| 93.330002| 93.419998|
|      93.0| 92.790001|
| 93.370003| 92.720001|
|      94.0| 93.239998|
| 95.199997| 94.190002|
| 94.199997|     95.18|
| 93.970001| 93.639999|
| 93.989998| 93.739998|
| 97.610001| 94.830002|
|      96.0|     97.82|
|103.910004|104.349998|
|     105.0|105.080002|
|105.010002|    105.68|
|    106.93|105.970001|
|106.639999|107.129997|
|107.879997|106.910004|
+----------+----------+
only showing top 20 rows



In [52]:
df.filter(df['Close'] < 500).select(['Open', 'Close', 'Volume']).show()

+----------+----------+---------+
|      Open|     Close|   Volume|
+----------+----------+---------+
| 92.389999| 93.879997| 61140600|
|      90.0| 90.519997| 44188200|
| 92.720001| 90.339996| 76109800|
| 93.480003| 92.510002| 28539900|
| 93.330002| 93.419998| 33592500|
|      93.0| 92.790001| 32855300|
| 93.370003| 92.720001| 43458200|
|      94.0| 93.239998| 35890500|
| 95.199997| 94.190002| 41025500|
| 94.199997|     95.18| 56831300|
| 93.970001| 93.639999| 48160100|
| 93.989998| 93.739998| 68531500|
| 97.610001| 94.830002| 82242700|
|      96.0|     97.82|114602100|
|103.910004|104.349998| 56016200|
|     105.0|105.080002| 28031600|
|105.010002|    105.68| 33683100|
|    106.93|105.970001| 31552500|
|106.639999|107.129997| 30611000|
|107.879997|106.910004| 32384900|
+----------+----------+---------+
only showing top 20 rows



In [54]:
df.filter((df['Close'] < 200) & (df['Open'] > 200)).select(['Open', 'Close', 'Volume']).show()

+----------+----------+---------+
|      Open|     Close|   Volume|
+----------+----------+---------+
|201.079996|192.060003|311488100|
|204.930004|199.289995|293375600|
|206.780006|    197.75|220441900|
|202.239996|196.969995|116440800|
|201.109997|199.909996|106214500|
|201.659996|197.370005|189137900|
|200.599998|198.759996|285259800|
|200.589998|199.829994|174911800|
+----------+----------+---------+



In [56]:
df.filter((df['Close'] < 200) & ~(df['Open'] > 200)).select(['Open', 'Close', 'Volume']).show()

+----------+----------+---------+
|      Open|     Close|   Volume|
+----------+----------+---------+
| 92.389999| 93.879997| 61140600|
|      90.0| 90.519997| 44188200|
| 92.720001| 90.339996| 76109800|
| 93.480003| 92.510002| 28539900|
| 93.330002| 93.419998| 33592500|
|      93.0| 92.790001| 32855300|
| 93.370003| 92.720001| 43458200|
|      94.0| 93.239998| 35890500|
| 95.199997| 94.190002| 41025500|
| 94.199997|     95.18| 56831300|
| 93.970001| 93.639999| 48160100|
| 93.989998| 93.739998| 68531500|
| 97.610001| 94.830002| 82242700|
|      96.0|     97.82|114602100|
|103.910004|104.349998| 56016200|
|     105.0|105.080002| 28031600|
|105.010002|    105.68| 33683100|
|    106.93|105.970001| 31552500|
|106.639999|107.129997| 30611000|
|107.879997|106.910004| 32384900|
+----------+----------+---------+
only showing top 20 rows



In [58]:
df.filter(df['Low'] == 197.16).show()

+-------------------+----------+----------+------+------+---------+---------+
|               Date|      Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+----------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.780006|207.499996|197.16|197.75|220441900|  26.0037|
+-------------------+----------+----------+------+------+---------+---------+



## In real world we do not show we collect

In [59]:
result = df.filter(df['Low'] == 197.16).collect()

In [60]:
result

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.780006, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=26.0037)]

In [62]:
row = result[0]
row.asDict()

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.780006,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 26.0037}

# Groupby and Aggregations

In [63]:
from pyspark.sql import SparkSession

In [64]:
spark = SparkSession.builder.appName('aggs').getOrCreate()

In [65]:
df = spark.read.csv('data/sales_info.csv', inferSchema=True, header=True)

In [66]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [67]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [69]:
df.describe().show()

+-------+-------+-------+------------------+
|summary|Company| Person|             Sales|
+-------+-------+-------+------------------+
|  count|     12|     12|                12|
|   mean|   null|   null| 360.5833333333333|
| stddev|   null|   null|250.08742410799007|
|    min|   APPL|  Chris|             120.0|
|    max|   MSFT|Vanessa|             870.0|
+-------+-------+-------+------------------+



In [70]:
df.groupby('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [71]:
df.groupby('Company').count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [74]:
df.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [75]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [76]:
df.select(countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [78]:
df.select(avg('Sales').alias('Average_Sales')).show()

+-----------------+
|    Average_Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [80]:
df.select(stddev('Sales').alias('Std_Sales')).show()

+------------------+
|         Std_Sales|
+------------------+
|250.08742410799007|
+------------------+



In [81]:
from pyspark.sql.functions import format_number

In [82]:
sales_std = df.select(stddev('Sales').alias('std'))

In [86]:
sales_std.select(format_number('std', 2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [89]:
#Descending
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [90]:
#Ascending
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



# Handling Null Values

In [91]:
from pyspark.sql import SparkSession

In [92]:
spark = SparkSession.builder.appName('miss').getOrCreate()

In [93]:
df = spark.read.csv('data/ContainsNull.csv', inferSchema=True, header=True)

In [94]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [96]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [97]:
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [98]:
df.na.drop(how='any').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [99]:
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [100]:
df.na.drop(subset='Sales').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [102]:
df.na.fill('FILL VALUE').show()

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



In [103]:
df.na.fill(0.0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [104]:
df.na.fill('No name', subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No name| null|
|emp3|No name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [105]:
from pyspark.sql.functions import mean

In [106]:
mean_val = df.select(mean(df['Sales'])).collect()

In [110]:
mean_sales = mean_val[0][0]

In [111]:
mean_sales

400.5

In [112]:
df.na.fill(mean_sales, subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



# Dates and Timestamp

In [113]:
from pyspark.sql import SparkSession

In [114]:
spark = SparkSession.builder.appName('dates').getOrCreate()

In [115]:
df = spark.read.csv('data/appl_stock.csv', inferSchema=True, header=True)

In [117]:
df.head(1)

[Row(Date=datetime.datetime(2016, 5, 16, 0, 0), Open=92.389999, High=94.389999, Low=91.650002, Close=93.879997, Volume=61140600, Adj Close=93.879997)]

In [118]:
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, date_format)

In [119]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|              16|
|              13|
|              12|
|              11|
|              10|
|               9|
|               6|
|               5|
|               4|
|               3|
|               2|
|              29|
|              28|
|              27|
|              26|
|              25|
|              22|
|              21|
|              20|
|              19|
+----------------+
only showing top 20 rows



In [120]:
df.select(month(df['Date'])).show()

+-----------+
|month(Date)|
+-----------+
|          5|
|          5|
|          5|
|          5|
|          5|
|          5|
|          5|
|          5|
|          5|
|          5|
|          5|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
|          4|
+-----------+
only showing top 20 rows



In [125]:
#Average closing price per year
df_year = df.withColumn("Year", year(df['Date']))

In [128]:
result = df_year.groupBy("Year").mean().select(["Year", "avg(Close)"])

In [133]:
new = result.withColumnRenamed("avg(Close)", 'Average Closing Price')

In [137]:
new.select(['Year', format_number('Average Closing Price', 2).alias("Avg Close")]).show()

+----+---------+
|Year|Avg Close|
+----+---------+
|1990|    37.56|
|2003|    18.54|
|2007|   128.27|
|2015|   120.04|
|2006|    70.81|
|2013|   472.63|
|1997|    17.97|
|1988|    41.54|
|1994|    34.08|
|2014|   295.40|
|2004|    35.53|
|1991|    52.49|
|1982|    19.14|
|1996|    24.92|
|1989|    41.66|
|1998|    30.56|
|1985|    20.20|
|2012|   576.05|
|1987|    53.89|
|2009|   146.81|
+----+---------+
only showing top 20 rows

