# Spark DataFrame Basics

In [76]:
from pyspark.sql import SparkSession

In [77]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [78]:
df = spark.read.json('people.json')

## 1. Basic Methods or Attributes

In [79]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [80]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [81]:
df.columns

['age', 'name']

In [82]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



## 2. Changing Type of Columns (Optional)

Use this part if Spark can not figure out correct types by itself. 

In [83]:
from pyspark.sql.types import (StructField, StringType, 
                               IntegerType, StructType)

In [84]:
# Parameters of StructField: 
# 1. The column whose type we want to change,
# 2. The new type of the column,
# 3. Can values in this column be nullable
data_schema = [StructField('age', IntegerType(), True),
              StructField('name', StringType(), True)]

In [85]:
final_struc = StructType(fields=data_schema)

In [86]:
df = spark.read.json('people.json', schema=final_struc)

In [87]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



## 3. Select Values

In [88]:
# Can not view the contents of the column
type(df['age'])

pyspark.sql.column.Column

In [89]:
# Type of df.select('age') is dataFrame hence we can see the contents
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [90]:
df.select(['age', 'name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [91]:
df.head(2)[0]

Row(age=None, name='Michael')

In [92]:
 type(df.head(2)[0])

pyspark.sql.types.Row

There are many specialized types in Spark such as **pyspark.sql.column.Column** or **pyspark.sql.types.Row.** In typical Python projects types are a lot simplier like lists. 

Main reason behind this that Spark reads data from many distributed data sources and then do computational part on distributed computers.

## 4. Adding New Data

Both examples are not changing the main dataFrame

In [93]:
df.withColumn('double_age', df['age']*2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [94]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [95]:
df.withColumnRenamed('age', 'new_age').show()

+-------+-------+
|new_age|   name|
+-------+-------+
|   null|Michael|
|     30|   Andy|
|     19| Justin|
+-------+-------+



In [96]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



## 5. Filtering Data

In [97]:
df2 = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

In [98]:
df2.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [99]:
df2.head(3)[0]

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [100]:
df2.filter("Close < 500").select("Open").show(3)

+----------+
|      Open|
+----------+
|213.429998|
|214.599998|
|214.379993|
+----------+
only showing top 3 rows



In [101]:
# This is more pythonic way to filter out the data 
df2.filter(df2['CLose'] < 500).select('Open').show(3)

+----------+
|      Open|
+----------+
|213.429998|
|214.599998|
|214.379993|
+----------+
only showing top 3 rows



### 5.1 Filtering on multiple conditions

**df2.filter(df2['Close'] < 200 & df2['Open'] > 200).show()**: Results in Py4JError. 
Be careful about parenthesis. Also typical and, or does not work in filtering, you need to use & and | operators.

In [102]:
df2.filter((df2['Close'] < 200) & ~(df2['Open'] > 200)).show(2)

+-------------------+------------------+----------+------------------+----------+---------+------------------+
|               Date|              Open|      High|               Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+------------------+----------+---------+------------------+
|2010-02-01 00:00:00|192.36999699999998|     196.0|191.29999899999999|194.729998|187469100|         25.229131|
|2010-02-02 00:00:00|        195.909998|196.319994|193.37999299999998|195.859997|174585600|25.375532999999997|
+-------------------+------------------+----------+------------------+----------+---------+------------------+
only showing top 2 rows



### 5.2 Access values of data

The example above only showing the required data but if we want to make some calculations with it we need to access it directly. To access the data use the **collect** method.

In [103]:
result = df2.filter(df2['Low'] == 197.16).collect()

In [104]:
row = result[0]

In [105]:
# Type of the row above is pyspark.sql.types.Row
# To access the data change the type to dictionary
# Or you can also access this data by typing row[5]
row.asDict()['Volume']

220441900

## 6. GroupBy and Aggregate

In [106]:
df3 = spark.read.csv('sales_info.csv', inferSchema=True, header=True)

In [107]:
df3.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [108]:
df3.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [109]:
df3.groupBy("Company").max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



You can also write the code above like this.

In [110]:
grouped_data = df3.groupBy("Company")
grouped_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [111]:
df3.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [112]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [113]:
df3.select(countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [114]:
df3.select(avg('Sales').alias('Average Sales')).show()

+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [115]:
sales_std = df3.select(stddev('Sales').alias('std'))

As you can see the values are really long for average and std. To format these values use the method given below.

In [116]:
from pyspark.sql.functions import format_number

In [117]:
sales_std.select(format_number('std',2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



ORDER BY ASCENDING

In [118]:
df3.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



ORDER BY DESCENDING

In [119]:
df3.orderBy(df3['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



## 7. Missing Data

In [120]:
df4 = spark.read.csv('ContainsNull.csv', header=True, inferSchema=True)

In [121]:
df4.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [122]:
df4.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



### 7.1 Drop missing values

In [123]:
# Row needs to have at least 2 non-null value in order to keep it 
df4.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [124]:
# Any or all
df4.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [125]:
df4.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### 7.2 Fill missing values

In [126]:
df4.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [127]:
# Fills only values whose type is string
df4.na.fill('FILL VALUE').show()

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



In [128]:
# Fills only values whose type is numeric
df4.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



<font color='RED'>IT IS A GOOD PRACTICE TO SPECIFY COLUMNS WHEN FILLING THEM AND NOT TRUST SPARK TO INFER THE TYPE.</font> 

In [129]:
df4.na.fill('No Name', subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



<font color='RED'>ANOTHER COMMON PRACTICE IS TO FILL THE VALUES WITH MEAN VALUE.</font> 

In [130]:
from pyspark.sql.functions import mean

In [131]:
mean_val = df4.select(mean(df4['Sales'])).collect()

In [132]:
df4.na.fill(mean_val[0][0], ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



## 8. Spark SQL 

**createOrReplaceTempView** creates (or replaces if that view name already exists) a lazily evaluated "view" that you can then use like a hive table in Spark SQL. It does not persist to memory unless you cache the dataset that underpins the view. 

In [133]:
df.createOrReplaceTempView('people')

In [134]:
results = spark.sql("SELECT * FROM people WHERE age=30")

In [135]:
results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



## 9. Dates and Timestamps

For this section old dataset which was transferred to df2 is used.

In [136]:
df2.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [137]:
from pyspark.sql.functions import (dayofmonth, year, month, format_number, date_format)

In [138]:
df2.select(dayofmonth(df2['Date'])).show(5)

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
+----------------+
only showing top 5 rows



In [139]:
new_df = df2.withColumn("Year", year(df2['Date']))

In [140]:
result = new_df.groupBy('Year').mean().select(['Year', 'avg(Close)'])

In [143]:
result.select(['Year', format_number('avg(Close)', 2).alias("Avg Close")]).show()

+----+---------+
|Year|Avg Close|
+----+---------+
|2015|   120.04|
|2013|   472.63|
|2014|   295.40|
|2012|   576.05|
|2016|   104.60|
|2010|   259.84|
|2011|   364.00|
+----+---------+

