### **Spark DataFrame Basics**

Spark DataFrames are the workhouse and main way of working with Spark and Python post Spark 2.0. DataFrames act as powerful versions of tables, with rows and columns, easily handling large datasets. The shift to DataFrames provides many advantages:

- A much simpler syntax
- Ability to use SQL directly in the dataframe
- Operations are automatically distributed across RDDs (Resilient Distributed Datasets)

The main advantage to using Spark DataFrames vs those other programs is that Spark can handle data across many RDDs, huge data sets that would never fit on a single computer.

**Create a Spark Session**

In order to actually work with Spark the first thing we need to do is **to create a spark session**

Spark session is a unified entry point of a spark application from Spark 2.0. It provides a way to interact with various spark's functionality with a lesser number of constructs.

In [1]:
import pyspark
import os
from pyspark.sql import SparkSession

os.environ['JAVA_HOME'] = 'C:\Java\jre1.8.0_351' #to fix a problem I had with  nvironment variables

spark = SparkSession.builder.getOrCreate()

#### **Read in a dataframe from a JSON file**

In [27]:
path = r'C:\Users\CTW02528\OneDrive - Critical Techworks\Desktop\Data Engineer CTW\4 - Trainning\Python-and-Spark-for-Big-Data-master\Spark_DataFrames\people.json'

df = spark.read.json(path)

In [14]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [15]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [16]:
df.columns

['age', 'name']

In [22]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



#### **Manually set the dataframe schema**

In [29]:
from pyspark.sql.types import (StructField,StringType,
                               IntegerType,StructType)

data_schema = [StructField('age',IntegerType(),True),
               StructField('name',StringType(),True)] #True means that the column may have NULL values

final_struct = StructType(fields=data_schema)

df = spark.read.json(path,schema = final_struct)

df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



#### **Grab data from the dataframe**

In [33]:
df.select('Age').show()

+----+
| Age|
+----+
|null|
|  30|
|  19|
+----+



In [37]:
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [38]:
df.select(['age','name'])

DataFrame[age: int, name: string]

In [41]:
# Create a double age column
df.withColumn('newage',df['age'] * 2).show()

+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    60|
|  19| Justin|    38|
+----+-------+------+



In [44]:
# Rename a column
df = df.withColumnRenamed('age','new_age')
df.show()

+-------+-------+
|new_age|   name|
+-------+-------+
|   null|Michael|
|     30|   Andy|
|     19| Justin|
+-------+-------+



#### **Use SQL to directly deal and interact with the dataframe**

In [52]:
#create a sql temporary view
df.createOrReplaceTempView('people')
query = spark.sql('SELECT * \
                  FROM people \
                  WHERE new_age = 30').show()

+-------+----+
|new_age|name|
+-------+----+
|     30|Andy|
+-------+----+



#### **Basic Operations**

In [53]:
path = r'C:\Users\CTW02528\OneDrive - Critical Techworks\Desktop\Data Engineer CTW\4 - Trainning\Python-and-Spark-for-Big-Data-master\Spark_DataFrames\appl_stock.csv'

df = spark.read.csv(path,header=True,inferSchema=True)
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

#### **Filter data**

In [55]:
# Show the opening price for every row that has a closing price less than 500€, using SQL syntax
df.filter("Close < 500").select('Open').show()

+------------------+
|              Open|
+------------------+
|        213.429998|
|        214.599998|
|        214.379993|
|            211.75|
|        210.299994|
|212.79999700000002|
|209.18999499999998|
|        207.870005|
|210.11000299999998|
|210.92999500000002|
|        208.330002|
|        214.910006|
|        212.079994|
|206.78000600000001|
|202.51000200000001|
|205.95000100000001|
|        206.849995|
|        204.930004|
|        201.079996|
|192.36999699999998|
+------------------+
only showing top 20 rows



In [58]:
# Do the same using dataframes syntax
df.filter(df['Close'] < 500).select('Open').show()

+------------------+
|              Open|
+------------------+
|        213.429998|
|        214.599998|
|        214.379993|
|            211.75|
|        210.299994|
|212.79999700000002|
|209.18999499999998|
|        207.870005|
|210.11000299999998|
|210.92999500000002|
|        208.330002|
|        214.910006|
|        212.079994|
|206.78000600000001|
|202.51000200000001|
|205.95000100000001|
|        206.849995|
|        204.930004|
|        201.079996|
|192.36999699999998|
+------------------+
only showing top 20 rows



In [63]:
# Filtering using multiple condition where the close are less than 200 and open are not greater than 200
df.filter( (df['Close'] < 500) & ~(df['Open'] > 200) ).select('Open').show()

+------------------+
|              Open|
+------------------+
|192.36999699999998|
|        195.909998|
|        195.169994|
|        196.730003|
|192.63000300000002|
|        195.690006|
|        196.419996|
|        195.889997|
|        194.880001|
|        198.109995|
|        199.999998|
|        198.229998|
|        197.380005|
|         92.699997|
|         94.730003|
|         94.129997|
|         94.040001|
|         92.199997|
|         91.510002|
|         92.309998|
+------------------+
only showing top 20 rows



In [66]:
# Save as a variable the result where the low price is equal to 197.16
result = df.filter(df['low'] == 197.16).collect()
result

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [71]:
result[0].asDict()

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

#### **GroupBy and Aggregate Operations**

In [73]:
path = r'C:\Users\CTW02528\OneDrive - Critical Techworks\Desktop\Data Engineer CTW\4 - Trainning\Python-and-Spark-for-Big-Data-master\Spark_DataFrames\sales_info.csv'

df = spark.read.csv(path,header=True,inferSchema=True)
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [81]:
# Show the average Sales according to the company
df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [84]:
# Return total sales 
df.agg({'Sales':'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [86]:
# Show the Max sales per comapny
df.groupBy('Company').agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [130]:
from pyspark.sql.functions import countDistinct,avg,stddev,mean

In [93]:
# Count the number of distinct companies
df.select(countDistinct('Company').alias('Distinct Companies')).show()

+------------------+
|Distinct Companies|
+------------------+
|                 4|
+------------------+



In [97]:
# Order by Ascending sales
df.orderBy('sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [98]:
# Descending call off the column itself.
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



#### **Missing Data**

Often data sources are incomplete, which means we will have missing data, we have 3 basic options for filling in missing data:

* Just keep the missing data points.
* Drop the missing data points (including the entire row)
* Fill them in with some other value as the mean 

In [100]:
path = r'C:\Users\CTW02528\OneDrive - Critical Techworks\Desktop\Data Engineer CTW\4 - Trainning\Python-and-Spark-for-Big-Data-master\Spark_DataFrames\ContainsNull.csv'

df = spark.read.csv(path,header=True,inferSchema=True)
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [101]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [103]:
# drop only rows that have at least 2 non null values
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [104]:
# drop rows if all rows are null
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [105]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [113]:
# Spark is smart to not fill Sales, a numerical field, with a string
df.na.fill('FILL NUMBER').show()

+----+-----------+-----+
|  Id|       Name|Sales|
+----+-----------+-----+
|emp1|       John| null|
|emp2|FILL NUMBER| null|
|emp3|FILL NUMBER|345.0|
|emp4|      Cindy|456.0|
+----+-----------+-----+



In [115]:
# We should specify the column to fill
df.na.fill('No Name',subset='Name').show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [141]:
# Fill missing values for Sales with the average Sales

mean_val = df.select(avg(df['sales'])).collect(
mean_sales = mean_val[0].asDict()['avg(sales)']


df.na.fill(mean_sales,subset='Sales').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



#### **Dates and Timestamps**

In [142]:
path = r'C:\Users\CTW02528\OneDrive - Critical Techworks\Desktop\Data Engineer CTW\4 - Trainning\Python-and-Spark-for-Big-Data-master\Spark_DataFrames\appl_stock.csv'

df = spark.read.csv(path,header=True,inferSchema=True)
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [144]:
from pyspark.sql.functions import (dayofmonth,hour,
                                   dayofyear,month,
                                   year,weekofyear,
                                   format_number,date_format)

In [148]:
df.select(['Date'
           ,dayofmonth(df['Date']).alias('Day Of Month')
           ,month(df['Date']).alias('Month')
          ]).show()

+-------------------+------------+-----+
|               Date|Day Of Month|Month|
+-------------------+------------+-----+
|2010-01-04 00:00:00|           4|    1|
|2010-01-05 00:00:00|           5|    1|
|2010-01-06 00:00:00|           6|    1|
|2010-01-07 00:00:00|           7|    1|
|2010-01-08 00:00:00|           8|    1|
|2010-01-11 00:00:00|          11|    1|
|2010-01-12 00:00:00|          12|    1|
|2010-01-13 00:00:00|          13|    1|
|2010-01-14 00:00:00|          14|    1|
|2010-01-15 00:00:00|          15|    1|
|2010-01-19 00:00:00|          19|    1|
|2010-01-20 00:00:00|          20|    1|
|2010-01-21 00:00:00|          21|    1|
|2010-01-22 00:00:00|          22|    1|
|2010-01-25 00:00:00|          25|    1|
|2010-01-26 00:00:00|          26|    1|
|2010-01-27 00:00:00|          27|    1|
|2010-01-28 00:00:00|          28|    1|
|2010-01-29 00:00:00|          29|    1|
|2010-02-01 00:00:00|           1|    2|
+-------------------+------------+-----+
only showing top

In [183]:
# Show the average closing price per year

df = df.withColumn('Year',year(df['date']))

result = df.groupBy('Year').agg({'Close':'Mean'}).orderBy('Year')
result.show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2010| 259.8424600000002|
|2011|364.00432532142867|
|2012| 576.0497195640002|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2015|120.03999980555547|
|2016|104.60400786904763|
+----+------------------+



In [186]:
# Format avg(close) column and rename column
result.select(['Year'
               ,format_number('avg(Close)',2).alias('Average Closing Sales')]).show()

+----+---------------------+
|Year|Average Closing Sales|
+----+---------------------+
|2010|               259.84|
|2011|               364.00|
|2012|               576.05|
|2013|               472.63|
|2014|               295.40|
|2015|               120.04|
|2016|               104.60|
+----+---------------------+

